40 |
|
* |
41 |
|
* <pre> {@code |
42 |
|
* class OneShotPublisher implements Publisher<Boolean> { |
43 |
< |
* final Executor executor = ForkJoinPool.commonPool(); // daemon-based |
43 |
> |
* final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based |
44 |
|
* boolean subscribed = false; // true after first subscribe |
45 |
|
* public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { |
46 |
|
* if (subscribed) |
52 |
|
* } |
53 |
|
* static class OneShotSubscription implements Subscription { |
54 |
|
* final Subscriber<? super Boolean> subscriber; |
55 |
< |
* final Executor executor; |
55 |
> |
* final ExecutorService executor; |
56 |
> |
* Future<?> future; // to allow cancellation |
57 |
|
* boolean completed = false; |
58 |
|
* OneShotSubscription(Subscriber<? super Boolean> subscriber, |
59 |
< |
* Executor executor) { |
59 |
> |
* ExecutorService executor) { |
60 |
|
* this.subscriber = subscriber; |
61 |
|
* this.executor = executor; |
62 |
|
* } |
63 |
|
* public synchronized void request(long n) { |
64 |
|
* if (n != 0 && !completed) { |
65 |
|
* completed = true; |
66 |
< |
* executor.execute(() -> { |
66 |
> |
* future = executor.submit(() -> { |
67 |
|
* if (n < 0) |
68 |
|
* subscriber.onError(new IllegalArgumentException()); |
69 |
|
* else { |
73 |
|
* } |
74 |
|
* } |
75 |
|
* public synchronized void cancel() { |
76 |
< |
* completed = true; // ineffective if task already started |
76 |
> |
* completed = true; |
77 |
> |
* if (future != null) future.cancel(false); |
78 |
|
* } |
79 |
|
* } |
80 |
|
* }}</pre> |