ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.16 by dl, Mon Jan 19 18:06:23 2015 UTC vs.
Revision 1.17 by dl, Mon Jan 19 18:15:09 2015 UTC

# Line 40 | Line 40 | import java.util.stream.Stream;
40   *
41   * <pre> {@code
42   * class OneShotPublisher implements Publisher<Boolean> {
43 < *   final Executor executor = ForkJoinPool.commonPool(); // daemon-based
43 > *   final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
44   *   boolean subscribed = false; // true after first subscribe
45   *   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
46   *     if (subscribed)
# Line 52 | Line 52 | import java.util.stream.Stream;
52   *   }
53   *   static class OneShotSubscription implements Subscription {
54   *     final Subscriber<? super Boolean> subscriber;
55 < *     final Executor executor;
55 > *     final ExecutorService executor;
56 > *     Future<?> future; // to allow cancellation
57   *     boolean completed = false;
58   *     OneShotSubscription(Subscriber<? super Boolean> subscriber,
59 < *                         Executor executor) {
59 > *                         ExecutorService executor) {
60   *       this.subscriber = subscriber;
61   *       this.executor = executor;
62   *     }
63   *     public synchronized void request(long n) {
64   *       if (n != 0 && !completed) {
65   *         completed = true;
66 < *         executor.execute(() -> {
66 > *         future = executor.submit(() -> {
67   *           if (n < 0)
68   *             subscriber.onError(new IllegalArgumentException());
69   *           else {
# Line 72 | Line 73 | import java.util.stream.Stream;
73   *       }
74   *     }
75   *     public synchronized void cancel() {
76 < *       completed = true; // ineffective if task already started
76 > *       completed = true;
77 > *       if (future != null) future.cancel(false);
78   *     }
79   *   }
80   * }}</pre>

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines