How do I use ConcurrentHasMap forEach() method?

The forEach() method in ConcurrentHashMap is used for iteration over the entries in the map. The method takes a BiConsumer as an argument, which is a functional interface that represents an operation that accepts two input arguments and returns no result.

Here’s an example of how to use forEach() with a ConcurrentHashMap:

package org.kodejava.util.concurrent;

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapForEachExample {
    public static void main(String[] args) {
        // Create a new ConcurrentHashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // Add some key-value pairs
        map.put("One", 1);
        map.put("Two", 2);
        map.put("Three", 3);
        map.put("Four", 4);

        // Use forEach to iterate over the ConcurrentHashMap.
        // The BiConsumer takes a key (k) and value (v), and we're
        // just printing them here.
        map.forEach((k, v) -> System.out.println("Key: " + k + ", Value: " + v));
    }
}

Output:

Key: One, Value: 1
Key: Four, Value: 4
Key: Two, Value: 2
Key: Three, Value: 3

In the above example, forEach() is used to iterate over the entries of the map. For each entry, the key and value are printed. The forEach() method is often more convenient to use than an iterator, especially when you’re only performing a single operation (like print) for each entry in the map.

What is ConcurrentHasMap and how do I use it in Java?

ConcurrentHashMap is a class in Java that implements the ConcurrentMap interface. It is part of the Java Collection Framework and extends the AbstractMap class.

ConcurrentHashMap is thread-safe, which means it is designed to support high concurrency levels by handling multiple threads concurrently without any inconsistencies. It allows multiple threads to perform retrieve (get) and update (insert & delete) operations. Internally, ConcurrentHashMap uses concepts of Segmentation to store data which allows higher degree of concurrency.

Here is an example of how to use ConcurrentHashMap in Java:

package org.kodejava.util;

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        // Create a ConcurrentHashMap instance
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // Add elements
        map.put("One", 1);
        map.put("Two", 2);
        map.put("Three", 3);

        // Retrieve elements
        Integer one = map.get("One");
        System.out.println("Retrieved value for 'One': " + one);

        // Remove an element
        map.remove("Two");

        // Print all elements
        map.forEach((key, value) -> System.out.println(key + " = " + value));
    }
}

Output:

Retrieved value for 'One': 1
One = 1
Three = 3

In this example, we’re creating a ConcurrentHashMap, adding some elements to it, retrieving an element, removing an element, and finally printing all the elements.

One thing to note is that while ConcurrentHashMap allows multiple threads to read and write concurrently, a get() operation might not reflect the latest put() operation, since it might be looking at a previous segment. Further thread synchronization mechanisms might be necessary depending on your exact use case.

Also, worth mentioning, null values and null keys are not permitted in ConcurrentHashMap to prevent ambiguities and potential errors in multithreaded contexts. If you try to use null, ConcurrentHashMap will throw a NullPointerException.

Here’s an example demonstrating the usage of ConcurrentHashMap in a multithreaded context:

package org.kodejava.util;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConcurrentHashMapThreadDemo {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // Create a ThreadPool with 5 threads
        try (ExecutorService executor = Executors.newFixedThreadPool(5)) {

            // Runnable task to increment a value in the map
            Runnable task = () -> {
                for (int i = 0; i < 10; i++) {
                    map.compute("TestKey", (key, value) -> {
                        if (value == null) {
                            return 1;
                        } else {
                            return value + 1;
                        }
                    });
                }
            };

            // Submit the task to each thread in the pool
            for (int i = 0; i < 5; i++) {
                executor.submit(task);
            }

            // Shut down the executor and wait for tasks to complete
            executor.shutdown();
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        }

        System.out.println("Final value for 'TestKey': " + map.get("TestKey"));
    }
}

Output:

Final value for 'TestKey': 50

In this example, we’re creating a ConcurrentHashMap and a thread pool with ExecutorService. We’re then defining a Runnable task, which increments the value of the “TestKey” key in the map 10 times.

The task uses ConcurrentHashMap‘s compute() method, which is atomic, meaning that the retrieval and update of the value is done as a single operation that cannot be interleaved with other operations. We then submit the task to each of the five threads in our thread pool. After all threads have completed their tasks, we retrieve and print the final value of “TestKey”.

If everything works correctly, the output should be “Final value for ‘TestKey’: 50”, because we have 5 threads each incrementing the value 10 times. This demonstrates the thread-safety of ConcurrentHashMap, as the compute() operation is done atomically and many threads were able to modify the map simultaneously without causing inconsistencies. If we were using a plain HashMap instead, we could not guarantee this would be the case.

How do I use PriorityBlockingQueue class?

This example demonstrate how to use the PriorityBlockingQueue class. The PriorityBlockingQueue is one implementation of the BlockingQueue interface. It is an unbounded concurrent queue. The object place in this type of queue must implement the java.lang.Comparable interface. The Comparable interface defines how the order priority of the elements inside this queue.

For simplicity, in this example we use strings object as the elements to be placed in the queue. The String class implements the comparable interface. Running this example will print out the names in the string array in alphabetical orders.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        final String[] names =
                {"carol", "alice", "malory", "bob", "alex", "jacobs"};

        final BlockingQueue<String> queue = new PriorityBlockingQueue<>();

        new Thread(() -> {
            for (String name : names) {
                try {
                    queue.put(name);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < names.length; i++) {
                    System.out.println(queue.take());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Consumer").start();
    }
}

This code print out:

alex
alice
bob
carol
jacobs
malory

How do I use LinkedBlockingQueue class?

In the following code snippet you will learn how to use the LinkedBlockingQueue class. This class implements the BlockingQueue interface. We can create a bounded queue by specifying the queue capacity at the object construction. If we do not define the capacity, the upper bound is limited to the size of Integer.MAX_VALUE.

The data in the queue represented as a linked node in a FIFO (First-In-First-Out) order. The head element of the queue is the longest element placed in the queue and the tail element is the latest element added to the queue.

Let’s see the code in action below:

package org.kodejava.util.concurrent;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        final BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);

        // Producer Tread
        new Thread(() -> {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.printf("[%s] PUT [%s].%n",
                            Thread.currentThread().getName(), data);
                    queue.put(data);
                    Thread.sleep(250);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer").start();

        // Consumer-1 Thread
        new Thread(() -> {
            while (true) {
                try {
                    String data = queue.take();
                    System.out.printf("[%s] GET [%s].%n",
                            Thread.currentThread().getName(), data);
                    Thread.sleep(550);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer-1").start();

        // Consumer-2 Thread
        new Thread(() -> {
            while (true) {
                try {
                    String data = queue.take();
                    System.out.printf("[%s] GET [%s].%n",
                            Thread.currentThread().getName(), data);
                    Thread.sleep(750);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer-2").start();
    }
}

The output of the code snippet above:

[Producer] PUT [34418601-19cf-41fc-aecc-a0fa7caa72bb].
[Consumer-1] GET [34418601-19cf-41fc-aecc-a0fa7caa72bb].
[Producer] PUT [f2050eaa-c575-4faf-89e8-1ad0e3fbce0e].
[Consumer-2] GET [f2050eaa-c575-4faf-89e8-1ad0e3fbce0e].
[Producer] PUT [2f24985b-009b-44c3-9cec-7a066aa08ecf].
[Consumer-1] GET [2f24985b-009b-44c3-9cec-7a066aa08ecf].
[Producer] PUT [b4332823-c1a2-4587-bf5f-1f67407d6945].
[Consumer-2] GET [b4332823-c1a2-4587-bf5f-1f67407d6945].
[Producer] PUT [33edcd78-1b14-4913-afe2-7f44d76db50b].
[Consumer-1] GET [33edcd78-1b14-4913-afe2-7f44d76db50b].

How to implement queue using the DelayQueue?

The java.util.concurrent.DelayQueue class in an implementation of the BlockingQueue interface. Elements added to the queue must implement the java.util.concurrent.Delayed interface.

The queue is unbound in size, enabling adds to return immediately, we can only take an element from the queue when the delay time has expired. If multiple elements have expired delays, the element with the longest delay expiration will be taken first.

package org.kodejava.util.concurrent;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

public class DelayQueueExample {
    public static void main(String[] args) {
        // Creates an instance of blocking queue using the DelayQueue.
        final BlockingQueue<DelayObject> queue = new DelayQueue<>();
        final Random random = new Random();

        new Thread(() -> {
            while (true) {
                try {
                    // Put some Delayed object into the Queue.
                    int delay = random.nextInt(10000);
                    DelayObject object = new DelayObject(
                            UUID.randomUUID().toString(), delay);

                    System.out.printf("Put object = %s%n", object);
                    queue.put(object);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer Thread").start();

        new Thread(() -> {
            while (true) {
                try {
                    // Take elements out from the DelayQueue object.
                    DelayObject object = queue.take();
                    System.out.printf("[%s] - Take object = %s%n",
                            Thread.currentThread().getName(), object);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer Thread-1").start();

        new Thread(() -> {
            while (true) {
                try {
                    // Take elements out from the DelayQueue object.
                    DelayObject object = queue.take();
                    System.out.printf("[%s] - Take object = %s%n",
                            Thread.currentThread().getName(), object);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Consumer Thread-2").start();
    }
}

Below is an implementation of the Delayed interface. In the implementation class we have to implement the getDelay(TimeUnit) and the compareTo(Object) methods.

package org.kodejava.util.concurrent;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayObject implements Delayed {
    private final String data;
    private final long startTime;

    public DelayObject(String data, long delay) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayObject) o).startTime);
    }

    @Override
    public String toString() {
        return "{" +
                "data='" + data + '\'' +
                ", startTime=" + startTime +
                '}';
    }
}

Running this example give you some kind of the following output:

Put object = {data='ff217d8f-30c7-4ff6-b107-f5e6b13b5cf1', startTime=1635830327667}
Put object = {data='ad9b7f64-04f6-45ba-8b4a-6d9d25785303', startTime=1635830326742}
Put object = {data='7e439219-486a-473f-b093-2ada26b682f4', startTime=1635830328540}
Put object = {data='1fd6f3d2-60b1-4986-a8d0-ceb0863b30a7', startTime=1635830328498}
[Consumer Thread-1] - Take object = {data='ad9b7f64-04f6-45ba-8b4a-6d9d25785303', startTime=1635830326742}
Put object = {data='9d8811a2-5f2a-487b-a229-11ad8ed54515', startTime=1635830330952}
Put object = {data='baf72a83-0242-42cb-93db-9ef08da5f78d', startTime=1635830336088}
[Consumer Thread-2] - Take object = {data='ff217d8f-30c7-4ff6-b107-f5e6b13b5cf1', startTime=1635830327667}
Put object = {data='9d4d14c5-0777-4675-aa67-a49a0519d9b2', startTime=1635830328952}
Put object = {data='c4979d10-4a11-44a8-b79c-ff31309e968b', startTime=1635830331430}