40 |
|
* |
41 |
|
* <pre> {@code |
42 |
|
* class OneShotPublisher implements Publisher<Boolean> { |
43 |
< |
* final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based |
44 |
< |
* boolean subscribed = false; // true after first subscribe |
43 |
> |
* private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based |
44 |
> |
* private boolean subscribed = false; // true after first subscribe |
45 |
|
* public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { |
46 |
|
* if (subscribed) |
47 |
|
* subscriber.onError(new IllegalStateException()); // only one allowed |
51 |
|
* } |
52 |
|
* } |
53 |
|
* static class OneShotSubscription implements Subscription { |
54 |
< |
* final Subscriber<? super Boolean> subscriber; |
55 |
< |
* final ExecutorService executor; |
56 |
< |
* Future<?> future; // to allow cancellation |
57 |
< |
* boolean completed = false; |
54 |
> |
* private final Subscriber<? super Boolean> subscriber; |
55 |
> |
* private final ExecutorService executor; |
56 |
> |
* private Future<?> future; // to allow cancellation |
57 |
> |
* private boolean completed = false; |
58 |
|
* OneShotSubscription(Subscriber<? super Boolean> subscriber, |
59 |
|
* ExecutorService executor) { |
60 |
|
* this.subscriber = subscriber; |
108 |
|
* this.consumer = consumer; |
109 |
|
* } |
110 |
|
* public void onSubscribe(Subscription subscription) { |
111 |
< |
* (this.subscription = subscription).request(bufferSize); |
111 |
> |
* long initialRequestSize = bufferSize; |
112 |
|
* count = bufferSize - bufferSize / 2; // re-request when half consumed |
113 |
+ |
* (this.subscription = subscription).request(initialRequestSize); |
114 |
|
* } |
115 |
|
* public void onNext(T item) { |
116 |
|
* if (--count <= 0) |
309 |
|
this.requestSize = bufferSize; |
310 |
|
} |
311 |
|
public final void onSubscribe(Subscription subscription) { |
312 |
< |
(this.subscription = subscription).request(requestSize); |
313 |
< |
count = requestSize -= (requestSize >>> 1); |
312 |
> |
long rs = requestSize; |
313 |
> |
count = requestSize -= (rs >>> 1); |
314 |
> |
(this.subscription = subscription).request(rs); |
315 |
|
} |
316 |
|
public final void onError(Throwable ex) { |
317 |
|
status.completeExceptionally(ex); |
322 |
|
subscription.request(count = requestSize); |
323 |
|
accept(item); |
324 |
|
} catch (Throwable ex) { |
325 |
+ |
subscription.cancel(); |
326 |
|
status.completeExceptionally(ex); |
327 |
|
} |
328 |
|
} |
348 |
|
* signals {@code onComplete}, or completed exceptionally upon any |
349 |
|
* error, including an exception thrown by the Consumer (in which |
350 |
|
* case the subscription is cancelled if not already terminated). |
351 |
+ |
* Other attempts to cancel the CompletableFuture need not |
352 |
+ |
* cause the computation to terminate. |
353 |
|
* |
354 |
|
* @param <T> the published item type |
355 |
|
* @param bufferSize the request size for subscriptions |
414 |
|
* buffering. Returns a CompletableFuture that is completed |
415 |
|
* normally with the result of this function when the publisher |
416 |
|
* signals {@code onComplete}, or is completed exceptionally upon |
417 |
< |
* any error. |
417 |
> |
* any error. Other attempts to cancel the CompletableFuture need |
418 |
> |
* not cause the computation to terminate. |
419 |
|
* |
420 |
|
* <p><b>Preliminary release note:</b> Currently, this method |
421 |
|
* collects all items before executing the stream |