How do I process tasks as they complete using CompletionService?

To process tasks as they complete using a CompletionService in Java, you can take advantage of the ExecutorCompletionService. This class provides a mechanism to submit tasks for execution and retrieve their results in the order of completion, rather than the order of submission.

Key Steps for Using CompletionService:

  1. Create an ExecutorService: This handles the thread pooling for concurrent task execution.
  2. Create an ExecutorCompletionService: Wrap the ExecutorService in an ExecutorCompletionService.
  3. Submit Tasks: Use the submit method to submit tasks to the CompletionService.
  4. Process Results as They Complete: Retrieve the results using the poll or take methods of CompletionService.

Example Code:

package org.kodejava.util.concurrent;

import java.util.concurrent.*;

public class CompletionServiceExample {

   public static void main(String[] args) throws InterruptedException {
      int numTasks = 5;

      // Step 1: Create an ExecutorService with fixed thread pool
      ExecutorService executorService = Executors.newFixedThreadPool(3);

      // Step 2: Create an ExecutorCompletionService
      CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

      // Step 3: Submit tasks to the CompletionService
      for (int i = 0; i < numTasks; i++) {
         int taskId = i;
         completionService.submit(() -> {
            Thread.sleep((long) (Math.random() * 2000)); // Simulate work
            return "Result from Task " + taskId;
         });
      }

      // Step 4: Process tasks as they complete
      for (int i = 0; i < numTasks; i++) {
         try {
            Future<String> resultFuture = completionService.take(); // Retrieves the next completed task
            String result = resultFuture.get(); // Blocks until the result is available
            System.out.println(result);
         } catch (ExecutionException e) {
            System.err.println("Task execution failed: " + e.getMessage());
         }
      }

      // Shutdown the ExecutorService
      executorService.shutdown();
   }
}

Explanation of the Code:

  1. ExecutorService: A thread pool of 3 worker threads is created using Executors.newFixedThreadPool(3).
  2. ExecutorCompletionService: Wraps the ExecutorService to handle submission and retrieval of tasks.
  3. Submitting Tasks: Each task is computed in the background and asynchronously submitted to the completionService.
  4. Result Retrieval:
    • The completionService.take() method blocks until the next completed task result is available.
    • completionService.poll() could also be used if you want non-blocking retrieval (e.g., you check if a result is ready).
  5. Task Results in Completion Order: Results are processed as tasks complete, regardless of their submission order.

When to Use CompletionService

  • When you want to process tasks as they finish, rather than waiting for all tasks to complete.
  • In scenarios where tasks may have uneven execution times, and you want to immediately handle the results of the fastest tasks.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.