How do I implement async pipelines with CompletableFuture chaining?

Asynchronous pipelines can be implemented efficiently in Java using CompletableFuture. The ability of CompletableFuture to chain multiple steps through its functional programming model (e.g., thenApply, thenCompose, thenAccept, etc.) makes it a powerful tool for building non-blocking pipelines.

Here’s a step-by-step guide to implement async pipelines using CompletableFuture chaining:


1. Basics of CompletableFuture

CompletableFuture is a class in java.util.concurrent that represents a future result of an asynchronous computation. You can chain computations, handle exceptions, and run them all asynchronously.

2. Example Async Pipeline Workflow

Let’s assume we have a pipeline with multiple steps:

  1. Fetch data from a remote source.
  2. Transform the data.
  3. Save the data to a database.
  4. Notify a user.

Each of these steps will be represented as a method returning a CompletableFuture.


3. Implementation

a) Define the Asynchronous Tasks

Each step in the pipeline is encapsulated as an asynchronous method using CompletableFuture.supplyAsync() or runAsync().

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class AsyncPipeline {

   private static final ExecutorService executor = Executors.newFixedThreadPool(10); // Thread pool

   // 1. Fetch data from a remote source
   public CompletableFuture<String> fetchData() {
      return CompletableFuture.supplyAsync(() -> {
         try {
            Thread.sleep(1000); // Simulating delay
         } catch (InterruptedException e) {
            throw new IllegalStateException(e);
         }
         System.out.println("Fetched data");
         return "Data from remote source";
      }, executor);
   }

   // 2. Transform the data
   public CompletableFuture<String> transformData(String data) {
      return CompletableFuture.supplyAsync(() -> {
         System.out.println("Transforming data");
         return data.toUpperCase();
      }, executor);
   }

   // 3. Save the data to a database
   public CompletableFuture<Void> saveToDatabase(String transformedData) {
      return CompletableFuture.runAsync(() -> {
         System.out.println("Saving data to database: " + transformedData);
      }, executor);
   }

   // 4. Notify the user
   public CompletableFuture<Void> notifyUser() {
      return CompletableFuture.runAsync(() -> {
         System.out.println("User notified!");
      }, executor);
   }

   // Build the pipeline
   public void executePipeline() {
      fetchData()
              .thenCompose(this::transformData) // Pass the result from "fetchData" to "transformData"
              .thenCompose(this::saveToDatabase) // Then save transformed data to database
              .thenCompose(aVoid -> notifyUser()) // Finally notify the user
              .exceptionally(ex -> { // Handle exceptions globally
                 System.err.println("Pipeline execution failed: " + ex.getMessage());
                 return null;
              })
              .join(); // Block and wait for the pipeline to complete
   }

   public static void main(String[] args) {
      new AsyncPipeline().executePipeline();
   }
}

4. Explanation of the Code

  1. supplyAsync():
    • Used for methods that generate results, such as fetchData() and transformData().
  2. runAsync():
    • Used for methods that don’t produce results (return void), like saveToDatabase() and notifyUser().
  3. Chaining with thenCompose:
    • thenCompose() is used for chaining tasks where each subsequent task depends on the result of the previous task.
  4. Error Handling with exceptionally:
    • exceptionally() is used to handle any error in the pipeline and provide fallback logic.
  5. Thread Pool:
    • You can specify a custom ExecutorService for better control over thread resources.
  6. join():
    • Blocks the main thread until the entire pipeline is complete.

5. Key Methods in CompletableFuture

Method Description
supplyAsync() Executes a task asynchronously and provides a result.
runAsync() Executes a task asynchronously without any result.
thenApply() Transforms the result of a CompletableFuture.
thenCompose() Chains a dependent CompletableFuture, useful when a task depends on the output of the previous.
thenAccept() Consumes the result of a task without returning a result itself.
exceptionally() Handles exceptions that occur during pipeline execution.
allOf() Combines multiple CompletableFutures and waits for all to complete.
anyOf() Completes when any of the provided CompletableFutures completes first.

6. Advanced: Combining Multiple Futures

If there’s a need to combine results from multiple independent asynchronous tasks, you can use allOf() or anyOf().

Example with allOf:

public void fetchMultipleSources() {
    CompletableFuture<String> source1 = fetchSource1();
    CompletableFuture<String> source2 = fetchSource2();

    CompletableFuture<Void> combined = CompletableFuture.allOf(source1, source2)
        .thenRun(() -> {
            try {
                // Retrieve results once all futures are complete
                System.out.println("Source1: " + source1.get());
                System.out.println("Source2: " + source2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

    combined.join(); // Wait for the combined task to complete
}

private CompletableFuture<String> fetchSource1() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 1...");
        return "Data from Source 1";
    });
}

private CompletableFuture<String> fetchSource2() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 2...");
        return "Data from Source 2";
    });
}

This approach allows building flexible and robust asynchronous pipelines while keeping the code clear and concise.

How do I use CompletableFuture for reactive-style concurrency?

Using CompletableFuture in Java can be a powerful way to implement reactive-style concurrency. It provides a clean and functional way to perform asynchronous tasks, compose their results, and handle exceptions without blocking threads. Here’s an explanation with examples to guide you through its reactive-style usage:


Key Features of CompletableFuture

  1. Asynchronous execution – Run tasks on background threads.
  2. Chaining tasks – Perform dependent actions when a task completes using thenApply, thenAccept, thenCompose, etc.
  3. Combining tasks – Execute multiple tasks in parallel and combine their results using thenCombine, allOf, or anyOf.
  4. Exception handling – Handle errors gracefully with handle, exceptionally, or whenComplete.

Example Use Cases

1. Basic Asynchronous Execution

You can run a task asynchronously without blocking the main thread:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Simulate a long computation
            System.out.println("Running in background...");
            return "Result";
        }).thenAccept(result -> {
            // Use the result once completed
            System.out.println("Completed with: " + result);
        });

        System.out.println("Main thread is free to do other work...");
    }
}

Output:

Main thread is free to do other work...
Running in background...
Completed with: Result

2. Chaining Dependent Tasks

Reactive-style programming involves chaining tasks, which can be done with thenApply or thenCompose:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChaining {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Fetch some data (simulate API/database call)
            return "Data from API";
        }).thenApply(data -> {
            // Transform the data
            return data.toUpperCase();
        }).thenAccept(processedData -> {
            // Use transformed data
            System.out.println("Processed Data: " + processedData);
        });
    }
}

3. Combining Multiple Async Tasks

To run multiple tasks in parallel and combine their results:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombine {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1 Result");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2 Result");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return result1 + " & " + result2;
        });

        combinedFuture.thenAccept(System.out::println);
    }
}

Output:

Task 1 Result & Task 2 Result

4. Waiting for All Tasks to Complete

If you need to wait for multiple independent tasks to complete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public class CompletableFutureAllOf {
    public static void main(String[] args) {
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> System.out.println("Task 1")),
                CompletableFuture.runAsync(() -> System.out.println("Task 2")),
                CompletableFuture.runAsync(() -> System.out.println("Task 3"))
        );

        // Wait for all tasks to complete
        allTasks.join();
        System.out.println("All tasks completed.");
    }
}

5. Handling Exceptions

You can handle exceptions gracefully with methods like exceptionally, handle, or whenComplete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionHandling {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
                    // Simulate an error
                    if (true) throw new RuntimeException("Something went wrong!");
                    return "Task Result";
                })
                .exceptionally(ex -> {
                    System.out.println("Error: " + ex.getMessage());
                    return "Fallback Result";
                })
                .thenAccept(result -> System.out.println("Result: " + result));
    }
}

Output:

Error: Something went wrong!
Result: Fallback Result

6. Running Tasks in a Custom Executor

By default, CompletableFuture uses the common ForkJoinPool, but you can specify a custom executor:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;

public class CustomExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        CompletableFuture.runAsync(() -> {
            System.out.println("Task executing in custom executor");
        }, executor).thenRun(() -> executor.shutdown());
    }
}

Summary of Key Methods

Method Purpose
supplyAsync(Supplier) Run a computation in another thread and return a result.
runAsync(Runnable) Run a computation without returning a result.
thenApply(Function) Transform result of the stage.
thenCompose(Function) Chain another async computation dependent on the previous one.
thenAccept(Consumer) Consume the result.
thenCombine(CompletableFuture, BiFunction) Combine results of two independent computations.
allOf(CompletableFuture...) Wait for all tasks to complete.
anyOf(CompletableFuture...) Return as soon as any task is complete.
exceptionally(Function) Handle exceptions and provide a fallback value.
handle(BiFunction) Process the result or handle exceptions.

Benefits of Using CompletableFuture for Reactive Programming

  • Non-blocking and efficient concurrency.
  • Easier composition of asynchronous operations compared to traditional threads.
  • Fine-grained exception handling and coordination of parallel tasks.
  • Works well with APIs like REST or streaming in a reactive pipeline.

By taking advantage of these features, you can implement clean, reactive, and efficient systems.