ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.22 by dl, Sun Jan 25 15:19:40 2015 UTC vs.
Revision 1.23 by dl, Fri Jul 24 18:51:27 2015 UTC

# Line 40 | Line 40 | import java.util.stream.Stream;
40   *
41   * <pre> {@code
42   * class OneShotPublisher implements Publisher<Boolean> {
43 < *   final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
44 < *   boolean subscribed = false; // true after first subscribe
43 > *   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
44 > *   private boolean subscribed = false; // true after first subscribe
45   *   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
46   *     if (subscribed)
47   *        subscriber.onError(new IllegalStateException()); // only one allowed
# Line 51 | Line 51 | import java.util.stream.Stream;
51   *     }
52   *   }
53   *   static class OneShotSubscription implements Subscription {
54 < *     final Subscriber<? super Boolean> subscriber;
55 < *     final ExecutorService executor;
56 < *     Future<?> future; // to allow cancellation
57 < *     boolean completed = false;
54 > *     private final Subscriber<? super Boolean> subscriber;
55 > *     private final ExecutorService executor;
56 > *     private Future<?> future; // to allow cancellation
57 > *     private boolean completed = false;
58   *     OneShotSubscription(Subscriber<? super Boolean> subscriber,
59   *                         ExecutorService executor) {
60   *       this.subscriber = subscriber;
# Line 108 | Line 108 | import java.util.stream.Stream;
108   *     this.consumer = consumer;
109   *   }
110   *   public void onSubscribe(Subscription subscription) {
111 < *     (this.subscription = subscription).request(bufferSize);
111 > *     long initialRequestSize = bufferSize;
112   *     count = bufferSize - bufferSize / 2; // re-request when half consumed
113 + *     (this.subscription = subscription).request(initialRequestSize);
114   *   }
115   *   public void onNext(T item) {
116   *     if (--count <= 0)
# Line 308 | Line 309 | public final class Flow {
309              this.requestSize = bufferSize;
310          }
311          public final void onSubscribe(Subscription subscription) {
312 <            (this.subscription = subscription).request(requestSize);
313 <            count = requestSize -= (requestSize >>> 1);
312 >            long rs = requestSize;
313 >            count = requestSize -= (rs >>> 1);
314 >            (this.subscription = subscription).request(rs);
315          }
316          public final void onError(Throwable ex) {
317              status.completeExceptionally(ex);
# Line 320 | Line 322 | public final class Flow {
322                      subscription.request(count = requestSize);
323                  accept(item);
324              } catch (Throwable ex) {
325 +                subscription.cancel();
326                  status.completeExceptionally(ex);
327              }
328          }
# Line 345 | Line 348 | public final class Flow {
348       * signals {@code onComplete}, or completed exceptionally upon any
349       * error, including an exception thrown by the Consumer (in which
350       * case the subscription is cancelled if not already terminated).
351 +     * Other attempts to cancel the CompletableFuture need not
352 +     * cause the computation to terminate.
353       *
354       * @param <T> the published item type
355       * @param bufferSize the request size for subscriptions
# Line 409 | Line 414 | public final class Flow {
414       * buffering. Returns a CompletableFuture that is completed
415       * normally with the result of this function when the publisher
416       * signals {@code onComplete}, or is completed exceptionally upon
417 <     * any error.
417 >     * any error. Other attempts to cancel the CompletableFuture need
418 >     * not cause the computation to terminate.
419       *
420       * <p><b>Preliminary release note:</b> Currently, this method
421       * collects all items before executing the stream

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines