ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.1
Committed: Thu Jan 15 15:58:31 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Log Message:
Initial commits

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Consumer;
9     import java.util.function.Function;
10     import java.util.ArrayList;
11     import java.util.stream.Stream;
12    
13     /**
14     * Interrelated interfaces and static methods for establishing
15     * flow-controlled components in which {@link Publisher Publishers}
16     * produce items consumed by one or more {@link Subscriber
17     * Subscribers}, each managed by a {@link Subscription
18     * Subscription}. The use of flow control helps address common
19     * resource issues in "push" based asynchronous systems.
20     *
21     * <p>These interfaces correspond to the <a
22     * href="http://www.reactive-streams.org/"> reactive-streams</a>
23     * specification. (<b>Preliminary release note:</b> This spec is
24     * not yet finalized, so minor details could change.)
25     *
26     * <p><b>Preliminary release note:</b> This class may later include
27     * methods supporting periodic events and/or async IO.
28     *
29     * @author Doug Lea
30     * @since 1.9
31     */
32     public final class Flow {
33    
34     private Flow() {} // uninstantiable
35    
36     /**
37     * A producer of items (and related control messages) received by
38     * Subscribers. Each current {@link Subscriber} receives the same
39     * items (via method onNext) in the same order, unless drops or
40     * errors are encountered. If a Publisher encounters an error that
41     * does not allow further items to be issued to a Subscriber, that
42     * Subscriber receives onError, and then receives no further
43     * messages. Otherwise, if it is known that no further items will
44     * be produced, each Subscriber receives onComplete. Publishers
45     * may vary in policy about whether drops (failures to issue an
46     * item because of resource limitations) are treated as errors.
47     * Publishers may also vary about whether Subscribers receive
48     * items that were produced or available before they subscribed.
49     *
50     * @param <T> the published item type
51     */
52     @FunctionalInterface
53     public static interface Publisher<T> {
54     /**
55     * Adds the given Subscriber if possible. If already
56     * subscribed, or the attempt to subscribe fails due to policy
57     * violations or errors, the Subscriber's onError method is
58     * invoked with an IllegalStateException. Otherwise, upon
59     * success, the Subscriber's onSubscribe method is invoked
60     * with a new Subscription. Subscribers may enable receiving
61     * items by invoking the request method of this Subscription,
62     * and may unsubscribe by invoking its cancel method.
63     *
64     * @param subscriber the subscriber
65     * @throws NullPointerException if subscriber is null
66     */
67     public void subscribe(Subscriber<? super T> subscriber);
68     }
69    
70     /**
71     * A receiver of messages. The methods in this interface must be
72     * invoked sequentially by each Subscription, so are not required
73     * to be thread-safe unless subscribing to multiple publishers.
74     *
75     * <p>Items (invocations of {@link #onNext}) are not issued unless
76     * requested, but multiple items may be requested. Many
77     * Subscriber implementations can arrange this in the style of the
78     * following example, where a request size of 1 single-steps, and
79     * larger sizes (for example 64) usually allow for more efficient
80     * overlapped processing. (See also {@link #consume(long, Publisher,
81     * Consumer)} that automates a common case.)
82     *
83     * <pre> {@code
84     * class SampleSubscriber<T> implements Subscriber<T> {
85     * final Consumer<? super T> consumer;
86     * Subscription subscription;
87     * final long requestSize;
88     * long count;
89     * SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
90     * this.requestSize = requestSize;
91     * this.consumer = consumer;
92     * }
93     * public void onSubscribe(Subscription subscription) {
94     * count = requestSize / 2; // re-request when half consumed
95     * (this.subscription = subscription).request(requestSize);
96     * }
97     * public void onNext(T item) {
98     * if (--count <= 0)
99     * subscription.request(count = requestSize);
100     * consumer.accept(item);
101     * }
102     * public void onError(Throwable ex) { ex.printStackTrace(); }
103     * public void onComplete() {}
104     * }}</pre>
105     *
106     * @param <T> the subscribed item type
107     */
108     public static interface Subscriber<T> {
109     /**
110     * Method invoked prior to invoking any other Subscriber
111     * methods for the given Subscription. If this method throws
112     * an exception, resulting behavior is not guaranteed, but may
113     * cause the Subscription to be cancelled.
114     *
115     * <p>Typically, implementations of this method invoke the
116     * subscription's request method to enable receiving items.
117     *
118     * @param subscription a new subscription
119     */
120     public void onSubscribe(Subscription subscription);
121    
122     /**
123     * Method invoked with a Subscription's next item. If this
124     * method throws an exception, resulting behavior is not
125     * guaranteed, but may cause the Subscription to be cancelled.
126     *
127     * @param item the item
128     */
129     public void onNext(T item);
130    
131     /**
132     * Method invoked upon an unrecoverable error encountered by a
133     * Publisher or Subscription, after which no other Subscriber
134     * methods are invoked by the Subscription. If this method
135     * itself throws an exception, resulting behavior is
136     * undefined.
137     *
138     * @param throwable the exception
139     */
140     public void onError(Throwable throwable);
141    
142     /**
143     * Method invoked when it is known that no additional onNext
144     * invocations will occur for a Subscription that is not
145     * already terminated by error, after which no other
146     * Subscriber methods are invoked by the Subscription. If
147     * this method itself throws an exception, resulting behavior
148     * is undefined.
149     */
150     public void onComplete();
151     }
152    
153     /**
154     * Message control linking Publishers and Subscribers.
155     * Subscribers receive items (via onNext) only when requested, and
156     * may cancel at any time. The methods in this interface are
157     * intended to be invoked only by their Subscribers.
158     */
159     public static interface Subscription {
160     /**
161     * Adds the given number {@code n} of items to the current
162     * unfulfilled demand for this subscription. If {@code n} is
163     * negative, the Subscriber will receive an onError signal
164     * with an IllegalArgumentException argument. Otherwise, the
165     * Subscriber will receive up to {@code n} additional onNext
166     * invocations (or fewer if terminated).
167     *
168     * @param n the increment of demand; a value of {@code
169     * Long.MAX_VALUE} may be considered as effectively unbounded
170     */
171     public void request(long n);
172    
173     /**
174     * Causes the Subscriber to (eventually) stop receiving onNext
175     * messages
176     */
177     public void cancel();
178     }
179    
180     /**
181     * A component that acts as both a Subscriber and Publisher.
182     *
183     * @param <T> the subscribed item type
184     * @param <R> the published item type
185     */
186     public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
187     }
188    
189     // Support for static methods
190    
191     static final long DEFAULT_REQUEST_SIZE = 64L;
192    
193     static abstract class CompletableSubscriber<T,U> implements Subscriber<T>,
194     Consumer<T> {
195     final CompletableFuture<U> status;
196     Subscription subscription;
197     final long requestSize;
198     long count;
199     CompletableSubscriber(long requestSize,
200     CompletableFuture<U> status) {
201     this.status = status;
202     this.requestSize = requestSize;
203     this.count = requestSize >>> 1;
204     }
205     public final void onSubscribe(Subscription subscription) {
206     (this.subscription = subscription).request(requestSize);
207     status.exceptionally(ex -> { subscription.cancel(); return null;});
208     }
209     public final void onError(Throwable ex) {
210     if (ex == null)
211     ex = new IllegalStateException("null onError argument");
212     status.completeExceptionally(ex);
213     }
214     public void onNext(T item) {
215     Subscription s = subscription;
216     if (s == null)
217     status.completeExceptionally(
218     new IllegalStateException("onNext without subscription"));
219     else {
220     try {
221     if (--count <= 0)
222     s.request(count = requestSize);
223     accept(item);
224     } catch (Throwable ex) {
225     status.completeExceptionally(ex);
226     }
227     }
228     }
229     }
230    
231     static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
232     final Consumer<? super T> consumer;
233     ConsumeSubscriber(long requestSize,
234     CompletableFuture<Void> status,
235     Consumer<? super T> consumer) {
236     super(requestSize, status);
237     this.consumer = consumer;
238     }
239     public void accept(T item) { consumer.accept(item); }
240     public void onComplete() { status.complete(null); }
241     }
242    
243     /**
244     * Creates and subscribes a Subscriber that consumes all items
245     * from the given publisher using the given Consumer function, and
246     * using the given requestSize for buffering. Returns a
247     * CompletableFuture that is completed normally when the publisher
248     * signals onComplete, or completed exceptionally upon any error,
249     * including an exception thrown by the Consumer (in which case
250     * the subscription is cancelled if not already terminated).
251     *
252     * @param requestSize the request size for subscriptions
253     * @param publisher the publisher
254     * @param consumer the function applied to each onNext item
255     * @return a CompletableFuture that is completed normally
256     * when the publisher signals onComplete, and exceptionally
257     * upon any error.
258     * @throws NullPointerException if publisher or consumer are null
259     * @throws IllegalArgumentException if requestSize not positive
260     * @param <T> the published item type
261     */
262     public static <T> CompletableFuture<Void> consume(
263     long requestSize, Publisher<T> publisher, Consumer<? super T> consumer) {
264     if (requestSize <= 0L)
265     throw new IllegalArgumentException("requestSize must be positive");
266     if (publisher == null || consumer == null)
267     throw new NullPointerException();
268     CompletableFuture<Void> status = new CompletableFuture<>();
269     publisher.subscribe(new ConsumeSubscriber<T>(
270     requestSize, status, consumer));
271     return status;
272     }
273    
274     /**
275     * Equivalent to {@link #consume(long, Publisher, Consumer)}
276     * with a request size of 64.
277     *
278     * @param publisher the publisher
279     * @param consumer the function applied to each onNext item
280     * @return a CompletableFuture that is completed normally
281     * when the publisher signals onComplete, and exceptionally
282     * upon any error.
283     * @throws NullPointerException if publisher or consumer are null
284     * @param <T> the published item type
285     */
286     public static <T> CompletableFuture<Void> consume(
287     Publisher<T> publisher, Consumer<? super T> consumer) {
288     return consume(DEFAULT_REQUEST_SIZE, publisher, consumer);
289     }
290    
291     /**
292     * Temporary implementation for Stream, collecting all items
293     * and then applying stream operation.
294     */
295     static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
296     final Function<? super Stream<T>, ? extends R> fn;
297     final ArrayList<T> items;
298     StreamSubscriber(long requestSize,
299     CompletableFuture<R> status,
300     Function<? super Stream<T>, ? extends R> fn) {
301     super(requestSize, status);
302     this.fn = fn;
303     this.items = new ArrayList<T>();
304     }
305     public void accept(T item) { items.add(item); }
306     public void onComplete() { status.complete(fn.apply(items.stream())); }
307     }
308    
309     /**
310     * Creates and subscribes a Subscriber that applies the given
311     * stream operation to items, and uses the given requestSize for
312     * buffering. Returns a CompletableFuture that is completed
313     * normally with the result of this function when the publisher
314     * signals onComplete, or is completed exceptionally upon any
315     * error.
316     *
317     * <p><b>Preliminary release note:</b> Currently, this method
318     * collects all items before executing the stream
319     * computation. Improvements are pending Stream integration.
320     *
321     * @param requestSize the request size for subscriptions
322     * @param publisher the publisher
323     * @param streamFunction the operation on elements
324     * @return a CompletableFuture that is completed normally with the
325     * result of the given function as result when the publisher signals
326     * onComplete, and exceptionally upon any error.
327     * @throws NullPointerException if publisher or function are null
328     * @throws IllegalArgumentException if requestSize not positive
329     * @param <T> the published item type
330     * @param <R> the result type of the stream function
331     */
332     public static <T,R> CompletableFuture<R> stream(
333     long requestSize, Publisher<T> publisher,
334     Function<? super Stream<T>, ? extends R> streamFunction) {
335     if (requestSize <= 0L)
336     throw new IllegalArgumentException("requestSize must be positive");
337     if (publisher == null || streamFunction == null)
338     throw new NullPointerException();
339     CompletableFuture<R> status = new CompletableFuture<>();
340     publisher.subscribe(new StreamSubscriber<T,R>(
341     requestSize, status, streamFunction));
342     return status;
343     }
344    
345     /**
346     * Equivalent to {@link #stream(long, Publisher, Function)}
347     * with a request size of 64.
348     *
349     * @param publisher the publisher
350     * @param streamFunction the operation on elements
351     * @return a CompletableFuture that is completed normally with the
352     * result of the given function as result when the publisher signals
353     * onComplete, and exceptionally upon any error.
354     * @throws NullPointerException if publisher or function are null
355     * @param <T> the published item type
356     * @param <R> the result type of the stream function
357     */
358     public static <T,R> CompletableFuture<R> stream(
359     Publisher<T> publisher,
360     Function<? super Stream<T>,? extends R> streamFunction) {
361     return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
362     }
363    
364    
365     }