How do I implement producer-consumer with LinkedBlockingQueue?

The LinkedBlockingQueue in Java is an implementation of the BlockingQueue interface, which is well-suited for implementing the Producer-Consumer problem. It manages a thread-safe queue where producers can add elements and consumers can take elements, with built-in thread synchronization.

Here’s how you can implement a basic producer-consumer solution using LinkedBlockingQueue:


Example: Producer-Consumer with LinkedBlockingQueue

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerExample {

   public static void main(String[] args) {
      // Shared BlockingQueue with a capacity of 10
      BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

      // Create and start producer and consumer threads
      Thread producerThread = new Thread(new Producer(queue));
      Thread consumerThread = new Thread(new Consumer(queue));

      producerThread.start();
      consumerThread.start();
   }
}

// Producer class
class Producer implements Runnable {
   private final BlockingQueue<Integer> queue;

   public Producer(BlockingQueue<Integer> queue) {
      this.queue = queue;
   }

   @Override
   public void run() {
      try {
         for (int i = 0; i < 20; i++) { // Produce 20 items
            System.out.println("Producing: " + i);
            queue.put(i); // Adds an element to the queue, waits if full
            Thread.sleep(100); // Simulate production time
         }
      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         System.out.println("Producer was interrupted");
      }
   }
}

// Consumer class
class Consumer implements Runnable {
   private final BlockingQueue<Integer> queue;

   public Consumer(BlockingQueue<Integer> queue) {
      this.queue = queue;
   }

   @Override
   public void run() {
      try {
         while (true) { // Consume indefinitely (or you can add a termination condition)
            Integer value = queue.take(); // Removes and retrieves the head of the queue, waits if empty
            System.out.println("Consuming: " + value);
            Thread.sleep(150); // Simulate consumption time
         }
      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         System.out.println("Consumer was interrupted");
      }
   }
}

Explanation of Key Concepts

  1. Thread Safety: LinkedBlockingQueue ensures thread safety — no explicit synchronization is needed.
  2. Blocking Methods:
    • put(E e): Inserts an element into the queue, waiting if the queue is full.
    • take(): Retrieves and removes the next element from the queue, waiting if the queue is empty.
  3. Capacity: You can specify the queue’s maximum capacity to prevent overloading (in this example, it’s set to 10).
  4. Multi-threading:
    • Producer: Continuously adds elements to the queue until it reaches the specified capacity.
    • Consumer: Continuously retrieves and processes elements from the queue until it’s empty (or indefinitely, as shown).

Output

The output will interleave “Producing” and “Consuming” messages since the producer and consumer are running in separate threads:

Producing: 0
Consuming: 0
Producing: 1
Producing: 2
Consuming: 1
Producing: 3
...

Adding Multiple Producers and Consumers

You can easily extend this example to have multiple producers and consumers. For example:

Thread producer1 = new Thread(new Producer(queue));
Thread producer2 = new Thread(new Producer(queue));
Thread consumer1 = new Thread(new Consumer(queue));
Thread consumer2 = new Thread(new Consumer(queue));

producer1.start();
producer2.start();
consumer1.start();
consumer2.start();

With multiple producers and consumers, LinkedBlockingQueue automatically synchronizes all access.


This approach demonstrates how the LinkedBlockingQueue efficiently handles the producer-consumer problem without requiring explicit synchronization, making it a simple yet powerful tool for concurrent programming in Java.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.