33 |
|
* Subscriber}. It publishes items to the subscriber asynchronously, |
34 |
|
* normally using an {@link Executor}. For example, here is a very |
35 |
|
* simple publisher that only issues (when requested) a single {@code |
36 |
< |
* TRUE} item to any subscriber. Because each subscriber receives |
37 |
< |
* only the same single item, this class does not use buffering and |
38 |
< |
* ordering control required in most implementations (for example |
39 |
< |
* {@link SubmissionPublisher}). |
36 |
> |
* TRUE} item to a single subscriber. Because the subscriber receives |
37 |
> |
* only a single item, this class does not use buffering and ordering |
38 |
> |
* control required in most implementations (for example {@link |
39 |
> |
* SubmissionPublisher}). |
40 |
|
* |
41 |
|
* <pre> {@code |
42 |
|
* class OneShotPublisher implements Publisher<Boolean> { |
43 |
|
* final Executor executor = ForkJoinPool.commonPool(); // daemon-based |
44 |
< |
* public void subscribe(Subscriber<? super Boolean> subscriber) { |
44 |
> |
* boolean subscribed = false; // true after first subscribe |
45 |
> |
* public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { |
46 |
> |
* if (subscribed) |
47 |
> |
* subscriber.onError(new IllegalStateException()); // only one allowed |
48 |
> |
* else { |
49 |
> |
* subscribed = true; |
50 |
|
* subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); |
51 |
+ |
* } |
52 |
|
* } |
53 |
|
* static class OneShotSubscription implements Subscription { |
54 |
|
* final Subscriber<? super Boolean> subscriber; |
55 |
|
* final Executor executor; |
56 |
< |
* boolean completed; |
56 |
> |
* boolean completed = false; |
57 |
|
* OneShotSubscription(Subscriber<? super Boolean> subscriber, |
58 |
|
* Executor executor) { |
59 |
|
* this.subscriber = subscriber; |
60 |
|
* this.executor = executor; |
61 |
|
* } |
62 |
|
* public synchronized void request(long n) { |
63 |
< |
* if (n > 0 && !completed) { |
63 |
> |
* if (n != 0 && !completed) { |
64 |
|
* completed = true; |
65 |
|
* executor.execute(() -> { |
66 |
< |
* subscriber.onNext(Boolean.TRUE); |
67 |
< |
* subscriber.onComplete(); |
68 |
< |
* }); |
66 |
> |
* if (n < 0) |
67 |
> |
* subscriber.onError(new IllegalArgumentException()); |
68 |
> |
* else { |
69 |
> |
* subscriber.onNext(Boolean.TRUE); |
70 |
> |
* subscriber.onComplete(); |
71 |
> |
* }}); |
72 |
|
* } |
64 |
– |
* else if (n < 0) { |
65 |
– |
* completed = true; |
66 |
– |
* subscriber.onError(new IllegalArgumentException()); |
67 |
– |
} |
73 |
|
* } |
74 |
|
* public synchronized void cancel() { |
75 |
|
* completed = true; // ineffective if task already started |