How do I use the Java Util Concurrent Flow API?

The java.util.concurrent.Flow API, introduced in Java 9, is a low-level implementation of the Reactive Streams specification, providing an asynchronous, non-blocking framework for handling streams of data. It allows you to build reactive systems that support backpressure to handle large or variable amounts of data more efficiently.

Here’s a quick guide on how to use java.util.concurrent.Flow API:


Key Interfaces in java.util.concurrent.Flow

The API comprises four core interfaces:

  1. Flow.Publisher:
    Represents the producer of data. It publishes items to one or more subscribers.

  2. Flow.Subscriber:
    Represents the consumer of data. It subscribes to a publisher to receive data.

  3. Flow.Subscription:
    Represents a link between a publisher and a subscriber, allowing the subscriber to control how much data it receives (backpressure).

  4. Flow.Processor:
    Both a subscriber and a publisher, used to transform or process elements as they flow through the stream.


Implementation Workflow

To use the Flow API, you need to implement these interfaces. Below is a step-by-step explanation:

1. Create a Publisher

  • The Flow.Publisher interface has a single method subscribe(Subscriber<? super T> subscriber).
  • The publisher is responsible for connecting with subscribers and managing their subscriptions.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimplePublisher implements Flow.Publisher<String> {
    private final String[] items = {"Item 1", "Item 2", "Item 3"};

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        SubscriptionImpl subscription = new SubscriptionImpl(subscriber, items);
        subscriber.onSubscribe(subscription);
    }

    private static class SubscriptionImpl implements Flow.Subscription {
        private final Flow.Subscriber<? super String> subscriber;
        private final String[] items;
        private int currentIndex = 0;
        private boolean canceled = false;

        public SubscriptionImpl(Flow.Subscriber<? super String> subscriber, String[] items) {
            this.subscriber = subscriber;
            this.items = items;
        }

        @Override
        public void request(long n) {
            if (n <= 0) {
                subscriber.onError(new IllegalArgumentException("Must request a positive number of items"));
                return;
            }

            for (int i = 0; i < n && currentIndex < items.length; i++) {
                if (canceled) {
                    return;
                }
                subscriber.onNext(items[currentIndex++]);
            }

            if (currentIndex == items.length) {
                subscriber.onComplete();
            }
        }

        @Override
        public void cancel() {
            canceled = true;
        }
    }
}

2. Create a Subscriber

  • Implement the Flow.Subscriber interface (four methods) for receiving events from a publisher:
    • onSubscribe(Flow.Subscription subscription): Receive the subscription. You must request data here.
    • onNext(T item): Handle the next item of the stream.
    • onError(Throwable throwable): Handle any errors.
    • onComplete(): Called when the publisher finishes sending data.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimpleSubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Subscribed!");
        subscription.request(1); // Request the first item
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1); // Request the next item
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Complete!");
    }
}

3. Connect the Publisher to the Subscriber

  • Instantiate and link your Publisher and Subscriber.
package org.kodejava.util.concurrent;

public class FlowExample {
    public static void main(String[] args) {
        SimplePublisher publisher = new SimplePublisher();
        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);
    }
}

4. (Optional) Create a Processor

  • A Processor acts as both a Subscriber to transform data from an upstream publisher and a Publisher to pass it downstream.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class UppercaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(item.toUpperCase());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        close();
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete!");
        close();
    }
}

Example with SubmissionPublisher

Java also provides a SubmissionPublisher class, an implementation of Flow.Publisher, which simplifies creating Publishers.

package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("Subscribed!");
                subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
                subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Done!");
            }
        };

        publisher.subscribe(subscriber);

        System.out.println("Publishing data...");
        publisher.submit("Hello");
        publisher.submit("World");
        publisher.submit("!");

        Thread.sleep(100); // Allow time for processing
        publisher.close();
    }
}

Output of the Above Example

Publishing data...
Subscribed!
Received: Hello
Received: World
Received: !
Done!

Points to Remember

  1. Backpressure:
    • The subscriber can control how many items it wants to receive using the request() method of Flow.Subscription.
    • If the subscriber requests fewer items, the publisher will slow down and send only the requested number.
  2. Error Handling:
    • If something goes wrong, the onError() callback is invoked, allowing you to handle errors gracefully.
  3. Completion:
    • Once all elements are processed, the publisher calls onComplete() to indicate the sequence is finished.
  4. Threading:
    • The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the SubmissionPublisher uses a default ForkJoinPool to process items).

By combining custom implementations with the provided SubmissionPublisher and additional libraries, you can build reactive systems that are both powerful and resource-efficient.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.