ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.9
Committed: Thu Jan 15 18:46:08 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.8: +2 -1 lines
Log Message:
tidy imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.function.Consumer;
11 import java.util.function.Function;
12 import java.util.stream.Stream;
13
14 /**
15 * Interrelated interfaces and static methods for establishing
16 * flow-controlled components in which {@link Publisher Publishers}
17 * produce items consumed by one or more {@link Subscriber
18 * Subscribers}, each managed by a {@link Subscription
19 * Subscription}. The use of flow control helps address common
20 * resource issues in "push" based asynchronous systems.
21 *
22 * <p>These interfaces correspond to the <a
23 * href="http://www.reactive-streams.org/"> reactive-streams</a>
24 * specification. (<b>Preliminary release note:</b> This spec is
25 * not yet finalized, so minor details could change.)
26 *
27 * <p><b>Preliminary release note:</b> This class may later include
28 * methods supporting periodic events and/or async IO.
29 *
30 * @author Doug Lea
31 * @since 1.9
32 */
33 public final class Flow {
34
35 private Flow() {} // uninstantiable
36
37 /**
38 * A producer of items (and related control messages) received by
39 * Subscribers. Each current {@link Subscriber} receives the same
40 * items (via method onNext) in the same order, unless drops or
41 * errors are encountered. If a Publisher encounters an error that
42 * does not allow further items to be issued to a Subscriber, that
43 * Subscriber receives onError, and then receives no further
44 * messages. Otherwise, if it is known that no further items will
45 * be produced, each Subscriber receives onComplete. Publishers
46 * may vary in policy about whether drops (failures to issue an
47 * item because of resource limitations) are treated as errors.
48 * Publishers may also vary about whether Subscribers receive
49 * items that were produced or available before they subscribed.
50 *
51 * @param <T> the published item type
52 */
53 @FunctionalInterface
54 public static interface Publisher<T> {
55 /**
56 * Adds the given Subscriber if possible. If already
57 * subscribed, or the attempt to subscribe fails due to policy
58 * violations or errors, the Subscriber's onError method is
59 * invoked with an IllegalStateException. Otherwise, upon
60 * success, the Subscriber's onSubscribe method is invoked
61 * with a new Subscription. Subscribers may enable receiving
62 * items by invoking the request method of this Subscription,
63 * and may unsubscribe by invoking its cancel method.
64 *
65 * @param subscriber the subscriber
66 * @throws NullPointerException if subscriber is null
67 */
68 public void subscribe(Subscriber<? super T> subscriber);
69 }
70
71 /**
72 * A receiver of messages. The methods in this interface must be
73 * invoked sequentially by each Subscription, so are not required
74 * to be thread-safe unless subscribing to multiple publishers.
75 *
76 * <p>Items (invocations of {@link #onNext}) are not issued unless
77 * requested, but multiple items may be requested. Many
78 * Subscriber implementations can arrange this in the style of the
79 * following example, where a request size of 1 single-steps, and
80 * larger sizes (for example 64) usually allow for more efficient
81 * overlapped processing. (See also {@link #consume(long, Publisher,
82 * Consumer)} that automates a common case.)
83 *
84 * <pre> {@code
85 * class SampleSubscriber<T> implements Subscriber<T> {
86 * final Consumer<? super T> consumer;
87 * Subscription subscription;
88 * final long requestSize;
89 * long count;
90 * SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
91 * this.requestSize = requestSize;
92 * this.consumer = consumer;
93 * }
94 * public void onSubscribe(Subscription subscription) {
95 * count = requestSize / 2; // re-request when half consumed
96 * (this.subscription = subscription).request(requestSize);
97 * }
98 * public void onNext(T item) {
99 * if (--count <= 0)
100 * subscription.request(count = requestSize);
101 * consumer.accept(item);
102 * }
103 * public void onError(Throwable ex) { ex.printStackTrace(); }
104 * public void onComplete() {}
105 * }}</pre>
106 *
107 * @param <T> the subscribed item type
108 */
109 public static interface Subscriber<T> {
110 /**
111 * Method invoked prior to invoking any other Subscriber
112 * methods for the given Subscription. If this method throws
113 * an exception, resulting behavior is not guaranteed, but may
114 * cause the Subscription to be cancelled.
115 *
116 * <p>Typically, implementations of this method invoke the
117 * subscription's request method to enable receiving items.
118 *
119 * @param subscription a new subscription
120 */
121 public void onSubscribe(Subscription subscription);
122
123 /**
124 * Method invoked with a Subscription's next item. If this
125 * method throws an exception, resulting behavior is not
126 * guaranteed, but may cause the Subscription to be cancelled.
127 *
128 * @param item the item
129 */
130 public void onNext(T item);
131
132 /**
133 * Method invoked upon an unrecoverable error encountered by a
134 * Publisher or Subscription, after which no other Subscriber
135 * methods are invoked by the Subscription. If this method
136 * itself throws an exception, resulting behavior is
137 * undefined.
138 *
139 * @param throwable the exception
140 */
141 public void onError(Throwable throwable);
142
143 /**
144 * Method invoked when it is known that no additional onNext
145 * invocations will occur for a Subscription that is not
146 * already terminated by error, after which no other
147 * Subscriber methods are invoked by the Subscription. If
148 * this method itself throws an exception, resulting behavior
149 * is undefined.
150 */
151 public void onComplete();
152 }
153
154 /**
155 * Message control linking Publishers and Subscribers.
156 * Subscribers receive items (via onNext) only when requested, and
157 * may cancel at any time. The methods in this interface are
158 * intended to be invoked only by their Subscribers.
159 */
160 public static interface Subscription {
161 /**
162 * Adds the given number {@code n} of items to the current
163 * unfulfilled demand for this subscription. If {@code n} is
164 * negative, the Subscriber will receive an onError signal
165 * with an IllegalArgumentException argument. Otherwise, the
166 * Subscriber will receive up to {@code n} additional onNext
167 * invocations (or fewer if terminated).
168 *
169 * @param n the increment of demand; a value of {@code
170 * Long.MAX_VALUE} may be considered as effectively unbounded
171 */
172 public void request(long n);
173
174 /**
175 * Causes the Subscriber to (eventually) stop receiving onNext
176 * messages.
177 */
178 public void cancel();
179 }
180
181 /**
182 * A component that acts as both a Subscriber and Publisher.
183 *
184 * @param <T> the subscribed item type
185 * @param <R> the published item type
186 */
187 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
188 }
189
190 // Support for static methods
191
192 static final long DEFAULT_REQUEST_SIZE = 64L;
193
194 abstract static class CompletableSubscriber<T,U> implements Subscriber<T>,
195 Consumer<T> {
196 final CompletableFuture<U> status;
197 Subscription subscription;
198 final long requestSize;
199 long count;
200 CompletableSubscriber(long requestSize,
201 CompletableFuture<U> status) {
202 this.status = status;
203 this.requestSize = requestSize;
204 this.count = requestSize >>> 1;
205 }
206 public final void onSubscribe(Subscription subscription) {
207 (this.subscription = subscription).request(requestSize);
208 status.exceptionally(ex -> { subscription.cancel(); return null;});
209 }
210 public final void onError(Throwable ex) {
211 if (ex == null)
212 ex = new IllegalStateException("null onError argument");
213 status.completeExceptionally(ex);
214 }
215 public void onNext(T item) {
216 Subscription s = subscription;
217 if (s == null)
218 status.completeExceptionally(
219 new IllegalStateException("onNext without subscription"));
220 else {
221 try {
222 if (--count <= 0)
223 s.request(count = requestSize);
224 accept(item);
225 } catch (Throwable ex) {
226 status.completeExceptionally(ex);
227 }
228 }
229 }
230 }
231
232 static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
233 final Consumer<? super T> consumer;
234 ConsumeSubscriber(long requestSize,
235 CompletableFuture<Void> status,
236 Consumer<? super T> consumer) {
237 super(requestSize, status);
238 this.consumer = consumer;
239 }
240 public void accept(T item) { consumer.accept(item); }
241 public void onComplete() { status.complete(null); }
242 }
243
244 /**
245 * Creates and subscribes a Subscriber that consumes all items
246 * from the given publisher using the given Consumer function, and
247 * using the given requestSize for buffering. Returns a
248 * CompletableFuture that is completed normally when the publisher
249 * signals onComplete, or completed exceptionally upon any error,
250 * including an exception thrown by the Consumer (in which case
251 * the subscription is cancelled if not already terminated).
252 *
253 * @param <T> the published item type
254 * @param requestSize the request size for subscriptions
255 * @param publisher the publisher
256 * @param consumer the function applied to each onNext item
257 * @return a CompletableFuture that is completed normally
258 * when the publisher signals onComplete, and exceptionally
259 * upon any error
260 * @throws NullPointerException if publisher or consumer are null
261 * @throws IllegalArgumentException if requestSize not positive
262 */
263 public static <T> CompletableFuture<Void> consume(
264 long requestSize, Publisher<T> publisher, Consumer<? super T> consumer) {
265 if (requestSize <= 0L)
266 throw new IllegalArgumentException("requestSize must be positive");
267 if (publisher == null || consumer == null)
268 throw new NullPointerException();
269 CompletableFuture<Void> status = new CompletableFuture<>();
270 publisher.subscribe(new ConsumeSubscriber<T>(
271 requestSize, status, consumer));
272 return status;
273 }
274
275 /**
276 * Equivalent to {@link #consume(long, Publisher, Consumer)}
277 * with a request size of 64.
278 *
279 * @param <T> the published item type
280 * @param publisher the publisher
281 * @param consumer the function applied to each onNext item
282 * @return a CompletableFuture that is completed normally
283 * when the publisher signals onComplete, and exceptionally
284 * upon any error
285 * @throws NullPointerException if publisher or consumer are null
286 */
287 public static <T> CompletableFuture<Void> consume(
288 Publisher<T> publisher, Consumer<? super T> consumer) {
289 return consume(DEFAULT_REQUEST_SIZE, publisher, consumer);
290 }
291
292 /**
293 * Temporary implementation for Stream, collecting all items
294 * and then applying stream operation.
295 */
296 static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
297 final Function<? super Stream<T>, ? extends R> fn;
298 final ArrayList<T> items;
299 StreamSubscriber(long requestSize,
300 CompletableFuture<R> status,
301 Function<? super Stream<T>, ? extends R> fn) {
302 super(requestSize, status);
303 this.fn = fn;
304 this.items = new ArrayList<T>();
305 }
306 public void accept(T item) { items.add(item); }
307 public void onComplete() { status.complete(fn.apply(items.stream())); }
308 }
309
310 /**
311 * Creates and subscribes a Subscriber that applies the given
312 * stream operation to items, and uses the given requestSize for
313 * buffering. Returns a CompletableFuture that is completed
314 * normally with the result of this function when the publisher
315 * signals onComplete, or is completed exceptionally upon any
316 * error.
317 *
318 * <p><b>Preliminary release note:</b> Currently, this method
319 * collects all items before executing the stream
320 * computation. Improvements are pending Stream integration.
321 *
322 * @param <T> the published item type
323 * @param <R> the result type of the stream function
324 * @param requestSize the request size for subscriptions
325 * @param publisher the publisher
326 * @param streamFunction the operation on elements
327 * @return a CompletableFuture that is completed normally with the
328 * result of the given function as result when the publisher signals
329 * onComplete, and exceptionally upon any error
330 * @throws NullPointerException if publisher or function are null
331 * @throws IllegalArgumentException if requestSize not positive
332 */
333 public static <T,R> CompletableFuture<R> stream(
334 long requestSize, Publisher<T> publisher,
335 Function<? super Stream<T>, ? extends R> streamFunction) {
336 if (requestSize <= 0L)
337 throw new IllegalArgumentException("requestSize must be positive");
338 if (publisher == null || streamFunction == null)
339 throw new NullPointerException();
340 CompletableFuture<R> status = new CompletableFuture<>();
341 publisher.subscribe(new StreamSubscriber<T,R>(
342 requestSize, status, streamFunction));
343 return status;
344 }
345
346 /**
347 * Equivalent to {@link #stream(long, Publisher, Function)}
348 * with a request size of 64.
349 *
350 * @param <T> the published item type
351 * @param <R> the result type of the stream function
352 * @param publisher the publisher
353 * @param streamFunction the operation on elements
354 * @return a CompletableFuture that is completed normally with the
355 * result of the given function as result when the publisher signals
356 * onComplete, and exceptionally upon any error
357 * @throws NullPointerException if publisher or function are null
358 */
359 public static <T,R> CompletableFuture<R> stream(
360 Publisher<T> publisher,
361 Function<? super Stream<T>,? extends R> streamFunction) {
362 return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
363 }
364 }