How do I chain asynchronous calls using Java 11 HttpClient and CompletableFuture?

Chaining asynchronous calls using Java 11’s HttpClient and CompletableFuture can be achieved by leveraging the reactive capabilities of CompletableFuture. The sendAsync method of HttpClient supports asynchronous processing, and you can chain multiple calls together using methods like thenApply, thenCompose, or thenAccept. Here’s a step-by-step example:


Key Concepts Used:

  1. CompletableFuture:
    • Allows for async processing and chaining of dependent tasks.
  2. HttpClient and HttpRequest:
    • The async calls are made using the HttpClient.sendAsync method.
  3. Chaining methods:
    • Use thenApply to transform the response or thenCompose to chain dependent async calls.

Example: Chaining Multiple HTTP Requests

Say we need to:

  1. Fetch data using one API.
  2. Use the response data to make another API call.
  3. Process the final response.

Here’s how you can do that:

package org.kodejava.net.http;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;

public class AsyncChainingExample {

   public static void main(String[] args) {
      HttpClient client = HttpClient.newHttpClient();

      // First API Request
      HttpRequest firstRequest = HttpRequest.newBuilder()
              .uri(URI.create("https://jsonplaceholder.typicode.com/posts/1"))
              .GET()
              .build();

      // First Async Call
      CompletableFuture<Void> future = client.sendAsync(firstRequest, HttpResponse.BodyHandlers.ofString())
              .thenApply(HttpResponse::body) // Extract body from response
              .thenCompose(body -> {
                 System.out.println("First API Response: " + body);

                 // Use data from the first response to make the second API request
                 String secondApiUri = "https://jsonplaceholder.typicode.com/comments?postId=1";
                 HttpRequest secondRequest = HttpRequest.newBuilder()
                         .uri(URI.create(secondApiUri))
                         .GET()
                         .build();

                 return client.sendAsync(secondRequest, HttpResponse.BodyHandlers.ofString());
              })
              .thenApply(HttpResponse::body) // Extract body from second response
              .thenAccept(secondResponse -> {
                 // Final result processing
                 System.out.println("Second API Response: " + secondResponse);
              });

      // Wait for all the tasks to complete
      future.join();
   }
}

Explanation of the Code:

  1. Create the HttpClient:
    • HttpClient.newHttpClient() initializes the HTTP client that will send requests asynchronously.
  2. First API Call:
    • The first API request (firstRequest) is created using HttpRequest.newBuilder.
    • Send the request asynchronously with:
    client.sendAsync(firstRequest, HttpResponse.BodyHandlers.ofString());
    
    • thenApply is used to extract the body of the response.
  3. Second API Call (Chained):
    • In thenCompose, the code prepares and sends the second API request. This ensures that the second API call happens only after the first call completes.
    • The response of this call is again processed by extracting the body.
  4. Response Processing:
    • thenAccept is used at the end of the chain to process the final response.
  5. Waiting for Completion:
    • Since the operations are asynchronous, future.join() blocks the main thread until all the chained calls complete.

Output:

Sample output from the above example (when run):

First API Response: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati",
  "body": "quia et suscipit..."
}

Second API Response: [
  {
    "postId": 1,
    "id": 1,
    "name": "id labore ex et quam laborum",
    ...
  },
  ...
]

Key Functions Used in the Chain:

  1. thenApply(Function)
    • Transforms the result of the previous step (e.g., extract the body).
  2. thenCompose(Function)
    • Used for dependent async calls. Ensures one CompletableFuture waits for another.
  3. thenAccept(Consumer)
    • Consumes the result without returning anything.

Advantages of this Approach:

  • No need for manual thread management.
  • Non-blocking I/O.
  • Easily scalable chaining of async calls.

This is a modern, clean solution for handling asynchronous HTTP requests in Java using HttpClient and CompletableFuture.

How do I use Java 11 HttpClient to send asynchronous requests?

In Java 11, the HttpClient API provides a modern and user-friendly way to send both synchronous and asynchronous HTTP requests. To send asynchronous requests, you’ll use the sendAsync method, which returns a CompletableFuture.

Here’s how to use it:


Step-by-Step Guide to Sending Asynchronous Requests:

  1. Initialize the HttpClient:
    Use HttpClient to create an instance. This is the central object for sending requests.
  2. Create an HttpRequest:
    Prepare your HTTP request using the HttpRequest class, where you can specify the URI, HTTP method, headers, body, etc.
  3. Send an Asynchronous Request with sendAsync:
    Call the sendAsync method of the HttpClient, passing the request and body handler as arguments. This returns a CompletableFuture, which allows you to perform non-blocking operations.
  4. Process the Response:
    Use the CompletableFuture chain methods, like thenApply and thenAccept, to process the response once it’s available.

Example: Sending an Asynchronous GET Request

package org.kodejava.net.http;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class AsyncHttpClientExample {

    public static void main(String[] args) {
        // Create HttpClient instance
        HttpClient client = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(10)) // Optional timeout
                .build();

        // Prepare HttpRequest
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://jsonplaceholder.typicode.com/posts"))
                .GET()
                .header("Accept", "application/json")
                .build();

        // Send asynchronous request
        CompletableFuture<HttpResponse<String>> futureResponse =
                client.sendAsync(request, HttpResponse.BodyHandlers.ofString());

        // Process response asynchronously
        futureResponse.thenApply(HttpResponse::body) // Extract the response body
                .thenAccept(System.out::println) // Print the body
                .exceptionally(ex -> {
                    System.err.println("Request failed: " + ex.getMessage());
                    return null;
                });

        // Do other tasks while the response is being fetched...
        System.out.println("Request is sent. Waiting for response...");

        // Wait until the response completes to prevent the program from exiting early
        futureResponse.join();
    }
}

Explanation:

  1. HttpClient.newBuilder():
    Creates a new instance of the HttpClient. You can optionally configure timeouts, proxies, or redirect policies.
  2. HttpRequest.newBuilder():
    Creates an HTTP request. You specify the URI, headers, and HTTP method (e.g., GET, POST, etc.).
  3. sendAsync:
    Sends the request asynchronously. It accepts two arguments:

    • The HttpRequest object.
    • A BodyHandler to determine how the HTTP response body should be handled, such as ofString() for plain text.
  4. CompletableFuture Chain:
    • thenApply: Manipulates the asynchronous result as it becomes available.
    • thenAccept: Consumes the result of the future once it’s ready.
    • exceptionally: Handles any exceptions that occur during execution.
  5. join():
    Blocks the main thread until the asynchronous operation is complete (used here to prevent premature termination of the program).

Example: Sending an Asynchronous POST Request with JSON Body

package org.kodejava.net.http;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class AsyncPostExample {

    public static void main(String[] args) {
        // Create HttpClient
        HttpClient client = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(10))
                .build();

        // Prepare JSON body
        String jsonBody = "{ \"title\": \"foo\", \"body\": \"bar\", \"userId\": 1 }";

        // Create HttpRequest
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://jsonplaceholder.typicode.com/posts"))
                .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
                .header("Content-Type", "application/json")
                .build();

        // Send asynchronous POST request
        CompletableFuture<HttpResponse<String>> futureResponse =
                client.sendAsync(request, HttpResponse.BodyHandlers.ofString());

        // Handle response
        futureResponse.thenApply(HttpResponse::body)
                .thenAccept(System.out::println)
                .exceptionally(ex -> {
                    System.err.println("Error: " + ex.getMessage());
                    return null;
                });

        // Keep the program running to wait for response
        System.out.println("POST request sent. Waiting for response...");
        futureResponse.join();
    }
}

Keynotes:

  • Thread-Safe HttpClient:
    The HttpClient instance is thread-safe and can be reused for multiple requests.
  • Non-blocking Nature:
    Asynchronous requests are non-blocking, so you can perform other tasks while waiting for the response.
  • Error Handling:
    Use the exceptionally method of the CompletableFuture to handle any errors during the request.
  • Keepalive:
    By default, HttpClient connections have keepalive enabled. It’s more efficient for high-performance applications.
  • Timeouts:
    Always configure timeouts to prevent indefinite blockage (connectTimeout or read timeouts).

Using this approach, you can efficiently perform asynchronous HTTP communication with HttpClient.

How do I implement async pipelines with CompletableFuture chaining?

Asynchronous pipelines can be implemented efficiently in Java using CompletableFuture. The ability of CompletableFuture to chain multiple steps through its functional programming model (e.g., thenApply, thenCompose, thenAccept, etc.) makes it a powerful tool for building non-blocking pipelines.

Here’s a step-by-step guide to implement async pipelines using CompletableFuture chaining:


1. Basics of CompletableFuture

CompletableFuture is a class in java.util.concurrent that represents a future result of an asynchronous computation. You can chain computations, handle exceptions, and run them all asynchronously.

2. Example Async Pipeline Workflow

Let’s assume we have a pipeline with multiple steps:

  1. Fetch data from a remote source.
  2. Transform the data.
  3. Save the data to a database.
  4. Notify a user.

Each of these steps will be represented as a method returning a CompletableFuture.


3. Implementation

a) Define the Asynchronous Tasks

Each step in the pipeline is encapsulated as an asynchronous method using CompletableFuture.supplyAsync() or runAsync().

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class AsyncPipeline {

   private static final ExecutorService executor = Executors.newFixedThreadPool(10); // Thread pool

   // 1. Fetch data from a remote source
   public CompletableFuture<String> fetchData() {
      return CompletableFuture.supplyAsync(() -> {
         try {
            Thread.sleep(1000); // Simulating delay
         } catch (InterruptedException e) {
            throw new IllegalStateException(e);
         }
         System.out.println("Fetched data");
         return "Data from remote source";
      }, executor);
   }

   // 2. Transform the data
   public CompletableFuture<String> transformData(String data) {
      return CompletableFuture.supplyAsync(() -> {
         System.out.println("Transforming data");
         return data.toUpperCase();
      }, executor);
   }

   // 3. Save the data to a database
   public CompletableFuture<Void> saveToDatabase(String transformedData) {
      return CompletableFuture.runAsync(() -> {
         System.out.println("Saving data to database: " + transformedData);
      }, executor);
   }

   // 4. Notify the user
   public CompletableFuture<Void> notifyUser() {
      return CompletableFuture.runAsync(() -> {
         System.out.println("User notified!");
      }, executor);
   }

   // Build the pipeline
   public void executePipeline() {
      fetchData()
              .thenCompose(this::transformData) // Pass the result from "fetchData" to "transformData"
              .thenCompose(this::saveToDatabase) // Then save transformed data to database
              .thenCompose(aVoid -> notifyUser()) // Finally notify the user
              .exceptionally(ex -> { // Handle exceptions globally
                 System.err.println("Pipeline execution failed: " + ex.getMessage());
                 return null;
              })
              .join(); // Block and wait for the pipeline to complete
   }

   public static void main(String[] args) {
      new AsyncPipeline().executePipeline();
   }
}

4. Explanation of the Code

  1. supplyAsync():
    • Used for methods that generate results, such as fetchData() and transformData().
  2. runAsync():
    • Used for methods that don’t produce results (return void), like saveToDatabase() and notifyUser().
  3. Chaining with thenCompose:
    • thenCompose() is used for chaining tasks where each subsequent task depends on the result of the previous task.
  4. Error Handling with exceptionally:
    • exceptionally() is used to handle any error in the pipeline and provide fallback logic.
  5. Thread Pool:
    • You can specify a custom ExecutorService for better control over thread resources.
  6. join():
    • Blocks the main thread until the entire pipeline is complete.

5. Key Methods in CompletableFuture

Method Description
supplyAsync() Executes a task asynchronously and provides a result.
runAsync() Executes a task asynchronously without any result.
thenApply() Transforms the result of a CompletableFuture.
thenCompose() Chains a dependent CompletableFuture, useful when a task depends on the output of the previous.
thenAccept() Consumes the result of a task without returning a result itself.
exceptionally() Handles exceptions that occur during pipeline execution.
allOf() Combines multiple CompletableFutures and waits for all to complete.
anyOf() Completes when any of the provided CompletableFutures completes first.

6. Advanced: Combining Multiple Futures

If there’s a need to combine results from multiple independent asynchronous tasks, you can use allOf() or anyOf().

Example with allOf:

public void fetchMultipleSources() {
    CompletableFuture<String> source1 = fetchSource1();
    CompletableFuture<String> source2 = fetchSource2();

    CompletableFuture<Void> combined = CompletableFuture.allOf(source1, source2)
        .thenRun(() -> {
            try {
                // Retrieve results once all futures are complete
                System.out.println("Source1: " + source1.get());
                System.out.println("Source2: " + source2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

    combined.join(); // Wait for the combined task to complete
}

private CompletableFuture<String> fetchSource1() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 1...");
        return "Data from Source 1";
    });
}

private CompletableFuture<String> fetchSource2() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching source 2...");
        return "Data from Source 2";
    });
}

This approach allows building flexible and robust asynchronous pipelines while keeping the code clear and concise.