ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.7
Committed: Thu Jan 15 17:37:21 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.6: +4 -4 lines
Log Message:
javadoc punctuation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Consumer;
9 import java.util.function.Function;
10 import java.util.ArrayList;
11 import java.util.stream.Stream;
12
13 /**
14 * Interrelated interfaces and static methods for establishing
15 * flow-controlled components in which {@link Publisher Publishers}
16 * produce items consumed by one or more {@link Subscriber
17 * Subscribers}, each managed by a {@link Subscription
18 * Subscription}. The use of flow control helps address common
19 * resource issues in "push" based asynchronous systems.
20 *
21 * <p>These interfaces correspond to the <a
22 * href="http://www.reactive-streams.org/"> reactive-streams</a>
23 * specification. (<b>Preliminary release note:</b> This spec is
24 * not yet finalized, so minor details could change.)
25 *
26 * <p><b>Preliminary release note:</b> This class may later include
27 * methods supporting periodic events and/or async IO.
28 *
29 * @author Doug Lea
30 * @since 1.9
31 */
32 public final class Flow {
33
34 private Flow() {} // uninstantiable
35
36 /**
37 * A producer of items (and related control messages) received by
38 * Subscribers. Each current {@link Subscriber} receives the same
39 * items (via method onNext) in the same order, unless drops or
40 * errors are encountered. If a Publisher encounters an error that
41 * does not allow further items to be issued to a Subscriber, that
42 * Subscriber receives onError, and then receives no further
43 * messages. Otherwise, if it is known that no further items will
44 * be produced, each Subscriber receives onComplete. Publishers
45 * may vary in policy about whether drops (failures to issue an
46 * item because of resource limitations) are treated as errors.
47 * Publishers may also vary about whether Subscribers receive
48 * items that were produced or available before they subscribed.
49 *
50 * @param <T> the published item type
51 */
52 @FunctionalInterface
53 public static interface Publisher<T> {
54 /**
55 * Adds the given Subscriber if possible. If already
56 * subscribed, or the attempt to subscribe fails due to policy
57 * violations or errors, the Subscriber's onError method is
58 * invoked with an IllegalStateException. Otherwise, upon
59 * success, the Subscriber's onSubscribe method is invoked
60 * with a new Subscription. Subscribers may enable receiving
61 * items by invoking the request method of this Subscription,
62 * and may unsubscribe by invoking its cancel method.
63 *
64 * @param subscriber the subscriber
65 * @throws NullPointerException if subscriber is null
66 */
67 public void subscribe(Subscriber<? super T> subscriber);
68 }
69
70 /**
71 * A receiver of messages. The methods in this interface must be
72 * invoked sequentially by each Subscription, so are not required
73 * to be thread-safe unless subscribing to multiple publishers.
74 *
75 * <p>Items (invocations of {@link #onNext}) are not issued unless
76 * requested, but multiple items may be requested. Many
77 * Subscriber implementations can arrange this in the style of the
78 * following example, where a request size of 1 single-steps, and
79 * larger sizes (for example 64) usually allow for more efficient
80 * overlapped processing. (See also {@link #consume(long, Publisher,
81 * Consumer)} that automates a common case.)
82 *
83 * <pre> {@code
84 * class SampleSubscriber<T> implements Subscriber<T> {
85 * final Consumer<? super T> consumer;
86 * Subscription subscription;
87 * final long requestSize;
88 * long count;
89 * SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
90 * this.requestSize = requestSize;
91 * this.consumer = consumer;
92 * }
93 * public void onSubscribe(Subscription subscription) {
94 * count = requestSize / 2; // re-request when half consumed
95 * (this.subscription = subscription).request(requestSize);
96 * }
97 * public void onNext(T item) {
98 * if (--count <= 0)
99 * subscription.request(count = requestSize);
100 * consumer.accept(item);
101 * }
102 * public void onError(Throwable ex) { ex.printStackTrace(); }
103 * public void onComplete() {}
104 * }}</pre>
105 *
106 * @param <T> the subscribed item type
107 */
108 public static interface Subscriber<T> {
109 /**
110 * Method invoked prior to invoking any other Subscriber
111 * methods for the given Subscription. If this method throws
112 * an exception, resulting behavior is not guaranteed, but may
113 * cause the Subscription to be cancelled.
114 *
115 * <p>Typically, implementations of this method invoke the
116 * subscription's request method to enable receiving items.
117 *
118 * @param subscription a new subscription
119 */
120 public void onSubscribe(Subscription subscription);
121
122 /**
123 * Method invoked with a Subscription's next item. If this
124 * method throws an exception, resulting behavior is not
125 * guaranteed, but may cause the Subscription to be cancelled.
126 *
127 * @param item the item
128 */
129 public void onNext(T item);
130
131 /**
132 * Method invoked upon an unrecoverable error encountered by a
133 * Publisher or Subscription, after which no other Subscriber
134 * methods are invoked by the Subscription. If this method
135 * itself throws an exception, resulting behavior is
136 * undefined.
137 *
138 * @param throwable the exception
139 */
140 public void onError(Throwable throwable);
141
142 /**
143 * Method invoked when it is known that no additional onNext
144 * invocations will occur for a Subscription that is not
145 * already terminated by error, after which no other
146 * Subscriber methods are invoked by the Subscription. If
147 * this method itself throws an exception, resulting behavior
148 * is undefined.
149 */
150 public void onComplete();
151 }
152
153 /**
154 * Message control linking Publishers and Subscribers.
155 * Subscribers receive items (via onNext) only when requested, and
156 * may cancel at any time. The methods in this interface are
157 * intended to be invoked only by their Subscribers.
158 */
159 public static interface Subscription {
160 /**
161 * Adds the given number {@code n} of items to the current
162 * unfulfilled demand for this subscription. If {@code n} is
163 * negative, the Subscriber will receive an onError signal
164 * with an IllegalArgumentException argument. Otherwise, the
165 * Subscriber will receive up to {@code n} additional onNext
166 * invocations (or fewer if terminated).
167 *
168 * @param n the increment of demand; a value of {@code
169 * Long.MAX_VALUE} may be considered as effectively unbounded
170 */
171 public void request(long n);
172
173 /**
174 * Causes the Subscriber to (eventually) stop receiving onNext
175 * messages.
176 */
177 public void cancel();
178 }
179
180 /**
181 * A component that acts as both a Subscriber and Publisher.
182 *
183 * @param <T> the subscribed item type
184 * @param <R> the published item type
185 */
186 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
187 }
188
189 // Support for static methods
190
191 static final long DEFAULT_REQUEST_SIZE = 64L;
192
193 abstract static class CompletableSubscriber<T,U> implements Subscriber<T>,
194 Consumer<T> {
195 final CompletableFuture<U> status;
196 Subscription subscription;
197 final long requestSize;
198 long count;
199 CompletableSubscriber(long requestSize,
200 CompletableFuture<U> status) {
201 this.status = status;
202 this.requestSize = requestSize;
203 this.count = requestSize >>> 1;
204 }
205 public final void onSubscribe(Subscription subscription) {
206 (this.subscription = subscription).request(requestSize);
207 status.exceptionally(ex -> { subscription.cancel(); return null;});
208 }
209 public final void onError(Throwable ex) {
210 if (ex == null)
211 ex = new IllegalStateException("null onError argument");
212 status.completeExceptionally(ex);
213 }
214 public void onNext(T item) {
215 Subscription s = subscription;
216 if (s == null)
217 status.completeExceptionally(
218 new IllegalStateException("onNext without subscription"));
219 else {
220 try {
221 if (--count <= 0)
222 s.request(count = requestSize);
223 accept(item);
224 } catch (Throwable ex) {
225 status.completeExceptionally(ex);
226 }
227 }
228 }
229 }
230
231 static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
232 final Consumer<? super T> consumer;
233 ConsumeSubscriber(long requestSize,
234 CompletableFuture<Void> status,
235 Consumer<? super T> consumer) {
236 super(requestSize, status);
237 this.consumer = consumer;
238 }
239 public void accept(T item) { consumer.accept(item); }
240 public void onComplete() { status.complete(null); }
241 }
242
243 /**
244 * Creates and subscribes a Subscriber that consumes all items
245 * from the given publisher using the given Consumer function, and
246 * using the given requestSize for buffering. Returns a
247 * CompletableFuture that is completed normally when the publisher
248 * signals onComplete, or completed exceptionally upon any error,
249 * including an exception thrown by the Consumer (in which case
250 * the subscription is cancelled if not already terminated).
251 *
252 * @param <T> the published item type
253 * @param requestSize the request size for subscriptions
254 * @param publisher the publisher
255 * @param consumer the function applied to each onNext item
256 * @return a CompletableFuture that is completed normally
257 * when the publisher signals onComplete, and exceptionally
258 * upon any error
259 * @throws NullPointerException if publisher or consumer are null
260 * @throws IllegalArgumentException if requestSize not positive
261 */
262 public static <T> CompletableFuture<Void> consume(
263 long requestSize, Publisher<T> publisher, Consumer<? super T> consumer) {
264 if (requestSize <= 0L)
265 throw new IllegalArgumentException("requestSize must be positive");
266 if (publisher == null || consumer == null)
267 throw new NullPointerException();
268 CompletableFuture<Void> status = new CompletableFuture<>();
269 publisher.subscribe(new ConsumeSubscriber<T>(
270 requestSize, status, consumer));
271 return status;
272 }
273
274 /**
275 * Equivalent to {@link #consume(long, Publisher, Consumer)}
276 * with a request size of 64.
277 *
278 * @param <T> the published item type
279 * @param publisher the publisher
280 * @param consumer the function applied to each onNext item
281 * @return a CompletableFuture that is completed normally
282 * when the publisher signals onComplete, and exceptionally
283 * upon any error
284 * @throws NullPointerException if publisher or consumer are null
285 */
286 public static <T> CompletableFuture<Void> consume(
287 Publisher<T> publisher, Consumer<? super T> consumer) {
288 return consume(DEFAULT_REQUEST_SIZE, publisher, consumer);
289 }
290
291 /**
292 * Temporary implementation for Stream, collecting all items
293 * and then applying stream operation.
294 */
295 static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
296 final Function<? super Stream<T>, ? extends R> fn;
297 final ArrayList<T> items;
298 StreamSubscriber(long requestSize,
299 CompletableFuture<R> status,
300 Function<? super Stream<T>, ? extends R> fn) {
301 super(requestSize, status);
302 this.fn = fn;
303 this.items = new ArrayList<T>();
304 }
305 public void accept(T item) { items.add(item); }
306 public void onComplete() { status.complete(fn.apply(items.stream())); }
307 }
308
309 /**
310 * Creates and subscribes a Subscriber that applies the given
311 * stream operation to items, and uses the given requestSize for
312 * buffering. Returns a CompletableFuture that is completed
313 * normally with the result of this function when the publisher
314 * signals onComplete, or is completed exceptionally upon any
315 * error.
316 *
317 * <p><b>Preliminary release note:</b> Currently, this method
318 * collects all items before executing the stream
319 * computation. Improvements are pending Stream integration.
320 *
321 * @param <T> the published item type
322 * @param <R> the result type of the stream function
323 * @param requestSize the request size for subscriptions
324 * @param publisher the publisher
325 * @param streamFunction the operation on elements
326 * @return a CompletableFuture that is completed normally with the
327 * result of the given function as result when the publisher signals
328 * onComplete, and exceptionally upon any error
329 * @throws NullPointerException if publisher or function are null
330 * @throws IllegalArgumentException if requestSize not positive
331 */
332 public static <T,R> CompletableFuture<R> stream(
333 long requestSize, Publisher<T> publisher,
334 Function<? super Stream<T>, ? extends R> streamFunction) {
335 if (requestSize <= 0L)
336 throw new IllegalArgumentException("requestSize must be positive");
337 if (publisher == null || streamFunction == null)
338 throw new NullPointerException();
339 CompletableFuture<R> status = new CompletableFuture<>();
340 publisher.subscribe(new StreamSubscriber<T,R>(
341 requestSize, status, streamFunction));
342 return status;
343 }
344
345 /**
346 * Equivalent to {@link #stream(long, Publisher, Function)}
347 * with a request size of 64.
348 *
349 * @param <T> the published item type
350 * @param <R> the result type of the stream function
351 * @param publisher the publisher
352 * @param streamFunction the operation on elements
353 * @return a CompletableFuture that is completed normally with the
354 * result of the given function as result when the publisher signals
355 * onComplete, and exceptionally upon any error
356 * @throws NullPointerException if publisher or function are null
357 */
358 public static <T,R> CompletableFuture<R> stream(
359 Publisher<T> publisher,
360 Function<? super Stream<T>,? extends R> streamFunction) {
361 return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
362 }
363
364
365 }