How do I use CyclicBarrier for synchronization?

In Java’s concurrency utilities, CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. It is called “cyclic” because it can be re-used after the waiting threads are released.

Here is a breakdown of how to use it effectively.

1. Basic Concepts

  • Parties: The number of threads that must invoke await() before the barrier is tripped.
  • Barrier Action: An optional Runnable that is executed once per barrier point, after the last thread arrives, but before any threads are released.
  • await(): The core method threads call to wait. It blocks until all parties have arrived.

2. Implementation Steps

Create the Barrier

You initialize it with the number of participating threads.

import java.util.concurrent.CyclicBarrier;

// ...
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
    // This runs once all threads reach the barrier
    System.out.println("Barrier reached! Processing combined results...");
});

Define the Worker

Each thread performs its task and then calls barrier.await().

package org.kodejava.util.concurrent;

import java.util.concurrent.CyclicBarrier;

class Task implements Runnable {
    private final CyclicBarrier barrier;

    Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " is working...");
            // Simulate work
            Thread.sleep(1000);

            System.out.println(Thread.currentThread().getName() + " waiting at barrier.");
            barrier.await(); // Thread blocks here

            System.out.println(Thread.currentThread().getName() + " passed the barrier!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. Key Differences from CountDownLatch

While both coordinate threads, they have distinct use cases:

  • Reusability: A CyclicBarrier can be reset and used again (hence “cyclic”). A CountDownLatch count cannot be reset once it reaches zero.
  • Waiting Mechanism: In a CyclicBarrier, the workers wait for each other. In a CountDownLatch, a main thread typically waits for workers to finish (workers don’t necessarily block).
  • Barrier Action: CyclicBarrier supports a custom action when the barrier trips; CountDownLatch does not.

4. Handling Broken Barriers

If a thread leaves the barrier prematurely (due to interruption, timeout, or failure), the barrier is considered “broken.” Any other threads waiting at the barrier will receive a BrokenBarrierException. You can check this status using barrier.isBroken().

Example Use Case

CyclicBarrier is ideal for parallel algorithms that involve multiple phases (iterative methods), where each phase must be completed by all threads before anyone starts the next phase.

How do I synchronize phases of execution with CyclicBarrier?

The CyclicBarrier class in Java allows you to synchronize phases or threads at a common point. It is particularly useful when you have multiple threads working on subtasks that need to wait for each other to proceed to the next phase of their work.

Key Features of CyclicBarrier

  • Reusable: The barrier can be reused once all threads have reached the barrier.
  • Action on Barrier Completion: You can specify a barrier action (a task to run only once by one of the threads) that gets executed when all threads reach the barrier.

How CyclicBarrier Works

  • A CyclicBarrier is initialized with a specific number of parties (threads) that must reach the barrier before they are permitted to proceed.
  • When a thread reaches the barrier, it calls the await() method.
  • The thread is blocked until all the required threads reach the barrier (i.e., call await()).
  • Once all threads reach the barrier:
    • Optionally, the barrier action (if defined) is executed by one thread.
    • All threads are released to continue execution.

Example: Synchronizing Multiple Threads with CyclicBarrier

Here’s an example of synchronizing threads using CyclicBarrier. In this case, multiple worker threads perform some task in phases, and all must wait for one another at the end of each phase before proceeding.

package org.kodejava.util.concurrent;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

   public static void main(String[] args) {
      // Number of threads (parties) to synchronize
      int numThreads = 3;

      // Create a CyclicBarrier with a barrier action
      CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
         System.out.println("All threads have reached the barrier. Proceeding to the next phase...");
      });

      // Create and start worker threads
      for (int i = 0; i < numThreads; i++) {
         new Thread(new Worker(barrier), "Thread " + (i + 1)).start();
      }
   }

   static class Worker implements Runnable {
      private final CyclicBarrier barrier;

      public Worker(CyclicBarrier barrier) {
         this.barrier = barrier;
      }

      @Override
      public void run() {
         try {
            System.out.println(Thread.currentThread().getName() + " is performing the first phase of task...");
            Thread.sleep((long) (Math.random() * 3000)); // Simulate work
            System.out.println(Thread.currentThread().getName() + " has finished the first phase. Waiting at the barrier...");
            barrier.await(); // Wait for other threads to reach the barrier

            System.out.println(Thread.currentThread().getName() + " is performing the second phase of task...");
            Thread.sleep((long) (Math.random() * 3000)); // Simulate work
            System.out.println(Thread.currentThread().getName() + " has finished the second phase. Waiting at the barrier...");
            barrier.await(); // Wait for other threads at the next barrier

            System.out.println(Thread.currentThread().getName() + " has completed all phases.");
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
}

Explanation of Code:

  1. Barrier Creation:
    • new CyclicBarrier(numThreads, action):
      • numThreads: Number of threads involved in synchronization.
      • action: A Runnable task that executes after all threads reach the barrier.
  2. Phase Execution:
    • Each thread performs its task and then calls barrier.await() to wait for others.
    • When all threads have called await(), the barrier opens, the optional action (if defined) executes, and threads proceed.
  3. Random Delays:
    • Simulated with Thread.sleep((long) (Math.random() * 3000)) to illustrate different thread run times.
  4. Multiple Phases:
    • The example includes two phases of execution, and the barrier synchronizes threads at the end of each phase.

Output (Example Output):

Thread 2 is performing the first phase of task...
Thread 1 is performing the first phase of task...
Thread 3 is performing the first phase of task...
Thread 2 has finished the first phase. Waiting at the barrier...
Thread 1 has finished the first phase. Waiting at the barrier...
Thread 3 has finished the first phase. Waiting at the barrier...
All threads have reached the barrier. Proceeding to the next phase...
Thread 2 is performing the second phase of task...
Thread 3 is performing the second phase of task...
Thread 1 is performing the second phase of task...
Thread 2 has finished the second phase. Waiting at the barrier...
Thread 3 has finished the second phase. Waiting at the barrier...
Thread 1 has finished the second phase. Waiting at the barrier...
All threads have reached the barrier. Proceeding to the next phase...
Thread 2 has completed all phases.
Thread 1 has completed all phases.
Thread 3 has completed all phases.

Keynotes:

  1. Thread Releasing:
    • All threads are released simultaneously when all of them reach the barrier.
  2. BarrierAction Execution:
    • The Runnable passed to the CyclicBarrier constructor (optional) is run by one of the threads before proceeding.
  3. Reuse:
    • The CyclicBarrier resets automatically after releasing the threads, so it can be reused for the next phase.
  4. Exceptions:
    • If one thread fails (e.g., throws an exception during await()), the barrier is broken, and other threads waiting at that barrier will also throw a BrokenBarrierException.

This implementation is widely used in parallel processing scenarios where tasks are executed in phases and synchronized at specific points.