How do I build a work-stealing pool with ForkJoinPool?

Building a work-stealing pool using the ForkJoinPool in Java is straightforward, as the ForkJoinPool class natively supports the work-stealing mechanism. Work-stealing allows idle threads to “steal” tasks from the queues of other busy threads, increasing the efficiency of the processing.

Here’s how you can create and use a work-stealing pool with ForkJoinPool:


1. Understanding ForkJoinPool

  • A ForkJoinPool is designed for tasks that can be recursively divided into smaller subtasks (i.e., the “fork” step). These subtasks may then be processed in parallel by multiple threads in the pool.
  • If some threads are idle, they can “steal” tasks from the queues of other threads (i.e., the “work-stealing” part).

2. Creating a ForkJoinPool

To create the pool:

  • Use the ForkJoinPool constructor with a desired parallelism level (number of threads in the pool).
  • You can also use the ForkJoinPool.commonPool(), a shared instance available to your application.

Example:

int parallelism = Runtime.getRuntime().availableProcessors(); // Number of threads in the pool
ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);

3. Submitting Tasks to the ForkJoinPool

Create tasks using RecursiveTask<T> for tasks that return a result, or RecursiveAction for tasks that do not return a result.

These tasks implement the compute() method, which contains the logic for splitting and processing the tasks.


4. Example: Using RecursiveTask

Here is an example of using a ForkJoinPool with work-stealing to calculate the sum of a large array:

package org.kodejava.util.concurrent;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class WorkStealingExample {
   // RecursiveTask to compute the sum of an array
   static class SumTask extends RecursiveTask<Long> {
      private static final int THRESHOLD = 1_000; // Threshold for splitting tasks
      private final int[] array;
      private final int start, end;

      public SumTask(int[] array, int start, int end) {
         this.array = array;
         this.start = start;
         this.end = end;
      }

      @Override
      protected Long compute() {
         if ((end - start) <= THRESHOLD) {
            // Base case: process the task directly
            long sum = 0;
            for (int i = start; i < end; i++) {
               sum += array[i];
            }
            return sum;
         } else {
            // Split task: fork/join
            int mid = (start + end) / 2;
            SumTask leftTask = new SumTask(array, start, mid);
            SumTask rightTask = new SumTask(array, mid, end);

            // Fork the subtasks
            leftTask.fork(); // Fork the left task
            Long rightResult = rightTask.compute(); // Process the right task directly
            Long leftResult = leftTask.join(); // Wait for the left task to complete

            // Combine the results
            return leftResult + rightResult;
         }
      }
   }

   public static void main(String[] args) {
      int[] array = new int[10_000_000];
      for (int i = 0; i < array.length; i++) {
         array[i] = i + 1; // Initialize array
      }

      long result;
      // Default pool size: available processors
      try (ForkJoinPool pool = new ForkJoinPool()) {
         SumTask task = new SumTask(array, 0, array.length);

         // Submit and retrieve the result
         result = pool.invoke(task);
      }

      System.out.println("Total Sum: " + result);
   }
}

5. Key Points in the Example

  1. Threshold-Based Splitting:
    • The THRESHOLD constant defines at what point tasks are small enough to process directly.
    • Larger tasks are split into smaller subtasks (forked) recursively.
  2. Fork/Join Paradigm:
    • fork(): Spawns a new subtask asynchronously.
    • compute(): Performs computation directly or splits into subtasks.
    • join(): Waits for a subtask’s result.
  3. Work-Stealing:
    • If a thread finishes its tasks early, it “steals” tasks from other busy threads, making use of all available processors efficiently.

6. Using the Common ForkJoinPool

You can alternatively use the common pool (a shared ForkJoinPool):

ForkJoinPool.commonPool().invoke(task);

The common pool is created globally with threads equal to the number of processors by default.


7. Tuning the ForkJoinPool

You can fine-tune the pool by providing custom parameters, such as:

  • parallelism: Number of worker threads.
  • ForkJoinPool.ManagedBlocker: For handling thread blocking when using external resources.

Example:

ForkJoinPool pool = new ForkJoinPool(4); // Create a pool with 4 threads

This approach enables parallel computation with efficient load balancing and idle idle-thread utilization via work-stealing.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.