How do I use CountDownLatch for coordination?

A CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. It’s initialized with a given count, and the await() methods block until the current count reaches zero due to invocations of the countDown() method.

Here is a practical guide on how to coordinate threads using CountDownLatch.

1. Basic Coordination Pattern

The most common use case is having a main thread wait for several worker threads to finish their tasks.

package org.kodejava.util.concurrent;

import java.util.concurrent.CountDownLatch;

public class LatchCoordination {
    public static void main(String[] args) throws InterruptedException {
        int numberOfWorkers = 3;
        // 1. Initialize with the number of events to wait for
        CountDownLatch latch = new CountDownLatch(numberOfWorkers);

        for (int i = 0; i < numberOfWorkers; i++) {
            new Thread(new Worker(latch, "Worker-" + i)).start();
        }

        System.out.println("Main thread is waiting...");
        // 2. Wait until the count reaches zero
        latch.await();

        System.out.println("All workers finished. Main thread proceeding.");
    }
}

class Worker implements Runnable {
    private final CountDownLatch latch;
    private final String name;

    Worker(CountDownLatch latch, String name) {
        this.latch = latch;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            System.out.println(name + " is performing work...");
            Thread.sleep(1000); // Simulate work
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 3. Crucial: Always decrement the count in a finally block
            latch.countDown();
            System.out.println(name + " finished. Remaining: " + latch.getCount());
        }
    }
}

2. Key Methods to Remember

  • new CountDownLatch(int count): Sets the initial count. This count cannot be reset. If you need to reset the count, consider using a CyclicBarrier.
  • countDown(): Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
  • await(): Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted.
  • await(long timeout, TimeUnit unit): A safer version that waits for a specific duration. It returns true if the count reached zero and false if the waiting time elapsed before the count reached zero.

3. Advanced Use: The “Starting Gun”

You can also use a CountDownLatch with a count of 1 as a “starting gun” to release many threads at the exact same moment (useful for load testing).

CountDownLatch startSignal = new CountDownLatch(1);

for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            startSignal.await(); // All threads stall here
            // Do concurrent work...
        } catch (InterruptedException e) { /* handle */ }
    }).start();
}

// All threads start simultaneously when this is called
startSignal.countDown();

Best Practices

  • Always use finally: Place countDown() inside a finally block to ensure the latch is decremented even if a worker thread encounters an exception. Otherwise, the waiting thread might block indefinitely.
  • Don’t reuse: Once a CountDownLatch reaches zero, it cannot be reused. If your algorithm requires repeating the synchronization point, use a CyclicBarrier or Phaser.
  • Check the count: You can use latch.getCount() for debugging or monitoring, but don’t use it to make control flow decisions in production code as it is volatile.

How do I use CyclicBarrier for synchronization?

In Java’s concurrency utilities, CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. It is called “cyclic” because it can be re-used after the waiting threads are released.

Here is a breakdown of how to use it effectively.

1. Basic Concepts

  • Parties: The number of threads that must invoke await() before the barrier is tripped.
  • Barrier Action: An optional Runnable that is executed once per barrier point, after the last thread arrives, but before any threads are released.
  • await(): The core method threads call to wait. It blocks until all parties have arrived.

2. Implementation Steps

Create the Barrier

You initialize it with the number of participating threads.

import java.util.concurrent.CyclicBarrier;

// ...
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
    // This runs once all threads reach the barrier
    System.out.println("Barrier reached! Processing combined results...");
});

Define the Worker

Each thread performs its task and then calls barrier.await().

package org.kodejava.util.concurrent;

import java.util.concurrent.CyclicBarrier;

class Task implements Runnable {
    private final CyclicBarrier barrier;

    Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " is working...");
            // Simulate work
            Thread.sleep(1000);

            System.out.println(Thread.currentThread().getName() + " waiting at barrier.");
            barrier.await(); // Thread blocks here

            System.out.println(Thread.currentThread().getName() + " passed the barrier!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. Key Differences from CountDownLatch

While both coordinate threads, they have distinct use cases:

  • Reusability: A CyclicBarrier can be reset and used again (hence “cyclic”). A CountDownLatch count cannot be reset once it reaches zero.
  • Waiting Mechanism: In a CyclicBarrier, the workers wait for each other. In a CountDownLatch, a main thread typically waits for workers to finish (workers don’t necessarily block).
  • Barrier Action: CyclicBarrier supports a custom action when the barrier trips; CountDownLatch does not.

4. Handling Broken Barriers

If a thread leaves the barrier prematurely (due to interruption, timeout, or failure), the barrier is considered “broken.” Any other threads waiting at the barrier will receive a BrokenBarrierException. You can check this status using barrier.isBroken().

Example Use Case

CyclicBarrier is ideal for parallel algorithms that involve multiple phases (iterative methods), where each phase must be completed by all threads before anyone starts the next phase.

How do I use Semaphore for resource control?

Semaphore is an advanced synchronization mechanism used to control access to a shared resource by multiple threads. It can maintain a set of permits, restricting how many threads can concurrently access a critical section or shared resource. If the permits are exhausted, additional threads will block until permits are released.

How to Use Semaphore for Resource Control

Here are the key steps for using a Semaphore:


1. Initialization

  • Permits: When creating Semaphore, specify the number of permits. This determines the maximum number of threads that can access the resource simultaneously.
  • Fairness: Optionally, you can specify a fairness policy (true for FIFO access to permits, false for default behavior).

    Example:

    Semaphore semaphore = new Semaphore(2, true); // 2 permits, FIFO fairness
    

2. Acquiring Permits

Threads must acquire permits before accessing the shared resource. The acquire() method blocks the thread if no permits are available.

  • Interruptible Acquire: acquire() blocks until a permit becomes available.
    semaphore.acquire();
    
  • Immediate Acquire: tryAcquire() attempts to acquire and doesn’t block. Returns true if successful, false otherwise.
    if (semaphore.tryAcquire()) {
        // Acquired permit
    }
    
  • Timed Acquire: tryAcquire(timeout, TimeUnit) waits for a permit for a specified amount of time before giving up.
    if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
        // Acquired permit
    }
    

3. Using the Shared Resource

After acquiring a permit, the thread performs its task within the critical section or accesses the shared resource.

Example:

// Critical section
System.out.println(Thread.currentThread().getName() + " is using the resource");

4. Releasing Permits

After completing the task, the thread should release the permit it acquired. This allows other threads to proceed.

  • Use release() to give up the permit:
    semaphore.release();
    

If a thread fails to release its permit due to an exception or oversight, other threads might starve waiting for permits.


Example of Semaphore in Practice

Here’s a practical example:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    // Semaphore initialized with 2 permits (only 2 threads can access simultaneously).
    private static final Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        // Create a thread pool with 5 threads
        try (ExecutorService executorService = Executors.newFixedThreadPool(5)) {

            // Let each thread try to acquire a permit and access a shared resource
            for (int i = 1; i <= 5; i++) {
                final int threadId = i;
                executorService.submit(() -> {
                    try {
                        System.out.println("Thread " + threadId + " is trying to acquire a permit.");
                        semaphore.acquire();

                        System.out.println("Thread " + threadId + " has acquired a permit.");
                        Thread.sleep(2000);  // Simulate using the shared resource

                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        System.out.println("Thread " + threadId + " is releasing the permit.");
                        semaphore.release();
                    }
                });
            }
        }
    }
}

Key Concepts of Semaphores

  1. Permits:
    • Semaphore tracks the number of remaining permits.
    • Initial permits are specified at the time of creation.
  2. Blocking vs Non-blocking Acquire:
    • Threads may block (acquire()), attempt immediate access (tryAcquire()), or timeout (tryAcquire(timeout)).
  3. Fairness:
    • Semaphore fairness ensures FIFO granting of permits if fairness is enabled.
  4. Common Usage Scenarios:
    • Throttling: Limit the number of threads accessing resources like database connections or file IO simultaneously.
    • Rate Limiting: Control the frequency of tasks or API calls.
  5. Thread-Safe: The semaphore internally ensures thread-safety using synchronization primitives.


By using these steps, you can effectively use semaphore to control access to a shared resource, ensuring both mutual exclusion and efficient resource utilization.

How do I use ForkJoinPool for recursive tasks?

ForkJoinPool in Java is a part of the java.util.concurrent package and is designed to efficiently execute recursive tasks using a work-stealing algorithm. It works particularly well for problems that can be split into smaller subproblems and then combined to form the final result, adhering to the divide-and-conquer paradigm.

Here’s how you can use ForkJoinPool for recursive tasks:


1. Define Recursive Behavior with RecursiveTask/RecursiveAction

The main entities to use with ForkJoinPool are:

  • RecursiveTask<T>: Returns a result.
  • RecursiveAction: Performs an action without returning a result.

You define the recursive logic within these classes by overriding the compute() method.


2. Implement Recursive Splitting

  • A base case is defined where small tasks are computed directly.
  • For larger tasks, the work is split into subtasks, and fork() is invoked to execute them asynchronously. Results are aggregated using join().

3. Run Tasks in a ForkJoinPool

The tasks are submitted to a ForkJoinPool. This pool can manage multiple tasks simultaneously and perform work-stealing to optimize performance.


Example: Parallel Sum Using RecursiveTask

package org.kodejava.util.concurrent;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

// RecursiveTask to calculate the sum of an array
class ParallelSumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10; // Splitting threshold
    private final int[] arr;
    private final int start, end;

    public ParallelSumTask(int[] arr, int start, int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;

        // Base case: if below a threshold, compute directly
        if (length <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += arr[i];
            }
            return sum;
        }

        // Recursive splitting
        int mid = start + length / 2;
        ParallelSumTask leftTask = new ParallelSumTask(arr, start, mid);
        ParallelSumTask rightTask = new ParallelSumTask(arr, mid, end);

        leftTask.fork();          // Fork the left task
        long rightResult = rightTask.compute();  // Compute the right task
        long leftResult = leftTask.join();       // Wait for the left task

        return leftResult + rightResult;        // Combine results
    }
}

public class ForkJoinExample {
    public static void main(String[] args) {
        int[] numbers = new int[100];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i + 1; // Fill the array with 1 to 100
        }

        ForkJoinPool pool = new ForkJoinPool(); // Create ForkJoinPool
        ParallelSumTask task = new ParallelSumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Start the task and get the result
        System.out.println("Sum: " + result);
    }
}

Key Methods in ForkJoinTask

  • fork(): Asynchronously executes the task in the pool.
  • join(): Waits for the task to finish and retrieves its result.
  • invoke(): A shortcut for fork() + join().
  • compute(): Defines the logic for splitting and computation of tasks.

Advantages of ForkJoinPool

  1. Work-Stealing Algorithm: Idle threads steal tasks from busy threads, ensuring an even workload distribution.
  2. Efficient for Recursive Tasks: Particularly suited for algorithms like QuickSort, MergeSort, and calculations like Fibonacci or array sums.
  3. Dynamic Thread Management: ForkJoinPool manages the number of threads for optimal utilization based on available cores.

When to Use

  • Large, recursive tasks with problems that are computationally expensive.
  • Divide-and-conquer problems where each subproblem is independent after splitting.

Things to Consider

  • Splitting Threshold: Choosing a suitable threshold is crucial for balancing computation and task overhead.
  • Thread Contention: Ensure your tasks do not rely on a shared mutable state to avoid contention between threads.

How do I schedule tasks using ScheduledExecutorService?

To schedule tasks using ScheduledExecutorService in Java, follow these steps:

1. Create a ScheduledExecutorService

  • Use Executors.newScheduledThreadPool(int corePoolSize) to get an instance of ScheduledExecutorService.
    • corePoolSize: Number of threads to keep in the pool.

2. Schedule Tasks

The ScheduledExecutorService provides three methods for scheduling tasks:

  • schedule: Schedule a task to run after a specific delay.
    scheduler.schedule(() -> {
             System.out.println("Task executed after a delay");
         }, delay, TimeUnit.SECONDS);
    
    • delay: Time to wait before executing the task.
  • scheduleAtFixedRate: Schedule tasks to start at a fixed rate.
    scheduler.scheduleAtFixedRate(() -> {
             System.out.println("Task executed at a fixed rate");
         }, initialDelay, period, TimeUnit.SECONDS);
    
    • initialDelay: The delay before the first execution.
    • period: The interval between successive executions.
  • scheduleWithFixedDelay: Schedule tasks with a fixed delay between the end of one execution and the start of the next.
    scheduler.scheduleWithFixedDelay(() -> {
             System.out.println("Task executed with a delay");
         }, initialDelay, delay, TimeUnit.SECONDS);
    
    • delay: Time to wait between the previous task’s completion and the start of the next.

3. Shut Down the Scheduler

  • Always shut down the ScheduledExecutorService once tasks are no longer needed.
    • shutdown() to initiate an orderly shutdown.
    • shutdownNow() to stop all tasks immediately.
    scheduler.shutdown();
    

Points to Remember:

  1. Thread Efficiency: Reuse threads from the pool to handle multiple tasks efficiently.
  2. Exception Handling: If a task throws an exception, that thread may stop entirely. Either implement proper exception handling or use a ThreadFactory to manage threads (e.g., restart them).
  3. Fixed Rate vs Fixed Delay:
    • scheduleAtFixedRate: The interval is measured from the start of one task to the start of the next.
    • scheduleWithFixedDelay: The interval is measured from the end of one task to the start of the next.

Example:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// Task to run after 2 seconds
scheduler.schedule(() -> System.out.println("One-time task executed"), 2, TimeUnit.SECONDS);

// Task to run initially after 1 second, then every 3 seconds
scheduler.scheduleAtFixedRate(() -> System.out.println("Fixed rate task"), 1, 3, TimeUnit.SECONDS);

// Task to run initially after 2 seconds, then with a delay of 4 seconds
scheduler.scheduleWithFixedDelay(() -> System.out.println("Fixed delay task"), 2, 4, TimeUnit.SECONDS);

// Shutdown the executor after 15 seconds
scheduler.schedule(() -> {
    System.out.println("Shutting down the scheduler");
    scheduler.shutdown();
}, 15, TimeUnit.SECONDS);

The above demonstrates how to use ScheduledExecutorService.