How do I use CompletableFuture for reactive-style concurrency?

Using CompletableFuture in Java can be a powerful way to implement reactive-style concurrency. It provides a clean and functional way to perform asynchronous tasks, compose their results, and handle exceptions without blocking threads. Here’s an explanation with examples to guide you through its reactive-style usage:


Key Features of CompletableFuture

  1. Asynchronous execution – Run tasks on background threads.
  2. Chaining tasks – Perform dependent actions when a task completes using thenApply, thenAccept, thenCompose, etc.
  3. Combining tasks – Execute multiple tasks in parallel and combine their results using thenCombine, allOf, or anyOf.
  4. Exception handling – Handle errors gracefully with handle, exceptionally, or whenComplete.

Example Use Cases

1. Basic Asynchronous Execution

You can run a task asynchronously without blocking the main thread:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Simulate a long computation
            System.out.println("Running in background...");
            return "Result";
        }).thenAccept(result -> {
            // Use the result once completed
            System.out.println("Completed with: " + result);
        });

        System.out.println("Main thread is free to do other work...");
    }
}

Output:

Main thread is free to do other work...
Running in background...
Completed with: Result

2. Chaining Dependent Tasks

Reactive-style programming involves chaining tasks, which can be done with thenApply or thenCompose:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChaining {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Fetch some data (simulate API/database call)
            return "Data from API";
        }).thenApply(data -> {
            // Transform the data
            return data.toUpperCase();
        }).thenAccept(processedData -> {
            // Use transformed data
            System.out.println("Processed Data: " + processedData);
        });
    }
}

3. Combining Multiple Async Tasks

To run multiple tasks in parallel and combine their results:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombine {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1 Result");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2 Result");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return result1 + " & " + result2;
        });

        combinedFuture.thenAccept(System.out::println);
    }
}

Output:

Task 1 Result & Task 2 Result

4. Waiting for All Tasks to Complete

If you need to wait for multiple independent tasks to complete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public class CompletableFutureAllOf {
    public static void main(String[] args) {
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> System.out.println("Task 1")),
                CompletableFuture.runAsync(() -> System.out.println("Task 2")),
                CompletableFuture.runAsync(() -> System.out.println("Task 3"))
        );

        // Wait for all tasks to complete
        allTasks.join();
        System.out.println("All tasks completed.");
    }
}

5. Handling Exceptions

You can handle exceptions gracefully with methods like exceptionally, handle, or whenComplete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionHandling {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
                    // Simulate an error
                    if (true) throw new RuntimeException("Something went wrong!");
                    return "Task Result";
                })
                .exceptionally(ex -> {
                    System.out.println("Error: " + ex.getMessage());
                    return "Fallback Result";
                })
                .thenAccept(result -> System.out.println("Result: " + result));
    }
}

Output:

Error: Something went wrong!
Result: Fallback Result

6. Running Tasks in a Custom Executor

By default, CompletableFuture uses the common ForkJoinPool, but you can specify a custom executor:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;

public class CustomExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        CompletableFuture.runAsync(() -> {
            System.out.println("Task executing in custom executor");
        }, executor).thenRun(() -> executor.shutdown());
    }
}

Summary of Key Methods

Method Purpose
supplyAsync(Supplier) Run a computation in another thread and return a result.
runAsync(Runnable) Run a computation without returning a result.
thenApply(Function) Transform result of the stage.
thenCompose(Function) Chain another async computation dependent on the previous one.
thenAccept(Consumer) Consume the result.
thenCombine(CompletableFuture, BiFunction) Combine results of two independent computations.
allOf(CompletableFuture...) Wait for all tasks to complete.
anyOf(CompletableFuture...) Return as soon as any task is complete.
exceptionally(Function) Handle exceptions and provide a fallback value.
handle(BiFunction) Process the result or handle exceptions.

Benefits of Using CompletableFuture for Reactive Programming

  • Non-blocking and efficient concurrency.
  • Easier composition of asynchronous operations compared to traditional threads.
  • Fine-grained exception handling and coordination of parallel tasks.
  • Works well with APIs like REST or streaming in a reactive pipeline.

By taking advantage of these features, you can implement clean, reactive, and efficient systems.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.