ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.9 by jsr166, Thu Jan 15 18:46:08 2015 UTC vs.
Revision 1.10 by dl, Sun Jan 18 23:06:59 2015 UTC

# Line 24 | Line 24 | import java.util.stream.Stream;
24   * specification. (<b>Preliminary release note:</b> This spec is
25   * not yet finalized, so minor details could change.)
26   *
27 < * <p><b>Preliminary release note:</b> This class may later include
28 < * methods supporting periodic events and/or async IO.
27 > * <p><b>Examples.</b> A {@link Publisher} usually defines its own
28 > * {@link Subscription} implementation; constructing one in method
29 > * {@code subscribe} and issuing it to the calling {@link
30 > * Subscriber}. It publishes items to the subscriber asynchronously,
31 > * normally using an {@link Executor}.  For example, here is a very
32 > * simple publisher that only issues (when requested) a single {@code
33 > * TRUE} item to each subscriber, then completes.  Because each
34 > * subscriber receives only the same single item, this class does not
35 > * need the buffering and ordering control required in most
36 > * implementations.
37 > *
38 > * <pre> {@code
39 > * class OneShotPublisher implements Publisher<Boolean> {
40 > *   final Executor executor = Executors.newSingleThreadExecutor();
41 > *   public void subscribe(Subscriber<? super Boolean> subscriber) {
42 > *       subscriber.onSubscribe(new OneShotSubscription(subscriber));
43 > *   }
44 > *   static class OneShotSubscription implements Subscription {
45 > *     final Subscriber<? super Boolean> subscriber;
46 > *     final Executor executor;
47 > *     boolean completed;
48 > *     OneShotSubscription(Subscriber<? super Boolean> subscriber,
49 > *                         Executor executor) {
50 > *       this.subscriber = subscriber;
51 > *       this.executor = executor;
52 > *     }
53 > *     public synchronized void request(long n) {
54 > *       if (n > 0 && !completed) {
55 > *         completed = true;
56 > *         executor.execute(() -> {
57 > *                   subscriber.onNext(Boolean.TRUE);
58 > *                   subscriber.onComplete();
59 > *               });
60 > *       }
61 > *       else if (n < 0) {
62 > *         completed = true;
63 > *         subscriber.onError(new IllegalArgumentException());
64 >         }
65 > *     }
66 > *     public synchronized void cancel() { completed = true; }
67 > *   }
68 > * }}</pre>
69 > *
70 > * <p> A {@link Subscriber} arranges that items be requested and
71 > * processed.  Items (invocations of {@link Subscriber#onNext}) are
72 > * not issued unless requested, but multiple items may be requested.
73 > * Many Subscriber implementations can arrange this in the style of
74 > * the following example, where a buffer size of 1 single-steps, and
75 > * larger sizes usually allow for more efficient overlapped processing
76 > * with less communication; for example with a value of 64, this keeps
77 > * total outstanding requests between 32 and 64.  (See also {@link
78 > * #consume(long, Publisher, Consumer)} that automates a common case.)
79 > *
80 > * <pre> {@code
81 > * class SampleSubscriber<T> implements Subscriber<T> {
82 > *   final Consumer<? super T> consumer;
83 > *   Subscription subscription;
84 > *   final long bufferSize;
85 > *   long count;
86 > *   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
87 > *     this.bufferSize = bufferSize;
88 > *     this.consumer = consumer;
89 > *   }
90 > *   public void onSubscribe(Subscription subscription) {
91 > *     (this.subscription = subscription).request(bufferSize);
92 > *     count = bufferSize - bufferSize / 2; // re-request when half consumed
93 > *   }
94 > *   public void onNext(T item) {
95 > *     if (--count <= 0)
96 > *       subscription.request(count = bufferSize - bufferSize / 2);
97 > *     consumer.accept(item);
98 > *   }
99 > *   public void onError(Throwable ex) { ex.printStackTrace(); }
100 > *   public void onComplete() {}
101 > * }}</pre>
102   *
103   * @author Doug Lea
104   * @since 1.9
# Line 73 | Line 146 | public final class Flow {
146       * invoked sequentially by each Subscription, so are not required
147       * to be thread-safe unless subscribing to multiple publishers.
148       *
76     * <p>Items (invocations of {@link #onNext}) are not issued unless
77     * requested, but multiple items may be requested.  Many
78     * Subscriber implementations can arrange this in the style of the
79     * following example, where a request size of 1 single-steps, and
80     * larger sizes (for example 64) usually allow for more efficient
81     * overlapped processing.  (See also {@link #consume(long, Publisher,
82     * Consumer)} that automates a common case.)
83     *
84     * <pre> {@code
85     * class SampleSubscriber<T> implements Subscriber<T> {
86     *   final Consumer<? super T> consumer;
87     *   Subscription subscription;
88     *   final long requestSize;
89     *   long count;
90     *   SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
91     *     this.requestSize = requestSize;
92     *     this.consumer = consumer;
93     *   }
94     *   public void onSubscribe(Subscription subscription) {
95     *     count = requestSize / 2; // re-request when half consumed
96     *     (this.subscription = subscription).request(requestSize);
97     *   }
98     *   public void onNext(T item) {
99     *     if (--count <= 0)
100     *       subscription.request(count = requestSize);
101     *     consumer.accept(item);
102     *   }
103     *   public void onError(Throwable ex) { ex.printStackTrace(); }
104     *   public void onComplete() {}
105     * }}</pre>
106     *
149       * @param <T> the subscribed item type
150       */
151      public static interface Subscriber<T> {
# Line 189 | Line 231 | public final class Flow {
231  
232      // Support for static methods
233  
234 <    static final long DEFAULT_REQUEST_SIZE = 64L;
234 >    static final long DEFAULT_BUFFER_SIZE = 64L;
235  
236      abstract static class CompletableSubscriber<T,U> implements Subscriber<T>,
237                                                                  Consumer<T> {
238          final CompletableFuture<U> status;
239          Subscription subscription;
240 <        final long requestSize;
240 >        long requestSize;
241          long count;
242 <        CompletableSubscriber(long requestSize,
201 <                              CompletableFuture<U> status) {
242 >        CompletableSubscriber(long bufferSize, CompletableFuture<U> status) {
243              this.status = status;
244 <            this.requestSize = requestSize;
204 <            this.count = requestSize >>> 1;
244 >            this.requestSize = bufferSize;
245          }
246          public final void onSubscribe(Subscription subscription) {
247              (this.subscription = subscription).request(requestSize);
248 <            status.exceptionally(ex -> { subscription.cancel(); return null;});
248 >            count = requestSize -= (requestSize >>> 1);
249          }
250          public final void onError(Throwable ex) {
211            if (ex == null)
212                ex = new IllegalStateException("null onError argument");
251              status.completeExceptionally(ex);
252          }
253          public void onNext(T item) {
254 <            Subscription s = subscription;
255 <            if (s == null)
256 <                status.completeExceptionally(
257 <                    new IllegalStateException("onNext without subscription"));
258 <            else {
259 <                try {
222 <                    if (--count <= 0)
223 <                        s.request(count = requestSize);
224 <                    accept(item);
225 <                } catch (Throwable ex) {
226 <                    status.completeExceptionally(ex);
227 <                }
254 >            try {
255 >                if (--count <= 0)
256 >                    subscription.request(count = requestSize);
257 >                accept(item);
258 >            } catch (Throwable ex) {
259 >                status.completeExceptionally(ex);
260              }
261          }
262      }
263  
264      static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
265          final Consumer<? super T> consumer;
266 <        ConsumeSubscriber(long requestSize,
266 >        ConsumeSubscriber(long bufferSize,
267                            CompletableFuture<Void> status,
268                            Consumer<? super T> consumer) {
269 <            super(requestSize, status);
269 >            super(bufferSize, status);
270              this.consumer = consumer;
271          }
272          public void accept(T item) { consumer.accept(item); }
# Line 244 | Line 276 | public final class Flow {
276      /**
277       * Creates and subscribes a Subscriber that consumes all items
278       * from the given publisher using the given Consumer function, and
279 <     * using the given requestSize for buffering. Returns a
279 >     * using the given bufferSize for buffering. Returns a
280       * CompletableFuture that is completed normally when the publisher
281       * signals onComplete, or completed exceptionally upon any error,
282       * including an exception thrown by the Consumer (in which case
283       * the subscription is cancelled if not already terminated).
284       *
285       * @param <T> the published item type
286 <     * @param requestSize the request size for subscriptions
286 >     * @param bufferSize the request size for subscriptions
287       * @param publisher the publisher
288       * @param consumer the function applied to each onNext item
289       * @return a CompletableFuture that is completed normally
290       * when the publisher signals onComplete, and exceptionally
291       * upon any error
292       * @throws NullPointerException if publisher or consumer are null
293 <     * @throws IllegalArgumentException if requestSize not positive
293 >     * @throws IllegalArgumentException if bufferSize not positive
294       */
295      public static <T> CompletableFuture<Void> consume(
296 <        long requestSize, Publisher<T> publisher, Consumer<? super T> consumer) {
297 <        if (requestSize <= 0L)
298 <            throw new IllegalArgumentException("requestSize must be positive");
296 >        long bufferSize, Publisher<T> publisher, Consumer<? super T> consumer) {
297 >        if (bufferSize <= 0L)
298 >            throw new IllegalArgumentException("bufferSize must be positive");
299          if (publisher == null || consumer == null)
300              throw new NullPointerException();
301          CompletableFuture<Void> status = new CompletableFuture<>();
302          publisher.subscribe(new ConsumeSubscriber<T>(
303 <                                requestSize, status, consumer));
303 >                                bufferSize, status, consumer));
304          return status;
305      }
306  
307      /**
308       * Equivalent to {@link #consume(long, Publisher, Consumer)}
309 <     * with a request size of 64.
309 >     * with a buffer size of 64.
310       *
311       * @param <T> the published item type
312       * @param publisher the publisher
# Line 286 | Line 318 | public final class Flow {
318       */
319      public static <T> CompletableFuture<Void> consume(
320          Publisher<T> publisher, Consumer<? super T> consumer) {
321 <        return consume(DEFAULT_REQUEST_SIZE, publisher, consumer);
321 >        return consume(DEFAULT_BUFFER_SIZE, publisher, consumer);
322      }
323  
324      /**
# Line 296 | Line 328 | public final class Flow {
328      static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
329          final Function<? super Stream<T>, ? extends R> fn;
330          final ArrayList<T> items;
331 <        StreamSubscriber(long requestSize,
331 >        StreamSubscriber(long bufferSize,
332                           CompletableFuture<R> status,
333                           Function<? super Stream<T>, ? extends R> fn) {
334 <            super(requestSize, status);
334 >            super(bufferSize, status);
335              this.fn = fn;
336              this.items = new ArrayList<T>();
337          }
# Line 309 | Line 341 | public final class Flow {
341  
342      /**
343       * Creates and subscribes a Subscriber that applies the given
344 <     * stream operation to items, and uses the given requestSize for
344 >     * stream operation to items, and uses the given bufferSize for
345       * buffering. Returns a CompletableFuture that is completed
346       * normally with the result of this function when the publisher
347       * signals onComplete, or is completed exceptionally upon any
# Line 321 | Line 353 | public final class Flow {
353       *
354       * @param <T> the published item type
355       * @param <R> the result type of the stream function
356 <     * @param requestSize the request size for subscriptions
356 >     * @param bufferSize the request size for subscriptions
357       * @param publisher the publisher
358       * @param streamFunction the operation on elements
359       * @return a CompletableFuture that is completed normally with the
360       * result of the given function as result when the publisher signals
361       * onComplete, and exceptionally upon any error
362       * @throws NullPointerException if publisher or function are null
363 <     * @throws IllegalArgumentException if requestSize not positive
363 >     * @throws IllegalArgumentException if bufferSize not positive
364       */
365      public static <T,R> CompletableFuture<R> stream(
366 <        long requestSize, Publisher<T> publisher,
366 >        long bufferSize, Publisher<T> publisher,
367          Function<? super Stream<T>, ? extends R> streamFunction) {
368 <        if (requestSize <= 0L)
369 <            throw new IllegalArgumentException("requestSize must be positive");
368 >        if (bufferSize <= 0L)
369 >            throw new IllegalArgumentException("bufferSize must be positive");
370          if (publisher == null || streamFunction == null)
371              throw new NullPointerException();
372          CompletableFuture<R> status = new CompletableFuture<>();
373          publisher.subscribe(new StreamSubscriber<T,R>(
374 <                                requestSize, status, streamFunction));
374 >                                bufferSize, status, streamFunction));
375          return status;
376      }
377  
378      /**
379       * Equivalent to {@link #stream(long, Publisher, Function)}
380 <     * with a request size of 64.
380 >     * with a buffer size of 64.
381       *
382       * @param <T> the published item type
383       * @param <R> the result type of the stream function
# Line 359 | Line 391 | public final class Flow {
391      public static <T,R> CompletableFuture<R> stream(
392          Publisher<T> publisher,
393          Function<? super Stream<T>,? extends R> streamFunction) {
394 <        return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
394 >        return stream(DEFAULT_BUFFER_SIZE, publisher, streamFunction);
395      }
396 +
397   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines