How do I synchronize phases of execution with CyclicBarrier?

The CyclicBarrier class in Java allows you to synchronize phases or threads at a common point. It is particularly useful when you have multiple threads working on subtasks that need to wait for each other to proceed to the next phase of their work.

Key Features of CyclicBarrier

  • Reusable: The barrier can be reused once all threads have reached the barrier.
  • Action on Barrier Completion: You can specify a barrier action (a task to run only once by one of the threads) that gets executed when all threads reach the barrier.

How CyclicBarrier Works

  • A CyclicBarrier is initialized with a specific number of parties (threads) that must reach the barrier before they are permitted to proceed.
  • When a thread reaches the barrier, it calls the await() method.
  • The thread is blocked until all the required threads reach the barrier (i.e., call await()).
  • Once all threads reach the barrier:
    • Optionally, the barrier action (if defined) is executed by one thread.
    • All threads are released to continue execution.

Example: Synchronizing Multiple Threads with CyclicBarrier

Here’s an example of synchronizing threads using CyclicBarrier. In this case, multiple worker threads perform some task in phases, and all must wait for one another at the end of each phase before proceeding.

package org.kodejava.util.concurrent;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

   public static void main(String[] args) {
      // Number of threads (parties) to synchronize
      int numThreads = 3;

      // Create a CyclicBarrier with a barrier action
      CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
         System.out.println("All threads have reached the barrier. Proceeding to the next phase...");
      });

      // Create and start worker threads
      for (int i = 0; i < numThreads; i++) {
         new Thread(new Worker(barrier), "Thread " + (i + 1)).start();
      }
   }

   static class Worker implements Runnable {
      private final CyclicBarrier barrier;

      public Worker(CyclicBarrier barrier) {
         this.barrier = barrier;
      }

      @Override
      public void run() {
         try {
            System.out.println(Thread.currentThread().getName() + " is performing the first phase of task...");
            Thread.sleep((long) (Math.random() * 3000)); // Simulate work
            System.out.println(Thread.currentThread().getName() + " has finished the first phase. Waiting at the barrier...");
            barrier.await(); // Wait for other threads to reach the barrier

            System.out.println(Thread.currentThread().getName() + " is performing the second phase of task...");
            Thread.sleep((long) (Math.random() * 3000)); // Simulate work
            System.out.println(Thread.currentThread().getName() + " has finished the second phase. Waiting at the barrier...");
            barrier.await(); // Wait for other threads at the next barrier

            System.out.println(Thread.currentThread().getName() + " has completed all phases.");
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
}

Explanation of Code:

  1. Barrier Creation:
    • new CyclicBarrier(numThreads, action):
      • numThreads: Number of threads involved in synchronization.
      • action: A Runnable task that executes after all threads reach the barrier.
  2. Phase Execution:
    • Each thread performs its task and then calls barrier.await() to wait for others.
    • When all threads have called await(), the barrier opens, the optional action (if defined) executes, and threads proceed.
  3. Random Delays:
    • Simulated with Thread.sleep((long) (Math.random() * 3000)) to illustrate different thread run times.
  4. Multiple Phases:
    • The example includes two phases of execution, and the barrier synchronizes threads at the end of each phase.

Output (Example Output):

Thread 2 is performing the first phase of task...
Thread 1 is performing the first phase of task...
Thread 3 is performing the first phase of task...
Thread 2 has finished the first phase. Waiting at the barrier...
Thread 1 has finished the first phase. Waiting at the barrier...
Thread 3 has finished the first phase. Waiting at the barrier...
All threads have reached the barrier. Proceeding to the next phase...
Thread 2 is performing the second phase of task...
Thread 3 is performing the second phase of task...
Thread 1 is performing the second phase of task...
Thread 2 has finished the second phase. Waiting at the barrier...
Thread 3 has finished the second phase. Waiting at the barrier...
Thread 1 has finished the second phase. Waiting at the barrier...
All threads have reached the barrier. Proceeding to the next phase...
Thread 2 has completed all phases.
Thread 1 has completed all phases.
Thread 3 has completed all phases.

Keynotes:

  1. Thread Releasing:
    • All threads are released simultaneously when all of them reach the barrier.
  2. BarrierAction Execution:
    • The Runnable passed to the CyclicBarrier constructor (optional) is run by one of the threads before proceeding.
  3. Reuse:
    • The CyclicBarrier resets automatically after releasing the threads, so it can be reused for the next phase.
  4. Exceptions:
    • If one thread fails (e.g., throws an exception during await()), the barrier is broken, and other threads waiting at that barrier will also throw a BrokenBarrierException.

This implementation is widely used in parallel processing scenarios where tasks are executed in phases and synchronized at specific points.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.