Building a work-stealing pool using the ForkJoinPool
in Java is straightforward, as the ForkJoinPool
class natively supports the work-stealing mechanism. Work-stealing allows idle threads to “steal” tasks from the queues of other busy threads, increasing the efficiency of the processing.
Here’s how you can create and use a work-stealing pool with ForkJoinPool
:
1. Understanding ForkJoinPool
- A
ForkJoinPool
is designed for tasks that can be recursively divided into smaller subtasks (i.e., the “fork” step). These subtasks may then be processed in parallel by multiple threads in the pool. - If some threads are idle, they can “steal” tasks from the queues of other threads (i.e., the “work-stealing” part).
2. Creating a ForkJoinPool
To create the pool:
- Use the
ForkJoinPool
constructor with a desired parallelism level (number of threads in the pool). - You can also use the
ForkJoinPool.commonPool()
, a shared instance available to your application.
Example:
int parallelism = Runtime.getRuntime().availableProcessors(); // Number of threads in the pool
ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);
3. Submitting Tasks to the ForkJoinPool
Create tasks using RecursiveTask<T>
for tasks that return a result, or RecursiveAction
for tasks that do not return a result.
These tasks implement the compute()
method, which contains the logic for splitting and processing the tasks.
4. Example: Using RecursiveTask
Here is an example of using a ForkJoinPool
with work-stealing to calculate the sum of a large array:
package org.kodejava.util.concurrent;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class WorkStealingExample {
// RecursiveTask to compute the sum of an array
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1_000; // Threshold for splitting tasks
private final int[] array;
private final int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) <= THRESHOLD) {
// Base case: process the task directly
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Split task: fork/join
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// Fork the subtasks
leftTask.fork(); // Fork the left task
Long rightResult = rightTask.compute(); // Process the right task directly
Long leftResult = leftTask.join(); // Wait for the left task to complete
// Combine the results
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
int[] array = new int[10_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1; // Initialize array
}
long result;
// Default pool size: available processors
try (ForkJoinPool pool = new ForkJoinPool()) {
SumTask task = new SumTask(array, 0, array.length);
// Submit and retrieve the result
result = pool.invoke(task);
}
System.out.println("Total Sum: " + result);
}
}
5. Key Points in the Example
- Threshold-Based Splitting:
- The
THRESHOLD
constant defines at what point tasks are small enough to process directly. - Larger tasks are split into smaller subtasks (forked) recursively.
- The
- Fork/Join Paradigm:
fork()
: Spawns a new subtask asynchronously.compute()
: Performs computation directly or splits into subtasks.join()
: Waits for a subtask’s result.
- Work-Stealing:
- If a thread finishes its tasks early, it “steals” tasks from other busy threads, making use of all available processors efficiently.
6. Using the Common ForkJoinPool
You can alternatively use the common pool (a shared ForkJoinPool
):
ForkJoinPool.commonPool().invoke(task);
The common pool is created globally with threads equal to the number of processors by default.
7. Tuning the ForkJoinPool
You can fine-tune the pool by providing custom parameters, such as:
parallelism
: Number of worker threads.ForkJoinPool.ManagedBlocker
: For handling thread blocking when using external resources.
Example:
ForkJoinPool pool = new ForkJoinPool(4); // Create a pool with 4 threads
This approach enables parallel computation with efficient load balancing and idle idle-thread utilization via work-stealing.