How do I close or shutdown a BlockingQueue?

In this example you’ll learn how to close or shutdown a BlockingQueue when no more element available in the queue. We will use the common strategy by making the Producer to send a marker object in a Producer – Consumer scenario. This marker object also known as the poison object will be considered as a sign that the queue contain no more object that need to be processed. Which then will allow us to break the operation of the consumer thread.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueShutdown {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue<>(32);

        MyDataProducer producer = new MyDataProducer(queue);
        MyDataConsumer consumer = new MyDataConsumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

Below is the Producer object that put data into the queue. The string DONE is our marker object. This is the last data will be placed in the queue for the consumer to pick up.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataProducer implements Runnable {
    BlockingQueue queue;

    public MyDataProducer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataProducer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005", "DONE"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The Consumer object loops to retrieve elements from the queue. And it will break the loop and ended the thread when it retrieve the marker object from the queue.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataConsumer implements Runnable {
    BlockingQueue queue;

    public MyDataConsumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataConsumer.run");

        while (true) {
            try {
                String element = queue.take();
                if ("DONE".equals(element)) {
                    System.out.println("Exiting consumer thread, " +
                            "end of data reached.");
                    break;
                }
                System.out.println("Element = " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
Wayan Saryada

Wayan Saryada

A programmer, runner, recreational diver, currently living in the island of Bali, Indonesia. Mostly programming in Java, creating web based application with Spring Framework, JPA, etc. If you need help on Java programming you can hire me on Fiverr.
Wayan Saryada

Leave a Reply