How do I use ForkJoinPool for recursive tasks?

ForkJoinPool in Java is a part of the java.util.concurrent package and is designed to efficiently execute recursive tasks using a work-stealing algorithm. It works particularly well for problems that can be split into smaller subproblems and then combined to form the final result, adhering to the divide-and-conquer paradigm.

Here’s how you can use ForkJoinPool for recursive tasks:


1. Define Recursive Behavior with RecursiveTask/RecursiveAction

The main entities to use with ForkJoinPool are:

  • RecursiveTask<T>: Returns a result.
  • RecursiveAction: Performs an action without returning a result.

You define the recursive logic within these classes by overriding the compute() method.


2. Implement Recursive Splitting

  • A base case is defined where small tasks are computed directly.
  • For larger tasks, the work is split into subtasks, and fork() is invoked to execute them asynchronously. Results are aggregated using join().

3. Run Tasks in a ForkJoinPool

The tasks are submitted to a ForkJoinPool. This pool can manage multiple tasks simultaneously and perform work-stealing to optimize performance.


Example: Parallel Sum Using RecursiveTask

package org.kodejava.util.concurrent;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

// RecursiveTask to calculate the sum of an array
class ParallelSumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10; // Splitting threshold
    private final int[] arr;
    private final int start, end;

    public ParallelSumTask(int[] arr, int start, int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;

        // Base case: if below a threshold, compute directly
        if (length <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += arr[i];
            }
            return sum;
        }

        // Recursive splitting
        int mid = start + length / 2;
        ParallelSumTask leftTask = new ParallelSumTask(arr, start, mid);
        ParallelSumTask rightTask = new ParallelSumTask(arr, mid, end);

        leftTask.fork();          // Fork the left task
        long rightResult = rightTask.compute();  // Compute the right task
        long leftResult = leftTask.join();       // Wait for the left task

        return leftResult + rightResult;        // Combine results
    }
}

public class ForkJoinExample {
    public static void main(String[] args) {
        int[] numbers = new int[100];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i + 1; // Fill the array with 1 to 100
        }

        ForkJoinPool pool = new ForkJoinPool(); // Create ForkJoinPool
        ParallelSumTask task = new ParallelSumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Start the task and get the result
        System.out.println("Sum: " + result);
    }
}

Key Methods in ForkJoinTask

  • fork(): Asynchronously executes the task in the pool.
  • join(): Waits for the task to finish and retrieves its result.
  • invoke(): A shortcut for fork() + join().
  • compute(): Defines the logic for splitting and computation of tasks.

Advantages of ForkJoinPool

  1. Work-Stealing Algorithm: Idle threads steal tasks from busy threads, ensuring an even workload distribution.
  2. Efficient for Recursive Tasks: Particularly suited for algorithms like QuickSort, MergeSort, and calculations like Fibonacci or array sums.
  3. Dynamic Thread Management: ForkJoinPool manages the number of threads for optimal utilization based on available cores.

When to Use

  • Large, recursive tasks with problems that are computationally expensive.
  • Divide-and-conquer problems where each subproblem is independent after splitting.

Things to Consider

  • Splitting Threshold: Choosing a suitable threshold is crucial for balancing computation and task overhead.
  • Thread Contention: Ensure your tasks do not rely on a shared mutable state to avoid contention between threads.

How do I schedule tasks using ScheduledExecutorService?

To schedule tasks using ScheduledExecutorService in Java, follow these steps:

1. Create a ScheduledExecutorService

  • Use Executors.newScheduledThreadPool(int corePoolSize) to get an instance of ScheduledExecutorService.
    • corePoolSize: Number of threads to keep in the pool.

2. Schedule Tasks

The ScheduledExecutorService provides three methods for scheduling tasks:

  • schedule: Schedule a task to run after a specific delay.
    scheduler.schedule(() -> {
             System.out.println("Task executed after a delay");
         }, delay, TimeUnit.SECONDS);
    
    • delay: Time to wait before executing the task.
  • scheduleAtFixedRate: Schedule tasks to start at a fixed rate.
    scheduler.scheduleAtFixedRate(() -> {
             System.out.println("Task executed at a fixed rate");
         }, initialDelay, period, TimeUnit.SECONDS);
    
    • initialDelay: The delay before the first execution.
    • period: The interval between successive executions.
  • scheduleWithFixedDelay: Schedule tasks with a fixed delay between the end of one execution and the start of the next.
    scheduler.scheduleWithFixedDelay(() -> {
             System.out.println("Task executed with a delay");
         }, initialDelay, delay, TimeUnit.SECONDS);
    
    • delay: Time to wait between the previous task’s completion and the start of the next.

3. Shut Down the Scheduler

  • Always shut down the ScheduledExecutorService once tasks are no longer needed.
    • shutdown() to initiate an orderly shutdown.
    • shutdownNow() to stop all tasks immediately.
    scheduler.shutdown();
    

Points to Remember:

  1. Thread Efficiency: Reuse threads from the pool to handle multiple tasks efficiently.
  2. Exception Handling: If a task throws an exception, that thread may stop entirely. Either implement proper exception handling or use a ThreadFactory to manage threads (e.g., restart them).
  3. Fixed Rate vs Fixed Delay:
    • scheduleAtFixedRate: The interval is measured from the start of one task to the start of the next.
    • scheduleWithFixedDelay: The interval is measured from the end of one task to the start of the next.

Example:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// Task to run after 2 seconds
scheduler.schedule(() -> System.out.println("One-time task executed"), 2, TimeUnit.SECONDS);

// Task to run initially after 1 second, then every 3 seconds
scheduler.scheduleAtFixedRate(() -> System.out.println("Fixed rate task"), 1, 3, TimeUnit.SECONDS);

// Task to run initially after 2 seconds, then with a delay of 4 seconds
scheduler.scheduleWithFixedDelay(() -> System.out.println("Fixed delay task"), 2, 4, TimeUnit.SECONDS);

// Shutdown the executor after 15 seconds
scheduler.schedule(() -> {
    System.out.println("Shutting down the scheduler");
    scheduler.shutdown();
}, 15, TimeUnit.SECONDS);

The above demonstrates how to use ScheduledExecutorService.

How do I use ExecutorService with virtual threads?

To use ExecutorService with virtual threads in Java, you can leverage the Executors.newVirtualThreadPerTaskExecutor() method. This method creates an ExecutorService where each task is executed on a new virtual thread, managed by the Java runtime. Here’s a step-by-step guide:


1. Dependencies & Setup

Ensure you are using Java 19 or newer. Virtual threads were introduced as a preview feature, but from Java 21 onward, they are part of the platform. You may need --enable-preview as a JVM option for Java 19 and 20.


2. Creating an ExecutorService with Virtual Threads

The Executors.newVirtualThreadPerTaskExecutor() method provides an easy way to create an executor service for virtual threads.

Example:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExample {
    public static void main(String[] args) {
        // Creates an ExecutorService with virtual threads
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // Submitting tasks to executor
            executor.submit(() -> System.out.println("Task 1 on virtual thread"));
            executor.submit(() -> System.out.println("Task 2 on another virtual thread"));
        } // The executor is automatically closed after the try block
    }
}

In this example:

  • Each task runs in its own virtual thread, allowing them to scale efficiently.
  • The try block ensures the resources are cleaned up when the executor is closed.

3. Advantages

  • Concurrency: High-concurrency tasks, such as I/O-bound operations, benefit from virtual threads.
  • Scalability: You don’t have to limit the number of threads since virtual threads don’t demand system OS threads.
  • Simplicity: Virtual threads make it easier to adopt a thread-per-task model without resource overhead.

4. Important Use Cases

  • Concurrent workloads like handling multiple incoming web requests.
  • Tasks that rely on blocking operations, such as database access or network I/O.

5. Combining with Structured Concurrency (Optional)

Using structured concurrency (Java 21+) simplifies managing tasks by controlling their lifecycle. Here’s a snippet combining virtual threads with structured concurrency:

Example:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StructuredConcurrencyExample {
    public static void main(String[] args) throws Exception {
        // Using an ExecutorService with virtual threads
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            var task1 = executor.submit(() -> {
                Thread.sleep(500); // Simulating a long-running task
                return "Result from task 1";
            });

            var task2 = executor.submit(() -> {
                Thread.sleep(300); // Another long-running task
                return "Result from task 2";
            });

            // Getting results from tasks
            System.out.println(task1.get());
            System.out.println(task2.get());
        }
    }
}

6. Considerations

  • Resource Efficiency: Virtual threads work well for blocking I/O tasks. However, for CPU-bound tasks, you’re limited by the number of available processors.
  • Preview Feature (If Applicable): Ensure you run the program with --enable-preview if using a preview version of Java.

Virtual threads offer a significant leap in simplifying multithreaded programming while improving scalability. Transitioning to virtual threads in most legacy multithreaded systems is straightforward because they integrate seamlessly with the existing threading APIs.

How do I combine multiple CompletableFutures?

To combine multiple CompletableFuture objects in Java, you can use the following approaches, depending on your specific use case:


1. Combine Two Futures

If you have two CompletableFuture instances and need to combine their results, use the thenCombine method. It lets you specify how to merge the results of both futures.

Example:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

// Combine their results
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
    return result1 + " & " + result2; // Merge both results
});

combinedFuture.thenAccept(result -> System.out.println("Combined Result: " + result));

Output:

Combined Result: Task1 & Task2

2. Wait for All Futures to Complete

If you want to wait for multiple CompletableFuture tasks to complete (without necessarily combining their results immediately), you can use the allOf method. This is useful when you want to ensure that all tasks are completed before proceeding further.

Example:

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 3 completed"))
);

// Wait for all tasks to complete
allTasks.join();

System.out.println("All tasks completed.");

3. Combine Results of Multiple Futures

To combine the individual results of a list (or array) of CompletableFuture objects, you would use allOf in conjunction with further processing.

For example:

  1. Create a list/array of CompletableFuture.
  2. Use CompletableFuture.allOf() to wait for all of them.
  3. Extract and combine results using join.

Example:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "Result1"),
    CompletableFuture.supplyAsync(() -> "Result2"),
    CompletableFuture.supplyAsync(() -> "Result3")
);

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

// Gather results once all are completed
CompletableFuture<List<String>> resultFuture = allOf.thenApply(v ->
    futures.stream()
           .map(CompletableFuture::join) // Join and collect results
           .collect(Collectors.toList())
);

resultFuture.thenAccept(results -> System.out.println("Results: " + results));

Output:

Results: [Result1, Result2, Result3]

4. Wait for the First to Complete

If you are waiting for the first CompletableFuture to complete instead of all, use anyOf. It is useful when results from the first completed task suffice.

Example:

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> "First Result"),
    CompletableFuture.supplyAsync(() -> "Second Result"),
    CompletableFuture.supplyAsync(() -> "Third Result")
);

anyOf.thenAccept(result -> System.out.println("First completed task result: " + result));

Comparison of APIs for Combining Futures:

Method Purpose
thenCombine Combines two futures’ results into one.
thenCompose Chains dependent tasks (where one task’s result is input for the next).
allOf Waits for all futures in a list/array to complete but does not return their results.
anyOf Waits for the first future from a list/array to complete and returns its result.

By selecting the appropriate method (thenCombine, allOf, anyOf, or others), you can efficiently handle multiple asynchronous computations in Java with CompletableFuture.

How do I use CompletableFuture for async tasks?

CompletableFuture is a powerful tool in Java to implement asynchronous programming. It allows you to perform tasks in the background, chain multiple async tasks together, handle both success and failure scenarios, and combine multiple async computations.

Here’s a summary of how to use CompletableFuture with examples:


1. Run a task asynchronously

Use supplyAsync() (if the task produces a result) or runAsync() (if the task doesn’t return anything).

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running task in background...");
});

Or with a result:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello, World!";
});

2. Process the result

You can process the result of a CompletableFuture using methods like thenAccept or thenApply.

future.thenApply(result -> {
    System.out.println("Received result: " + result);
    return result.toUpperCase();
}).thenAccept(uppercaseResult -> {
    System.out.println("Transformed result: " + uppercaseResult);
});

3. Combine multiple tasks

You can run multiple tasks in parallel and combine their results.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (task1Result, task2Result) -> {
    return task1Result + " and " + task2Result;
});

combinedFuture.thenAccept(result -> {
    System.out.println("Combined Result: " + result);
});

4. Wait for all tasks

If you have multiple tasks and want to wait for all of them to complete, use CompletableFuture.allOf.

CompletableFuture<Void> allTasks = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> System.out.println("Task 1 completed")),
    CompletableFuture.runAsync(() -> System.out.println("Task 2 completed"))
);

allTasks.join(); // Blocks the thread and waits for completion
System.out.println("All tasks completed.");

5. Handle errors

You can handle exceptions in async tasks using exceptionally().

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Oops, something went wrong!");
}).exceptionally(ex -> {
    System.out.println("Error: " + ex.getMessage());
    return null;
});

6. Compose dependent tasks

Use thenCompose to chain dependent tasks where the second task depends on the result of the first one.

CompletableFuture.supplyAsync(() -> "Task 1 Result")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " Task 2 Result"))
    .thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));

7. Custom Executor

By default, CompletableFuture uses the ForkJoinPool.commonPool for async tasks. You can provide a custom executor for better control over threads.

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture.runAsync(() -> {
    System.out.println("Running task on custom executor");
}, executor);

Key Methods in CompletableFuture

Method Description
runAsync Run a task asynchronously (does not return result).
supplyAsync Run a task asynchronously and return a result.
thenApply Transform the result of a CompletableFuture.
thenAccept Consumes the result of a CompletableFuture (no further processing).
thenCompose Chains dependent tasks where the next uses the result of the previous.
thenCombine Combines two CompletableFuture results.
allOf / anyOf Wait for all or any of multiple futures to complete.
exceptionally Handle exceptions thrown during the async computation.
join / get Block and wait for the task to complete and retrieve its result (not recommended for non-blocking).

Example Workflow with Pipeline

Here’s an example pipeline:

  1. Fetch data
  2. Process it
  3. Save it
  4. Notify the user
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching data...");
    return "Raw Data";
}, executor).thenApply(data -> {
    System.out.println("Processing data...");
    return data.toUpperCase();
}).thenAccept(processedData -> {
    System.out.println("Saving: " + processedData);
}).thenRun(() -> {
    System.out.println("Notification: Done!");
}).exceptionally(ex -> {
    System.err.println("Pipeline failed due to: " + ex.getMessage());
    return null;
}).join();

Using CompletableFuture, you can build flexible, high-performance, and non-blocking applications.