How do I implement async pipelines with CompletableFuture chaining?

Asynchronous pipelines can be implemented efficiently in Java using CompletableFuture. The ability of CompletableFuture to chain multiple steps through its functional programming model (e.g., thenApply, thenCompose, thenAccept, etc.) makes it a powerful tool for building non-blocking pipelines.

Here’s a step-by-step guide to implement async pipelines using CompletableFuture chaining:


1. Basics of CompletableFuture

CompletableFuture is a class in java.util.concurrent that represents a future result of an asynchronous computation. You can chain computations, handle exceptions, and run them all asynchronously.

2. Example Async Pipeline Workflow

Let’s assume we have a pipeline with multiple steps:

  1. Fetch data from a remote source.
  2. Transform the data.
  3. Save the data to a database.
  4. Notify a user.

Each of these steps will be represented as a method returning a CompletableFuture.


3. Implementation

a) Define the Asynchronous Tasks

Each step in the pipeline is encapsulated as an asynchronous method using CompletableFuture.supplyAsync() or runAsync().

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class AsyncPipeline {

   private static final ExecutorService executor = Executors.newFixedThreadPool(10); // Thread pool

   // 1. Fetch data from a remote source
   public CompletableFuture<String> fetchData() {
      return CompletableFuture.supplyAsync(() -> {
         try {
            Thread.sleep(1000); // Simulating delay
         } catch (InterruptedException e) {
            throw new IllegalStateException(e);
         }
         System.out.println("Fetched data");
         return "Data from remote source";
      }, executor);
   }

   // 2. Transform the data
   public CompletableFuture<String> transformData(String data) {
      return CompletableFuture.supplyAsync(() -> {
         System.out.println("Transforming data");
         return data.toUpperCase();
      }, executor);
   }

   // 3. Save the data to a database
   public CompletableFuture<Void> saveToDatabase(String transformedData) {
      return CompletableFuture.runAsync(() -> {
         System.out.println("Saving data to database: " + transformedData);
      }, executor);
   }

   // 4. Notify the user
   public CompletableFuture<Void> notifyUser() {
      return CompletableFuture.runAsync(() -> {
         System.out.println("User notified!");
      }, executor);
   }

   // Build the pipeline
   public void executePipeline() {
      fetchData()
              .thenCompose(this::transformData) // Pass the result from "fetchData" to "transformData"
              .thenCompose(this::saveToDatabase) // Then save transformed data to database
              .thenCompose(aVoid -> notifyUser()) // Finally notify the user
              .exceptionally(ex -> { // Handle exceptions globally
                 System.err.println("Pipeline execution failed: " + ex.getMessage());
                 return null;
              })
              .join(); // Block and wait for the pipeline to complete
   }

   public static void main(String[] args) {
      new AsyncPipeline().executePipeline();
   }
}

4. Explanation of the Code

  1. supplyAsync():
    • Used for methods that generate results, such as fetchData() and transformData().
  2. runAsync():
    • Used for methods that don’t produce results (return void), like saveToDatabase() and notifyUser().
  3. Chaining with thenCompose:
    • thenCompose() is used for chaining tasks where each subsequent task depends on the result of the previous task.
  4. Error Handling with exceptionally:
    • exceptionally() is used to handle any error in the pipeline and provide fallback logic.
  5. Thread Pool:
    • You can specify a custom ExecutorService for better control over thread resources.
  6. join():
    • Blocks the main thread until the entire pipeline is complete.

5. Key Methods in CompletableFuture

Method Description
supplyAsync() Executes a task asynchronously and provides a result.
runAsync() Executes a task asynchronously without any result.
thenApply() Transforms the result of a CompletableFuture.
thenCompose() Chains a dependent CompletableFuture, useful when a task depends on the output of the previous.
thenAccept() Consumes the result of a task without returning a result itself.
exceptionally() Handles exceptions that occur during pipeline execution.
allOf() Combines multiple CompletableFutures and waits for all to complete.
anyOf() Completes when any of the provided CompletableFutures completes first.

6. Advanced: Combining Multiple Futures

If there’s a need to combine results from multiple independent asynchronous tasks, you can use allOf() or anyOf().

Example with allOf:

public void fetchMultipleSources() {
    CompletableFuture<String> source1 = fetchSource1();
    CompletableFuture<String> source2 = fetchSource2();

    CompletableFuture<Void> combined = CompletableFuture.allOf(source1, source2)
        .thenRun(() -> {
            try {
                // Retrieve results once all futures are complete
                System.out.println("Source1: " + source1.get());
                System.out.println("Source2: " + source2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

    combined.join(); // Wait for the combined task to complete
}

private CompletableFuture<String> fetchSource1() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 1...");
        return "Data from Source 1";
    });
}

private CompletableFuture<String> fetchSource2() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 2...");
        return "Data from Source 2";
    });
}

This approach allows building flexible and robust asynchronous pipelines while keeping the code clear and concise.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.