ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.10
Committed: Sun Jan 18 23:06:59 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.9: +109 -76 lines
Log Message:
Improve wordings, add example

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.function.Consumer;
11 import java.util.function.Function;
12 import java.util.stream.Stream;
13
14 /**
15 * Interrelated interfaces and static methods for establishing
16 * flow-controlled components in which {@link Publisher Publishers}
17 * produce items consumed by one or more {@link Subscriber
18 * Subscribers}, each managed by a {@link Subscription
19 * Subscription}. The use of flow control helps address common
20 * resource issues in "push" based asynchronous systems.
21 *
22 * <p>These interfaces correspond to the <a
23 * href="http://www.reactive-streams.org/"> reactive-streams</a>
24 * specification. (<b>Preliminary release note:</b> This spec is
25 * not yet finalized, so minor details could change.)
26 *
27 * <p><b>Examples.</b> A {@link Publisher} usually defines its own
28 * {@link Subscription} implementation; constructing one in method
29 * {@code subscribe} and issuing it to the calling {@link
30 * Subscriber}. It publishes items to the subscriber asynchronously,
31 * normally using an {@link Executor}. For example, here is a very
32 * simple publisher that only issues (when requested) a single {@code
33 * TRUE} item to each subscriber, then completes. Because each
34 * subscriber receives only the same single item, this class does not
35 * need the buffering and ordering control required in most
36 * implementations.
37 *
38 * <pre> {@code
39 * class OneShotPublisher implements Publisher<Boolean> {
40 * final Executor executor = Executors.newSingleThreadExecutor();
41 * public void subscribe(Subscriber<? super Boolean> subscriber) {
42 * subscriber.onSubscribe(new OneShotSubscription(subscriber));
43 * }
44 * static class OneShotSubscription implements Subscription {
45 * final Subscriber<? super Boolean> subscriber;
46 * final Executor executor;
47 * boolean completed;
48 * OneShotSubscription(Subscriber<? super Boolean> subscriber,
49 * Executor executor) {
50 * this.subscriber = subscriber;
51 * this.executor = executor;
52 * }
53 * public synchronized void request(long n) {
54 * if (n > 0 && !completed) {
55 * completed = true;
56 * executor.execute(() -> {
57 * subscriber.onNext(Boolean.TRUE);
58 * subscriber.onComplete();
59 * });
60 * }
61 * else if (n < 0) {
62 * completed = true;
63 * subscriber.onError(new IllegalArgumentException());
64 }
65 * }
66 * public synchronized void cancel() { completed = true; }
67 * }
68 * }}</pre>
69 *
70 * <p> A {@link Subscriber} arranges that items be requested and
71 * processed. Items (invocations of {@link Subscriber#onNext}) are
72 * not issued unless requested, but multiple items may be requested.
73 * Many Subscriber implementations can arrange this in the style of
74 * the following example, where a buffer size of 1 single-steps, and
75 * larger sizes usually allow for more efficient overlapped processing
76 * with less communication; for example with a value of 64, this keeps
77 * total outstanding requests between 32 and 64. (See also {@link
78 * #consume(long, Publisher, Consumer)} that automates a common case.)
79 *
80 * <pre> {@code
81 * class SampleSubscriber<T> implements Subscriber<T> {
82 * final Consumer<? super T> consumer;
83 * Subscription subscription;
84 * final long bufferSize;
85 * long count;
86 * SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
87 * this.bufferSize = bufferSize;
88 * this.consumer = consumer;
89 * }
90 * public void onSubscribe(Subscription subscription) {
91 * (this.subscription = subscription).request(bufferSize);
92 * count = bufferSize - bufferSize / 2; // re-request when half consumed
93 * }
94 * public void onNext(T item) {
95 * if (--count <= 0)
96 * subscription.request(count = bufferSize - bufferSize / 2);
97 * consumer.accept(item);
98 * }
99 * public void onError(Throwable ex) { ex.printStackTrace(); }
100 * public void onComplete() {}
101 * }}</pre>
102 *
103 * @author Doug Lea
104 * @since 1.9
105 */
106 public final class Flow {
107
108 private Flow() {} // uninstantiable
109
110 /**
111 * A producer of items (and related control messages) received by
112 * Subscribers. Each current {@link Subscriber} receives the same
113 * items (via method onNext) in the same order, unless drops or
114 * errors are encountered. If a Publisher encounters an error that
115 * does not allow further items to be issued to a Subscriber, that
116 * Subscriber receives onError, and then receives no further
117 * messages. Otherwise, if it is known that no further items will
118 * be produced, each Subscriber receives onComplete. Publishers
119 * may vary in policy about whether drops (failures to issue an
120 * item because of resource limitations) are treated as errors.
121 * Publishers may also vary about whether Subscribers receive
122 * items that were produced or available before they subscribed.
123 *
124 * @param <T> the published item type
125 */
126 @FunctionalInterface
127 public static interface Publisher<T> {
128 /**
129 * Adds the given Subscriber if possible. If already
130 * subscribed, or the attempt to subscribe fails due to policy
131 * violations or errors, the Subscriber's onError method is
132 * invoked with an IllegalStateException. Otherwise, upon
133 * success, the Subscriber's onSubscribe method is invoked
134 * with a new Subscription. Subscribers may enable receiving
135 * items by invoking the request method of this Subscription,
136 * and may unsubscribe by invoking its cancel method.
137 *
138 * @param subscriber the subscriber
139 * @throws NullPointerException if subscriber is null
140 */
141 public void subscribe(Subscriber<? super T> subscriber);
142 }
143
144 /**
145 * A receiver of messages. The methods in this interface must be
146 * invoked sequentially by each Subscription, so are not required
147 * to be thread-safe unless subscribing to multiple publishers.
148 *
149 * @param <T> the subscribed item type
150 */
151 public static interface Subscriber<T> {
152 /**
153 * Method invoked prior to invoking any other Subscriber
154 * methods for the given Subscription. If this method throws
155 * an exception, resulting behavior is not guaranteed, but may
156 * cause the Subscription to be cancelled.
157 *
158 * <p>Typically, implementations of this method invoke the
159 * subscription's request method to enable receiving items.
160 *
161 * @param subscription a new subscription
162 */
163 public void onSubscribe(Subscription subscription);
164
165 /**
166 * Method invoked with a Subscription's next item. If this
167 * method throws an exception, resulting behavior is not
168 * guaranteed, but may cause the Subscription to be cancelled.
169 *
170 * @param item the item
171 */
172 public void onNext(T item);
173
174 /**
175 * Method invoked upon an unrecoverable error encountered by a
176 * Publisher or Subscription, after which no other Subscriber
177 * methods are invoked by the Subscription. If this method
178 * itself throws an exception, resulting behavior is
179 * undefined.
180 *
181 * @param throwable the exception
182 */
183 public void onError(Throwable throwable);
184
185 /**
186 * Method invoked when it is known that no additional onNext
187 * invocations will occur for a Subscription that is not
188 * already terminated by error, after which no other
189 * Subscriber methods are invoked by the Subscription. If
190 * this method itself throws an exception, resulting behavior
191 * is undefined.
192 */
193 public void onComplete();
194 }
195
196 /**
197 * Message control linking Publishers and Subscribers.
198 * Subscribers receive items (via onNext) only when requested, and
199 * may cancel at any time. The methods in this interface are
200 * intended to be invoked only by their Subscribers.
201 */
202 public static interface Subscription {
203 /**
204 * Adds the given number {@code n} of items to the current
205 * unfulfilled demand for this subscription. If {@code n} is
206 * negative, the Subscriber will receive an onError signal
207 * with an IllegalArgumentException argument. Otherwise, the
208 * Subscriber will receive up to {@code n} additional onNext
209 * invocations (or fewer if terminated).
210 *
211 * @param n the increment of demand; a value of {@code
212 * Long.MAX_VALUE} may be considered as effectively unbounded
213 */
214 public void request(long n);
215
216 /**
217 * Causes the Subscriber to (eventually) stop receiving onNext
218 * messages.
219 */
220 public void cancel();
221 }
222
223 /**
224 * A component that acts as both a Subscriber and Publisher.
225 *
226 * @param <T> the subscribed item type
227 * @param <R> the published item type
228 */
229 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
230 }
231
232 // Support for static methods
233
234 static final long DEFAULT_BUFFER_SIZE = 64L;
235
236 abstract static class CompletableSubscriber<T,U> implements Subscriber<T>,
237 Consumer<T> {
238 final CompletableFuture<U> status;
239 Subscription subscription;
240 long requestSize;
241 long count;
242 CompletableSubscriber(long bufferSize, CompletableFuture<U> status) {
243 this.status = status;
244 this.requestSize = bufferSize;
245 }
246 public final void onSubscribe(Subscription subscription) {
247 (this.subscription = subscription).request(requestSize);
248 count = requestSize -= (requestSize >>> 1);
249 }
250 public final void onError(Throwable ex) {
251 status.completeExceptionally(ex);
252 }
253 public void onNext(T item) {
254 try {
255 if (--count <= 0)
256 subscription.request(count = requestSize);
257 accept(item);
258 } catch (Throwable ex) {
259 status.completeExceptionally(ex);
260 }
261 }
262 }
263
264 static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
265 final Consumer<? super T> consumer;
266 ConsumeSubscriber(long bufferSize,
267 CompletableFuture<Void> status,
268 Consumer<? super T> consumer) {
269 super(bufferSize, status);
270 this.consumer = consumer;
271 }
272 public void accept(T item) { consumer.accept(item); }
273 public void onComplete() { status.complete(null); }
274 }
275
276 /**
277 * Creates and subscribes a Subscriber that consumes all items
278 * from the given publisher using the given Consumer function, and
279 * using the given bufferSize for buffering. Returns a
280 * CompletableFuture that is completed normally when the publisher
281 * signals onComplete, or completed exceptionally upon any error,
282 * including an exception thrown by the Consumer (in which case
283 * the subscription is cancelled if not already terminated).
284 *
285 * @param <T> the published item type
286 * @param bufferSize the request size for subscriptions
287 * @param publisher the publisher
288 * @param consumer the function applied to each onNext item
289 * @return a CompletableFuture that is completed normally
290 * when the publisher signals onComplete, and exceptionally
291 * upon any error
292 * @throws NullPointerException if publisher or consumer are null
293 * @throws IllegalArgumentException if bufferSize not positive
294 */
295 public static <T> CompletableFuture<Void> consume(
296 long bufferSize, Publisher<T> publisher, Consumer<? super T> consumer) {
297 if (bufferSize <= 0L)
298 throw new IllegalArgumentException("bufferSize must be positive");
299 if (publisher == null || consumer == null)
300 throw new NullPointerException();
301 CompletableFuture<Void> status = new CompletableFuture<>();
302 publisher.subscribe(new ConsumeSubscriber<T>(
303 bufferSize, status, consumer));
304 return status;
305 }
306
307 /**
308 * Equivalent to {@link #consume(long, Publisher, Consumer)}
309 * with a buffer size of 64.
310 *
311 * @param <T> the published item type
312 * @param publisher the publisher
313 * @param consumer the function applied to each onNext item
314 * @return a CompletableFuture that is completed normally
315 * when the publisher signals onComplete, and exceptionally
316 * upon any error
317 * @throws NullPointerException if publisher or consumer are null
318 */
319 public static <T> CompletableFuture<Void> consume(
320 Publisher<T> publisher, Consumer<? super T> consumer) {
321 return consume(DEFAULT_BUFFER_SIZE, publisher, consumer);
322 }
323
324 /**
325 * Temporary implementation for Stream, collecting all items
326 * and then applying stream operation.
327 */
328 static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
329 final Function<? super Stream<T>, ? extends R> fn;
330 final ArrayList<T> items;
331 StreamSubscriber(long bufferSize,
332 CompletableFuture<R> status,
333 Function<? super Stream<T>, ? extends R> fn) {
334 super(bufferSize, status);
335 this.fn = fn;
336 this.items = new ArrayList<T>();
337 }
338 public void accept(T item) { items.add(item); }
339 public void onComplete() { status.complete(fn.apply(items.stream())); }
340 }
341
342 /**
343 * Creates and subscribes a Subscriber that applies the given
344 * stream operation to items, and uses the given bufferSize for
345 * buffering. Returns a CompletableFuture that is completed
346 * normally with the result of this function when the publisher
347 * signals onComplete, or is completed exceptionally upon any
348 * error.
349 *
350 * <p><b>Preliminary release note:</b> Currently, this method
351 * collects all items before executing the stream
352 * computation. Improvements are pending Stream integration.
353 *
354 * @param <T> the published item type
355 * @param <R> the result type of the stream function
356 * @param bufferSize the request size for subscriptions
357 * @param publisher the publisher
358 * @param streamFunction the operation on elements
359 * @return a CompletableFuture that is completed normally with the
360 * result of the given function as result when the publisher signals
361 * onComplete, and exceptionally upon any error
362 * @throws NullPointerException if publisher or function are null
363 * @throws IllegalArgumentException if bufferSize not positive
364 */
365 public static <T,R> CompletableFuture<R> stream(
366 long bufferSize, Publisher<T> publisher,
367 Function<? super Stream<T>, ? extends R> streamFunction) {
368 if (bufferSize <= 0L)
369 throw new IllegalArgumentException("bufferSize must be positive");
370 if (publisher == null || streamFunction == null)
371 throw new NullPointerException();
372 CompletableFuture<R> status = new CompletableFuture<>();
373 publisher.subscribe(new StreamSubscriber<T,R>(
374 bufferSize, status, streamFunction));
375 return status;
376 }
377
378 /**
379 * Equivalent to {@link #stream(long, Publisher, Function)}
380 * with a buffer size of 64.
381 *
382 * @param <T> the published item type
383 * @param <R> the result type of the stream function
384 * @param publisher the publisher
385 * @param streamFunction the operation on elements
386 * @return a CompletableFuture that is completed normally with the
387 * result of the given function as result when the publisher signals
388 * onComplete, and exceptionally upon any error
389 * @throws NullPointerException if publisher or function are null
390 */
391 public static <T,R> CompletableFuture<R> stream(
392 Publisher<T> publisher,
393 Function<? super Stream<T>,? extends R> streamFunction) {
394 return stream(DEFAULT_BUFFER_SIZE, publisher, streamFunction);
395 }
396
397 }