ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.14
Committed: Mon Jan 19 13:59:35 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.13: +68 -50 lines
Log Message:
Doc improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.function.Consumer;
11 import java.util.function.Function;
12 import java.util.stream.Stream;
13
14 /**
15 * Interrelated interfaces and static methods for establishing
16 * flow-controlled components in which {@link Publisher Publishers}
17 * produce items consumed by one or more {@link Subscriber
18 * Subscribers}, each managed by a {@link Subscription
19 * Subscription}.
20 *
21 * <p>These interfaces correspond to the <a
22 * href="http://www.reactive-streams.org/"> reactive-streams</a>
23 * specification. They apply in both concurrent and distributed
24 * asynchronous settings: All (seven) methods are defined in {@code
25 * void} "oneway" message style. Communication relies on a simple form
26 * of flow control (method {@link Subscription#request}) that can be
27 * used to avoid resource management problems that may otherwise occur
28 * in "push" based systems.
29 *
30 * <p><b>Examples.</b> A {@link Publisher} usually defines its own
31 * {@link Subscription} implementation; constructing one in method
32 * {@code subscribe} and issuing it to the calling {@link
33 * Subscriber}. It publishes items to the subscriber asynchronously,
34 * normally using an {@link Executor}. For example, here is a very
35 * simple publisher that only issues (when requested) a single {@code
36 * TRUE} item to any subscriber. Because each subscriber receives
37 * only the same single item, this class does not need the buffering
38 * and ordering control required in most implementations (for
39 * example {@link SubmissionPublisher}).
40 *
41 * <pre> {@code
42 * class OneShotPublisher implements Publisher<Boolean> {
43 * final Executor executor = Executors.newSingleThreadExecutor();
44 * public void subscribe(Subscriber<? super Boolean> subscriber) {
45 * subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
46 * }
47 * static class OneShotSubscription implements Subscription {
48 * final Subscriber<? super Boolean> subscriber;
49 * final Executor executor;
50 * boolean completed;
51 * OneShotSubscription(Subscriber<? super Boolean> subscriber,
52 * Executor executor) {
53 * this.subscriber = subscriber;
54 * this.executor = executor;
55 * }
56 * public synchronized void request(long n) {
57 * if (n > 0 && !completed) {
58 * completed = true;
59 * executor.execute(() -> {
60 * subscriber.onNext(Boolean.TRUE);
61 * subscriber.onComplete();
62 * });
63 * }
64 * else if (n < 0) {
65 * completed = true;
66 * subscriber.onError(new IllegalArgumentException());
67 }
68 * }
69 * public synchronized void cancel() { completed = true; }
70 * }
71 * }}</pre>
72 *
73 * <p>A {@link Subscriber} arranges that items be requested and
74 * processed. Items (invocations of {@link Subscriber#onNext}) are
75 * not issued unless requested, but multiple items may be requested.
76 * Many Subscriber implementations can arrange this in the style of
77 * the following example, where a buffer size of 1 single-steps, and
78 * larger sizes usually allow for more efficient overlapped processing
79 * with less communication; for example with a value of 64, this keeps
80 * total outstanding requests between 32 and 64. (See also {@link
81 * #consume(long, Publisher, Consumer)} that automates a common case.)
82 * Because Subscriber method invocations for a given {@link
83 * Subscription} are strictly ordered, there is no need for these
84 * methods to use locks or volatiles unless a Subscriber maintains
85 * multiple Subscriptions (in which case it is better to instead
86 * define multiple Subscribers, each with its own Subscription).
87 *
88 * <pre> {@code
89 * class SampleSubscriber<T> implements Subscriber<T> {
90 * final Consumer<? super T> consumer;
91 * Subscription subscription;
92 * final long bufferSize;
93 * long count;
94 * SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
95 * this.bufferSize = bufferSize;
96 * this.consumer = consumer;
97 * }
98 * public void onSubscribe(Subscription subscription) {
99 * (this.subscription = subscription).request(bufferSize);
100 * count = bufferSize - bufferSize / 2; // re-request when half consumed
101 * }
102 * public void onNext(T item) {
103 * if (--count <= 0)
104 * subscription.request(count = bufferSize - bufferSize / 2);
105 * consumer.accept(item);
106 * }
107 * public void onError(Throwable ex) { ex.printStackTrace(); }
108 * public void onComplete() {}
109 * }}</pre>
110 *
111 * <p>When flow control is inapplicable, a subscriber may initially
112 * request an effectively unbounded number of items, as in:
113 *
114 * <pre> {@code
115 * class UnboundedSubscriber<T> implements Subscriber<T> {
116 * public void onSubscribe(Subscription subscription) {
117 * subscription.request(Long.MAX_VALUE); // effectively unbounded
118 * }
119 * public void onNext(T item) { use(item); }
120 * public void onError(Throwable ex) { ex.printStackTrace(); }
121 * public void onComplete() {}
122 * void use(T item) { ... }
123 * }}</pre>
124 *
125 * @author Doug Lea
126 * @since 1.9
127 */
128 public final class Flow {
129
130 private Flow() {} // uninstantiable
131
132 /**
133 * A producer of items (and related control messages) received by
134 * Subscribers. Each current {@link Subscriber} receives the same
135 * items (via method {@code onNext}) in the same order, unless
136 * drops or errors are encountered. If a Publisher encounters an
137 * error that does not allow further items to be issued to a
138 * Subscriber, that Subscriber receives {@code onError}, and then
139 * receives no further messages. Otherwise, if it is known that
140 * no further items will be produced, each Subscriber receives
141 * {@code onComplete}. Publishers ensure that Subscriber method
142 * invocations for each subscription are strictly ordered in <a
143 * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
144 * order.
145 *
146 * <p>Publishers may vary in policy about whether drops
147 * (failures to issue an item because of resource limitations) are
148 * treated as errors. Publishers may also vary about whether
149 * Subscribers receive items that were produced or available
150 * before they subscribed.
151 *
152 * @param <T> the published item type
153 */
154 @FunctionalInterface
155 public static interface Publisher<T> {
156 /**
157 * Adds the given Subscriber if possible. If already
158 * subscribed, or the attempt to subscribe fails due to policy
159 * violations or errors, the Subscriber's {@code onError}
160 * method is invoked with an {@link IllegalStateException}.
161 * Otherwise, the Subscriber's {@code onSubscribe} method is
162 * invoked with a new {@link Subscription}. Subscribers may
163 * enable receiving items by invoking the {@code request}
164 * method of this Subscription, and may unsubscribe by
165 * invoking its {@code cancel} method.
166 *
167 * @param subscriber the subscriber
168 * @throws NullPointerException if subscriber is null
169 */
170 public void subscribe(Subscriber<? super T> subscriber);
171 }
172
173 /**
174 * A receiver of messages. The methods in this interface are
175 * invoked in strict sequential order for each {@link
176 * Subscription}.
177 *
178 * @param <T> the subscribed item type
179 */
180 public static interface Subscriber<T> {
181 /**
182 * Method invoked prior to invoking any other Subscriber
183 * methods for the given Subscription. If this method throws
184 * an exception, resulting behavior is not guaranteed, but may
185 * cause the Subscription to be cancelled.
186 *
187 * <p>Typically, implementations of this method invoke the
188 * subscription's {@code request} method to enable receiving
189 * items.
190 *
191 * @param subscription a new subscription
192 */
193 public void onSubscribe(Subscription subscription);
194
195 /**
196 * Method invoked with a Subscription's next item. If this
197 * method throws an exception, resulting behavior is not
198 * guaranteed, but may cause the Subscription to be cancelled.
199 *
200 * @param item the item
201 */
202 public void onNext(T item);
203
204 /**
205 * Method invoked upon an unrecoverable error encountered by a
206 * Publisher or Subscription, after which no other Subscriber
207 * methods are invoked by the Subscription. If this method
208 * itself throws an exception, resulting behavior is
209 * undefined.
210 *
211 * @param throwable the exception
212 */
213 public void onError(Throwable throwable);
214
215 /**
216 * Method invoked when it is known that no additional {@code
217 * onNext} invocations will occur for a Subscription that is
218 * not already terminated by error, after which no other
219 * Subscriber methods are invoked by the Subscription. If
220 * this method throws an exception, resulting behavior is
221 * undefined.
222 */
223 public void onComplete();
224 }
225
226 /**
227 * Message control linking a {@link Publisher} and {@link
228 * Subscriber}. Subscribers receive items only when requested,
229 * and may cancel at any time. The methods in this interface are
230 * intended to be invoked only by their Subscribers; usages in
231 * other contexts have undefined effects.
232 */
233 public static interface Subscription {
234 /**
235 * Adds the given number {@code n} of items to the current
236 * unfulfilled demand for this subscription. If {@code n} is
237 * negative, the Subscriber will receive an {@code onError}
238 * signal with an {@link IllegalArgumentException}
239 * argument. Otherwise, the Subscriber will receive up to
240 * {@code n} additional {@code onNext} invocations (or fewer
241 * if terminated).
242 *
243 * @param n the increment of demand; a value of {@code
244 * Long.MAX_VALUE} may be considered as effectively unbounded
245 */
246 public void request(long n);
247
248 /**
249 * Causes the Subscriber to (eventually) stop receiving
250 * messages. Implementation is best-effort -- one or more
251 * messages may be received after invoking this method.
252 */
253 public void cancel();
254 }
255
256 /**
257 * A component that acts as both a Subscriber and Publisher.
258 *
259 * @param <T> the subscribed item type
260 * @param <R> the published item type
261 */
262 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
263 }
264
265 // Support for static methods
266
267 static final long DEFAULT_BUFFER_SIZE = 64L;
268
269 abstract static class CompletableSubscriber<T,U> implements Subscriber<T>,
270 Consumer<T> {
271 final CompletableFuture<U> status;
272 Subscription subscription;
273 long requestSize;
274 long count;
275 CompletableSubscriber(long bufferSize, CompletableFuture<U> status) {
276 this.status = status;
277 this.requestSize = bufferSize;
278 }
279 public final void onSubscribe(Subscription subscription) {
280 (this.subscription = subscription).request(requestSize);
281 count = requestSize -= (requestSize >>> 1);
282 }
283 public final void onError(Throwable ex) {
284 status.completeExceptionally(ex);
285 }
286 public void onNext(T item) {
287 try {
288 if (--count <= 0)
289 subscription.request(count = requestSize);
290 accept(item);
291 } catch (Throwable ex) {
292 status.completeExceptionally(ex);
293 }
294 }
295 }
296
297 static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
298 final Consumer<? super T> consumer;
299 ConsumeSubscriber(long bufferSize,
300 CompletableFuture<Void> status,
301 Consumer<? super T> consumer) {
302 super(bufferSize, status);
303 this.consumer = consumer;
304 }
305 public void accept(T item) { consumer.accept(item); }
306 public void onComplete() { status.complete(null); }
307 }
308
309 /**
310 * Creates and subscribes a Subscriber that consumes all items
311 * from the given publisher using the given Consumer function, and
312 * using the given bufferSize for buffering. Returns a
313 * CompletableFuture that is completed normally when the publisher
314 * signals {@code onComplete}, or completed exceptionally upon any
315 * error, including an exception thrown by the Consumer (in which
316 * case the subscription is cancelled if not already terminated).
317 *
318 * @param <T> the published item type
319 * @param bufferSize the request size for subscriptions
320 * @param publisher the publisher
321 * @param consumer the function applied to each onNext item
322 * @return a CompletableFuture that is completed normally
323 * when the publisher signals onComplete, and exceptionally
324 * upon any error
325 * @throws NullPointerException if publisher or consumer are null
326 * @throws IllegalArgumentException if bufferSize not positive
327 */
328 public static <T> CompletableFuture<Void> consume(
329 long bufferSize, Publisher<T> publisher, Consumer<? super T> consumer) {
330 if (bufferSize <= 0L)
331 throw new IllegalArgumentException("bufferSize must be positive");
332 if (publisher == null || consumer == null)
333 throw new NullPointerException();
334 CompletableFuture<Void> status = new CompletableFuture<>();
335 publisher.subscribe(new ConsumeSubscriber<T>(
336 bufferSize, status, consumer));
337 return status;
338 }
339
340 /**
341 * Equivalent to {@link #consume(long, Publisher, Consumer)}
342 * with a buffer size of 64.
343 *
344 * @param <T> the published item type
345 * @param publisher the publisher
346 * @param consumer the function applied to each onNext item
347 * @return a CompletableFuture that is completed normally
348 * when the publisher signals onComplete, and exceptionally
349 * upon any error
350 * @throws NullPointerException if publisher or consumer are null
351 */
352 public static <T> CompletableFuture<Void> consume(
353 Publisher<T> publisher, Consumer<? super T> consumer) {
354 return consume(DEFAULT_BUFFER_SIZE, publisher, consumer);
355 }
356
357 /**
358 * Temporary implementation for Stream, collecting all items
359 * and then applying stream operation.
360 */
361 static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
362 final Function<? super Stream<T>, ? extends R> fn;
363 final ArrayList<T> items;
364 StreamSubscriber(long bufferSize,
365 CompletableFuture<R> status,
366 Function<? super Stream<T>, ? extends R> fn) {
367 super(bufferSize, status);
368 this.fn = fn;
369 this.items = new ArrayList<T>();
370 }
371 public void accept(T item) { items.add(item); }
372 public void onComplete() { status.complete(fn.apply(items.stream())); }
373 }
374
375 /**
376 * Creates and subscribes a Subscriber that applies the given
377 * stream operation to items, and uses the given bufferSize for
378 * buffering. Returns a CompletableFuture that is completed
379 * normally with the result of this function when the publisher
380 * signals {@code onComplete}, or is completed exceptionally upon
381 * any error.
382 *
383 * <p><b>Preliminary release note:</b> Currently, this method
384 * collects all items before executing the stream
385 * computation. Improvements are pending Stream integration.
386 *
387 * @param <T> the published item type
388 * @param <R> the result type of the stream function
389 * @param bufferSize the request size for subscriptions
390 * @param publisher the publisher
391 * @param streamFunction the operation on elements
392 * @return a CompletableFuture that is completed normally with the
393 * result of the given function as result when the publisher signals
394 * onComplete, and exceptionally upon any error
395 * @throws NullPointerException if publisher or function are null
396 * @throws IllegalArgumentException if bufferSize not positive
397 */
398 public static <T,R> CompletableFuture<R> stream(
399 long bufferSize, Publisher<T> publisher,
400 Function<? super Stream<T>, ? extends R> streamFunction) {
401 if (bufferSize <= 0L)
402 throw new IllegalArgumentException("bufferSize must be positive");
403 if (publisher == null || streamFunction == null)
404 throw new NullPointerException();
405 CompletableFuture<R> status = new CompletableFuture<>();
406 publisher.subscribe(new StreamSubscriber<T,R>(
407 bufferSize, status, streamFunction));
408 return status;
409 }
410
411 /**
412 * Equivalent to {@link #stream(long, Publisher, Function)}
413 * with a buffer size of 64.
414 *
415 * @param <T> the published item type
416 * @param <R> the result type of the stream function
417 * @param publisher the publisher
418 * @param streamFunction the operation on elements
419 * @return a CompletableFuture that is completed normally with the
420 * result of the given function as result when the publisher signals
421 * onComplete, and exceptionally upon any error
422 * @throws NullPointerException if publisher or function are null
423 */
424 public static <T,R> CompletableFuture<R> stream(
425 Publisher<T> publisher,
426 Function<? super Stream<T>,? extends R> streamFunction) {
427 return stream(DEFAULT_BUFFER_SIZE, publisher, streamFunction);
428 }
429
430 }