To process tasks as they complete using a CompletionService in Java, you can take advantage of the ExecutorCompletionService. This class provides a mechanism to submit tasks for execution and retrieve their results in the order of completion, rather than the order of submission.
Key Steps for Using CompletionService:
- Create an ExecutorService: This handles the thread pooling for concurrent task execution.
- Create an ExecutorCompletionService: Wrap the
ExecutorServicein anExecutorCompletionService. - Submit Tasks: Use the
submitmethod to submit tasks to theCompletionService. - Process Results as They Complete: Retrieve the results using the
pollortakemethods ofCompletionService.
Example Code:
package org.kodejava.util.concurrent;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException {
int numTasks = 5;
// Step 1: Create an ExecutorService with fixed thread pool
ExecutorService executorService = Executors.newFixedThreadPool(3);
// Step 2: Create an ExecutorCompletionService
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
// Step 3: Submit tasks to the CompletionService
for (int i = 0; i < numTasks; i++) {
int taskId = i;
completionService.submit(() -> {
Thread.sleep((long) (Math.random() * 2000)); // Simulate work
return "Result from Task " + taskId;
});
}
// Step 4: Process tasks as they complete
for (int i = 0; i < numTasks; i++) {
try {
Future<String> resultFuture = completionService.take(); // Retrieves the next completed task
String result = resultFuture.get(); // Blocks until the result is available
System.out.println(result);
} catch (ExecutionException e) {
System.err.println("Task execution failed: " + e.getMessage());
}
}
// Shutdown the ExecutorService
executorService.shutdown();
}
}
Explanation of the Code:
- ExecutorService: A thread pool of 3 worker threads is created using
Executors.newFixedThreadPool(3). - ExecutorCompletionService: Wraps the
ExecutorServiceto handle submission and retrieval of tasks. - Submitting Tasks: Each task is computed in the background and asynchronously submitted to the
completionService. - Result Retrieval:
- The
completionService.take()method blocks until the next completed task result is available. completionService.poll()could also be used if you want non-blocking retrieval (e.g., you check if a result is ready).
- The
-
Task Results in Completion Order: Results are processed as tasks complete, regardless of their submission order.
When to Use CompletionService
- When you want to process tasks as they finish, rather than waiting for all tasks to complete.
- In scenarios where tasks may have uneven execution times, and you want to immediately handle the results of the fastest tasks.
