ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.15 by dl, Mon Jan 19 15:41:22 2015 UTC vs.
Revision 1.16 by dl, Mon Jan 19 18:06:23 2015 UTC

# Line 33 | Line 33 | import java.util.stream.Stream;
33   * Subscriber}. It publishes items to the subscriber asynchronously,
34   * normally using an {@link Executor}.  For example, here is a very
35   * simple publisher that only issues (when requested) a single {@code
36 < * TRUE} item to any subscriber.  Because each subscriber receives
37 < * only the same single item, this class does not use buffering and
38 < * ordering control required in most implementations (for example
39 < * {@link SubmissionPublisher}).
36 > * TRUE} item to a single subscriber.  Because the subscriber receives
37 > * only a single item, this class does not use buffering and ordering
38 > * control required in most implementations (for example {@link
39 > * SubmissionPublisher}).
40   *
41   * <pre> {@code
42   * class OneShotPublisher implements Publisher<Boolean> {
43   *   final Executor executor = ForkJoinPool.commonPool(); // daemon-based
44 < *   public void subscribe(Subscriber<? super Boolean> subscriber) {
44 > *   boolean subscribed = false; // true after first subscribe
45 > *   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
46 > *     if (subscribed)
47 > *        subscriber.onError(new IllegalStateException()); // only one allowed
48 > *     else {
49 > *       subscribed = true;
50   *       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
51 + *     }
52   *   }
53   *   static class OneShotSubscription implements Subscription {
54   *     final Subscriber<? super Boolean> subscriber;
55   *     final Executor executor;
56 < *     boolean completed;
56 > *     boolean completed = false;
57   *     OneShotSubscription(Subscriber<? super Boolean> subscriber,
58   *                         Executor executor) {
59   *       this.subscriber = subscriber;
60   *       this.executor = executor;
61   *     }
62   *     public synchronized void request(long n) {
63 < *       if (n > 0 && !completed) {
63 > *       if (n != 0 && !completed) {
64   *         completed = true;
65   *         executor.execute(() -> {
66 < *                   subscriber.onNext(Boolean.TRUE);
67 < *                   subscriber.onComplete();
68 < *               });
66 > *           if (n < 0)
67 > *             subscriber.onError(new IllegalArgumentException());
68 > *           else {
69 > *             subscriber.onNext(Boolean.TRUE);
70 > *             subscriber.onComplete();
71 > *           }});
72   *       }
64 *       else if (n < 0) {
65 *         completed = true;
66 *         subscriber.onError(new IllegalArgumentException());
67         }
73   *     }
74   *     public synchronized void cancel() {
75   *       completed = true; // ineffective if task already started

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines