Asynchronous pipelines can be implemented efficiently in Java using CompletableFuture
. The ability of CompletableFuture
to chain multiple steps through its functional programming model (e.g., thenApply
, thenCompose
, thenAccept
, etc.) makes it a powerful tool for building non-blocking pipelines.
Here’s a step-by-step guide to implement async pipelines using CompletableFuture
chaining:
1. Basics of CompletableFuture
CompletableFuture is a class in java.util.concurrent
that represents a future result of an asynchronous computation. You can chain computations, handle exceptions, and run them all asynchronously.
2. Example Async Pipeline Workflow
Let’s assume we have a pipeline with multiple steps:
- Fetch data from a remote source.
- Transform the data.
- Save the data to a database.
- Notify a user.
Each of these steps will be represented as a method returning a CompletableFuture
.
3. Implementation
a) Define the Asynchronous Tasks
Each step in the pipeline is encapsulated as an asynchronous method using CompletableFuture.supplyAsync()
or runAsync()
.
package org.kodejava.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class AsyncPipeline {
private static final ExecutorService executor = Executors.newFixedThreadPool(10); // Thread pool
// 1. Fetch data from a remote source
public CompletableFuture<String> fetchData() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // Simulating delay
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Fetched data");
return "Data from remote source";
}, executor);
}
// 2. Transform the data
public CompletableFuture<String> transformData(String data) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Transforming data");
return data.toUpperCase();
}, executor);
}
// 3. Save the data to a database
public CompletableFuture<Void> saveToDatabase(String transformedData) {
return CompletableFuture.runAsync(() -> {
System.out.println("Saving data to database: " + transformedData);
}, executor);
}
// 4. Notify the user
public CompletableFuture<Void> notifyUser() {
return CompletableFuture.runAsync(() -> {
System.out.println("User notified!");
}, executor);
}
// Build the pipeline
public void executePipeline() {
fetchData()
.thenCompose(this::transformData) // Pass the result from "fetchData" to "transformData"
.thenCompose(this::saveToDatabase) // Then save transformed data to database
.thenCompose(aVoid -> notifyUser()) // Finally notify the user
.exceptionally(ex -> { // Handle exceptions globally
System.err.println("Pipeline execution failed: " + ex.getMessage());
return null;
})
.join(); // Block and wait for the pipeline to complete
}
public static void main(String[] args) {
new AsyncPipeline().executePipeline();
}
}
4. Explanation of the Code
supplyAsync()
:- Used for methods that generate results, such as
fetchData()
andtransformData()
.
- Used for methods that generate results, such as
runAsync()
:- Used for methods that don’t produce results (return
void
), likesaveToDatabase()
andnotifyUser()
.
- Used for methods that don’t produce results (return
- Chaining with
thenCompose
:thenCompose()
is used for chaining tasks where each subsequent task depends on the result of the previous task.
- Error Handling with
exceptionally
:exceptionally()
is used to handle any error in the pipeline and provide fallback logic.
- Thread Pool:
- You can specify a custom
ExecutorService
for better control over thread resources.
- You can specify a custom
join()
:- Blocks the main thread until the entire pipeline is complete.
5. Key Methods in CompletableFuture
Method | Description |
---|---|
supplyAsync() |
Executes a task asynchronously and provides a result. |
runAsync() |
Executes a task asynchronously without any result. |
thenApply() |
Transforms the result of a CompletableFuture . |
thenCompose() |
Chains a dependent CompletableFuture , useful when a task depends on the output of the previous. |
thenAccept() |
Consumes the result of a task without returning a result itself. |
exceptionally() |
Handles exceptions that occur during pipeline execution. |
allOf() |
Combines multiple CompletableFuture s and waits for all to complete. |
anyOf() |
Completes when any of the provided CompletableFuture s completes first. |
6. Advanced: Combining Multiple Futures
If there’s a need to combine results from multiple independent asynchronous tasks, you can use allOf()
or anyOf()
.
Example with allOf
:
public void fetchMultipleSources() {
CompletableFuture<String> source1 = fetchSource1();
CompletableFuture<String> source2 = fetchSource2();
CompletableFuture<Void> combined = CompletableFuture.allOf(source1, source2)
.thenRun(() -> {
try {
// Retrieve results once all futures are complete
System.out.println("Source1: " + source1.get());
System.out.println("Source2: " + source2.get());
} catch (Exception e) {
e.printStackTrace();
}
});
combined.join(); // Wait for the combined task to complete
}
private CompletableFuture<String> fetchSource1() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching source 1...");
return "Data from Source 1";
});
}
private CompletableFuture<String> fetchSource2() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching source 2...");
return "Data from Source 2";
});
}
This approach allows building flexible and robust asynchronous pipelines while keeping the code clear and concise.