ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.76
Committed: Mon Nov 27 15:15:32 2017 UTC (6 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.75: +11 -7 lines
Log Message:
Touch-ups

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.function.BiConsumer;
16 import java.util.function.BiPredicate;
17 import java.util.function.Consumer;
18 import static java.util.concurrent.Flow.Publisher;
19 import static java.util.concurrent.Flow.Subscriber;
20 import static java.util.concurrent.Flow.Subscription;
21
22 /**
23 * A {@link Flow.Publisher} that asynchronously issues submitted
24 * (non-null) items to current subscribers until it is closed. Each
25 * current subscriber receives newly submitted items in the same order
26 * unless drops or exceptions are encountered. Using a
27 * SubmissionPublisher allows item generators to act as compliant <a
28 * href="http://www.reactive-streams.org/"> reactive-streams</a>
29 * Publishers relying on drop handling and/or blocking for flow
30 * control.
31 *
32 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
33 * constructor for delivery to subscribers. The best choice of
34 * Executor depends on expected usage. If the generator(s) of
35 * submitted items run in separate threads, and the number of
36 * subscribers can be estimated, consider using a {@link
37 * Executors#newFixedThreadPool}. Otherwise consider using the
38 * default, normally the {@link ForkJoinPool#commonPool}.
39 *
40 * <p>Buffering allows producers and consumers to transiently operate
41 * at different rates. Each subscriber uses an independent buffer.
42 * Buffers are created upon first use and expanded as needed up to the
43 * given maximum. (The enforced capacity may be rounded up to the
44 * nearest power of two and/or bounded by the largest value supported
45 * by this implementation.) Invocations of {@link
46 * Flow.Subscription#request(long) request} do not directly result in
47 * buffer expansion, but risk saturation if unfilled requests exceed
48 * the maximum capacity. The default value of {@link
49 * Flow#defaultBufferSize()} may provide a useful starting point for
50 * choosing a capacity based on expected rates, resources, and usages.
51 *
52 * <p>A single SubmissionPublisher may be shared among multiple
53 * sources. Actions in a source thread prior to publishing an item or
54 * issuing a signal <a href="package-summary.html#MemoryVisibility">
55 * <i>happen-before</i></a> actions subsequent to the corresponding
56 * access by each subscriber. But reported estimates of lag and demand
57 * are designed for use in monitoring, not for synchronization
58 * control, and may reflect stale or inaccurate views of progress.
59 *
60 * <p>Publication methods support different policies about what to do
61 * when buffers are saturated. Method {@link #submit(Object) submit}
62 * blocks until resources are available. This is simplest, but least
63 * responsive. The {@code offer} methods may drop items (either
64 * immediately or with bounded timeout), but provide an opportunity to
65 * interpose a handler and then retry.
66 *
67 * <p>If any Subscriber method throws an exception, its subscription
68 * is cancelled. If a handler is supplied as a constructor argument,
69 * it is invoked before cancellation upon an exception in method
70 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
71 * {@link Flow.Subscriber#onSubscribe onSubscribe},
72 * {@link Flow.Subscriber#onError(Throwable) onError} and
73 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
74 * handled before cancellation. If the supplied Executor throws
75 * {@link RejectedExecutionException} (or any other RuntimeException
76 * or Error) when attempting to execute a task, or a drop handler
77 * throws an exception when processing a dropped item, then the
78 * exception is rethrown. In these cases, not all subscribers will
79 * have been issued the published item. It is usually good practice to
80 * {@link #closeExceptionally closeExceptionally} in these cases.
81 *
82 * <p>Method {@link #consume(Consumer)} simplifies support for a
83 * common case in which the only action of a subscriber is to request
84 * and process all items using a supplied function.
85 *
86 * <p>This class may also serve as a convenient base for subclasses
87 * that generate items, and use the methods in this class to publish
88 * them. For example here is a class that periodically publishes the
89 * items generated from a supplier. (In practice you might add methods
90 * to independently start and stop generation, to share Executors
91 * among publishers, and so on, or use a SubmissionPublisher as a
92 * component rather than a superclass.)
93 *
94 * <pre> {@code
95 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
96 * final ScheduledFuture<?> periodicTask;
97 * final ScheduledExecutorService scheduler;
98 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
99 * Supplier<? extends T> supplier,
100 * long period, TimeUnit unit) {
101 * super(executor, maxBufferCapacity);
102 * scheduler = new ScheduledThreadPoolExecutor(1);
103 * periodicTask = scheduler.scheduleAtFixedRate(
104 * () -> submit(supplier.get()), 0, period, unit);
105 * }
106 * public void close() {
107 * periodicTask.cancel(false);
108 * scheduler.shutdown();
109 * super.close();
110 * }
111 * }}</pre>
112 *
113 * <p>Here is an example of a {@link Flow.Processor} implementation.
114 * It uses single-step requests to its publisher for simplicity of
115 * illustration. A more adaptive version could monitor flow using the
116 * lag estimate returned from {@code submit}, along with other utility
117 * methods.
118 *
119 * <pre> {@code
120 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
121 * implements Flow.Processor<S,T> {
122 * final Function<? super S, ? extends T> function;
123 * Flow.Subscription subscription;
124 * TransformProcessor(Executor executor, int maxBufferCapacity,
125 * Function<? super S, ? extends T> function) {
126 * super(executor, maxBufferCapacity);
127 * this.function = function;
128 * }
129 * public void onSubscribe(Flow.Subscription subscription) {
130 * (this.subscription = subscription).request(1);
131 * }
132 * public void onNext(S item) {
133 * subscription.request(1);
134 * submit(function.apply(item));
135 * }
136 * public void onError(Throwable ex) { closeExceptionally(ex); }
137 * public void onComplete() { close(); }
138 * }}</pre>
139 *
140 * @param <T> the published item type
141 * @author Doug Lea
142 * @since 9
143 */
144 public class SubmissionPublisher<T> implements Publisher<T>,
145 AutoCloseable {
146 /*
147 * Most mechanics are handled by BufferedSubscription. This class
148 * mainly tracks subscribers and ensures sequentiality, by using
149 * built-in synchronization locks across public methods. Using
150 * built-in locks works well in the most typical case in which
151 * only one thread submits items. We extend this idea in
152 * submission methods by detecting single-ownership to reduce
153 * producer-consumer synchronization strength.
154 */
155
156 /** The largest possible power of two array size. */
157 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
158
159 /**
160 * Initial buffer capacity used when maxBufferCapacity is
161 * greater. Must be a power of two.
162 */
163 static final int INITIAL_CAPACITY = 32;
164
165 /** Round capacity to power of 2, at most limit. */
166 static final int roundCapacity(int cap) {
167 int n = cap - 1;
168 n |= n >>> 1;
169 n |= n >>> 2;
170 n |= n >>> 4;
171 n |= n >>> 8;
172 n |= n >>> 16;
173 return (n <= 0) ? 1 : // at least 1
174 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
175 }
176
177 // default Executor setup; nearly the same as CompletableFuture
178
179 /**
180 * Default executor -- ForkJoinPool.commonPool() unless it cannot
181 * support parallelism.
182 */
183 private static final Executor ASYNC_POOL =
184 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
185 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
186
187 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
188 private static final class ThreadPerTaskExecutor implements Executor {
189 ThreadPerTaskExecutor() {} // prevent access constructor creation
190 public void execute(Runnable r) { new Thread(r).start(); }
191 }
192
193 /**
194 * Clients (BufferedSubscriptions) are maintained in a linked list
195 * (via their "next" fields). This works well for publish loops.
196 * It requires O(n) traversal to check for duplicate subscribers,
197 * but we expect that subscribing is much less common than
198 * publishing. Unsubscribing occurs only during traversal loops,
199 * when BufferedSubscription methods return negative values
200 * signifying that they have been closed. To reduce
201 * head-of-line blocking, submit and offer methods first call
202 * BufferedSubscription.offer on each subscriber, and place
203 * saturated ones in retries list (using nextRetry field), and
204 * retry, possibly blocking or dropping.
205 */
206 BufferedSubscription<T> clients;
207
208 /** Run status, updated only within locks */
209 volatile boolean closed;
210 /** Set true on first call to subscribe, to initialize possible owner */
211 boolean subscribed;
212 /** The first caller thread to subscribe, or null if thread ever changed */
213 Thread owner;
214 /** If non-null, the exception in closeExceptionally */
215 volatile Throwable closedException;
216
217 // Parameters for constructing BufferedSubscriptions
218 final Executor executor;
219 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
220 final int maxBufferCapacity;
221
222 /**
223 * Creates a new SubmissionPublisher using the given Executor for
224 * async delivery to subscribers, with the given maximum buffer size
225 * for each subscriber, and, if non-null, the given handler invoked
226 * when any Subscriber throws an exception in method {@link
227 * Flow.Subscriber#onNext(Object) onNext}.
228 *
229 * @param executor the executor to use for async delivery,
230 * supporting creation of at least one independent thread
231 * @param maxBufferCapacity the maximum capacity for each
232 * subscriber's buffer (the enforced capacity may be rounded up to
233 * the nearest power of two and/or bounded by the largest value
234 * supported by this implementation; method {@link #getMaxBufferCapacity}
235 * returns the actual value)
236 * @param handler if non-null, procedure to invoke upon exception
237 * thrown in method {@code onNext}
238 * @throws NullPointerException if executor is null
239 * @throws IllegalArgumentException if maxBufferCapacity not
240 * positive
241 */
242 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
243 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
244 if (executor == null)
245 throw new NullPointerException();
246 if (maxBufferCapacity <= 0)
247 throw new IllegalArgumentException("capacity must be positive");
248 this.executor = executor;
249 this.onNextHandler = handler;
250 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
251 }
252
253 /**
254 * Creates a new SubmissionPublisher using the given Executor for
255 * async delivery to subscribers, with the given maximum buffer size
256 * for each subscriber, and no handler for Subscriber exceptions in
257 * method {@link Flow.Subscriber#onNext(Object) onNext}.
258 *
259 * @param executor the executor to use for async delivery,
260 * supporting creation of at least one independent thread
261 * @param maxBufferCapacity the maximum capacity for each
262 * subscriber's buffer (the enforced capacity may be rounded up to
263 * the nearest power of two and/or bounded by the largest value
264 * supported by this implementation; method {@link #getMaxBufferCapacity}
265 * returns the actual value)
266 * @throws NullPointerException if executor is null
267 * @throws IllegalArgumentException if maxBufferCapacity not
268 * positive
269 */
270 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
271 this(executor, maxBufferCapacity, null);
272 }
273
274 /**
275 * Creates a new SubmissionPublisher using the {@link
276 * ForkJoinPool#commonPool()} for async delivery to subscribers
277 * (unless it does not support a parallelism level of at least two,
278 * in which case, a new Thread is created to run each task), with
279 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
280 * handler for Subscriber exceptions in method {@link
281 * Flow.Subscriber#onNext(Object) onNext}.
282 */
283 public SubmissionPublisher() {
284 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
285 }
286
287 /**
288 * Adds the given Subscriber unless already subscribed. If already
289 * subscribed, the Subscriber's {@link
290 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
291 * the existing subscription with an {@link IllegalStateException}.
292 * Otherwise, upon success, the Subscriber's {@link
293 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
294 * asynchronously with a new {@link Flow.Subscription}. If {@link
295 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
296 * subscription is cancelled. Otherwise, if this SubmissionPublisher
297 * was closed exceptionally, then the subscriber's {@link
298 * Flow.Subscriber#onError onError} method is invoked with the
299 * corresponding exception, or if closed without exception, the
300 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
301 * method is invoked. Subscribers may enable receiving items by
302 * invoking the {@link Flow.Subscription#request(long) request}
303 * method of the new Subscription, and may unsubscribe by invoking
304 * its {@link Flow.Subscription#cancel() cancel} method.
305 *
306 * @param subscriber the subscriber
307 * @throws NullPointerException if subscriber is null
308 */
309 public void subscribe(Subscriber<? super T> subscriber) {
310 if (subscriber == null) throw new NullPointerException();
311 int max = maxBufferCapacity; // allocate initial array
312 Object[] array = new Object[max < INITIAL_CAPACITY ?
313 max : INITIAL_CAPACITY];
314 BufferedSubscription<T> subscription =
315 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
316 array, max);
317 synchronized (this) {
318 if (!subscribed) {
319 subscribed = true;
320 owner = Thread.currentThread();
321 }
322 for (BufferedSubscription<T> b = clients, pred = null;;) {
323 if (b == null) {
324 Throwable ex;
325 subscription.onSubscribe();
326 if ((ex = closedException) != null)
327 subscription.onError(ex);
328 else if (closed)
329 subscription.onComplete();
330 else if (pred == null)
331 clients = subscription;
332 else
333 pred.next = subscription;
334 break;
335 }
336 BufferedSubscription<T> next = b.next;
337 if (b.isClosed()) { // remove
338 b.next = null; // detach
339 if (pred == null)
340 clients = next;
341 else
342 pred.next = next;
343 }
344 else if (subscriber.equals(b.subscriber)) {
345 b.onError(new IllegalStateException("Duplicate subscribe"));
346 break;
347 }
348 else
349 pred = b;
350 b = next;
351 }
352 }
353 }
354
355 /**
356 * Common implementation for all three forms of submit and offer.
357 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
358 */
359 private int doOffer(T item, long nanos,
360 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
361 if (item == null) throw new NullPointerException();
362 int lag = 0;
363 boolean complete, unowned;
364 synchronized (this) {
365 Thread t = Thread.currentThread(), o;
366 BufferedSubscription<T> b = clients;
367 if ((unowned = ((o = owner) != t)) && o != null)
368 owner = null; // disable bias
369 if (b == null)
370 complete = closed;
371 else {
372 complete = false;
373 boolean cleanMe = false;
374 BufferedSubscription<T> retries = null, rtail = null, next;
375 do {
376 next = b.next;
377 int stat = b.offer(item, unowned);
378 if (stat == 0) { // saturated; add to retry list
379 b.nextRetry = null; // avoid garbage on exceptions
380 if (rtail == null)
381 retries = b;
382 else
383 rtail.nextRetry = b;
384 rtail = b;
385 }
386 else if (stat < 0) // closed
387 cleanMe = true; // remove later
388 else if (stat > lag)
389 lag = stat;
390 } while ((b = next) != null);
391
392 if (retries != null || cleanMe)
393 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
394 }
395 }
396 if (complete)
397 throw new IllegalStateException("Closed");
398 else
399 return lag;
400 }
401
402 /**
403 * Helps, (timed) waits for, and/or drops buffers on list; returns
404 * lag or negative drops (for use in offer).
405 */
406 private int retryOffer(T item, long nanos,
407 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
408 BufferedSubscription<T> retries, int lag,
409 boolean cleanMe) {
410 for (BufferedSubscription<T> r = retries; r != null;) {
411 BufferedSubscription<T> nextRetry = r.nextRetry;
412 r.nextRetry = null;
413 if (nanos > 0L)
414 r.awaitSpace(nanos);
415 int stat = r.retryOffer(item);
416 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
417 stat = r.retryOffer(item);
418 if (stat == 0)
419 lag = (lag >= 0) ? -1 : lag - 1;
420 else if (stat < 0)
421 cleanMe = true;
422 else if (lag >= 0 && stat > lag)
423 lag = stat;
424 r = nextRetry;
425 }
426 if (cleanMe)
427 cleanAndCount();
428 return lag;
429 }
430
431 /**
432 * Returns current list count after removing closed subscribers.
433 * Call only while holding lock. Used mainly by retryOffer for
434 * cleanup.
435 */
436 private int cleanAndCount() {
437 int count = 0;
438 BufferedSubscription<T> pred = null, next;
439 for (BufferedSubscription<T> b = clients; b != null; b = next) {
440 next = b.next;
441 if (b.isClosed()) {
442 b.next = null;
443 if (pred == null)
444 clients = next;
445 else
446 pred.next = next;
447 }
448 else {
449 pred = b;
450 ++count;
451 }
452 }
453 return count;
454 }
455
456 /**
457 * Publishes the given item to each current subscriber by
458 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
459 * onNext} method, blocking uninterruptibly while resources for any
460 * subscriber are unavailable. This method returns an estimate of
461 * the maximum lag (number of items submitted but not yet consumed)
462 * among all current subscribers. This value is at least one
463 * (accounting for this submitted item) if there are any
464 * subscribers, else zero.
465 *
466 * <p>If the Executor for this publisher throws a
467 * RejectedExecutionException (or any other RuntimeException or
468 * Error) when attempting to asynchronously notify subscribers,
469 * then this exception is rethrown, in which case not all
470 * subscribers will have been issued this item.
471 *
472 * @param item the (non-null) item to publish
473 * @return the estimated maximum lag among subscribers
474 * @throws IllegalStateException if closed
475 * @throws NullPointerException if item is null
476 * @throws RejectedExecutionException if thrown by Executor
477 */
478 public int submit(T item) {
479 return doOffer(item, Long.MAX_VALUE, null);
480 }
481
482 /**
483 * Publishes the given item, if possible, to each current subscriber
484 * by asynchronously invoking its {@link
485 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
486 * dropped by one or more subscribers if resource limits are
487 * exceeded, in which case the given handler (if non-null) is
488 * invoked, and if it returns true, retried once. Other calls to
489 * methods in this class by other threads are blocked while the
490 * handler is invoked. Unless recovery is assured, options are
491 * usually limited to logging the error and/or issuing an {@link
492 * Flow.Subscriber#onError(Throwable) onError} signal to the
493 * subscriber.
494 *
495 * <p>This method returns a status indicator: If negative, it
496 * represents the (negative) number of drops (failed attempts to
497 * issue the item to a subscriber). Otherwise it is an estimate of
498 * the maximum lag (number of items submitted but not yet
499 * consumed) among all current subscribers. This value is at least
500 * one (accounting for this submitted item) if there are any
501 * subscribers, else zero.
502 *
503 * <p>If the Executor for this publisher throws a
504 * RejectedExecutionException (or any other RuntimeException or
505 * Error) when attempting to asynchronously notify subscribers, or
506 * the drop handler throws an exception when processing a dropped
507 * item, then this exception is rethrown.
508 *
509 * @param item the (non-null) item to publish
510 * @param onDrop if non-null, the handler invoked upon a drop to a
511 * subscriber, with arguments of the subscriber and item; if it
512 * returns true, an offer is re-attempted (once)
513 * @return if negative, the (negative) number of drops; otherwise
514 * an estimate of maximum lag
515 * @throws IllegalStateException if closed
516 * @throws NullPointerException if item is null
517 * @throws RejectedExecutionException if thrown by Executor
518 */
519 public int offer(T item,
520 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
521 return doOffer(item, 0L, onDrop);
522 }
523
524 /**
525 * Publishes the given item, if possible, to each current subscriber
526 * by asynchronously invoking its {@link
527 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
528 * resources for any subscription are unavailable, up to the
529 * specified timeout or until the caller thread is interrupted, at
530 * which point the given handler (if non-null) is invoked, and if it
531 * returns true, retried once. (The drop handler may distinguish
532 * timeouts from interrupts by checking whether the current thread
533 * is interrupted.) Other calls to methods in this class by other
534 * threads are blocked while the handler is invoked. Unless
535 * recovery is assured, options are usually limited to logging the
536 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
537 * onError} signal to the subscriber.
538 *
539 * <p>This method returns a status indicator: If negative, it
540 * represents the (negative) number of drops (failed attempts to
541 * issue the item to a subscriber). Otherwise it is an estimate of
542 * the maximum lag (number of items submitted but not yet
543 * consumed) among all current subscribers. This value is at least
544 * one (accounting for this submitted item) if there are any
545 * subscribers, else zero.
546 *
547 * <p>If the Executor for this publisher throws a
548 * RejectedExecutionException (or any other RuntimeException or
549 * Error) when attempting to asynchronously notify subscribers, or
550 * the drop handler throws an exception when processing a dropped
551 * item, then this exception is rethrown.
552 *
553 * @param item the (non-null) item to publish
554 * @param timeout how long to wait for resources for any subscriber
555 * before giving up, in units of {@code unit}
556 * @param unit a {@code TimeUnit} determining how to interpret the
557 * {@code timeout} parameter
558 * @param onDrop if non-null, the handler invoked upon a drop to a
559 * subscriber, with arguments of the subscriber and item; if it
560 * returns true, an offer is re-attempted (once)
561 * @return if negative, the (negative) number of drops; otherwise
562 * an estimate of maximum lag
563 * @throws IllegalStateException if closed
564 * @throws NullPointerException if item is null
565 * @throws RejectedExecutionException if thrown by Executor
566 */
567 public int offer(T item, long timeout, TimeUnit unit,
568 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
569 long nanos = unit.toNanos(timeout);
570 // distinguishes from untimed (only wrt interrupt policy)
571 if (nanos == Long.MAX_VALUE) --nanos;
572 return doOffer(item, nanos, onDrop);
573 }
574
575 /**
576 * Unless already closed, issues {@link
577 * Flow.Subscriber#onComplete() onComplete} signals to current
578 * subscribers, and disallows subsequent attempts to publish.
579 * Upon return, this method does <em>NOT</em> guarantee that all
580 * subscribers have yet completed.
581 */
582 public void close() {
583 if (!closed) {
584 BufferedSubscription<T> b;
585 synchronized (this) {
586 // no need to re-check closed here
587 b = clients;
588 clients = null;
589 owner = null;
590 closed = true;
591 }
592 while (b != null) {
593 BufferedSubscription<T> next = b.next;
594 b.next = null;
595 b.onComplete();
596 b = next;
597 }
598 }
599 }
600
601 /**
602 * Unless already closed, issues {@link
603 * Flow.Subscriber#onError(Throwable) onError} signals to current
604 * subscribers with the given error, and disallows subsequent
605 * attempts to publish. Future subscribers also receive the given
606 * error. Upon return, this method does <em>NOT</em> guarantee
607 * that all subscribers have yet completed.
608 *
609 * @param error the {@code onError} argument sent to subscribers
610 * @throws NullPointerException if error is null
611 */
612 public void closeExceptionally(Throwable error) {
613 if (error == null)
614 throw new NullPointerException();
615 if (!closed) {
616 BufferedSubscription<T> b;
617 synchronized (this) {
618 b = clients;
619 if (!closed) { // don't clobber racing close
620 closedException = error;
621 clients = null;
622 owner = null;
623 closed = true;
624 }
625 }
626 while (b != null) {
627 BufferedSubscription<T> next = b.next;
628 b.next = null;
629 b.onError(error);
630 b = next;
631 }
632 }
633 }
634
635 /**
636 * Returns true if this publisher is not accepting submissions.
637 *
638 * @return true if closed
639 */
640 public boolean isClosed() {
641 return closed;
642 }
643
644 /**
645 * Returns the exception associated with {@link
646 * #closeExceptionally(Throwable) closeExceptionally}, or null if
647 * not closed or if closed normally.
648 *
649 * @return the exception, or null if none
650 */
651 public Throwable getClosedException() {
652 return closedException;
653 }
654
655 /**
656 * Returns true if this publisher has any subscribers.
657 *
658 * @return true if this publisher has any subscribers
659 */
660 public boolean hasSubscribers() {
661 boolean nonEmpty = false;
662 synchronized (this) {
663 for (BufferedSubscription<T> b = clients; b != null;) {
664 BufferedSubscription<T> next = b.next;
665 if (b.isClosed()) {
666 b.next = null;
667 b = clients = next;
668 }
669 else {
670 nonEmpty = true;
671 break;
672 }
673 }
674 }
675 return nonEmpty;
676 }
677
678 /**
679 * Returns the number of current subscribers.
680 *
681 * @return the number of current subscribers
682 */
683 public int getNumberOfSubscribers() {
684 synchronized (this) {
685 return cleanAndCount();
686 }
687 }
688
689 /**
690 * Returns the Executor used for asynchronous delivery.
691 *
692 * @return the Executor used for asynchronous delivery
693 */
694 public Executor getExecutor() {
695 return executor;
696 }
697
698 /**
699 * Returns the maximum per-subscriber buffer capacity.
700 *
701 * @return the maximum per-subscriber buffer capacity
702 */
703 public int getMaxBufferCapacity() {
704 return maxBufferCapacity;
705 }
706
707 /**
708 * Returns a list of current subscribers for monitoring and
709 * tracking purposes, not for invoking {@link Flow.Subscriber}
710 * methods on the subscribers.
711 *
712 * @return list of current subscribers
713 */
714 public List<Subscriber<? super T>> getSubscribers() {
715 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
716 synchronized (this) {
717 BufferedSubscription<T> pred = null, next;
718 for (BufferedSubscription<T> b = clients; b != null; b = next) {
719 next = b.next;
720 if (b.isClosed()) {
721 b.next = null;
722 if (pred == null)
723 clients = next;
724 else
725 pred.next = next;
726 }
727 else
728 subs.add(b.subscriber);
729 }
730 }
731 return subs;
732 }
733
734 /**
735 * Returns true if the given Subscriber is currently subscribed.
736 *
737 * @param subscriber the subscriber
738 * @return true if currently subscribed
739 * @throws NullPointerException if subscriber is null
740 */
741 public boolean isSubscribed(Subscriber<? super T> subscriber) {
742 if (subscriber == null) throw new NullPointerException();
743 if (!closed) {
744 synchronized (this) {
745 BufferedSubscription<T> pred = null, next;
746 for (BufferedSubscription<T> b = clients; b != null; b = next) {
747 next = b.next;
748 if (b.isClosed()) {
749 b.next = null;
750 if (pred == null)
751 clients = next;
752 else
753 pred.next = next;
754 }
755 else if (subscriber.equals(b.subscriber))
756 return true;
757 else
758 pred = b;
759 }
760 }
761 }
762 return false;
763 }
764
765 /**
766 * Returns an estimate of the minimum number of items requested
767 * (via {@link Flow.Subscription#request(long) request}) but not
768 * yet produced, among all current subscribers.
769 *
770 * @return the estimate, or zero if no subscribers
771 */
772 public long estimateMinimumDemand() {
773 long min = Long.MAX_VALUE;
774 boolean nonEmpty = false;
775 synchronized (this) {
776 BufferedSubscription<T> pred = null, next;
777 for (BufferedSubscription<T> b = clients; b != null; b = next) {
778 int n; long d;
779 next = b.next;
780 if ((n = b.estimateLag()) < 0) {
781 b.next = null;
782 if (pred == null)
783 clients = next;
784 else
785 pred.next = next;
786 }
787 else {
788 if ((d = b.demand - n) < min)
789 min = d;
790 nonEmpty = true;
791 pred = b;
792 }
793 }
794 }
795 return nonEmpty ? min : 0;
796 }
797
798 /**
799 * Returns an estimate of the maximum number of items produced but
800 * not yet consumed among all current subscribers.
801 *
802 * @return the estimate
803 */
804 public int estimateMaximumLag() {
805 int max = 0;
806 synchronized (this) {
807 BufferedSubscription<T> pred = null, next;
808 for (BufferedSubscription<T> b = clients; b != null; b = next) {
809 int n;
810 next = b.next;
811 if ((n = b.estimateLag()) < 0) {
812 b.next = null;
813 if (pred == null)
814 clients = next;
815 else
816 pred.next = next;
817 }
818 else {
819 if (n > max)
820 max = n;
821 pred = b;
822 }
823 }
824 }
825 return max;
826 }
827
828 /**
829 * Processes all published items using the given Consumer function.
830 * Returns a CompletableFuture that is completed normally when this
831 * publisher signals {@link Flow.Subscriber#onComplete()
832 * onComplete}, or completed exceptionally upon any error, or an
833 * exception is thrown by the Consumer, or the returned
834 * CompletableFuture is cancelled, in which case no further items
835 * are processed.
836 *
837 * @param consumer the function applied to each onNext item
838 * @return a CompletableFuture that is completed normally
839 * when the publisher signals onComplete, and exceptionally
840 * upon any error or cancellation
841 * @throws NullPointerException if consumer is null
842 */
843 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
844 if (consumer == null)
845 throw new NullPointerException();
846 CompletableFuture<Void> status = new CompletableFuture<>();
847 subscribe(new ConsumerSubscriber<T>(status, consumer));
848 return status;
849 }
850
851 /** Subscriber for method consume */
852 static final class ConsumerSubscriber<T> implements Subscriber<T> {
853 final CompletableFuture<Void> status;
854 final Consumer<? super T> consumer;
855 Subscription subscription;
856 ConsumerSubscriber(CompletableFuture<Void> status,
857 Consumer<? super T> consumer) {
858 this.status = status; this.consumer = consumer;
859 }
860 public final void onSubscribe(Subscription subscription) {
861 this.subscription = subscription;
862 status.whenComplete((v, e) -> subscription.cancel());
863 if (!status.isDone())
864 subscription.request(Long.MAX_VALUE);
865 }
866 public final void onError(Throwable ex) {
867 status.completeExceptionally(ex);
868 }
869 public final void onComplete() {
870 status.complete(null);
871 }
872 public final void onNext(T item) {
873 try {
874 consumer.accept(item);
875 } catch (Throwable ex) {
876 subscription.cancel();
877 status.completeExceptionally(ex);
878 }
879 }
880 }
881
882 /**
883 * A task for consuming buffer items and signals, created and
884 * executed whenever they become available. A task consumes as
885 * many items/signals as possible before terminating, at which
886 * point another task is created when needed. The dual Runnable
887 * and ForkJoinTask declaration saves overhead when executed by
888 * ForkJoinPools, without impacting other kinds of Executors.
889 */
890 @SuppressWarnings("serial")
891 static final class ConsumerTask<T> extends ForkJoinTask<Void>
892 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
893 final BufferedSubscription<T> consumer;
894 ConsumerTask(BufferedSubscription<T> consumer) {
895 this.consumer = consumer;
896 }
897 public final Void getRawResult() { return null; }
898 public final void setRawResult(Void v) {}
899 public final boolean exec() { consumer.consume(); return false; }
900 public final void run() { consumer.consume(); }
901 }
902
903 /**
904 * A resizable array-based ring buffer with integrated control to
905 * start a consumer task whenever items are available. The buffer
906 * algorithm is specialized for the case of at most one concurrent
907 * producer and consumer, and power of two buffer sizes. It relies
908 * primarily on atomic operations (CAS or getAndSet) at the next
909 * array slot to put or take an element, at the "tail" and "head"
910 * indices written only by the producer and consumer respectively.
911 *
912 * We ensure internally that there is at most one active consumer
913 * task at any given time. The publisher guarantees a single
914 * producer via its lock. Sync among producers and consumers
915 * relies on volatile fields "ctl", "demand", and "waiting" (along
916 * with element access). Other variables are accessed in plain
917 * mode, relying on outer ordering and exclusion, and/or enclosing
918 * them within other volatile accesses. Some atomic operations are
919 * avoided by tracking single threaded ownership by producers (in
920 * the style of biased locking).
921 *
922 * Execution control and protocol state are managed using field
923 * "ctl". Methods to subscribe, close, request, and cancel set
924 * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
925 * and ensure that a task is running. (The corresponding consumer
926 * side actions are in method consume.) To avoid starting a new
927 * task on each action, ctl also includes a keep-alive bit
928 * (ACTIVE) that is refreshed if needed on producer actions.
929 * (Maintaining agreement about keep-alives requires most atomic
930 * updates to be full SC/Volatile strength, which is still much
931 * cheaper than using one task per item.) Error signals
932 * additionally null out items and/or fields to reduce termination
933 * latency. The cancel() method is supported by treating as ERROR
934 * but suppressing onError signal.
935 *
936 * Support for blocking also exploits the fact that there is only
937 * one possible waiter. ManagedBlocker-compatible control fields
938 * are placed in this class itself rather than in wait-nodes.
939 * Blocking control relies on the "waiting" and "waiter"
940 * fields. Producers set them before trying to block. Signalling
941 * unparks and clears fields. If the producer and/or consumer are
942 * using a ForkJoinPool, the producer attempts to help run
943 * consumer tasks via ForkJoinPool.helpAsyncBlocker before
944 * blocking.
945 *
946 * Usages of this class may encounter any of several forms of
947 * memory contention. We try to ameliorate across them without
948 * unduly impacting footprints in low-contention usages where it
949 * isn't needed. Buffer arrays start out small and grow only as
950 * needed. The class uses @Contended and heuristic field
951 * declaration ordering to reduce false-sharing memory contention
952 * across instances of BufferedSubscription (as in, multiple
953 * subscribers per publisher). We additionally segregate some
954 * fields that would otherwise nearly always encounter cache line
955 * contention among producers and consumers. To reduce contention
956 * across time (vs space), consumers only periodically update
957 * other fields (see method takeItems), at the expense of possibly
958 * staler reporting of lags and demand (bounded at 12.5% == 1/8
959 * capacity) and possibly more atomic operations.
960 *
961 * Other forms of imbalance and slowdowns can occur during startup
962 * when producer and consumer methods are compiled and/or memory
963 * is allocated at different rates. This is ameliorated by
964 * artificially subdividing some consumer methods, including
965 * isolation of all subscriber callbacks. This code also includes
966 * typical power-of-two array screening idioms to avoid compilers
967 * generating traps, along with the usual SSA-based inline
968 * assignment coding style. Also, all methods and fields have
969 * default visibility to simplify usage by callers.
970 */
971 @SuppressWarnings("serial")
972 @jdk.internal.vm.annotation.Contended
973 static final class BufferedSubscription<T>
974 implements Subscription, ForkJoinPool.ManagedBlocker {
975 // Order-sensitive field declarations
976 long timeout; // Long.MAX_VALUE if untimed wait
977 int head; // next position to take
978 int tail; // next position to put
979 final int maxCapacity; // max buffer size
980 volatile int ctl; // atomic run state flags
981 Object[] array; // buffer
982 final Subscriber<? super T> subscriber;
983 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
984 Executor executor; // null on error
985 Thread waiter; // blocked producer thread
986 Throwable pendingError; // holds until onError issued
987 BufferedSubscription<T> next; // used only by publisher
988 BufferedSubscription<T> nextRetry; // used only by publisher
989
990 @jdk.internal.vm.annotation.Contended("c") // segregate
991 volatile long demand; // # unfilled requests
992 @jdk.internal.vm.annotation.Contended("c")
993 volatile int waiting; // nonzero if producer blocked
994
995 // ctl bit values
996 static final int CLOSED = 0x01; // if set, other bits ignored
997 static final int ACTIVE = 0x02; // keep-alive for consumer task
998 static final int REQS = 0x04; // (possibly) nonzero demand
999 static final int ERROR = 0x08; // issues onError when noticed
1000 static final int COMPLETE = 0x10; // issues onComplete when done
1001 static final int RUN = 0x20; // task is or will be running
1002 static final int OPEN = 0x40; // true after subscribe
1003
1004 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1005
1006 BufferedSubscription(Subscriber<? super T> subscriber,
1007 Executor executor,
1008 BiConsumer<? super Subscriber<? super T>,
1009 ? super Throwable> onNextHandler,
1010 Object[] array,
1011 int maxBufferCapacity) {
1012 this.subscriber = subscriber;
1013 this.executor = executor;
1014 this.onNextHandler = onNextHandler;
1015 this.array = array;
1016 this.maxCapacity = maxBufferCapacity;
1017 }
1018
1019 // Wrappers for some VarHandle methods
1020
1021 final boolean weakCasCtl(int cmp, int val) {
1022 return CTL.weakCompareAndSet(this, cmp, val);
1023 }
1024
1025 final int getAndBitwiseOrCtl(int bits) {
1026 return (int)CTL.getAndBitwiseOr(this, bits);
1027 }
1028
1029 final long subtractDemand(int k) {
1030 long n = (long)(-k);
1031 return n + (long)DEMAND.getAndAdd(this, n);
1032 }
1033
1034 final boolean casDemand(long cmp, long val) {
1035 return DEMAND.compareAndSet(this, cmp, val);
1036 }
1037
1038 // Utilities used by SubmissionPublisher
1039
1040 /**
1041 * Returns true if closed (consumer task may still be running).
1042 */
1043 final boolean isClosed() {
1044 return (ctl & CLOSED) != 0;
1045 }
1046
1047 /**
1048 * Returns estimated number of buffered items, or negative if
1049 * closed.
1050 */
1051 final int estimateLag() {
1052 int c = ctl, n = tail - head;
1053 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1054 }
1055
1056 // Methods for submitting items
1057
1058 /**
1059 * Tries to add item and start consumer task if necessary.
1060 * @return negative if closed, 0 if saturated, else estimated lag
1061 */
1062 final int offer(T item, boolean unowned) {
1063 Object[] a;
1064 int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1065 int t = tail, i = t & (cap - 1), n = t + 1 - head;
1066 if (cap > 0) {
1067 boolean added;
1068 if (n >= cap && cap < maxCapacity) // resize
1069 added = growAndoffer(item, a, t);
1070 else if (n >= cap || unowned) // need volatile CAS
1071 added = QA.compareAndSet(a, i, null, item);
1072 else { // can use release mode
1073 QA.setRelease(a, i, item);
1074 added = true;
1075 }
1076 if (added) {
1077 tail = t + 1;
1078 stat = n;
1079 }
1080 }
1081 return startOnOffer(stat);
1082 }
1083
1084 /**
1085 * Tries to expand buffer and add item, returning true on
1086 * success. Currently fails only if out of memory.
1087 */
1088 final boolean growAndoffer(T item, Object[] a, int t) {
1089 int cap = 0, newCap = 0;
1090 Object[] newArray = null;
1091 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1092 try {
1093 newArray = new Object[newCap];
1094 } catch (OutOfMemoryError ex) {
1095 }
1096 }
1097 if (newArray == null)
1098 return false;
1099 else { // take and move items
1100 int newMask = newCap - 1;
1101 newArray[t-- & newMask] = item;
1102 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1103 Object x = QA.getAndSet(a, t & mask, null);
1104 if (x == null)
1105 break; // already consumed
1106 else
1107 newArray[t-- & newMask] = x;
1108 }
1109 array = newArray;
1110 VarHandle.releaseFence(); // release array and slots
1111 return true;
1112 }
1113 }
1114
1115 /**
1116 * Version of offer for retries (no resize or bias)
1117 */
1118 final int retryOffer(T item) {
1119 Object[] a;
1120 int stat = 0, t = tail, h = head, cap;
1121 if ((a = array) != null && (cap = a.length) > 0 &&
1122 QA.compareAndSet(a, (cap - 1) & t, null, item))
1123 stat = (tail = t + 1) - h;
1124 return startOnOffer(stat);
1125 }
1126
1127 /**
1128 * Tries to start consumer task after offer.
1129 * @return negative if now closed, else argument
1130 */
1131 final int startOnOffer(int stat) {
1132 int c; // start or keep alive if requests exist and not active
1133 if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1134 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1135 tryStart();
1136 else if ((c & CLOSED) != 0)
1137 stat = -1;
1138 return stat;
1139 }
1140
1141 /**
1142 * Tries to start consumer task. Sets error state on failure.
1143 */
1144 final void tryStart() {
1145 try {
1146 Executor e;
1147 ConsumerTask<T> task = new ConsumerTask<T>(this);
1148 if ((e = executor) != null) // skip if disabled on error
1149 e.execute(task);
1150 } catch (RuntimeException | Error ex) {
1151 getAndBitwiseOrCtl(ERROR | CLOSED);
1152 throw ex;
1153 }
1154 }
1155
1156 // Signals to consumer tasks
1157
1158 /**
1159 * Sets the given control bits, starting task if not running or closed.
1160 * @param bits state bits, assumed to include RUN but not CLOSED
1161 */
1162 final void startOnSignal(int bits) {
1163 if ((ctl & bits) != bits &&
1164 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1165 tryStart();
1166 }
1167
1168 final void onSubscribe() {
1169 startOnSignal(RUN | ACTIVE);
1170 }
1171
1172 final void onComplete() {
1173 startOnSignal(RUN | ACTIVE | COMPLETE);
1174 }
1175
1176 final void onError(Throwable ex) {
1177 int c; Object[] a; // to null out buffer on async error
1178 if (ex != null)
1179 pendingError = ex; // races are OK
1180 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1181 if ((c & RUN) == 0)
1182 tryStart();
1183 else if ((a = array) != null)
1184 Arrays.fill(a, null);
1185 }
1186 }
1187
1188 public final void cancel() {
1189 onError(null);
1190 }
1191
1192 public final void request(long n) {
1193 if (n > 0L) {
1194 for (;;) {
1195 long p = demand, d = p + n; // saturate
1196 if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1197 break;
1198 }
1199 startOnSignal(RUN | ACTIVE | REQS);
1200 }
1201 else
1202 onError(new IllegalArgumentException(
1203 "non-positive subscription request"));
1204 }
1205
1206 // Consumer task actions
1207
1208 /**
1209 * Consumer loop, called from ConsumerTask, or indirectly when
1210 * helping during submit.
1211 */
1212 final void consume() {
1213 Subscriber<? super T> s;
1214 if ((s = subscriber) != null) { // hoist checks
1215 subscribeOnOpen(s);
1216 long d = demand;
1217 for (int h = head, t = tail;;) {
1218 int c, taken; boolean empty;
1219 if (((c = ctl) & ERROR) != 0) {
1220 closeOnError(s, null);
1221 break;
1222 }
1223 else if ((taken = takeItems(s, d, h)) > 0) {
1224 head = h += taken;
1225 d = subtractDemand(taken);
1226 }
1227 else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1228 closeOnComplete(s); // end of stream
1229 break;
1230 }
1231 else if ((d = demand) == 0L && (c & REQS) != 0)
1232 weakCasCtl(c, c & ~REQS); // exhausted demand
1233 else if (d != 0L && (c & REQS) == 0)
1234 weakCasCtl(c, c | REQS); // new demand
1235 else if (t == (t = tail) && (empty || d == 0L)) {
1236 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1237 if (weakCasCtl(c, c & ~bit) && bit == RUN)
1238 break; // un-keep-alive or exit
1239 }
1240 }
1241 }
1242 }
1243
1244 /**
1245 * Consumes some items until unavailable or bound or error.
1246 *
1247 * @param s subscriber
1248 * @param d current demand
1249 * @param h current head
1250 * @return number taken
1251 */
1252 final int takeItems(Subscriber<? super T> s, long d, int h) {
1253 Object[] a;
1254 int k = 0, cap;
1255 if ((a = array) != null && (cap = a.length) > 0) {
1256 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1257 int n = (d < (long)b) ? (int)d : b;
1258 for (; k < n; ++h, ++k) {
1259 Object x = QA.getAndSet(a, h & m, null);
1260 if (waiting != 0)
1261 signalWaiter();
1262 if (x == null)
1263 break;
1264 else if (!consumeNext(s, x))
1265 break;
1266 }
1267 }
1268 return k;
1269 }
1270
1271 final boolean consumeNext(Subscriber<? super T> s, Object x) {
1272 try {
1273 @SuppressWarnings("unchecked") T y = (T) x;
1274 if (s != null)
1275 s.onNext(y);
1276 return true;
1277 } catch (Throwable ex) {
1278 handleOnNext(s, ex);
1279 return false;
1280 }
1281 }
1282
1283 /**
1284 * Processes exception in Subscriber.onNext.
1285 */
1286 final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1287 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1288 try {
1289 if ((h = onNextHandler) != null)
1290 h.accept(s, ex);
1291 } catch (Throwable ignore) {
1292 }
1293 closeOnError(s, ex);
1294 }
1295
1296 /**
1297 * Issues subscriber.onSubscribe if this is first signal.
1298 */
1299 final void subscribeOnOpen(Subscriber<? super T> s) {
1300 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1301 consumeSubscribe(s);
1302 }
1303
1304 final void consumeSubscribe(Subscriber<? super T> s) {
1305 try {
1306 if (s != null) // ignore if disabled
1307 s.onSubscribe(this);
1308 } catch (Throwable ex) {
1309 closeOnError(s, ex);
1310 }
1311 }
1312
1313 /**
1314 * Issues subscriber.onComplete unless already closed.
1315 */
1316 final void closeOnComplete(Subscriber<? super T> s) {
1317 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1318 consumeComplete(s);
1319 }
1320
1321 final void consumeComplete(Subscriber<? super T> s) {
1322 try {
1323 if (s != null)
1324 s.onComplete();
1325 } catch (Throwable ignore) {
1326 }
1327 }
1328
1329 /**
1330 * Issues subscriber.onError, and unblocks producer if needed.
1331 */
1332 final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1333 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1334 if (ex == null)
1335 ex = pendingError;
1336 pendingError = null; // detach
1337 executor = null; // suppress racing start calls
1338 signalWaiter();
1339 consumeError(s, ex);
1340 }
1341 }
1342
1343 final void consumeError(Subscriber<? super T> s, Throwable ex) {
1344 try {
1345 if (ex != null && s != null)
1346 s.onError(ex);
1347 } catch (Throwable ignore) {
1348 }
1349 }
1350
1351 // Blocking support
1352
1353 /**
1354 * Unblocks waiting producer.
1355 */
1356 final void signalWaiter() {
1357 Thread w;
1358 waiting = 0;
1359 if ((w = waiter) != null)
1360 LockSupport.unpark(w);
1361 }
1362
1363 /**
1364 * Returns true if closed or space available.
1365 * For ManagedBlocker.
1366 */
1367 public final boolean isReleasable() {
1368 Object[] a; int cap;
1369 return ((ctl & CLOSED) != 0 ||
1370 ((a = array) != null && (cap = a.length) > 0 &&
1371 QA.getAcquire(a, (cap - 1) & tail) == null));
1372 }
1373
1374 /**
1375 * Helps or blocks until timeout, closed, or space available.
1376 */
1377 final void awaitSpace(long nanos) {
1378 if (!isReleasable()) {
1379 ForkJoinPool.helpAsyncBlocker(executor, this);
1380 if (!isReleasable()) {
1381 timeout = nanos;
1382 try {
1383 ForkJoinPool.managedBlock(this);
1384 } catch (InterruptedException ie) {
1385 timeout = INTERRUPTED;
1386 }
1387 if (timeout == INTERRUPTED)
1388 Thread.currentThread().interrupt();
1389 }
1390 }
1391 }
1392
1393 /**
1394 * Blocks until closed, space available or timeout.
1395 * For ManagedBlocker.
1396 */
1397 public final boolean block() {
1398 long nanos = timeout;
1399 boolean timed = (nanos < Long.MAX_VALUE);
1400 long deadline = timed ? System.nanoTime() + nanos : 0L;
1401 while (!isReleasable()) {
1402 if (Thread.interrupted()) {
1403 timeout = INTERRUPTED;
1404 if (timed)
1405 break;
1406 }
1407 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1408 break;
1409 else if (waiter == null)
1410 waiter = Thread.currentThread();
1411 else if (waiting == 0)
1412 waiting = 1;
1413 else if (timed)
1414 LockSupport.parkNanos(this, nanos);
1415 else
1416 LockSupport.park(this);
1417 }
1418 waiter = null;
1419 waiting = 0;
1420 return true;
1421 }
1422
1423 // VarHandle mechanics
1424 static final VarHandle CTL;
1425 static final VarHandle DEMAND;
1426 static final VarHandle QA;
1427
1428 static {
1429 try {
1430 MethodHandles.Lookup l = MethodHandles.lookup();
1431 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1432 int.class);
1433 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1434 long.class);
1435 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1436 } catch (ReflectiveOperationException e) {
1437 throw new Error(e);
1438 }
1439
1440 // Reduce the risk of rare disastrous classloading in first call to
1441 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1442 Class<?> ensureLoaded = LockSupport.class;
1443 }
1444 }
1445 }