How do I combine multiple CompletableFutures?

To combine multiple CompletableFuture objects in Java, you can use the following approaches, depending on your specific use case:


1. Combine Two Futures

If you have two CompletableFuture instances and need to combine their results, use the thenCombine method. It lets you specify how to merge the results of both futures.

Example:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

// Combine their results
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
    return result1 + " & " + result2; // Merge both results
});

combinedFuture.thenAccept(result -> System.out.println("Combined Result: " + result));

Output:

Combined Result: Task1 & Task2

2. Wait for All Futures to Complete

If you want to wait for multiple CompletableFuture tasks to complete (without necessarily combining their results immediately), you can use the allOf method. This is useful when you want to ensure that all tasks are completed before proceeding further.

Example:

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 3 completed"))
);

// Wait for all tasks to complete
allTasks.join();

System.out.println("All tasks completed.");

3. Combine Results of Multiple Futures

To combine the individual results of a list (or array) of CompletableFuture objects, you would use allOf in conjunction with further processing.

For example:

  1. Create a list/array of CompletableFuture.
  2. Use CompletableFuture.allOf() to wait for all of them.
  3. Extract and combine results using join.

Example:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "Result1"),
    CompletableFuture.supplyAsync(() -> "Result2"),
    CompletableFuture.supplyAsync(() -> "Result3")
);

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

// Gather results once all are completed
CompletableFuture<List<String>> resultFuture = allOf.thenApply(v ->
    futures.stream()
           .map(CompletableFuture::join) // Join and collect results
           .collect(Collectors.toList())
);

resultFuture.thenAccept(results -> System.out.println("Results: " + results));

Output:

Results: [Result1, Result2, Result3]

4. Wait for the First to Complete

If you are waiting for the first CompletableFuture to complete instead of all, use anyOf. It is useful when results from the first completed task suffice.

Example:

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> "First Result"),
    CompletableFuture.supplyAsync(() -> "Second Result"),
    CompletableFuture.supplyAsync(() -> "Third Result")
);

anyOf.thenAccept(result -> System.out.println("First completed task result: " + result));

Comparison of APIs for Combining Futures:

Method Purpose
thenCombine Combines two futures’ results into one.
thenCompose Chains dependent tasks (where one task’s result is input for the next).
allOf Waits for all futures in a list/array to complete but does not return their results.
anyOf Waits for the first future from a list/array to complete and returns its result.

By selecting the appropriate method (thenCombine, allOf, anyOf, or others), you can efficiently handle multiple asynchronous computations in Java with CompletableFuture.

How do I use CompletableFuture for async tasks?

CompletableFuture is a powerful tool in Java to implement asynchronous programming. It allows you to perform tasks in the background, chain multiple async tasks together, handle both success and failure scenarios, and combine multiple async computations.

Here’s a summary of how to use CompletableFuture with examples:


1. Run a task asynchronously

Use supplyAsync() (if the task produces a result) or runAsync() (if the task doesn’t return anything).

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running task in background...");
});

Or with a result:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello, World!";
});

2. Process the result

You can process the result of a CompletableFuture using methods like thenAccept or thenApply.

future.thenApply(result -> {
    System.out.println("Received result: " + result);
    return result.toUpperCase();
}).thenAccept(uppercaseResult -> {
    System.out.println("Transformed result: " + uppercaseResult);
});

3. Combine multiple tasks

You can run multiple tasks in parallel and combine their results.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (task1Result, task2Result) -> {
    return task1Result + " and " + task2Result;
});

combinedFuture.thenAccept(result -> {
    System.out.println("Combined Result: " + result);
});

4. Wait for all tasks

If you have multiple tasks and want to wait for all of them to complete, use CompletableFuture.allOf.

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed"))
);

allTasks.join(); // Blocks the thread and waits for completion
System.out.println("All tasks completed.");

5. Handle errors

You can handle exceptions in async tasks using exceptionally().

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Oops, something went wrong!");
}).exceptionally(ex -> {
    System.out.println("Error: " + ex.getMessage());
    return null;
});

6. Compose dependent tasks

Use thenCompose to chain dependent tasks where the second task depends on the result of the first one.

CompletableFuture.supplyAsync(() -> "Task 1 Result")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " Task 2 Result"))
    .thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));

7. Custom Executor

By default, CompletableFuture uses the ForkJoinPool.commonPool for async tasks. You can provide a custom executor for better control over threads.

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture.runAsync(() -> {
    System.out.println("Running task on custom executor");
}, executor);

Key Methods in CompletableFuture

Method Description
runAsync Run a task asynchronously (does not return result).
supplyAsync Run a task asynchronously and return a result.
thenApply Transform the result of a CompletableFuture.
thenAccept Consumes the result of a CompletableFuture (no further processing).
thenCompose Chains dependent tasks where the next uses the result of the previous.
thenCombine Combines two CompletableFuture results.
allOf / anyOf Wait for all or any of multiple futures to complete.
exceptionally Handle exceptions thrown during the async computation.
join / get Block and wait for the task to complete and retrieve its result (not recommended for non-blocking).

Example Workflow with Pipeline

Here’s an example pipeline:

  1. Fetch data
  2. Process it
  3. Save it
  4. Notify the user
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching data...");
    return "Raw Data";
}, executor).thenApply(data -> {
    System.out.println("Processing data...");
    return data.toUpperCase();
}).thenAccept(processedData -> {
    System.out.println("Saving: " + processedData);
}).thenRun(() -> {
    System.out.println("Notification: Done!");
}).exceptionally(ex -> {
    System.err.println("Pipeline failed due to: " + ex.getMessage());
    return null;
}).join();

Using CompletableFuture, you can build flexible, high-performance, and non-blocking applications.

How do I use the Java Util Concurrent Flow API?

The java.util.concurrent.Flow API, introduced in Java 9, is a low-level implementation of the Reactive Streams specification, providing an asynchronous, non-blocking framework for handling streams of data. It allows you to build reactive systems that support backpressure to handle large or variable amounts of data more efficiently.

Here’s a quick guide on how to use java.util.concurrent.Flow API:


Key Interfaces in java.util.concurrent.Flow

The API comprises four core interfaces:

  1. Flow.Publisher:
    Represents the producer of data. It publishes items to one or more subscribers.

  2. Flow.Subscriber:
    Represents the consumer of data. It subscribes to a publisher to receive data.

  3. Flow.Subscription:
    Represents a link between a publisher and a subscriber, allowing the subscriber to control how much data it receives (backpressure).

  4. Flow.Processor:
    Both a subscriber and a publisher, used to transform or process elements as they flow through the stream.


Implementation Workflow

To use the Flow API, you need to implement these interfaces. Below is a step-by-step explanation:

1. Create a Publisher

  • The Flow.Publisher interface has a single method subscribe(Subscriber<? super T> subscriber).
  • The publisher is responsible for connecting with subscribers and managing their subscriptions.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimplePublisher implements Flow.Publisher<String> {
    private final String[] items = {"Item 1", "Item 2", "Item 3"};

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        SubscriptionImpl subscription = new SubscriptionImpl(subscriber, items);
        subscriber.onSubscribe(subscription);
    }

    private static class SubscriptionImpl implements Flow.Subscription {
        private final Flow.Subscriber<? super String> subscriber;
        private final String[] items;
        private int currentIndex = 0;
        private boolean canceled = false;

        public SubscriptionImpl(Flow.Subscriber<? super String> subscriber, String[] items) {
            this.subscriber = subscriber;
            this.items = items;
        }

        @Override
        public void request(long n) {
            if (n <= 0) {
                subscriber.onError(new IllegalArgumentException("Must request a positive number of items"));
                return;
            }

            for (int i = 0; i < n && currentIndex < items.length; i++) {
                if (canceled) {
                    return;
                }
                subscriber.onNext(items[currentIndex++]);
            }

            if (currentIndex == items.length) {
                subscriber.onComplete();
            }
        }

        @Override
        public void cancel() {
            canceled = true;
        }
    }
}

2. Create a Subscriber

  • Implement the Flow.Subscriber interface (four methods) for receiving events from a publisher:
    • onSubscribe(Flow.Subscription subscription): Receive the subscription. You must request data here.
    • onNext(T item): Handle the next item of the stream.
    • onError(Throwable throwable): Handle any errors.
    • onComplete(): Called when the publisher finishes sending data.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimpleSubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Subscribed!");
        subscription.request(1); // Request the first item
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1); // Request the next item
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Complete!");
    }
}

3. Connect the Publisher to the Subscriber

  • Instantiate and link your Publisher and Subscriber.
package org.kodejava.util.concurrent;

public class FlowExample {
    public static void main(String[] args) {
        SimplePublisher publisher = new SimplePublisher();
        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);
    }
}

4. (Optional) Create a Processor

  • A Processor acts as both a Subscriber to transform data from an upstream publisher and a Publisher to pass it downstream.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class UppercaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(item.toUpperCase());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        close();
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete!");
        close();
    }
}

Example with SubmissionPublisher

Java also provides a SubmissionPublisher class, an implementation of Flow.Publisher, which simplifies creating Publishers.

package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("Subscribed!");
                subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
                subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Done!");
            }
        };

        publisher.subscribe(subscriber);

        System.out.println("Publishing data...");
        publisher.submit("Hello");
        publisher.submit("World");
        publisher.submit("!");

        Thread.sleep(100); // Allow time for processing
        publisher.close();
    }
}

Output of the Above Example

Publishing data...
Subscribed!
Received: Hello
Received: World
Received: !
Done!

Points to Remember

  1. Backpressure:
    • The subscriber can control how many items it wants to receive using the request() method of Flow.Subscription.
    • If the subscriber requests fewer items, the publisher will slow down and send only the requested number.
  2. Error Handling:
    • If something goes wrong, the onError() callback is invoked, allowing you to handle errors gracefully.
  3. Completion:
    • Once all elements are processed, the publisher calls onComplete() to indicate the sequence is finished.
  4. Threading:
    • The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the SubmissionPublisher uses a default ForkJoinPool to process items).

By combining custom implementations with the provided SubmissionPublisher and additional libraries, you can build reactive systems that are both powerful and resource-efficient.

How do I use Map.forEach() for concise iteration?

The Map.forEach method in Java provides a concise and elegant way to iterate over all key-value pairs in a Map. This method accepts a lambda function (or method reference), which processes each entry in the map.

Here’s how you can use Map.forEach for concise iteration:

Syntax:

map.forEach((key, value) -> {
    // Your logic here
});

Example:

Suppose you have a map, and you want to print each key-value pair:

Map<String, Integer> map = new HashMap<>();
map.put("Apple", 10);
map.put("Orange", 20);
map.put("Banana", 30);

// Use forEach for iteration
map.forEach((key, value) -> System.out.println("Key: " + key + ", Value: " + value));

Explanation:

  1. Lambda Expression:
    • (key, value) are the parameters representing the key and the value of each entry in the map.
    • The code block after -> defines what happens for each entry in the map.
  2. Conciseness:
    • No need to use nested loops or explicitly retrieve entries from the map using entrySet or keySet.

Use Cases:

  • Logging or printing map entries.
  • Applying transformations (e.g., modifying values).
  • Collecting or filtering certain entries based on some condition.

Method Reference:

If your logic can be represented as a method, you can use a method reference:

map.forEach(System.out::println); // Prints entries like "Apple=10"

This keeps the code concise, readable, and functional.

How do I use Collectors.filtering() introduced in Java 9?

In Java 9, the Collectors.filtering method was introduced to the Stream API as part of java.util.stream.Collectors. It allows you to apply a filter to elements of a stream before collecting them into a downstream collector (e.g., toList, toSet, etc.).

This can be particularly useful when you want to filter elements as part of the data collection pipeline.


Syntax

static <T, A, R> Collector<T, ?, R> filtering(Predicate<? super T> predicate, Collector<? super T, A, R> downstream)
  • predicate: A filter condition to be applied (e.g., a lambda expression).
  • downstream: The collector that will gather the filtered elements (e.g., Collectors.toList()).

How It Works

  1. The filtering method applies the specified Predicate to filter the elements of the stream.
  2. Only the elements that match the predicate are passed to the downstream collector.
  3. The filtered results are then collected as specified by the downstream collector.

Usage Example

Here’s a basic example of using Collectors.filtering:

Collecting only even integers from a list:

package org.kodejava.util.stream;

import java.util.List;
import java.util.stream.Collectors;

public class FilteringExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Apply filtering before collecting to a list
        List<Integer> evenNumbers = numbers.stream()
                .collect(Collectors.filtering(n -> n % 2 == 0, Collectors.toList()));

        System.out.println("Even Numbers: " + evenNumbers);
    }
}

Output:

Even Numbers: [2, 4, 6, 8, 10]

Filtering with Downstream Grouping

You can use filtering in more complex collectors, such as those involving grouping. For example:

Grouping strings by their first character and filtering only strings longer than 3 characters:

package org.kodejava.util.stream;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class FilteringWithGrouping {
    public static void main(String[] args) {
        List<String> words = List.of("apple", "ant", "banana", "bat", "cat", "car", "dog");

        // Group by the first character and filter words with length > 3
        Map<Character, List<String>> filteredWordsByGroup = words.stream()
                .collect(Collectors.groupingBy(
                        word -> word.charAt(0), // Grouping by the first character
                        Collectors.filtering(
                                word -> word.length() > 3, // Filter words with length > 3
                                Collectors.toList() // Collect filtered words into a list
                        )
                ));

        System.out.println("Filtered Words: " + filteredWordsByGroup);
    }
}

Output:

Filtered Words: {a=[apple], b=[banana], c=[cat, car], d=[dog]}

When to Use

Collectors.filtering is particularly useful for:

  1. Grouped collections: Applying a filter while grouping elements.
  2. Custom collections: Collecting filtered elements into different collection types without needing an intermediate filtered stream.
  3. Improved readability: Reduces the need for chaining multiple Stream.filter() calls in complex data processing.

Overall, Collectors.filtering makes streams more flexible and concise for advanced data collection scenarios!