How do I use BlockingQueue to pass data between threads?

To use a BlockingQueue to pass data between threads in Java, you can follow these steps:

1. Understand BlockingQueue

BlockingQueue is part of the java.util.concurrent package and is designed for thread-safe communication between producer and consumer threads. It provides methods such as put() and take(), which handle blocking behavior:

  • put(E e): Blocks if the queue is full until space becomes available.
  • take(): Blocks if the queue is empty until an element becomes available.

Common implementations of BlockingQueue include:

  • ArrayBlockingQueue: A fixed-capacity, bounded queue.
  • LinkedBlockingQueue: A linked-node queue, optionally bounded.
  • PriorityBlockingQueue: A priority-based queue (does not block on offer/add).
  • SynchronousQueue: A queue with no capacity, where put blocks until a take occurs (and vice versa).

2. Example Setup for Producer-Consumer Pattern

Here’s an example to show how to use a BlockingQueue to pass data between producer and consumer threads:

Code Example

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueDemo {
    private static final int QUEUE_CAPACITY = 5;

    public static void main(String[] args) {
        // Instantiate a BlockingQueue with a capacity of 5
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

        // Create producer and consumer threads
        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        // Start threads
        producer.start();
        consumer.start();
    }
}

class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 10; i++) {
                System.out.println("Produced: " + i);
                queue.put(i); // Add item to the queue, blocks if full
                Thread.sleep(500); // Simulate production time
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer item = queue.take(); // Take item from the queue, blocks if empty
                System.out.println("Consumed: " + item);
                Thread.sleep(1000); // Simulate consumption time
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3. How It Works

  1. BlockingQueue:
    • A LinkedBlockingQueue with a capacity of 5 is created.
    • The producer thread calls queue.put(i) to add elements to the queue. If the queue is full, the thread blocks until space becomes available.
    • The consumer thread calls queue.take() to retrieve elements. If the queue is empty, the thread blocks until an item is added.
  2. Producer:
    • Produces data (e.g., numbers) and adds them to the queue.
    • The put method ensures thread safety and blocks automatically when the queue is full.
  3. Consumer:
    • Retrieves data from the queue and processes it.
    • The take method ensures thread safety and blocks automatically when the queue is empty.

4. Key Points

  • Thread Safety: BlockingQueue handles all necessary synchronization internally.
  • Automatic Blocking:
    • put() in the producer blocks if the queue is full.
    • take() in the consumer blocks if the queue is empty.
  • Stopping Mechanism:
    • In real-world applications, implement a stopping mechanism such as adding a “poison pill” (special object) to signal termination.

Example of Poison Pill:

// Add poison pill to queue after all items are produced
queue.put(-1);

// Consumer stops processing when it encounters the poison pill
if (item == -1) break;

5. Execution Output

If you execute the above example, the producer and consumer will work concurrently, producing and consuming items in a thread-safe manner. Sample output:

Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Produced: 4
Consumed: 3
Produced: 5

This demonstrates how BlockingQueue effectively facilitates communication between threads.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.