How to implement queue using the DelayQueue?

The java.util.concurrent.DelayQueue class in an implementation of the BlockingQueue interface. Elements added to the queue must implement the java.util.concurrent.Delayed interface.

The queue is unbound in size, enabling adds to return immediately, we can only take an element from the queue when the delay time has expired. If multiple elements have expired delays, the element with the longest delay expiration will be taken first.

package org.kodejava.util.concurrent;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class DelayQueueExample {
    public static void main(String[] args) {
        // Creates an instance of blocking queue using the DelayQueue.
        final BlockingQueue<DelayObject> queue = new DelayQueue<>();
        final Random random = new Random();

        new Thread(() -> {
            while (true) {
                try {
                    // Put some Delayed object into the Queue.
                    int delay = random.nextInt(10000);
                    DelayObject object = new DelayObject(
                            UUID.randomUUID().toString(), delay);

                    System.out.printf("Put object = %s%n", object);
                    queue.put(object);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer Thread").start();

        new Thread(() -> {
            while (true) {
                try {
                    // Take elements out from the DelayQueue object.
                    DelayObject object = queue.take();
                    System.out.printf("[%s] - Take object = %s%n",
                            Thread.currentThread().getName(), object);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer Thread-1").start();

        new Thread(() -> {
            while (true) {
                try {
                    // Take elements out from the DelayQueue object.
                    DelayObject object = queue.take();
                    System.out.printf("[%s] - Take object = %s%n",
                            Thread.currentThread().getName(), object);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer Thread-2").start();
    }
}

Below is an implementation of the Delayed interface. In the implementation class we have to implement the getDelay(TimeUnit) and the compareTo(Object) methods.

package org.kodejava.util.concurrent;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayObject implements Delayed {
    private final String data;
    private final long startTime;

    public DelayObject(String data, long delay) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayObject) o).startTime);
    }

    @Override
    public String toString() {
        return "{" +
                "data='" + data + '\'' +
                ", startTime=" + startTime +
                '}';
    }
}

Running this example give you some kind of the following output:

Put object = {data='ff217d8f-30c7-4ff6-b107-f5e6b13b5cf1', startTime=1635830327667}
Put object = {data='ad9b7f64-04f6-45ba-8b4a-6d9d25785303', startTime=1635830326742}
Put object = {data='7e439219-486a-473f-b093-2ada26b682f4', startTime=1635830328540}
Put object = {data='1fd6f3d2-60b1-4986-a8d0-ceb0863b30a7', startTime=1635830328498}
[Consumer Thread-1] - Take object = {data='ad9b7f64-04f6-45ba-8b4a-6d9d25785303', startTime=1635830326742}
Put object = {data='9d8811a2-5f2a-487b-a229-11ad8ed54515', startTime=1635830330952}
Put object = {data='baf72a83-0242-42cb-93db-9ef08da5f78d', startTime=1635830336088}
[Consumer Thread-2] - Take object = {data='ff217d8f-30c7-4ff6-b107-f5e6b13b5cf1', startTime=1635830327667}
Put object = {data='9d4d14c5-0777-4675-aa67-a49a0519d9b2', startTime=1635830328952}
Put object = {data='c4979d10-4a11-44a8-b79c-ff31309e968b', startTime=1635830331430}

How do I close or shutdown a BlockingQueue?

In this example you’ll learn how to close or shutdown a BlockingQueue when no more element available in the queue. We will use the common strategy by making the Producer to send a marker object in a Producer – Consumer scenario. This marker object also known as the poison object will be considered as a sign that the queue contain no more object that need to be processed. Which then will allow us to break the operation of the consumer thread.

package org.kodejava.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueShutdown {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(32);

        MyDataProducer producer = new MyDataProducer(queue);
        MyDataConsumer consumer = new MyDataConsumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

Below is the Producer object that put data into the queue. The string DONE is our marker object. This is the last data will be placed in the queue for the consumer to pick up.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataProducer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataProducer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005", "DONE"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The Consumer object loops to retrieve elements from the queue. And it will break the loop and ended the thread when it receives the marker object from the queue.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataConsumer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataConsumer.run");

        while (true) {
            try {
                String element = queue.take();
                if ("DONE".equals(element)) {
                    System.out.println("Exiting consumer thread, " +
                            "end of data reached.");
                    break;
                }
                System.out.println("Element = " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

How do I use the ArrayBlockingQueue?

ArrayBlockingQueue is one implementation of the java.util.concurrent.BlockingQueue which internally store the queue elements inside an array. The ArrayBlockingQueue can store elements for the size defined when the object is initialized, by the constructor. Once it size is defined it cannot be change or resize.

The code snippet below demonstrate the ArrayBlockingQueue class. We initialize the queue to allow its internal array to store maximum of 64 elements.

package org.kodejava.util.concurrent;

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    private final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);

    public static void main(String[] args) {
        new ArrayBlockingQueueExample().createProducerConsumer();
    }

    private void createProducerConsumer() {
        new Thread(() -> {
            while (true) {
                System.out.println(Thread.currentThread().getName());
                try {
                    sharedQueue.put("DATA-" + UUID.randomUUID());
                    Thread.sleep(250);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer Thread").start();

        new Thread(() -> {
            while (true) {
                System.out.print(Thread.currentThread().getName() + " => ");
                try {
                    System.out.println(sharedQueue.take());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer Thread-1").start();
    }
}

How do I use the BlockingQueue object?

A java.util.concurrent.BlockingQueue is an interface that extends the java.util.Queue that add special support for blocking operations. It will wait or block for the queue to become available when retrieving element operations occurs and wait or block the storing element operations until the queue has space available.

There are some implementations available for the java.util.concurrent.BlockingQueue interface. These implementations include the following classes:

Below is an example of how to use the BlockingQueue. In the example we use the ArrayBlockingQueue implementation of the interface. This example then create different thread for each Producer and Consumer object. Both of these threads use a shared blocking queue where the Producer object store some elements and the Consumer object try to retrieve the elements.

package org.kodejava.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(32);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer, "Producer").start();
        new Thread(consumer, "Consumer").start();
    }
}

Here is the Producer class. We define an array of string and use a for-loop to iterate this array to store the element into the queue. We do this by calling the put() method of the BlockingQueue. The put() method block the process if there is no space available in the queue. The calling of the Thread.sleep() method here causes the Consumer to block while waiting for the object available in the queue.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    BlockingQueue<String> queue;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("Producer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The Consumer object loops to retrieve element from the queue. In this example we retrieve an element using the take() method of the BlockingQueue and print it out into the console.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("Consumer.run");

        while (true) {
            System.out.println("Reading queue...");

            try {
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

The BlockingQueue has four different methods set for the storing and retrieving elements in the queue. Each of this method have different behavior.

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()

How to create JSP error page to handle exceptions?

In this example you will learn how to handle exceptions in JSP page. JSP have a build-in mechanism for error handling which is a special page that can be used to handle every error in the web application. To define a page as an error page we use the page directive and enable the isErrorPage attribute by setting the value to true.

Here is an example of a JSP error page:

<%@ page contentType="text/html;charset=UTF-8" %>
<%@ page isErrorPage="true" %>
<!DOCTYPE html>
<html lang="en">
<head>
    <title>Error Page</title>
</head>
<body>
<h1>An error has occurred.</h1>

<div style="color: #F00;">
    Error message: <%= exception.toString() %>
</div>
</body>
</html>

We have defined the error page. The next steps is how to tell other JSP pages to use the error page to handle errors when uncaught exception occurred. To do this we again use the page directive. Set the errorPage attribute of this directive to point to the error page. For instance in the example below we set it to errorPage.jsp.

If we try to access the errorTest.jsp as shown in the snippet below. It will throw an exception because we try to convert an invalid string into a number. Because we are not handling the error in the page the error page will come up and show the exception messages.

<%@ page contentType="text/html;charset=UTF-8" %>
<%@ page errorPage="/errorPage.jsp" %>
<html lang="en">
<head>
    <title>My Sample Page</title>
</head>
<body>
<h1>This page throws an error:</h1>

<%
    int number = Integer.parseInt("Hello, World!");
%>
</body>
</html>
JSP Error Page Demo

JSP Error Page Demo