How do I use ExecutorService with virtual threads?

To use ExecutorService with virtual threads in Java, you can leverage the Executors.newVirtualThreadPerTaskExecutor() method. This method creates an ExecutorService where each task is executed on a new virtual thread, managed by the Java runtime. Here’s a step-by-step guide:


1. Dependencies & Setup

Ensure you are using Java 19 or newer. Virtual threads were introduced as a preview feature, but from Java 21 onward, they are part of the platform. You may need --enable-preview as a JVM option for Java 19 and 20.


2. Creating an ExecutorService with Virtual Threads

The Executors.newVirtualThreadPerTaskExecutor() method provides an easy way to create an executor service for virtual threads.

Example:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExample {
    public static void main(String[] args) {
        // Creates an ExecutorService with virtual threads
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Submitting tasks to executor
            executor.submit(() -> System.out.println("Task 1 on virtual thread"));
            executor.submit(() -> System.out.println("Task 2 on another virtual thread"));
        } // The executor is automatically closed after the try block
    }
}

In this example:

  • Each task runs in its own virtual thread, allowing them to scale efficiently.
  • The try block ensures the resources are cleaned up when the executor is closed.

3. Advantages

  • Concurrency: High-concurrency tasks, such as I/O-bound operations, benefit from virtual threads.
  • Scalability: You don’t have to limit the number of threads since virtual threads don’t demand system OS threads.
  • Simplicity: Virtual threads make it easier to adopt a thread-per-task model without resource overhead.

4. Important Use Cases

  • Concurrent workloads like handling multiple incoming web requests.
  • Tasks that rely on blocking operations, such as database access or network I/O.

5. Combining with Structured Concurrency (Optional)

Using structured concurrency (Java 21+) simplifies managing tasks by controlling their lifecycle. Here’s a snippet combining virtual threads with structured concurrency:

Example:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StructuredConcurrencyExample {
    public static void main(String[] args) throws Exception {
        // Using an ExecutorService with virtual threads
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            var task1 = executor.submit(() -> {
                Thread.sleep(500); // Simulating a long-running task
                return "Result from task 1";
            });

            var task2 = executor.submit(() -> {
                Thread.sleep(300); // Another long-running task
                return "Result from task 2";
            });

            // Getting results from tasks
            System.out.println(task1.get());
            System.out.println(task2.get());
        }
    }
}

6. Considerations

  • Resource Efficiency: Virtual threads work well for blocking I/O tasks. However, for CPU-bound tasks, you’re limited by the number of available processors.
  • Preview Feature (If Applicable): Ensure you run the program with --enable-preview if using a preview version of Java.

Virtual threads offer a significant leap in simplifying multithreaded programming while improving scalability. Transitioning to virtual threads in most legacy multithreaded systems is straightforward because they integrate seamlessly with the existing threading APIs.

How do I combine multiple CompletableFutures?

To combine multiple CompletableFuture objects in Java, you can use the following approaches, depending on your specific use case:


1. Combine Two Futures

If you have two CompletableFuture instances and need to combine their results, use the thenCombine method. It lets you specify how to merge the results of both futures.

Example:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

// Combine their results
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
    return result1 + " & " + result2; // Merge both results
});

combinedFuture.thenAccept(result -> System.out.println("Combined Result: " + result));

Output:

Combined Result: Task1 & Task2

2. Wait for All Futures to Complete

If you want to wait for multiple CompletableFuture tasks to complete (without necessarily combining their results immediately), you can use the allOf method. This is useful when you want to ensure that all tasks are completed before proceeding further.

Example:

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 3 completed"))
);

// Wait for all tasks to complete
allTasks.join();

System.out.println("All tasks completed.");

3. Combine Results of Multiple Futures

To combine the individual results of a list (or array) of CompletableFuture objects, you would use allOf in conjunction with further processing.

For example:

  1. Create a list/array of CompletableFuture.
  2. Use CompletableFuture.allOf() to wait for all of them.
  3. Extract and combine results using join.

Example:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "Result1"),
    CompletableFuture.supplyAsync(() -> "Result2"),
    CompletableFuture.supplyAsync(() -> "Result3")
);

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

// Gather results once all are completed
CompletableFuture<List<String>> resultFuture = allOf.thenApply(v ->
    futures.stream()
           .map(CompletableFuture::join) // Join and collect results
           .collect(Collectors.toList())
);

resultFuture.thenAccept(results -> System.out.println("Results: " + results));

Output:

Results: [Result1, Result2, Result3]

4. Wait for the First to Complete

If you are waiting for the first CompletableFuture to complete instead of all, use anyOf. It is useful when results from the first completed task suffice.

Example:

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> "First Result"),
    CompletableFuture.supplyAsync(() -> "Second Result"),
    CompletableFuture.supplyAsync(() -> "Third Result")
);

anyOf.thenAccept(result -> System.out.println("First completed task result: " + result));

Comparison of APIs for Combining Futures:

Method Purpose
thenCombine Combines two futures’ results into one.
thenCompose Chains dependent tasks (where one task’s result is input for the next).
allOf Waits for all futures in a list/array to complete but does not return their results.
anyOf Waits for the first future from a list/array to complete and returns its result.

By selecting the appropriate method (thenCombine, allOf, anyOf, or others), you can efficiently handle multiple asynchronous computations in Java with CompletableFuture.

How do I use CompletableFuture for async tasks?

CompletableFuture is a powerful tool in Java to implement asynchronous programming. It allows you to perform tasks in the background, chain multiple async tasks together, handle both success and failure scenarios, and combine multiple async computations.

Here’s a summary of how to use CompletableFuture with examples:


1. Run a task asynchronously

Use supplyAsync() (if the task produces a result) or runAsync() (if the task doesn’t return anything).

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running task in background...");
});

Or with a result:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello, World!";
});

2. Process the result

You can process the result of a CompletableFuture using methods like thenAccept or thenApply.

future.thenApply(result -> {
    System.out.println("Received result: " + result);
    return result.toUpperCase();
}).thenAccept(uppercaseResult -> {
    System.out.println("Transformed result: " + uppercaseResult);
});

3. Combine multiple tasks

You can run multiple tasks in parallel and combine their results.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (task1Result, task2Result) -> {
    return task1Result + " and " + task2Result;
});

combinedFuture.thenAccept(result -> {
    System.out.println("Combined Result: " + result);
});

4. Wait for all tasks

If you have multiple tasks and want to wait for all of them to complete, use CompletableFuture.allOf.

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed"))
);

allTasks.join(); // Blocks the thread and waits for completion
System.out.println("All tasks completed.");

5. Handle errors

You can handle exceptions in async tasks using exceptionally().

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Oops, something went wrong!");
}).exceptionally(ex -> {
    System.out.println("Error: " + ex.getMessage());
    return null;
});

6. Compose dependent tasks

Use thenCompose to chain dependent tasks where the second task depends on the result of the first one.

CompletableFuture.supplyAsync(() -> "Task 1 Result")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " Task 2 Result"))
    .thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));

7. Custom Executor

By default, CompletableFuture uses the ForkJoinPool.commonPool for async tasks. You can provide a custom executor for better control over threads.

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture.runAsync(() -> {
    System.out.println("Running task on custom executor");
}, executor);

Key Methods in CompletableFuture

Method Description
runAsync Run a task asynchronously (does not return result).
supplyAsync Run a task asynchronously and return a result.
thenApply Transform the result of a CompletableFuture.
thenAccept Consumes the result of a CompletableFuture (no further processing).
thenCompose Chains dependent tasks where the next uses the result of the previous.
thenCombine Combines two CompletableFuture results.
allOf / anyOf Wait for all or any of multiple futures to complete.
exceptionally Handle exceptions thrown during the async computation.
join / get Block and wait for the task to complete and retrieve its result (not recommended for non-blocking).

Example Workflow with Pipeline

Here’s an example pipeline:

  1. Fetch data
  2. Process it
  3. Save it
  4. Notify the user
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching data...");
    return "Raw Data";
}, executor).thenApply(data -> {
    System.out.println("Processing data...");
    return data.toUpperCase();
}).thenAccept(processedData -> {
    System.out.println("Saving: " + processedData);
}).thenRun(() -> {
    System.out.println("Notification: Done!");
}).exceptionally(ex -> {
    System.err.println("Pipeline failed due to: " + ex.getMessage());
    return null;
}).join();

Using CompletableFuture, you can build flexible, high-performance, and non-blocking applications.

How do I use the Virtual Threads API Project Loom?

The Virtual Threads API is part of Project Loom in the Java platform. With Java 19, virtual threads became available as a preview feature, enabling the creation of lightweight threads that can run concurrently. They work similarly to traditional threads but are much cheaper in terms of memory and thread management because they are managed by the Java runtime, not the operating system. This makes it possible to scale the number of threads easily, even in the millions.

Here’s how you can use the Virtual Threads API:

1. Enable Virtual Threads

Virtual threads are available in Java 19+ as an incubating feature. To use them:

  • Ensure that you’re using a compatible version of Java (Java 19 or later).
  • Add the JVM flag --enable-preview to enable preview features when running your program.

2. Creating Virtual Threads

Java provides the java.lang.Thread class and the Executors utility to work with virtual threads. For example:

Creating a Virtual Thread

You can create and start a virtual thread like this:

Thread.startVirtualThread(() -> {
    System.out.println("This is a virtual thread!");
});

Using Virtual Threads with Executors

The Executors.newVirtualThreadPerTaskExecutor() method creates an ExecutorService that launches a new virtual thread for each task:

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> System.out.println("Task on a virtual thread"));
    executor.submit(() -> System.out.println("Another virtual thread task"));
}

3. Advantages of Virtual Threads

  1. Lightweight: Virtual threads are less resource-intensive because they use the Java runtime scheduler rather than the OS scheduler. Millions of threads can be created.
  2. Non-blocking: Blocking operations in virtual threads don’t block OS resources, making them very efficient for I/O-intensive workloads like web servers or concurrent network communication.
  3. Easier Scaling: They simplify concurrent programming by allowing you to continue using the familiar thread-per-task model without worrying about resource limits.
  4. Works with Existing Code: Virtual threads integrate well with existing Java APIs like java.util.concurrent.

4. When to Use Virtual Threads

Virtual threads are ideal for:

  • Concurrent I/O tasks like HTTP servers or database connections.
  • High-concurrency environments where traditional threads might run out of OS resources.
  • Migrating legacy multithreaded code to take advantage of better scalability.

5. Example: HTTP Server with Virtual Threads

Here’s a minimal example showcasing how to use virtual threads for handling multiple HTTP requests:

import java.net.ServerSocket;
import java.net.Socket;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.PrintWriter;

public class VirtualThreadHttpServer {
    public static void main(String[] args) throws Exception {
        try (var serverSocket = new ServerSocket(8080)) {
            while (true) {
                Socket clientSocket = serverSocket.accept();
                Thread.startVirtualThread(() -> handleClient(clientSocket));
            }
        }
    }

    private static void handleClient(Socket clientSocket) {
        try (var in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
             var out = new PrintWriter(clientSocket.getOutputStream(), true)) {
            out.println("Hello from the Virtual Thread server!");
            String input;
            while ((input = in.readLine()) != null) {
                System.out.println("Received: " + input);
                if ("exit".equalsIgnoreCase(input)) {
                    break;
                }
                out.println("You said: " + input);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

This makes use of virtual threads to handle each incoming socket connection, which scales efficiently for high-concurrency workloads.

6. Integration with Structured Concurrency

Virtual threads can be combined with structured concurrency (introduced in Java 21) for safer and more manageable multithreading. Structured concurrency allows parent threads to manage the lifecycle of child threads.

Example of Structured Concurrency:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StructuredConcurrencyExample {
    public static void main(String[] args) throws Exception {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            var future1 = executor.submit(() -> {
                Thread.sleep(500);
                return "Task 1 completed";
            });

            var future2 = executor.submit(() -> {
                Thread.sleep(300);
                return "Task 2 completed";
            });

            // Wait for results
            System.out.println(future1.get());
            System.out.println(future2.get());
        }
    }
}

Keynotes:

  • Virtual threads require no changes in application logic. Code written for traditional Thread can immediately benefit from using virtual threads.
  • They simplify thread management while maintaining excellent performance for non-blocking I/O operations.

Limitations:

  • Virtual threads won’t improve performance for CPU-bound tasks; you still need to consider the number of logical CPUs in your system.
  • JVM preview features need to be enabled since virtual threads are not yet finalized in the standard Java API. Check the latest Java release notes for updates.

How do I use the Java Util Concurrent Flow API?

The java.util.concurrent.Flow API, introduced in Java 9, is a low-level implementation of the Reactive Streams specification, providing an asynchronous, non-blocking framework for handling streams of data. It allows you to build reactive systems that support backpressure to handle large or variable amounts of data more efficiently.

Here’s a quick guide on how to use java.util.concurrent.Flow API:


Key Interfaces in java.util.concurrent.Flow

The API comprises four core interfaces:

  1. Flow.Publisher:
    Represents the producer of data. It publishes items to one or more subscribers.

  2. Flow.Subscriber:
    Represents the consumer of data. It subscribes to a publisher to receive data.

  3. Flow.Subscription:
    Represents a link between a publisher and a subscriber, allowing the subscriber to control how much data it receives (backpressure).

  4. Flow.Processor:
    Both a subscriber and a publisher, used to transform or process elements as they flow through the stream.


Implementation Workflow

To use the Flow API, you need to implement these interfaces. Below is a step-by-step explanation:

1. Create a Publisher

  • The Flow.Publisher interface has a single method subscribe(Subscriber<? super T> subscriber).
  • The publisher is responsible for connecting with subscribers and managing their subscriptions.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimplePublisher implements Flow.Publisher<String> {
    private final String[] items = {"Item 1", "Item 2", "Item 3"};

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        SubscriptionImpl subscription = new SubscriptionImpl(subscriber, items);
        subscriber.onSubscribe(subscription);
    }

    private static class SubscriptionImpl implements Flow.Subscription {
        private final Flow.Subscriber<? super String> subscriber;
        private final String[] items;
        private int currentIndex = 0;
        private boolean canceled = false;

        public SubscriptionImpl(Flow.Subscriber<? super String> subscriber, String[] items) {
            this.subscriber = subscriber;
            this.items = items;
        }

        @Override
        public void request(long n) {
            if (n <= 0) {
                subscriber.onError(new IllegalArgumentException("Must request a positive number of items"));
                return;
            }

            for (int i = 0; i < n && currentIndex < items.length; i++) {
                if (canceled) {
                    return;
                }
                subscriber.onNext(items[currentIndex++]);
            }

            if (currentIndex == items.length) {
                subscriber.onComplete();
            }
        }

        @Override
        public void cancel() {
            canceled = true;
        }
    }
}

2. Create a Subscriber

  • Implement the Flow.Subscriber interface (four methods) for receiving events from a publisher:
    • onSubscribe(Flow.Subscription subscription): Receive the subscription. You must request data here.
    • onNext(T item): Handle the next item of the stream.
    • onError(Throwable throwable): Handle any errors.
    • onComplete(): Called when the publisher finishes sending data.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimpleSubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Subscribed!");
        subscription.request(1); // Request the first item
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1); // Request the next item
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Complete!");
    }
}

3. Connect the Publisher to the Subscriber

  • Instantiate and link your Publisher and Subscriber.
package org.kodejava.util.concurrent;

public class FlowExample {
    public static void main(String[] args) {
        SimplePublisher publisher = new SimplePublisher();
        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);
    }
}

4. (Optional) Create a Processor

  • A Processor acts as both a Subscriber to transform data from an upstream publisher and a Publisher to pass it downstream.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class UppercaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(item.toUpperCase());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        close();
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete!");
        close();
    }
}

Example with SubmissionPublisher

Java also provides a SubmissionPublisher class, an implementation of Flow.Publisher, which simplifies creating Publishers.

package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("Subscribed!");
                subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
                subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Done!");
            }
        };

        publisher.subscribe(subscriber);

        System.out.println("Publishing data...");
        publisher.submit("Hello");
        publisher.submit("World");
        publisher.submit("!");

        Thread.sleep(100); // Allow time for processing
        publisher.close();
    }
}

Output of the Above Example

Publishing data...
Subscribed!
Received: Hello
Received: World
Received: !
Done!

Points to Remember

  1. Backpressure:
    • The subscriber can control how many items it wants to receive using the request() method of Flow.Subscription.
    • If the subscriber requests fewer items, the publisher will slow down and send only the requested number.
  2. Error Handling:
    • If something goes wrong, the onError() callback is invoked, allowing you to handle errors gracefully.
  3. Completion:
    • Once all elements are processed, the publisher calls onComplete() to indicate the sequence is finished.
  4. Threading:
    • The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the SubmissionPublisher uses a default ForkJoinPool to process items).

By combining custom implementations with the provided SubmissionPublisher and additional libraries, you can build reactive systems that are both powerful and resource-efficient.