How do I use CompletableFuture for reactive-style concurrency?

Using CompletableFuture in Java can be a powerful way to implement reactive-style concurrency. It provides a clean and functional way to perform asynchronous tasks, compose their results, and handle exceptions without blocking threads. Here’s an explanation with examples to guide you through its reactive-style usage:


Key Features of CompletableFuture

  1. Asynchronous execution – Run tasks on background threads.
  2. Chaining tasks – Perform dependent actions when a task completes using thenApply, thenAccept, thenCompose, etc.
  3. Combining tasks – Execute multiple tasks in parallel and combine their results using thenCombine, allOf, or anyOf.
  4. Exception handling – Handle errors gracefully with handle, exceptionally, or whenComplete.

Example Use Cases

1. Basic Asynchronous Execution

You can run a task asynchronously without blocking the main thread:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Simulate a long computation
            System.out.println("Running in background...");
            return "Result";
        }).thenAccept(result -> {
            // Use the result once completed
            System.out.println("Completed with: " + result);
        });

        System.out.println("Main thread is free to do other work...");
    }
}

Output:

Main thread is free to do other work...
Running in background...
Completed with: Result

2. Chaining Dependent Tasks

Reactive-style programming involves chaining tasks, which can be done with thenApply or thenCompose:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChaining {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // Fetch some data (simulate API/database call)
            return "Data from API";
        }).thenApply(data -> {
            // Transform the data
            return data.toUpperCase();
        }).thenAccept(processedData -> {
            // Use transformed data
            System.out.println("Processed Data: " + processedData);
        });
    }
}

3. Combining Multiple Async Tasks

To run multiple tasks in parallel and combine their results:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombine {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1 Result");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2 Result");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return result1 + " & " + result2;
        });

        combinedFuture.thenAccept(System.out::println);
    }
}

Output:

Task 1 Result & Task 2 Result

4. Waiting for All Tasks to Complete

If you need to wait for multiple independent tasks to complete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.List;

public class CompletableFutureAllOf {
    public static void main(String[] args) {
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> System.out.println("Task 1")),
                CompletableFuture.runAsync(() -> System.out.println("Task 2")),
                CompletableFuture.runAsync(() -> System.out.println("Task 3"))
        );

        // Wait for all tasks to complete
        allTasks.join();
        System.out.println("All tasks completed.");
    }
}

5. Handling Exceptions

You can handle exceptions gracefully with methods like exceptionally, handle, or whenComplete:

package org.kodejava.util.concurrent;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionHandling {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
                    // Simulate an error
                    if (true) throw new RuntimeException("Something went wrong!");
                    return "Task Result";
                })
                .exceptionally(ex -> {
                    System.out.println("Error: " + ex.getMessage());
                    return "Fallback Result";
                })
                .thenAccept(result -> System.out.println("Result: " + result));
    }
}

Output:

Error: Something went wrong!
Result: Fallback Result

6. Running Tasks in a Custom Executor

By default, CompletableFuture uses the common ForkJoinPool, but you can specify a custom executor:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;

public class CustomExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        CompletableFuture.runAsync(() -> {
            System.out.println("Task executing in custom executor");
        }, executor).thenRun(() -> executor.shutdown());
    }
}

Summary of Key Methods

Method Purpose
supplyAsync(Supplier) Run a computation in another thread and return a result.
runAsync(Runnable) Run a computation without returning a result.
thenApply(Function) Transform result of the stage.
thenCompose(Function) Chain another async computation dependent on the previous one.
thenAccept(Consumer) Consume the result.
thenCombine(CompletableFuture, BiFunction) Combine results of two independent computations.
allOf(CompletableFuture...) Wait for all tasks to complete.
anyOf(CompletableFuture...) Return as soon as any task is complete.
exceptionally(Function) Handle exceptions and provide a fallback value.
handle(BiFunction) Process the result or handle exceptions.

Benefits of Using CompletableFuture for Reactive Programming

  • Non-blocking and efficient concurrency.
  • Easier composition of asynchronous operations compared to traditional threads.
  • Fine-grained exception handling and coordination of parallel tasks.
  • Works well with APIs like REST or streaming in a reactive pipeline.

By taking advantage of these features, you can implement clean, reactive, and efficient systems.

How do I configure a custom thread factory for better debugging?

Configuring a custom thread factory can enhance debugging by customizing the naming and behavior of threads you create for your application. By providing meaningful names to threads and optionally logging their creation, you can significantly simplify debugging and profiling, especially in multi-threaded environments.

Here’s how you can configure a custom thread factory in Java:


Steps to Configure a Custom Thread Factory

  1. Implement a Custom ThreadFactory
    Create a custom class that implements the java.util.concurrent.ThreadFactory interface.

  2. Customize Thread Creation
    Override the newThread() method to provide specific thread naming, priorities, daemon flags, or other settings.

  3. Make the Threads Traceable
    Use meaningful thread names (e.g., include a prefix to indicate the purpose), which can be extremely helpful in logs during debugging.


Example of a Custom Thread Factory

Below is a code example of a custom thread factory:

package org.kodejava.util.concurrent;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DebuggableThreadFactory implements ThreadFactory {

   private final String threadNamePrefix;
   private final boolean daemon;
   private final int threadPriority;
   private final AtomicInteger threadCount = new AtomicInteger(1);

   public DebuggableThreadFactory(String threadNamePrefix, boolean daemon, int threadPriority) {
      this.threadNamePrefix = threadNamePrefix != null ? threadNamePrefix : "Thread";
      this.daemon = daemon;
      this.threadPriority = threadPriority;
   }

   @Override
   public Thread newThread(Runnable r) {
      String threadName = threadNamePrefix + "-" + threadCount.getAndIncrement();
      Thread thread = new Thread(r, threadName);
      thread.setDaemon(daemon);
      thread.setPriority(threadPriority);

      // For debugging, log thread creation
      System.out.println("Created thread: " + thread.getName() +
                         ", Daemon: " + daemon +
                         ", Priority: " + thread.getPriority());
      return thread;
   }
}

How to Use the Custom Thread Factory

You can use this custom thread factory to create executor services or individual threads:

Using with an ExecutorService:

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
   public static void main(String[] args) {
      DebuggableThreadFactory threadFactory =
              new DebuggableThreadFactory("Worker", false, Thread.NORM_PRIORITY);

      try (ExecutorService executorService = Executors.newFixedThreadPool(5, threadFactory)) {
         executorService.submit(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));
         executorService.shutdown();
      }
   }
}

Creating Individual Threads:

package org.kodejava.util.concurrent;

public class Main {
   public static void main(String[] args) {
      DebuggableThreadFactory threadFactory =
              new DebuggableThreadFactory("CustomThread", true, Thread.MAX_PRIORITY);

      Thread customThread = threadFactory.newThread(() -> {
         System.out.println("Running in: " + Thread.currentThread().getName());
      });

      customThread.start();
   }
}

Key Features of the Example

  1. Thread Naming:
    • Threads are named with a prefix and a counter (Worker-1, Worker-2, etc.).
    • Helps identify which thread is handling which task during debugging.
  2. Daemon Threads:
    • You can optionally configure threads as daemon or non-daemon.
    • Daemon threads do not prevent the JVM from exiting.
  3. Thread Priority:
    • You can set thread priorities (e.g., Thread.NORM_PRIORITY, Thread.MAX_PRIORITY, etc.).
  4. Debugging Logs:
    • Logs thread creation for visibility.
  5. Atomic Synchronization:
    • Ensures thread-safe counters when generating unique thread names.

Further Improvements

  • Custom Uncaught Exception Handlers:
    Set an uncaught exception handler for catching unhandled exceptions:

    thread.setUncaughtExceptionHandler((t, e) -> {
      System.err.println("Uncaught exception in thread " + t.getName() + ": " + e.getMessage());
    });
    
  • Thread Context Information:
    Consider associating thread-local variables to store additional debugging details when necessary.

By using this approach, you’ll gain greater control over thread behavior and be better equipped for debugging multi-threaded applications.

How do I process tasks as they complete using CompletionService?

To process tasks as they complete using a CompletionService in Java, you can take advantage of the ExecutorCompletionService. This class provides a mechanism to submit tasks for execution and retrieve their results in the order of completion, rather than the order of submission.

Key Steps for Using CompletionService:

  1. Create an ExecutorService: This handles the thread pooling for concurrent task execution.
  2. Create an ExecutorCompletionService: Wrap the ExecutorService in an ExecutorCompletionService.
  3. Submit Tasks: Use the submit method to submit tasks to the CompletionService.
  4. Process Results as They Complete: Retrieve the results using the poll or take methods of CompletionService.

Example Code:

package org.kodejava.util.concurrent;

import java.util.concurrent.*;

public class CompletionServiceExample {

   public static void main(String[] args) throws InterruptedException {
      int numTasks = 5;

      // Step 1: Create an ExecutorService with fixed thread pool
      ExecutorService executorService = Executors.newFixedThreadPool(3);

      // Step 2: Create an ExecutorCompletionService
      CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

      // Step 3: Submit tasks to the CompletionService
      for (int i = 0; i < numTasks; i++) {
         int taskId = i;
         completionService.submit(() -> {
            Thread.sleep((long) (Math.random() * 2000)); // Simulate work
            return "Result from Task " + taskId;
         });
      }

      // Step 4: Process tasks as they complete
      for (int i = 0; i < numTasks; i++) {
         try {
            Future<String> resultFuture = completionService.take(); // Retrieves the next completed task
            String result = resultFuture.get(); // Blocks until the result is available
            System.out.println(result);
         } catch (ExecutionException e) {
            System.err.println("Task execution failed: " + e.getMessage());
         }
      }

      // Shutdown the ExecutorService
      executorService.shutdown();
   }
}

Explanation of the Code:

  1. ExecutorService: A thread pool of 3 worker threads is created using Executors.newFixedThreadPool(3).
  2. ExecutorCompletionService: Wraps the ExecutorService to handle submission and retrieval of tasks.
  3. Submitting Tasks: Each task is computed in the background and asynchronously submitted to the completionService.
  4. Result Retrieval:
    • The completionService.take() method blocks until the next completed task result is available.
    • completionService.poll() could also be used if you want non-blocking retrieval (e.g., you check if a result is ready).
  5. Task Results in Completion Order: Results are processed as tasks complete, regardless of their submission order.

When to Use CompletionService

  • When you want to process tasks as they finish, rather than waiting for all tasks to complete.
  • In scenarios where tasks may have uneven execution times, and you want to immediately handle the results of the fastest tasks.

How do I cancel long-running tasks in ExecutorService?

To cancel long-running tasks in an ExecutorService, you can use the Future object returned when you submit a task and invoke its cancel method. Below are the steps and some important considerations for canceling tasks:

1. Submit Tasks to the ExecutorService

When you submit a task to an ExecutorService, it returns a Future object that can be used to monitor the task’s progress and cancel it if needed.

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<?> future = executor.submit(() -> {
    // Simulate a long-running task
    try {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Running...");
            Thread.sleep(1000);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt(); // Restore the interrupted status
        System.out.println("Task interrupted.");
    }
});

2. Cancel the Task

To cancel the task, invoke the cancel method on the Future object:

// Cancel the task after 5 seconds
Thread.sleep(5000); // Simulating some delay
boolean wasCancelled = future.cancel(true); // true means interrupt if running

System.out.println("Task cancelled: " + wasCancelled);
  • cancel(true) attempts to stop the execution of the task by interrupting the thread running it. For this to work, the task must regularly check its interrupted status (via Thread.interrupted() or Thread.currentThread().isInterrupted()) and gracefully terminate if interrupted.
  • cancel(false) does not interrupt the running task but prevents it from starting if it hasn’t already begun.

3. Handle Interruption Gracefully

For the cancellation to work, ensure that the task checks the interrupted status and responds accordingly. The task should periodically call Thread.interrupted() or Thread.currentThread().isInterrupted() to detect interruptions.

try {
    while (!Thread.currentThread().isInterrupted()) {
        // Simulate work
        System.out.println("Working...");
        Thread.sleep(1000); // This can throw InterruptedException
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // Re-set the interrupted status
    System.out.println("Task interrupted and exiting.");
}

4. Shutdown the ExecutorService

Once you’re done submitting tasks, shut down the ExecutorService to release resources:

executor.shutdown(); // Wait for running tasks to complete
try {
    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        executor.shutdownNow(); // Forcefully shut down if tasks don't finish in time
    }
} catch (InterruptedException e) {
    executor.shutdownNow(); // Force an immediate shutdown
    Thread.currentThread().interrupt(); // Reset the interrupted status
}

Keynotes

  • Interruptible Tasks: The tasks you submit must be designed to handle interruptions for cancellation to effectively work. For example, long-running loops or blocking calls should handle the interrupted status.
  • Blocking Methods: If the task is waiting on a blocking call (e.g., Thread.sleep(), Object.wait(), Future.get()), calling cancel(true) will usually interrupt these methods.
  • Non-Interruptible Work: If the task is not interruptible (e.g., performing intensive computations without checking the interrupted flag), cancel(true) will not have an immediate effect.
  • Future API: You can also check the status of a task using methods like isDone(), isCancelled() before or after attempting to cancel it.

This approach ensures your long-running task can be terminated gracefully and resourcefully.

How do I control access to resources using Semaphore?

To control access to shared resources in a multithreaded environment, a Semaphore is frequently used, which is part of the java.util.concurrent package. A semaphore manages a set number of permits that control how many threads can access a shared resource simultaneously. Threads acquire permits before accessing the resource and release the permits after they are done, ensuring controlled and synchronized access.

Here’s how you can use a semaphore to control access to resources:

1. Key Points About Semaphore:

  • Permits: The semaphore holds a set number of permits, which represent the number of threads that can access the resource concurrently.
  • Acquire/Release:
    • A thread must acquire a permit using the acquire() method to access the resource.
    • It must release the permit using release() after finishing its access to the resource.
  • Blocking Behavior: If no permits are available, the acquiring thread will block until a permit is released by another thread.

  • Fairness: You can construct a semaphore in a fair mode to ensure that waiting threads acquire permits in the order they requested them.

2. Example: Semaphore with Limited Access to Resources

Here is a simple example where a semaphore is used to control access to a shared resource (e.g., a connection pool or a printer):

package org.kodejava.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    // Semaphore initialized with 2 permits (only 2 threads can access simultaneously).
    private static final Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        // Create a thread pool with 5 threads
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // Simulate 5 threads trying to access the shared resource
        for (int i = 1; i <= 5; i++) {
            final int threadId = i;
            executorService.submit(() -> {
                try {
                    // Try to acquire a permit
                    System.out.println("Thread " + threadId + " is trying to acquire a permit.");
                    semaphore.acquire();  // Blocks if no permit is available

                    // Access the shared resource
                    System.out.println("Thread " + threadId + " has acquired a permit.");
                    Thread.sleep(2000);  // Simulate the resource usage

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // Release the permit after use
                    System.out.println("Thread " + threadId + " is releasing the permit.");
                    semaphore.release();
                }
            });
        }

        executorService.shutdown();
    }
}

3. How It Works:

  1. Initialization: The semaphore is initialized with a number of permits (new Semaphore(2)), allowing only 2 threads to access the resource concurrently.
  2. Acquire: A thread attempts to access the resource by calling semaphore.acquire(). If permits are unavailable, the thread is blocked until a permit is released by another thread.
  3. Critical Section: Once the permit is acquired, it enters the critical section and uses the resource.
  4. Release: After the thread is done using the resource, it calls semaphore.release() to return a permit, allowing other threads to acquire it.

4. Output Example:

When you run the example above, you might see an output like this, showing how only 2 threads can access the resource simultaneously:

Thread 1 is trying to acquire a permit.
Thread 1 has acquired a permit.
Thread 2 is trying to acquire a permit.
Thread 2 has acquired a permit.
Thread 3 is trying to acquire a permit.
Thread 1 is releasing the permit.
Thread 3 has acquired a permit.
Thread 4 is trying to acquire a permit.
Thread 2 is releasing the permit.
Thread 4 has acquired a permit.
Thread 5 is trying to acquire a permit.
Thread 3 is releasing the permit.
Thread 5 has acquired a permit.
Thread 4 is releasing the permit.
Thread 5 is releasing the permit.

Here, only 2 threads are allowed to acquire permits at a time, while others are blocked until permits are released.

5. Fair Ordering:

If you want the semaphore to provide fairness (FIFO order), you can use the constructor:

Semaphore semaphore = new Semaphore(2, true);

The second argument (true) enables fair ordering, making sure the threads acquire permits in the order they requested them.

6. Use Cases:

  • Database Connection Pools: Managing the number of simultaneous connections to a database.
  • Printers: Limiting how many jobs can access a shared printer.
  • Rate Limiting: Throttling the number of threads processing tasks in high-volume systems.