How do I safely share data between threads with ConcurrentHashMap?

When working with a multithreaded application, ConcurrentHashMap is a great choice for safely sharing data between threads. It is a thread-safe version of a HashMap that provides high concurrency for both retrieval and updates. Here are some guidelines to safely use a ConcurrentHashMap in a multithreaded environment:


1. Use Thread-Safe Access Operations

ConcurrentHashMap ensures that operations like put(), get(), remove(), containsKey() are thread-safe. Unlike HashMap, you can safely use these methods concurrently across multiple threads without additional synchronization.

package org.kodejava.util.concurrent;

import java.util.concurrent.ConcurrentHashMap;

public class ExampleConcurrentHashMap {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        map.put("key1", 1);

        // Reading and updating the map from multiple threads
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName());
            Integer value = map.get("key1");
            if (value != null) {
                map.put("key1", value + 1);
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();
    }
}

This code works safely across threads because the put() and get() operations are thread-safe.


2. Avoid Compound Operations

While individual operations like put() and get() are thread-safe, compound operations (operations that consist of multiple actions, e.g., check-then-act) are not atomic by default. For example, the following code might fail in a multithreaded scenario:

if (!map.containsKey("key")) {  // Thread 1 might pass this check
    map.put("key", 42);         // Thread 2 might also pass this check before Thread 1 puts the value
}

To perform compound operations atomically, use methods provided by ConcurrentHashMap, such as putIfAbsent(), compute(), or merge().

Example: Use putIfAbsent

map.putIfAbsent("key", 42); // Ensures that "key" is inserted only if it isn't already present

Example: Use compute

map.compute("key", (k, v) -> (v == null) ? 1 : v + 1);
// Safely updates the value of "key" atomically

Example: Use merge

map.merge("key", 1, Integer::sum);
// Combines a new value with the existing value of "key" in a thread-safe manner

3. Leverage Concurrent Iteration

ConcurrentHashMap allows thread-safe iteration over its entries using iterators. However, note that the iterator reflects the state of the map at the moment it was created. Any changes made to the map by other threads after the iterator creation will not throw ConcurrentModificationException, but they may or may not be seen during iteration.

Safe Iteration Example

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
map.put("key2", 2);

map.forEach((key, value) -> {
    System.out.println(key + ": " + value);
});

Iterating and updating simultaneously can still be done safely through operations like compute() or computeIfPresent() within the iteration.


4. Understand Default Concurrency Level

ConcurrentHashMap partitions the map into segments internally to reduce contention among threads. You can adjust the level of concurrency (number of segments) by specifying it during construction, but the default value is sufficient for most use cases.

Custom Concurrency Level Example:

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(16, 0.75f, 32);
// 32 is the concurrency level (number of threads allowed to modify without contention)

5. Use Bulk Operations for Performance

ConcurrentHashMap includes bulk operations like forEach(), reduce(), and search(). These operations are implemented to efficiently work with large volumes of data in a concurrent environment.

Example: Use forEach

map.forEach(1, (key, value) -> {
    System.out.println(key + ": " + value);
});
// The first parameter is parallelismThreshold (minimum size to make it parallelizable)

Example: Use reduce

Integer sum = map.reduceValues(1, Integer::sum);
System.out.println("Sum of all values: " + sum);

6. Avoid Manual Synchronization

Avoid adding explicit locks like synchronized or ReentrantLock with ConcurrentHashMap, as this can lead to deadlocks or significantly hinder performance. Instead, rely on the built-in atomic methods provided by the class.


7. Be Aware of Null Restrictions

Unlike HashMap, ConcurrentHashMap does not support null keys or null values. If you try to use null, it will throw a NullPointerException. Use valid non-null keys and values at all times.


Conclusion

ConcurrentHashMap is a powerful and flexible tool for managing shared data across multiple threads. To use it safely and efficiently:

  1. Use atomic methods like putIfAbsent, compute, or merge for compound operations.
  2. Avoid manual synchronization.
  3. Leverage bulk operations for large datasets.
  4. Handle data consistently without assuming atomicity for compound actions unless explicitly supported by the API.

By following these guidelines, you can minimize race conditions and improve the safety and performance of your multithreaded application.

How do I schedule tasks using ScheduledExecutorService?

The ScheduledExecutorService is a Java concurrency utility used for scheduling tasks to run after a delay or to execute periodically. Introduced in Java 5 as part of the java.util.concurrent package, it provides flexible scheduling functionality.
Here’s how you can use it:

1. Getting an Instance of ScheduledExecutorService

You can obtain an instance using the Executors factory class:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

// Single-threaded scheduled executor
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  

2. Methods to Schedule Tasks

A) Schedule a Task with a Delay

To schedule a task to execute once after a specified delay:

import java.util.concurrent.TimeUnit;

scheduler.schedule(() -> {
    System.out.println("Task executed after delay");
}, 5, TimeUnit.SECONDS);

In this example:

  • A task will run after a delay of 5 seconds.

B) Schedule a Task at Fixed Rate

To schedule a task to run repeatedly at a fixed rate, starting after an initial delay:

scheduler.scheduleAtFixedRate(() -> {
    System.out.println("Task executed at fixed rate");
}, 2, 3, TimeUnit.SECONDS);

In this example:

  • The task will first execute 2 seconds after scheduling.
  • Subsequent executions will occur every 3 seconds, irrespective of the previous task’s runtime.

C) Schedule a Task with Fixed Delay

To schedule a task to run repeatedly with a fixed delay between the completion of one execution and the start of the next:

scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("Task executed with fixed delay");
}, 2, 3, TimeUnit.SECONDS);

In this example:

  • The task will first execute 2 seconds after scheduling.
  • Subsequent executions will occur 3 seconds after the previous task finishes.

3. Shutting Down the Scheduler

It’s important to properly shut down the scheduler to release resources when it is no longer needed:

scheduler.shutdown();

If you want to wait for currently executing tasks to finish before termination:

try {
    if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
        scheduler.shutdownNow();  // Forcefully shutdown if tasks don't complete within 60 seconds
    }
} catch (InterruptedException e) {
    scheduler.shutdownNow();
}

4. Example: Complete Program

Here’s a complete program demonstrating all of the above:

package org.kodejava.util.concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorExample {
    public static void main(String[] args) {
        // Create a ScheduledExecutorService with a single thread
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        // Schedule a task to run after a delay
        scheduler.schedule(() -> System.out.println("Task1 executed after 5 seconds"), 5, TimeUnit.SECONDS);

        // Schedule a task to run periodically at a fixed rate
        scheduler.scheduleAtFixedRate(() -> System.out.println("Task2 executed at fixed rate"), 2, 3, TimeUnit.SECONDS);

        // Schedule a task to run periodically with a fixed delay
        scheduler.scheduleWithFixedDelay(() -> System.out.println("Task3 executed with fixed delay"), 2, 5, TimeUnit.SECONDS);

        // Shut down the scheduler after some time for demonstration
        scheduler.schedule(() -> {
            System.out.println("Shutting down scheduler...");
            scheduler.shutdown();
        }, 20, TimeUnit.SECONDS);
    }
}

Key Points to Remember

  • Use scheduleAtFixedRate for periodic tasks that need to run at a consistent interval irrespective of the task runtime.
  • Use scheduleWithFixedDelay when the delay between task executions must consider the runtime of the previous task.
  • Always shut down the ScheduledExecutorService to release resources.

How do I create a thread pool with Executors in Java?

In Java, the java.util.concurrent.Executors class provides factory methods for creating and managing thread pools easily. Below are common ways to create a thread pool using Executors:

1. Fixed Thread Pool

A fixed thread pool contains a fixed number of threads. This is useful when you have a specific number of tasks to manage and want to limit the number of concurrently running threads.

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // Create a fixed thread pool with 3 threads
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            fixedThreadPool.execute(() -> {
                System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName());
            });
        }

        // Shutdown the pool after task submission
        fixedThreadPool.shutdown();
    }
}

2. Cached Thread Pool

A cached thread pool creates new threads as needed and reuses previously constructed threads (if available). This is suitable for executing many short-lived asynchronous tasks.

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // Create a cached thread pool
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            cachedThreadPool.execute(() -> {
                System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName());
            });
        }

        // Shutdown the pool after task submission
        cachedThreadPool.shutdown();
    }
}

3. Single Thread Executor

A single-threaded executor ensures that tasks are executed sequentially, one at a time, in a single thread.

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // Create a single-threaded executor
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            singleThreadExecutor.execute(() -> {
                System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName());
            });
        }

        // Shutdown the pool after task submission
        singleThreadExecutor.shutdown();
    }
}

4. Scheduled Thread Pool

A scheduled thread pool is used to schedule tasks to run after a delay or periodically.

package org.kodejava.util.concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // Create a scheduled thread pool with 2 threads
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);

        // Schedule a task to run after a 3-second delay
        scheduledThreadPool.schedule(() -> {
            System.out.println("Task is running after a delay in thread " + Thread.currentThread().getName());
        }, 3, TimeUnit.SECONDS);

        // Schedule a repeating task to run every 2 seconds
        scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println("Repeating task is running in thread " + Thread.currentThread().getName());
        }, 1, 2, TimeUnit.SECONDS);

        // Optionally, shutdown the pool after some time (e.g., 10 seconds)
        scheduledThreadPool.schedule(() -> scheduledThreadPool.shutdown(), 10, TimeUnit.SECONDS);
    }
}

5. Custom Thread Pool

For more advanced needs, you can use ThreadPoolExecutor directly to fine-tune the behavior of the thread pool.

package org.kodejava.util.concurrent;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // Create a custom thread pool with 2 core threads, 4 maximum threads, and a 10-task queue
        ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
                2, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            customThreadPool.execute(() -> {
                System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName());
            });
        }

        // Shutdown the pool after task submission
        customThreadPool.shutdown();
    }
}

Key Points:

  • shutdown(): Prevents new tasks from being submitted to the thread pool and initiates an orderly shutdown.
  • shutdownNow(): Attempts to stop all actively executing tasks and halts the processing of waiting tasks.
  • newFixedThreadPool(): Creates a pool of a fixed number of threads.
  • newCachedThreadPool(): Creates a pool with potentially unlimited threads.
  • newSingleThreadExecutor(): Creates a single-threaded pool.
  • newScheduledThreadPool(): Creates a pool for scheduling tasks.

By using thread pools, you can effectively manage system resources and control the level of concurrency in your applications.

How do I use ExecutorService to run tasks in Java?

In Java, the ExecutorService interface is part of the java.util.concurrent package and provides a higher-level replacement for managing threads and tasks. It simplifies the execution of tasks in a multithreaded environment by abstracting thread creation and management.

Here’s how you can use ExecutorService to run tasks in Java:


1. Creating an ExecutorService

You can create an instance of ExecutorService using the factory methods provided by the Executors class. Some common options are:

  • Single-threaded pool:
    ExecutorService executor = Executors.newSingleThreadExecutor();
    
  • Fixed-size thread pool:
    ExecutorService executor = Executors.newFixedThreadPool(4); // 4 threads in the pool
    
  • Cached thread pool (dynamic sizing):
    ExecutorService executor = Executors.newCachedThreadPool();
    
  • Scheduled thread pool (for tasks that need scheduling or delayed execution):
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    

2. Submitting Tasks

You can submit tasks (runnable or callable) to the ExecutorService for execution:

  • Using Runnable:
    The Runnable interface doesn’t return a result or throw checked exceptions.

    executor.submit(() -> {
      System.out.println("Running a task in thread: " + Thread.currentThread().getName());
    });
    
  • Using Callable:
    The Callable interface allows the task to return a result and throw exceptions.

    Future<Integer> future = executor.submit(() -> {
      System.out.println("Calculating result in " + Thread.currentThread().getName());
      return 42; // returning a result
    });
    
    // Retrieve the result
    try {
      Integer result = future.get();
      System.out.println("Result: " + result);
    } catch (Exception e) {
      e.printStackTrace();
    }
    

3. Shutting Down the ExecutorService

You need to shut down the ExecutorService once you’ve completed submitting tasks:

  • Graceful shutdown:
    This stops accepting new tasks and allows the currently running tasks to complete.

    executor.shutdown();
    try {
      if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
          executor.shutdownNow(); // Force shutdown if timeout happens
      }
    } catch (InterruptedException e) {
      executor.shutdownNow();
    }
    
  • Forceful shutdown:
    This halts all running tasks and stops new ones immediately.

    executor.shutdownNow();
    

4. Example: Submitting Multiple Tasks

package org.kodejava.util.concurrent;

import java.util.concurrent.*;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        // Create a fixed thread pool with 3 threads
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // Submit Runnable tasks
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // Simulate work
                } catch (InterruptedException e) {
                    System.err.println("Task " + taskId + " was interrupted!");
                }
            });
        }

        // Shutdown the executor gracefully
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // Force shutdown if tasks exceed timeout
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }

        System.out.println("All tasks finished.");
    }
}

5. Choosing Between Runnable and Callable

  • Use Runnable when your task does not need to return a result.
  • Use Callable when your task needs to return a result or throw checked exceptions.

Advanced Features

If you need to manage periodic tasks or delayed execution, use ScheduledExecutorService:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

// Schedule a task to run after a delay
scheduler.schedule(() -> System.out.println("Task executed after delay"), 3, TimeUnit.SECONDS);

// Schedule a task to run repeatedly at fixed intervals
scheduler.scheduleAtFixedRate(() -> System.out.println("Recurring task"), 1, 5, TimeUnit.SECONDS);

Summary

  1. Create an ExecutorService instance (e.g., fixed thread pool, cached thread pool).
  2. Submit tasks (Runnable or Callable) using submit().
  3. Shut down the executor service gracefully (shutdown() and awaitTermination()).
  4. Use Callable and Future for tasks that need to return results.

This abstraction helps manage your threads efficiently and avoids the complexities of low-level thread creation and management.

How do I use PriorityBlockingQueue class?

This example demonstrate how to use the PriorityBlockingQueue class. The PriorityBlockingQueue is one implementation of the BlockingQueue interface. It is an unbounded concurrent queue. The object place in this type of queue must implement the java.lang.Comparable interface. The Comparable interface defines how the order priority of the elements inside this queue.

For simplicity, in this example we use strings object as the elements to be placed in the queue. The String class implements the comparable interface. Running this example will print out the names in the string array in alphabetical orders.

package org.kodejava.util.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        final String[] names =
                {"carol", "alice", "malory", "bob", "alex", "jacobs"};

        final BlockingQueue<String> queue = new PriorityBlockingQueue<>();

        new Thread(() -> {
            for (String name : names) {
                try {
                    queue.put(name);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Producer").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < names.length; i++) {
                    System.out.println(queue.take());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Consumer").start();
    }
}

This code print out:

alex
alice
bob
carol
jacobs
malory