/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.ArrayList;
import java.util.stream.Stream;
/**
* Interrelated interfaces and static methods for establishing
* flow-controlled components in which {@link Publisher Publishers}
* produce items consumed by one or more {@link Subscriber
* Subscribers}, each managed by a {@link Subscription
* Subscription}. The use of flow control helps address common
* resource issues in "push" based asynchronous systems.
*
*
These interfaces correspond to the reactive-streams
* specification. (Preliminary release note: This spec is
* not yet finalized, so minor details could change.)
*
*
Preliminary release note: This class may later include
* methods supporting periodic events and/or async IO.
*
* @author Doug Lea
* @since 1.9
*/
public final class Flow {
private Flow() {} // uninstantiable
/**
* A producer of items (and related control messages) received by
* Subscribers. Each current {@link Subscriber} receives the same
* items (via method onNext) in the same order, unless drops or
* errors are encountered. If a Publisher encounters an error that
* does not allow further items to be issued to a Subscriber, that
* Subscriber receives onError, and then receives no further
* messages. Otherwise, if it is known that no further items will
* be produced, each Subscriber receives onComplete. Publishers
* may vary in policy about whether drops (failures to issue an
* item because of resource limitations) are treated as errors.
* Publishers may also vary about whether Subscribers receive
* items that were produced or available before they subscribed.
*
* @param the published item type
*/
@FunctionalInterface
public static interface Publisher {
/**
* Adds the given Subscriber if possible. If already
* subscribed, or the attempt to subscribe fails due to policy
* violations or errors, the Subscriber's onError method is
* invoked with an IllegalStateException. Otherwise, upon
* success, the Subscriber's onSubscribe method is invoked
* with a new Subscription. Subscribers may enable receiving
* items by invoking the request method of this Subscription,
* and may unsubscribe by invoking its cancel method.
*
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
public void subscribe(Subscriber super T> subscriber);
}
/**
* A receiver of messages. The methods in this interface must be
* invoked sequentially by each Subscription, so are not required
* to be thread-safe unless subscribing to multiple publishers.
*
* Items (invocations of {@link #onNext}) are not issued unless
* requested, but multiple items may be requested. Many
* Subscriber implementations can arrange this in the style of the
* following example, where a request size of 1 single-steps, and
* larger sizes (for example 64) usually allow for more efficient
* overlapped processing. (See also {@link #consume(long, Publisher,
* Consumer)} that automates a common case.)
*
*
{@code
* class SampleSubscriber implements Subscriber {
* final Consumer super T> consumer;
* Subscription subscription;
* final long requestSize;
* long count;
* SampleSubscriber(long requestSize, Consumer super T> consumer) {
* this.requestSize = requestSize;
* this.consumer = consumer;
* }
* public void onSubscribe(Subscription subscription) {
* count = requestSize / 2; // re-request when half consumed
* (this.subscription = subscription).request(requestSize);
* }
* public void onNext(T item) {
* if (--count <= 0)
* subscription.request(count = requestSize);
* consumer.accept(item);
* }
* public void onError(Throwable ex) { ex.printStackTrace(); }
* public void onComplete() {}
* }}
*
* @param the subscribed item type
*/
public static interface Subscriber {
/**
* Method invoked prior to invoking any other Subscriber
* methods for the given Subscription. If this method throws
* an exception, resulting behavior is not guaranteed, but may
* cause the Subscription to be cancelled.
*
* Typically, implementations of this method invoke the
* subscription's request method to enable receiving items.
*
* @param subscription a new subscription
*/
public void onSubscribe(Subscription subscription);
/**
* Method invoked with a Subscription's next item. If this
* method throws an exception, resulting behavior is not
* guaranteed, but may cause the Subscription to be cancelled.
*
* @param item the item
*/
public void onNext(T item);
/**
* Method invoked upon an unrecoverable error encountered by a
* Publisher or Subscription, after which no other Subscriber
* methods are invoked by the Subscription. If this method
* itself throws an exception, resulting behavior is
* undefined.
*
* @param throwable the exception
*/
public void onError(Throwable throwable);
/**
* Method invoked when it is known that no additional onNext
* invocations will occur for a Subscription that is not
* already terminated by error, after which no other
* Subscriber methods are invoked by the Subscription. If
* this method itself throws an exception, resulting behavior
* is undefined.
*/
public void onComplete();
}
/**
* Message control linking Publishers and Subscribers.
* Subscribers receive items (via onNext) only when requested, and
* may cancel at any time. The methods in this interface are
* intended to be invoked only by their Subscribers.
*/
public static interface Subscription {
/**
* Adds the given number {@code n} of items to the current
* unfulfilled demand for this subscription. If {@code n} is
* negative, the Subscriber will receive an onError signal
* with an IllegalArgumentException argument. Otherwise, the
* Subscriber will receive up to {@code n} additional onNext
* invocations (or fewer if terminated).
*
* @param n the increment of demand; a value of {@code
* Long.MAX_VALUE} may be considered as effectively unbounded
*/
public void request(long n);
/**
* Causes the Subscriber to (eventually) stop receiving onNext
* messages.
*/
public void cancel();
}
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param the subscribed item type
* @param the published item type
*/
public static interface Processor extends Subscriber, Publisher {
}
// Support for static methods
static final long DEFAULT_REQUEST_SIZE = 64L;
abstract static class CompletableSubscriber implements Subscriber,
Consumer {
final CompletableFuture status;
Subscription subscription;
final long requestSize;
long count;
CompletableSubscriber(long requestSize,
CompletableFuture status) {
this.status = status;
this.requestSize = requestSize;
this.count = requestSize >>> 1;
}
public final void onSubscribe(Subscription subscription) {
(this.subscription = subscription).request(requestSize);
status.exceptionally(ex -> { subscription.cancel(); return null;});
}
public final void onError(Throwable ex) {
if (ex == null)
ex = new IllegalStateException("null onError argument");
status.completeExceptionally(ex);
}
public void onNext(T item) {
Subscription s = subscription;
if (s == null)
status.completeExceptionally(
new IllegalStateException("onNext without subscription"));
else {
try {
if (--count <= 0)
s.request(count = requestSize);
accept(item);
} catch (Throwable ex) {
status.completeExceptionally(ex);
}
}
}
}
static final class ConsumeSubscriber extends CompletableSubscriber {
final Consumer super T> consumer;
ConsumeSubscriber(long requestSize,
CompletableFuture status,
Consumer super T> consumer) {
super(requestSize, status);
this.consumer = consumer;
}
public void accept(T item) { consumer.accept(item); }
public void onComplete() { status.complete(null); }
}
/**
* Creates and subscribes a Subscriber that consumes all items
* from the given publisher using the given Consumer function, and
* using the given requestSize for buffering. Returns a
* CompletableFuture that is completed normally when the publisher
* signals onComplete, or completed exceptionally upon any error,
* including an exception thrown by the Consumer (in which case
* the subscription is cancelled if not already terminated).
*
* @param the published item type
* @param requestSize the request size for subscriptions
* @param publisher the publisher
* @param consumer the function applied to each onNext item
* @return a CompletableFuture that is completed normally
* when the publisher signals onComplete, and exceptionally
* upon any error
* @throws NullPointerException if publisher or consumer are null
* @throws IllegalArgumentException if requestSize not positive
*/
public static CompletableFuture consume(
long requestSize, Publisher publisher, Consumer super T> consumer) {
if (requestSize <= 0L)
throw new IllegalArgumentException("requestSize must be positive");
if (publisher == null || consumer == null)
throw new NullPointerException();
CompletableFuture status = new CompletableFuture<>();
publisher.subscribe(new ConsumeSubscriber(
requestSize, status, consumer));
return status;
}
/**
* Equivalent to {@link #consume(long, Publisher, Consumer)}
* with a request size of 64.
*
* @param the published item type
* @param publisher the publisher
* @param consumer the function applied to each onNext item
* @return a CompletableFuture that is completed normally
* when the publisher signals onComplete, and exceptionally
* upon any error
* @throws NullPointerException if publisher or consumer are null
*/
public static CompletableFuture consume(
Publisher publisher, Consumer super T> consumer) {
return consume(DEFAULT_REQUEST_SIZE, publisher, consumer);
}
/**
* Temporary implementation for Stream, collecting all items
* and then applying stream operation.
*/
static final class StreamSubscriber extends CompletableSubscriber {
final Function super Stream, ? extends R> fn;
final ArrayList items;
StreamSubscriber(long requestSize,
CompletableFuture status,
Function super Stream, ? extends R> fn) {
super(requestSize, status);
this.fn = fn;
this.items = new ArrayList();
}
public void accept(T item) { items.add(item); }
public void onComplete() { status.complete(fn.apply(items.stream())); }
}
/**
* Creates and subscribes a Subscriber that applies the given
* stream operation to items, and uses the given requestSize for
* buffering. Returns a CompletableFuture that is completed
* normally with the result of this function when the publisher
* signals onComplete, or is completed exceptionally upon any
* error.
*
* Preliminary release note: Currently, this method
* collects all items before executing the stream
* computation. Improvements are pending Stream integration.
*
* @param the published item type
* @param the result type of the stream function
* @param requestSize the request size for subscriptions
* @param publisher the publisher
* @param streamFunction the operation on elements
* @return a CompletableFuture that is completed normally with the
* result of the given function as result when the publisher signals
* onComplete, and exceptionally upon any error
* @throws NullPointerException if publisher or function are null
* @throws IllegalArgumentException if requestSize not positive
*/
public static CompletableFuture stream(
long requestSize, Publisher publisher,
Function super Stream, ? extends R> streamFunction) {
if (requestSize <= 0L)
throw new IllegalArgumentException("requestSize must be positive");
if (publisher == null || streamFunction == null)
throw new NullPointerException();
CompletableFuture status = new CompletableFuture<>();
publisher.subscribe(new StreamSubscriber(
requestSize, status, streamFunction));
return status;
}
/**
* Equivalent to {@link #stream(long, Publisher, Function)}
* with a request size of 64.
*
* @param the published item type
* @param the result type of the stream function
* @param publisher the publisher
* @param streamFunction the operation on elements
* @return a CompletableFuture that is completed normally with the
* result of the given function as result when the publisher signals
* onComplete, and exceptionally upon any error
* @throws NullPointerException if publisher or function are null
*/
public static CompletableFuture stream(
Publisher publisher,
Function super Stream,? extends R> streamFunction) {
return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
}
}