To use a BlockingQueue
to pass data between threads in Java, you can follow these steps:
1. Understand BlockingQueue
BlockingQueue is part of the java.util.concurrent
package and is designed for thread-safe communication between producer and consumer threads. It provides methods such as put()
and take()
, which handle blocking behavior:
put(E e)
: Blocks if the queue is full until space becomes available.take()
: Blocks if the queue is empty until an element becomes available.
Common implementations of BlockingQueue
include:
ArrayBlockingQueue
: A fixed-capacity, bounded queue.LinkedBlockingQueue
: A linked-node queue, optionally bounded.PriorityBlockingQueue
: A priority-based queue (does not block onoffer
/add
).SynchronousQueue
: A queue with no capacity, whereput
blocks until atake
occurs (and vice versa).
2. Example Setup for Producer-Consumer Pattern
Here’s an example to show how to use a BlockingQueue
to pass data between producer and consumer threads:
Code Example
package org.kodejava.util.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
private static final int QUEUE_CAPACITY = 5;
public static void main(String[] args) {
// Instantiate a BlockingQueue with a capacity of 5
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
// Create producer and consumer threads
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
// Start threads
producer.start();
consumer.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("Produced: " + i);
queue.put(i); // Add item to the queue, blocks if full
Thread.sleep(500); // Simulate production time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // Take item from the queue, blocks if empty
System.out.println("Consumed: " + item);
Thread.sleep(1000); // Simulate consumption time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3. How It Works
- BlockingQueue:
- A
LinkedBlockingQueue
with a capacity of 5 is created. - The producer thread calls
queue.put(i)
to add elements to the queue. If the queue is full, the thread blocks until space becomes available. - The consumer thread calls
queue.take()
to retrieve elements. If the queue is empty, the thread blocks until an item is added.
- A
- Producer:
- Produces data (e.g., numbers) and adds them to the queue.
- The
put
method ensures thread safety and blocks automatically when the queue is full.
- Consumer:
- Retrieves data from the queue and processes it.
- The
take
method ensures thread safety and blocks automatically when the queue is empty.
4. Key Points
- Thread Safety:
BlockingQueue
handles all necessary synchronization internally. - Automatic Blocking:
put()
in the producer blocks if the queue is full.take()
in the consumer blocks if the queue is empty.
- Stopping Mechanism:
- In real-world applications, implement a stopping mechanism such as adding a “poison pill” (special object) to signal termination.
Example of Poison Pill:
// Add poison pill to queue after all items are produced
queue.put(-1);
// Consumer stops processing when it encounters the poison pill
if (item == -1) break;
5. Execution Output
If you execute the above example, the producer and consumer will work concurrently, producing and consuming items in a thread-safe manner. Sample output:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Produced: 4
Consumed: 3
Produced: 5
This demonstrates how BlockingQueue
effectively facilitates communication between threads.