How do I use PriorityBlockingQueue class?

This example demonstrate how to use the PriorityBlockingQueue class. The PriorityBlockingQueue is one implementation of the BlockingQueue interface. It is an unbounded concurrent queue. The object place in this type of queue must implements the java.lang.Comparable interface. The Comparable interface defines how the order priority of the elements inside this queue.

For simplicity, in this example we use strings object as the elements to be placed in the queue. The String class implements the comparable interface. When we run this example it will print out the names in the string array in alphabetical orders.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        final String[] names =
                {"carol", "alice", "malory", "bob", "alex", "jacobs"};

        final BlockingQueue<String> queue = new PriorityBlockingQueue<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < names.length; i++) {
                    try {
                        queue.put(names[i]);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Producer").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < names.length; i++) {
                        System.out.println(queue.take());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer").start();
    }
}

This code print out:

alex
alice
bob
carol
jacobs
malory

How do I use LinkedBlockingQueue class?

In the following code snippet you will learn how to use the LinkedBlockingQueue class. This class implements the BlockingQueue interface. We can create a bounded queue by specifying the queue capacity at the object construction. If we do not define the capacity, the upper bound is limited to the size of Integer.MAX_VALUE.

The data in the queue represented as a linked node in a FIFO (First-In-First-Out) order. The head element of the queue is the longest element placed in the queue and the tail element is the latest element added to the queue.

Let’s see the code in action below:

package org.kodejava.example.util.concurrent;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        final BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);

        // Producer Tread
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = UUID.randomUUID().toString();
                        System.out.printf("[%s] PUT [%s].%n",
                                Thread.currentThread().getName(), data);
                        queue.put(data);
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Producer").start();

        // Consumer-1 Thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = queue.take();
                        System.out.printf("[%s] GET [%s].%n",
                                Thread.currentThread().getName(), data);
                        Thread.sleep(550);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Consumer-1").start();

        // Consumer-2 Thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = queue.take();
                        System.out.printf("[%s] GET [%s].%n",
                                Thread.currentThread().getName(), data);
                        Thread.sleep(750);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Consumer-2").start();
    }
}

How to implement queue using the DelayQueue?

The java.util.concurrent.DelayQueue class in an implementation of the BlockingQueue interface. Elements added to the queue must implement the java.util.concurrent.Delayed interface.

The queue is unbound in size, enabling adds to return immediately, we can only take an element from the queue when the delay time has expired. If multiple elements have expired delays, the element with the longest delay expiration will be taken first.

package org.kodejava.example.util.concurrent;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class DelayQueueExample {
    public static void main(String[] args) {
        // Creates an instance of blocking queue using the DelayQueue.
        final BlockingQueue<DelayObject> queue = new DelayQueue<>();
        final Random random = new Random();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        // Put some Delayed object into the Queue.
                        int delay = random.nextInt(10000);
                        DelayObject object = new DelayObject(
                                UUID.randomUUID().toString(), delay);

                        System.out.printf("Put object = %s%n", object);
                        queue.put(object);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Producer Thread").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        // Take elements out from the DelayQueue object.
                        DelayObject object = queue.take();
                        System.out.printf("[%s] - Take object = %s%n",
                                Thread.currentThread().getName(), object);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Consumer Thread-1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        // Take elements out from the DelayQueue object.
                        DelayObject object = queue.take();
                        System.out.printf("[%s] - Take object = %s%n",
                                Thread.currentThread().getName(), object);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Consumer Thread-2").start();
    }
}

Below is an implementation of the Delayed interface. In the implementation class we have to implement the getDelay(TimeUnit) and the compareTo(Object) methods.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayObject implements Delayed  {
    private String data;
    private long startTime;

    public DelayObject(String data, long delay) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.startTime < ((DelayObject) o).startTime) {
            return -1;
        }
        if (this.startTime > ((DelayObject) o).startTime) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "{" +
                "data='" + data + ''' +
                ", startTime=" + startTime +
                '}';
    }
}

Running this example give you some kind of the following output:

Put object = {data='dfe77bb9-b2b4-41d2-8b32-30b3eadae4d0', startTime=1347895148107}
Put object = {data='8f7881b3-fc0d-4c4d-b7e5-e0c2db126fc6', startTime=1347895142511}
Put object = {data='c8aa530b-38bf-4045-bcfa-5917f04bebab', startTime=1347895143447}
Put object = {data='7cc45157-2f10-4f19-8758-b43b47339fb0', startTime=1347895142971}
Put object = {data='d286bdf0-10d2-4371-8b85-1a830146d500', startTime=1347895150111}
Put object = {data='e1e100e7-4aee-44e2-b41b-92f0e1eacbc1', startTime=1347895145007}
[Consumer Thread-2] - Take object = {data='8f7881b3-fc0d-4c4d-b7e5-e0c2db126fc6', startTime=1347895142511}
Put object = {data='a2bba93c-9a9e-404f-ac57-7429c3ee0798', startTime=1347895149480}
[Consumer Thread-1] - Take object = {data='7cc45157-2f10-4f19-8758-b43b47339fb0', startTime=1347895142971}
Put object = {data='bfb74a00-d93a-43dd-b777-5f563657948a', startTime=1347895144808}

How do I close or shutdown a BlockingQueue?

In this example you’ll learn how to close or shutdown a BlockingQueue when no more element available in the queue. We will use the common strategy by making the Producer to send a marker object in a Producer – Consumer scenario. This marker object also known as the poison object will be considered as a sign that the queue contain no more object that need to be processed. Which then will allow us to break the operation of the consumer thread.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueShutdown {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(32);

        MyDataProducer producer = new MyDataProducer(queue);
        MyDataConsumer consumer = new MyDataConsumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

Below is the Producer object that put data into the queue. The string DONE is our marker object. This is the last data will be placed in the queue for the consumer to pick up.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataProducer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataProducer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005", "DONE"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The Consumer object loops to retrieve elements from the queue. And it will break the loop and ended the thread when it retrieve the marker object from the queue.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataConsumer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataConsumer.run");

        while (true) {
            try {
                String element = queue.take();
                if ("DONE".equals(element)) {
                    System.out.println("Exiting consumer thread, " +
                            "end of data reached.");
                    break;
                }
                System.out.println("Element = " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

How do I use the ArrayBlockingQueue?

ArrayBlockingQueue is one implementation of the java.util.concurrent.BlockingQueue which internally store the queue elements inside an array. The ArrayBlockingQueue can store elements for the size defined when the object is initialized, by the constructor. Once it size is defined it cannot be change or resize.

The code snippet below demonstrate the ArrayBlockingQueue class. We initialize the queue to allow its internal array to store maximum of 64 elements.

package org.kodejava.example.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);

    public static void main(String[] args) {
        new ArrayBlockingQueueExample().createProducerConsumer();
    }

    private void createProducerConsumer() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        sharedQueue.put("DATA");
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Producer Thread").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.print(Thread.currentThread().getName() + "=> ");
                    try {
                        System.out.println(sharedQueue.take());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "Consumer Thread-1").start();
    }
}