ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.83
Committed: Fri Jan 17 18:43:11 2020 UTC (4 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.82: +1 -1 lines
Log Message:
typos

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.function.BiConsumer;
17 import java.util.function.BiPredicate;
18 import java.util.function.Consumer;
19 import static java.util.concurrent.Flow.Publisher;
20 import static java.util.concurrent.Flow.Subscriber;
21 import static java.util.concurrent.Flow.Subscription;
22
23 /**
24 * A {@link Flow.Publisher} that asynchronously issues submitted
25 * (non-null) items to current subscribers until it is closed. Each
26 * current subscriber receives newly submitted items in the same order
27 * unless drops or exceptions are encountered. Using a
28 * SubmissionPublisher allows item generators to act as compliant <a
29 * href="http://www.reactive-streams.org/"> reactive-streams</a>
30 * Publishers relying on drop handling and/or blocking for flow
31 * control.
32 *
33 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
34 * constructor for delivery to subscribers. The best choice of
35 * Executor depends on expected usage. If the generator(s) of
36 * submitted items run in separate threads, and the number of
37 * subscribers can be estimated, consider using a {@link
38 * Executors#newFixedThreadPool}. Otherwise consider using the
39 * default, normally the {@link ForkJoinPool#commonPool}.
40 *
41 * <p>Buffering allows producers and consumers to transiently operate
42 * at different rates. Each subscriber uses an independent buffer.
43 * Buffers are created upon first use and expanded as needed up to the
44 * given maximum. (The enforced capacity may be rounded up to the
45 * nearest power of two and/or bounded by the largest value supported
46 * by this implementation.) Invocations of {@link
47 * Flow.Subscription#request(long) request} do not directly result in
48 * buffer expansion, but risk saturation if unfilled requests exceed
49 * the maximum capacity. The default value of {@link
50 * Flow#defaultBufferSize()} may provide a useful starting point for
51 * choosing a capacity based on expected rates, resources, and usages.
52 *
53 * <p>A single SubmissionPublisher may be shared among multiple
54 * sources. Actions in a source thread prior to publishing an item or
55 * issuing a signal <a href="package-summary.html#MemoryVisibility">
56 * <i>happen-before</i></a> actions subsequent to the corresponding
57 * access by each subscriber. But reported estimates of lag and demand
58 * are designed for use in monitoring, not for synchronization
59 * control, and may reflect stale or inaccurate views of progress.
60 *
61 * <p>Publication methods support different policies about what to do
62 * when buffers are saturated. Method {@link #submit(Object) submit}
63 * blocks until resources are available. This is simplest, but least
64 * responsive. The {@code offer} methods may drop items (either
65 * immediately or with bounded timeout), but provide an opportunity to
66 * interpose a handler and then retry.
67 *
68 * <p>If any Subscriber method throws an exception, its subscription
69 * is cancelled. If a handler is supplied as a constructor argument,
70 * it is invoked before cancellation upon an exception in method
71 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
72 * {@link Flow.Subscriber#onSubscribe onSubscribe},
73 * {@link Flow.Subscriber#onError(Throwable) onError} and
74 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
75 * handled before cancellation. If the supplied Executor throws
76 * {@link RejectedExecutionException} (or any other RuntimeException
77 * or Error) when attempting to execute a task, or a drop handler
78 * throws an exception when processing a dropped item, then the
79 * exception is rethrown. In these cases, not all subscribers will
80 * have been issued the published item. It is usually good practice to
81 * {@link #closeExceptionally closeExceptionally} in these cases.
82 *
83 * <p>Method {@link #consume(Consumer)} simplifies support for a
84 * common case in which the only action of a subscriber is to request
85 * and process all items using a supplied function.
86 *
87 * <p>This class may also serve as a convenient base for subclasses
88 * that generate items, and use the methods in this class to publish
89 * them. For example here is a class that periodically publishes the
90 * items generated from a supplier. (In practice you might add methods
91 * to independently start and stop generation, to share Executors
92 * among publishers, and so on, or use a SubmissionPublisher as a
93 * component rather than a superclass.)
94 *
95 * <pre> {@code
96 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
97 * final ScheduledFuture<?> periodicTask;
98 * final ScheduledExecutorService scheduler;
99 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
100 * Supplier<? extends T> supplier,
101 * long period, TimeUnit unit) {
102 * super(executor, maxBufferCapacity);
103 * scheduler = new ScheduledThreadPoolExecutor(1);
104 * periodicTask = scheduler.scheduleAtFixedRate(
105 * () -> submit(supplier.get()), 0, period, unit);
106 * }
107 * public void close() {
108 * periodicTask.cancel(false);
109 * scheduler.shutdown();
110 * super.close();
111 * }
112 * }}</pre>
113 *
114 * <p>Here is an example of a {@link Flow.Processor} implementation.
115 * It uses single-step requests to its publisher for simplicity of
116 * illustration. A more adaptive version could monitor flow using the
117 * lag estimate returned from {@code submit}, along with other utility
118 * methods.
119 *
120 * <pre> {@code
121 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
122 * implements Flow.Processor<S,T> {
123 * final Function<? super S, ? extends T> function;
124 * Flow.Subscription subscription;
125 * TransformProcessor(Executor executor, int maxBufferCapacity,
126 * Function<? super S, ? extends T> function) {
127 * super(executor, maxBufferCapacity);
128 * this.function = function;
129 * }
130 * public void onSubscribe(Flow.Subscription subscription) {
131 * (this.subscription = subscription).request(1);
132 * }
133 * public void onNext(S item) {
134 * subscription.request(1);
135 * submit(function.apply(item));
136 * }
137 * public void onError(Throwable ex) { closeExceptionally(ex); }
138 * public void onComplete() { close(); }
139 * }}</pre>
140 *
141 * @param <T> the published item type
142 * @author Doug Lea
143 * @since 9
144 */
145 public class SubmissionPublisher<T> implements Publisher<T>,
146 AutoCloseable {
147 /*
148 * Most mechanics are handled by BufferedSubscription. This class
149 * mainly tracks subscribers and ensures sequentiality, by using
150 * locks across public methods, to ensure thread-safety in the
151 * presence of multiple sources and maintain acquire-release
152 * ordering around user operations. However, we also track whether
153 * there is only a single source, and if so streamline some buffer
154 * operations by avoiding some atomics.
155 */
156
157 /** The largest possible power of two array size. */
158 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
159
160 /**
161 * Initial buffer capacity used when maxBufferCapacity is
162 * greater. Must be a power of two.
163 */
164 static final int INITIAL_CAPACITY = 32;
165
166 /** Round capacity to power of 2, at most limit. */
167 static final int roundCapacity(int cap) {
168 int n = cap - 1;
169 n |= n >>> 1;
170 n |= n >>> 2;
171 n |= n >>> 4;
172 n |= n >>> 8;
173 n |= n >>> 16;
174 return (n <= 0) ? 1 : // at least 1
175 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
176 }
177
178 // default Executor setup; nearly the same as CompletableFuture
179
180 /**
181 * Default executor -- ForkJoinPool.commonPool() unless it cannot
182 * support parallelism.
183 */
184 private static final Executor ASYNC_POOL =
185 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
186 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
187
188 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
189 private static final class ThreadPerTaskExecutor implements Executor {
190 ThreadPerTaskExecutor() {} // prevent access constructor creation
191 public void execute(Runnable r) { new Thread(r).start(); }
192 }
193
194 /**
195 * Clients (BufferedSubscriptions) are maintained in a linked list
196 * (via their "next" fields). This works well for publish loops.
197 * It requires O(n) traversal to check for duplicate subscribers,
198 * but we expect that subscribing is much less common than
199 * publishing. Unsubscribing occurs only during traversal loops,
200 * when BufferedSubscription methods return negative values
201 * signifying that they have been closed. To reduce
202 * head-of-line blocking, submit and offer methods first call
203 * BufferedSubscription.offer on each subscriber, and place
204 * saturated ones in retries list (using nextRetry field), and
205 * retry, possibly blocking or dropping.
206 */
207 BufferedSubscription<T> clients;
208
209 /** Lock for exclusion across multiple sources */
210 final ReentrantLock lock;
211 /** Run status, updated only within locks */
212 volatile boolean closed;
213 /** Set true on first call to subscribe, to initialize possible owner */
214 boolean subscribed;
215 /** The first caller thread to subscribe, or null if thread ever changed */
216 Thread owner;
217 /** If non-null, the exception in closeExceptionally */
218 volatile Throwable closedException;
219
220 // Parameters for constructing BufferedSubscriptions
221 final Executor executor;
222 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
223 final int maxBufferCapacity;
224
225 /**
226 * Creates a new SubmissionPublisher using the given Executor for
227 * async delivery to subscribers, with the given maximum buffer size
228 * for each subscriber, and, if non-null, the given handler invoked
229 * when any Subscriber throws an exception in method {@link
230 * Flow.Subscriber#onNext(Object) onNext}.
231 *
232 * @param executor the executor to use for async delivery,
233 * supporting creation of at least one independent thread
234 * @param maxBufferCapacity the maximum capacity for each
235 * subscriber's buffer (the enforced capacity may be rounded up to
236 * the nearest power of two and/or bounded by the largest value
237 * supported by this implementation; method {@link #getMaxBufferCapacity}
238 * returns the actual value)
239 * @param handler if non-null, procedure to invoke upon exception
240 * thrown in method {@code onNext}
241 * @throws NullPointerException if executor is null
242 * @throws IllegalArgumentException if maxBufferCapacity not
243 * positive
244 */
245 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
246 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
247 if (executor == null)
248 throw new NullPointerException();
249 if (maxBufferCapacity <= 0)
250 throw new IllegalArgumentException("capacity must be positive");
251 this.lock = new ReentrantLock();
252 this.executor = executor;
253 this.onNextHandler = handler;
254 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
255 }
256
257 /**
258 * Creates a new SubmissionPublisher using the given Executor for
259 * async delivery to subscribers, with the given maximum buffer size
260 * for each subscriber, and no handler for Subscriber exceptions in
261 * method {@link Flow.Subscriber#onNext(Object) onNext}.
262 *
263 * @param executor the executor to use for async delivery,
264 * supporting creation of at least one independent thread
265 * @param maxBufferCapacity the maximum capacity for each
266 * subscriber's buffer (the enforced capacity may be rounded up to
267 * the nearest power of two and/or bounded by the largest value
268 * supported by this implementation; method {@link #getMaxBufferCapacity}
269 * returns the actual value)
270 * @throws NullPointerException if executor is null
271 * @throws IllegalArgumentException if maxBufferCapacity not
272 * positive
273 */
274 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
275 this(executor, maxBufferCapacity, null);
276 }
277
278 /**
279 * Creates a new SubmissionPublisher using the {@link
280 * ForkJoinPool#commonPool()} for async delivery to subscribers
281 * (unless it does not support a parallelism level of at least two,
282 * in which case, a new Thread is created to run each task), with
283 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
284 * handler for Subscriber exceptions in method {@link
285 * Flow.Subscriber#onNext(Object) onNext}.
286 */
287 public SubmissionPublisher() {
288 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
289 }
290
291 /**
292 * Adds the given Subscriber unless already subscribed. If already
293 * subscribed, the Subscriber's {@link
294 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
295 * the existing subscription with an {@link IllegalStateException}.
296 * Otherwise, upon success, the Subscriber's {@link
297 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
298 * asynchronously with a new {@link Flow.Subscription}. If {@link
299 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
300 * subscription is cancelled. Otherwise, if this SubmissionPublisher
301 * was closed exceptionally, then the subscriber's {@link
302 * Flow.Subscriber#onError onError} method is invoked with the
303 * corresponding exception, or if closed without exception, the
304 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
305 * method is invoked. Subscribers may enable receiving items by
306 * invoking the {@link Flow.Subscription#request(long) request}
307 * method of the new Subscription, and may unsubscribe by invoking
308 * its {@link Flow.Subscription#cancel() cancel} method.
309 *
310 * @param subscriber the subscriber
311 * @throws NullPointerException if subscriber is null
312 */
313 public void subscribe(Subscriber<? super T> subscriber) {
314 if (subscriber == null) throw new NullPointerException();
315 ReentrantLock lock = this.lock;
316 int max = maxBufferCapacity; // allocate initial array
317 Object[] array = new Object[max < INITIAL_CAPACITY ?
318 max : INITIAL_CAPACITY];
319 BufferedSubscription<T> subscription =
320 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
321 array, max);
322 lock.lock();
323 try {
324 if (!subscribed) {
325 subscribed = true;
326 owner = Thread.currentThread();
327 }
328 for (BufferedSubscription<T> b = clients, pred = null;;) {
329 if (b == null) {
330 Throwable ex;
331 subscription.onSubscribe();
332 if ((ex = closedException) != null)
333 subscription.onError(ex);
334 else if (closed)
335 subscription.onComplete();
336 else if (pred == null)
337 clients = subscription;
338 else
339 pred.next = subscription;
340 break;
341 }
342 BufferedSubscription<T> next = b.next;
343 if (b.isClosed()) { // remove
344 b.next = null; // detach
345 if (pred == null)
346 clients = next;
347 else
348 pred.next = next;
349 }
350 else if (subscriber.equals(b.subscriber)) {
351 b.onError(new IllegalStateException("Duplicate subscribe"));
352 break;
353 }
354 else
355 pred = b;
356 b = next;
357 }
358 } finally {
359 lock.unlock();
360 }
361 }
362
363 /**
364 * Common implementation for all three forms of submit and offer.
365 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
366 */
367 private int doOffer(T item, long nanos,
368 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
369 if (item == null) throw new NullPointerException();
370 int lag = 0;
371 boolean complete, unowned;
372 ReentrantLock lock = this.lock;
373 lock.lock();
374 try {
375 Thread t = Thread.currentThread(), o;
376 BufferedSubscription<T> b = clients;
377 if ((unowned = ((o = owner) != t)) && o != null)
378 owner = null; // disable bias
379 if (b == null)
380 complete = closed;
381 else {
382 complete = false;
383 boolean cleanMe = false;
384 BufferedSubscription<T> retries = null, rtail = null, next;
385 do {
386 next = b.next;
387 int stat = b.offer(item, unowned);
388 if (stat == 0) { // saturated; add to retry list
389 b.nextRetry = null; // avoid garbage on exceptions
390 if (rtail == null)
391 retries = b;
392 else
393 rtail.nextRetry = b;
394 rtail = b;
395 }
396 else if (stat < 0) // closed
397 cleanMe = true; // remove later
398 else if (stat > lag)
399 lag = stat;
400 } while ((b = next) != null);
401
402 if (retries != null || cleanMe)
403 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
404 }
405 } finally {
406 lock.unlock();
407 }
408 if (complete)
409 throw new IllegalStateException("Closed");
410 else
411 return lag;
412 }
413
414 /**
415 * Helps, (timed) waits for, and/or drops buffers on list; returns
416 * lag or negative drops (for use in offer).
417 */
418 private int retryOffer(T item, long nanos,
419 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
420 BufferedSubscription<T> retries, int lag,
421 boolean cleanMe) {
422 for (BufferedSubscription<T> r = retries; r != null;) {
423 BufferedSubscription<T> nextRetry = r.nextRetry;
424 r.nextRetry = null;
425 if (nanos > 0L)
426 r.awaitSpace(nanos);
427 int stat = r.retryOffer(item);
428 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
429 stat = r.retryOffer(item);
430 if (stat == 0)
431 lag = (lag >= 0) ? -1 : lag - 1;
432 else if (stat < 0)
433 cleanMe = true;
434 else if (lag >= 0 && stat > lag)
435 lag = stat;
436 r = nextRetry;
437 }
438 if (cleanMe)
439 cleanAndCount();
440 return lag;
441 }
442
443 /**
444 * Returns current list count after removing closed subscribers.
445 * Call only while holding lock. Used mainly by retryOffer for
446 * cleanup.
447 */
448 private int cleanAndCount() {
449 int count = 0;
450 BufferedSubscription<T> pred = null, next;
451 for (BufferedSubscription<T> b = clients; b != null; b = next) {
452 next = b.next;
453 if (b.isClosed()) {
454 b.next = null;
455 if (pred == null)
456 clients = next;
457 else
458 pred.next = next;
459 }
460 else {
461 pred = b;
462 ++count;
463 }
464 }
465 return count;
466 }
467
468 /**
469 * Publishes the given item to each current subscriber by
470 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
471 * onNext} method, blocking uninterruptibly while resources for any
472 * subscriber are unavailable. This method returns an estimate of
473 * the maximum lag (number of items submitted but not yet consumed)
474 * among all current subscribers. This value is at least one
475 * (accounting for this submitted item) if there are any
476 * subscribers, else zero.
477 *
478 * <p>If the Executor for this publisher throws a
479 * RejectedExecutionException (or any other RuntimeException or
480 * Error) when attempting to asynchronously notify subscribers,
481 * then this exception is rethrown, in which case not all
482 * subscribers will have been issued this item.
483 *
484 * @param item the (non-null) item to publish
485 * @return the estimated maximum lag among subscribers
486 * @throws IllegalStateException if closed
487 * @throws NullPointerException if item is null
488 * @throws RejectedExecutionException if thrown by Executor
489 */
490 public int submit(T item) {
491 return doOffer(item, Long.MAX_VALUE, null);
492 }
493
494 /**
495 * Publishes the given item, if possible, to each current subscriber
496 * by asynchronously invoking its {@link
497 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
498 * dropped by one or more subscribers if resource limits are
499 * exceeded, in which case the given handler (if non-null) is
500 * invoked, and if it returns true, retried once. Other calls to
501 * methods in this class by other threads are blocked while the
502 * handler is invoked. Unless recovery is assured, options are
503 * usually limited to logging the error and/or issuing an {@link
504 * Flow.Subscriber#onError(Throwable) onError} signal to the
505 * subscriber.
506 *
507 * <p>This method returns a status indicator: If negative, it
508 * represents the (negative) number of drops (failed attempts to
509 * issue the item to a subscriber). Otherwise it is an estimate of
510 * the maximum lag (number of items submitted but not yet
511 * consumed) among all current subscribers. This value is at least
512 * one (accounting for this submitted item) if there are any
513 * subscribers, else zero.
514 *
515 * <p>If the Executor for this publisher throws a
516 * RejectedExecutionException (or any other RuntimeException or
517 * Error) when attempting to asynchronously notify subscribers, or
518 * the drop handler throws an exception when processing a dropped
519 * item, then this exception is rethrown.
520 *
521 * @param item the (non-null) item to publish
522 * @param onDrop if non-null, the handler invoked upon a drop to a
523 * subscriber, with arguments of the subscriber and item; if it
524 * returns true, an offer is re-attempted (once)
525 * @return if negative, the (negative) number of drops; otherwise
526 * an estimate of maximum lag
527 * @throws IllegalStateException if closed
528 * @throws NullPointerException if item is null
529 * @throws RejectedExecutionException if thrown by Executor
530 */
531 public int offer(T item,
532 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
533 return doOffer(item, 0L, onDrop);
534 }
535
536 /**
537 * Publishes the given item, if possible, to each current subscriber
538 * by asynchronously invoking its {@link
539 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
540 * resources for any subscription are unavailable, up to the
541 * specified timeout or until the caller thread is interrupted, at
542 * which point the given handler (if non-null) is invoked, and if it
543 * returns true, retried once. (The drop handler may distinguish
544 * timeouts from interrupts by checking whether the current thread
545 * is interrupted.) Other calls to methods in this class by other
546 * threads are blocked while the handler is invoked. Unless
547 * recovery is assured, options are usually limited to logging the
548 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
549 * onError} signal to the subscriber.
550 *
551 * <p>This method returns a status indicator: If negative, it
552 * represents the (negative) number of drops (failed attempts to
553 * issue the item to a subscriber). Otherwise it is an estimate of
554 * the maximum lag (number of items submitted but not yet
555 * consumed) among all current subscribers. This value is at least
556 * one (accounting for this submitted item) if there are any
557 * subscribers, else zero.
558 *
559 * <p>If the Executor for this publisher throws a
560 * RejectedExecutionException (or any other RuntimeException or
561 * Error) when attempting to asynchronously notify subscribers, or
562 * the drop handler throws an exception when processing a dropped
563 * item, then this exception is rethrown.
564 *
565 * @param item the (non-null) item to publish
566 * @param timeout how long to wait for resources for any subscriber
567 * before giving up, in units of {@code unit}
568 * @param unit a {@code TimeUnit} determining how to interpret the
569 * {@code timeout} parameter
570 * @param onDrop if non-null, the handler invoked upon a drop to a
571 * subscriber, with arguments of the subscriber and item; if it
572 * returns true, an offer is re-attempted (once)
573 * @return if negative, the (negative) number of drops; otherwise
574 * an estimate of maximum lag
575 * @throws IllegalStateException if closed
576 * @throws NullPointerException if item is null
577 * @throws RejectedExecutionException if thrown by Executor
578 */
579 public int offer(T item, long timeout, TimeUnit unit,
580 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
581 long nanos = unit.toNanos(timeout);
582 // distinguishes from untimed (only wrt interrupt policy)
583 if (nanos == Long.MAX_VALUE) --nanos;
584 return doOffer(item, nanos, onDrop);
585 }
586
587 /**
588 * Unless already closed, issues {@link
589 * Flow.Subscriber#onComplete() onComplete} signals to current
590 * subscribers, and disallows subsequent attempts to publish.
591 * Upon return, this method does <em>NOT</em> guarantee that all
592 * subscribers have yet completed.
593 */
594 public void close() {
595 ReentrantLock lock = this.lock;
596 if (!closed) {
597 BufferedSubscription<T> b;
598 lock.lock();
599 try {
600 // no need to re-check closed here
601 b = clients;
602 clients = null;
603 owner = null;
604 closed = true;
605 } finally {
606 lock.unlock();
607 }
608 while (b != null) {
609 BufferedSubscription<T> next = b.next;
610 b.next = null;
611 b.onComplete();
612 b = next;
613 }
614 }
615 }
616
617 /**
618 * Unless already closed, issues {@link
619 * Flow.Subscriber#onError(Throwable) onError} signals to current
620 * subscribers with the given error, and disallows subsequent
621 * attempts to publish. Future subscribers also receive the given
622 * error. Upon return, this method does <em>NOT</em> guarantee
623 * that all subscribers have yet completed.
624 *
625 * @param error the {@code onError} argument sent to subscribers
626 * @throws NullPointerException if error is null
627 */
628 public void closeExceptionally(Throwable error) {
629 if (error == null)
630 throw new NullPointerException();
631 ReentrantLock lock = this.lock;
632 if (!closed) {
633 BufferedSubscription<T> b;
634 lock.lock();
635 try {
636 b = clients;
637 if (!closed) { // don't clobber racing close
638 closedException = error;
639 clients = null;
640 owner = null;
641 closed = true;
642 }
643 } finally {
644 lock.unlock();
645 }
646 while (b != null) {
647 BufferedSubscription<T> next = b.next;
648 b.next = null;
649 b.onError(error);
650 b = next;
651 }
652 }
653 }
654
655 /**
656 * Returns true if this publisher is not accepting submissions.
657 *
658 * @return true if closed
659 */
660 public boolean isClosed() {
661 return closed;
662 }
663
664 /**
665 * Returns the exception associated with {@link
666 * #closeExceptionally(Throwable) closeExceptionally}, or null if
667 * not closed or if closed normally.
668 *
669 * @return the exception, or null if none
670 */
671 public Throwable getClosedException() {
672 return closedException;
673 }
674
675 /**
676 * Returns true if this publisher has any subscribers.
677 *
678 * @return true if this publisher has any subscribers
679 */
680 public boolean hasSubscribers() {
681 boolean nonEmpty = false;
682 ReentrantLock lock = this.lock;
683 lock.lock();
684 try {
685 for (BufferedSubscription<T> b = clients; b != null;) {
686 BufferedSubscription<T> next = b.next;
687 if (b.isClosed()) {
688 b.next = null;
689 b = clients = next;
690 }
691 else {
692 nonEmpty = true;
693 break;
694 }
695 }
696 } finally {
697 lock.unlock();
698 }
699 return nonEmpty;
700 }
701
702 /**
703 * Returns the number of current subscribers.
704 *
705 * @return the number of current subscribers
706 */
707 public int getNumberOfSubscribers() {
708 int n;
709 ReentrantLock lock = this.lock;
710 lock.lock();
711 try {
712 n = cleanAndCount();
713 } finally {
714 lock.unlock();
715 }
716 return n;
717 }
718
719 /**
720 * Returns the Executor used for asynchronous delivery.
721 *
722 * @return the Executor used for asynchronous delivery
723 */
724 public Executor getExecutor() {
725 return executor;
726 }
727
728 /**
729 * Returns the maximum per-subscriber buffer capacity.
730 *
731 * @return the maximum per-subscriber buffer capacity
732 */
733 public int getMaxBufferCapacity() {
734 return maxBufferCapacity;
735 }
736
737 /**
738 * Returns a list of current subscribers for monitoring and
739 * tracking purposes, not for invoking {@link Flow.Subscriber}
740 * methods on the subscribers.
741 *
742 * @return list of current subscribers
743 */
744 public List<Subscriber<? super T>> getSubscribers() {
745 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
746 ReentrantLock lock = this.lock;
747 lock.lock();
748 try {
749 BufferedSubscription<T> pred = null, next;
750 for (BufferedSubscription<T> b = clients; b != null; b = next) {
751 next = b.next;
752 if (b.isClosed()) {
753 b.next = null;
754 if (pred == null)
755 clients = next;
756 else
757 pred.next = next;
758 }
759 else {
760 subs.add(b.subscriber);
761 pred = b;
762 }
763 }
764 } finally {
765 lock.unlock();
766 }
767 return subs;
768 }
769
770 /**
771 * Returns true if the given Subscriber is currently subscribed.
772 *
773 * @param subscriber the subscriber
774 * @return true if currently subscribed
775 * @throws NullPointerException if subscriber is null
776 */
777 public boolean isSubscribed(Subscriber<? super T> subscriber) {
778 if (subscriber == null) throw new NullPointerException();
779 boolean subscribed = false;
780 ReentrantLock lock = this.lock;
781 if (!closed) {
782 lock.lock();
783 try {
784 BufferedSubscription<T> pred = null, next;
785 for (BufferedSubscription<T> b = clients; b != null; b = next) {
786 next = b.next;
787 if (b.isClosed()) {
788 b.next = null;
789 if (pred == null)
790 clients = next;
791 else
792 pred.next = next;
793 }
794 else if (subscribed = subscriber.equals(b.subscriber))
795 break;
796 else
797 pred = b;
798 }
799 } finally {
800 lock.unlock();
801 }
802 }
803 return subscribed;
804 }
805
806 /**
807 * Returns an estimate of the minimum number of items requested
808 * (via {@link Flow.Subscription#request(long) request}) but not
809 * yet produced, among all current subscribers.
810 *
811 * @return the estimate, or zero if no subscribers
812 */
813 public long estimateMinimumDemand() {
814 long min = Long.MAX_VALUE;
815 boolean nonEmpty = false;
816 ReentrantLock lock = this.lock;
817 lock.lock();
818 try {
819 BufferedSubscription<T> pred = null, next;
820 for (BufferedSubscription<T> b = clients; b != null; b = next) {
821 int n; long d;
822 next = b.next;
823 if ((n = b.estimateLag()) < 0) {
824 b.next = null;
825 if (pred == null)
826 clients = next;
827 else
828 pred.next = next;
829 }
830 else {
831 if ((d = b.demand - n) < min)
832 min = d;
833 nonEmpty = true;
834 pred = b;
835 }
836 }
837 } finally {
838 lock.unlock();
839 }
840 return nonEmpty ? min : 0;
841 }
842
843 /**
844 * Returns an estimate of the maximum number of items produced but
845 * not yet consumed among all current subscribers.
846 *
847 * @return the estimate
848 */
849 public int estimateMaximumLag() {
850 int max = 0;
851 ReentrantLock lock = this.lock;
852 lock.lock();
853 try {
854 BufferedSubscription<T> pred = null, next;
855 for (BufferedSubscription<T> b = clients; b != null; b = next) {
856 int n;
857 next = b.next;
858 if ((n = b.estimateLag()) < 0) {
859 b.next = null;
860 if (pred == null)
861 clients = next;
862 else
863 pred.next = next;
864 }
865 else {
866 if (n > max)
867 max = n;
868 pred = b;
869 }
870 }
871 } finally {
872 lock.unlock();
873 }
874 return max;
875 }
876
877 /**
878 * Processes all published items using the given Consumer function.
879 * Returns a CompletableFuture that is completed normally when this
880 * publisher signals {@link Flow.Subscriber#onComplete()
881 * onComplete}, or completed exceptionally upon any error, or an
882 * exception is thrown by the Consumer, or the returned
883 * CompletableFuture is cancelled, in which case no further items
884 * are processed.
885 *
886 * @param consumer the function applied to each onNext item
887 * @return a CompletableFuture that is completed normally
888 * when the publisher signals onComplete, and exceptionally
889 * upon any error or cancellation
890 * @throws NullPointerException if consumer is null
891 */
892 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
893 if (consumer == null)
894 throw new NullPointerException();
895 CompletableFuture<Void> status = new CompletableFuture<>();
896 subscribe(new ConsumerSubscriber<T>(status, consumer));
897 return status;
898 }
899
900 /** Subscriber for method consume */
901 static final class ConsumerSubscriber<T> implements Subscriber<T> {
902 final CompletableFuture<Void> status;
903 final Consumer<? super T> consumer;
904 Subscription subscription;
905 ConsumerSubscriber(CompletableFuture<Void> status,
906 Consumer<? super T> consumer) {
907 this.status = status; this.consumer = consumer;
908 }
909 public final void onSubscribe(Subscription subscription) {
910 this.subscription = subscription;
911 status.whenComplete((v, e) -> subscription.cancel());
912 if (!status.isDone())
913 subscription.request(Long.MAX_VALUE);
914 }
915 public final void onError(Throwable ex) {
916 status.completeExceptionally(ex);
917 }
918 public final void onComplete() {
919 status.complete(null);
920 }
921 public final void onNext(T item) {
922 try {
923 consumer.accept(item);
924 } catch (Throwable ex) {
925 subscription.cancel();
926 status.completeExceptionally(ex);
927 }
928 }
929 }
930
931 /**
932 * A task for consuming buffer items and signals, created and
933 * executed whenever they become available. A task consumes as
934 * many items/signals as possible before terminating, at which
935 * point another task is created when needed. The dual Runnable
936 * and ForkJoinTask declaration saves overhead when executed by
937 * ForkJoinPools, without impacting other kinds of Executors.
938 */
939 @SuppressWarnings("serial")
940 static final class ConsumerTask<T> extends ForkJoinTask<Void>
941 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
942 final BufferedSubscription<T> consumer;
943 ConsumerTask(BufferedSubscription<T> consumer) {
944 this.consumer = consumer;
945 }
946 public final Void getRawResult() { return null; }
947 public final void setRawResult(Void v) {}
948 public final boolean exec() { consumer.consume(); return false; }
949 public final void run() { consumer.consume(); }
950 }
951
952 /**
953 * A resizable array-based ring buffer with integrated control to
954 * start a consumer task whenever items are available. The buffer
955 * algorithm is specialized for the case of at most one concurrent
956 * producer and consumer, and power of two buffer sizes. It relies
957 * primarily on atomic operations (CAS or getAndSet) at the next
958 * array slot to put or take an element, at the "tail" and "head"
959 * indices written only by the producer and consumer respectively.
960 *
961 * We ensure internally that there is at most one active consumer
962 * task at any given time. The publisher guarantees a single
963 * producer via its lock. Sync among producers and consumers
964 * relies on volatile fields "ctl", "demand", and "waiting" (along
965 * with element access). Other variables are accessed in plain
966 * mode, relying on outer ordering and exclusion, and/or enclosing
967 * them within other volatile accesses. Some atomic operations are
968 * avoided by tracking single threaded ownership by producers (in
969 * the style of biased locking).
970 *
971 * Execution control and protocol state are managed using field
972 * "ctl". Methods to subscribe, close, request, and cancel set
973 * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
974 * and ensure that a task is running. (The corresponding consumer
975 * side actions are in method consume.) To avoid starting a new
976 * task on each action, ctl also includes a keep-alive bit
977 * (ACTIVE) that is refreshed if needed on producer actions.
978 * (Maintaining agreement about keep-alives requires most atomic
979 * updates to be full SC/Volatile strength, which is still much
980 * cheaper than using one task per item.) Error signals
981 * additionally null out items and/or fields to reduce termination
982 * latency. The cancel() method is supported by treating as ERROR
983 * but suppressing onError signal.
984 *
985 * Support for blocking also exploits the fact that there is only
986 * one possible waiter. ManagedBlocker-compatible control fields
987 * are placed in this class itself rather than in wait-nodes.
988 * Blocking control relies on the "waiting" and "waiter"
989 * fields. Producers set them before trying to block. Signalling
990 * unparks and clears fields. If the producer and/or consumer are
991 * using a ForkJoinPool, the producer attempts to help run
992 * consumer tasks via ForkJoinPool.helpAsyncBlocker before
993 * blocking.
994 *
995 * Usages of this class may encounter any of several forms of
996 * memory contention. We try to ameliorate across them without
997 * unduly impacting footprints in low-contention usages where it
998 * isn't needed. Buffer arrays start out small and grow only as
999 * needed. The class uses @Contended and heuristic field
1000 * declaration ordering to reduce false-sharing memory contention
1001 * across instances of BufferedSubscription (as in, multiple
1002 * subscribers per publisher). We additionally segregate some
1003 * fields that would otherwise nearly always encounter cache line
1004 * contention among producers and consumers. To reduce contention
1005 * across time (vs space), consumers only periodically update
1006 * other fields (see method takeItems), at the expense of possibly
1007 * staler reporting of lags and demand (bounded at 12.5% == 1/8
1008 * capacity) and possibly more atomic operations.
1009 *
1010 * Other forms of imbalance and slowdowns can occur during startup
1011 * when producer and consumer methods are compiled and/or memory
1012 * is allocated at different rates. This is ameliorated by
1013 * artificially subdividing some consumer methods, including
1014 * isolation of all subscriber callbacks. This code also includes
1015 * typical power-of-two array screening idioms to avoid compilers
1016 * generating traps, along with the usual SSA-based inline
1017 * assignment coding style. Also, all methods and fields have
1018 * default visibility to simplify usage by callers.
1019 */
1020 @SuppressWarnings("serial")
1021 @jdk.internal.vm.annotation.Contended
1022 static final class BufferedSubscription<T>
1023 implements Subscription, ForkJoinPool.ManagedBlocker {
1024 long timeout; // Long.MAX_VALUE if untimed wait
1025 int head; // next position to take
1026 int tail; // next position to put
1027 final int maxCapacity; // max buffer size
1028 volatile int ctl; // atomic run state flags
1029 Object[] array; // buffer
1030 final Subscriber<? super T> subscriber;
1031 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
1032 Executor executor; // null on error
1033 Thread waiter; // blocked producer thread
1034 Throwable pendingError; // holds until onError issued
1035 BufferedSubscription<T> next; // used only by publisher
1036 BufferedSubscription<T> nextRetry; // used only by publisher
1037
1038 @jdk.internal.vm.annotation.Contended("c") // segregate
1039 volatile long demand; // # unfilled requests
1040 @jdk.internal.vm.annotation.Contended("c")
1041 volatile int waiting; // nonzero if producer blocked
1042
1043 // ctl bit values
1044 static final int CLOSED = 0x01; // if set, other bits ignored
1045 static final int ACTIVE = 0x02; // keep-alive for consumer task
1046 static final int REQS = 0x04; // (possibly) nonzero demand
1047 static final int ERROR = 0x08; // issues onError when noticed
1048 static final int COMPLETE = 0x10; // issues onComplete when done
1049 static final int RUN = 0x20; // task is or will be running
1050 static final int OPEN = 0x40; // true after subscribe
1051
1052 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1053
1054 BufferedSubscription(Subscriber<? super T> subscriber,
1055 Executor executor,
1056 BiConsumer<? super Subscriber<? super T>,
1057 ? super Throwable> onNextHandler,
1058 Object[] array,
1059 int maxBufferCapacity) {
1060 this.subscriber = subscriber;
1061 this.executor = executor;
1062 this.onNextHandler = onNextHandler;
1063 this.array = array;
1064 this.maxCapacity = maxBufferCapacity;
1065 }
1066
1067 // Wrappers for some VarHandle methods
1068
1069 final boolean weakCasCtl(int cmp, int val) {
1070 return CTL.weakCompareAndSet(this, cmp, val);
1071 }
1072
1073 final int getAndBitwiseOrCtl(int bits) {
1074 return (int)CTL.getAndBitwiseOr(this, bits);
1075 }
1076
1077 final long subtractDemand(int k) {
1078 long n = (long)(-k);
1079 return n + (long)DEMAND.getAndAdd(this, n);
1080 }
1081
1082 final boolean casDemand(long cmp, long val) {
1083 return DEMAND.compareAndSet(this, cmp, val);
1084 }
1085
1086 // Utilities used by SubmissionPublisher
1087
1088 /**
1089 * Returns true if closed (consumer task may still be running).
1090 */
1091 final boolean isClosed() {
1092 return (ctl & CLOSED) != 0;
1093 }
1094
1095 /**
1096 * Returns estimated number of buffered items, or negative if
1097 * closed.
1098 */
1099 final int estimateLag() {
1100 int c = ctl, n = tail - head;
1101 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1102 }
1103
1104 // Methods for submitting items
1105
1106 /**
1107 * Tries to add item and start consumer task if necessary.
1108 * @return negative if closed, 0 if saturated, else estimated lag
1109 */
1110 final int offer(T item, boolean unowned) {
1111 Object[] a;
1112 int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1113 int t = tail, i = t & (cap - 1), n = t + 1 - head;
1114 if (cap > 0) {
1115 boolean added;
1116 if (n >= cap && cap < maxCapacity) // resize
1117 added = growAndOffer(item, a, t);
1118 else if (n >= cap || unowned) // need volatile CAS
1119 added = QA.compareAndSet(a, i, null, item);
1120 else { // can use release mode
1121 QA.setRelease(a, i, item);
1122 added = true;
1123 }
1124 if (added) {
1125 tail = t + 1;
1126 stat = n;
1127 }
1128 }
1129 return startOnOffer(stat);
1130 }
1131
1132 /**
1133 * Tries to expand buffer and add item, returning true on
1134 * success. Currently fails only if out of memory.
1135 */
1136 final boolean growAndOffer(T item, Object[] a, int t) {
1137 int cap = 0, newCap = 0;
1138 Object[] newArray = null;
1139 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1140 try {
1141 newArray = new Object[newCap];
1142 } catch (OutOfMemoryError ex) {
1143 }
1144 }
1145 if (newArray == null)
1146 return false;
1147 else { // take and move items
1148 int newMask = newCap - 1;
1149 newArray[t-- & newMask] = item;
1150 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1151 Object x = QA.getAndSet(a, t & mask, null);
1152 if (x == null)
1153 break; // already consumed
1154 else
1155 newArray[t-- & newMask] = x;
1156 }
1157 array = newArray;
1158 VarHandle.releaseFence(); // release array and slots
1159 return true;
1160 }
1161 }
1162
1163 /**
1164 * Version of offer for retries (no resize or bias)
1165 */
1166 final int retryOffer(T item) {
1167 Object[] a;
1168 int stat = 0, t = tail, h = head, cap;
1169 if ((a = array) != null && (cap = a.length) > 0 &&
1170 QA.compareAndSet(a, (cap - 1) & t, null, item))
1171 stat = (tail = t + 1) - h;
1172 return startOnOffer(stat);
1173 }
1174
1175 /**
1176 * Tries to start consumer task after offer.
1177 * @return negative if now closed, else argument
1178 */
1179 final int startOnOffer(int stat) {
1180 int c; // start or keep alive if requests exist and not active
1181 if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1182 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1183 tryStart();
1184 else if ((c & CLOSED) != 0)
1185 stat = -1;
1186 return stat;
1187 }
1188
1189 /**
1190 * Tries to start consumer task. Sets error state on failure.
1191 */
1192 final void tryStart() {
1193 try {
1194 Executor e;
1195 ConsumerTask<T> task = new ConsumerTask<T>(this);
1196 if ((e = executor) != null) // skip if disabled on error
1197 e.execute(task);
1198 } catch (RuntimeException | Error ex) {
1199 getAndBitwiseOrCtl(ERROR | CLOSED);
1200 throw ex;
1201 }
1202 }
1203
1204 // Signals to consumer tasks
1205
1206 /**
1207 * Sets the given control bits, starting task if not running or closed.
1208 * @param bits state bits, assumed to include RUN but not CLOSED
1209 */
1210 final void startOnSignal(int bits) {
1211 if ((ctl & bits) != bits &&
1212 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1213 tryStart();
1214 }
1215
1216 final void onSubscribe() {
1217 startOnSignal(RUN | ACTIVE);
1218 }
1219
1220 final void onComplete() {
1221 startOnSignal(RUN | ACTIVE | COMPLETE);
1222 }
1223
1224 final void onError(Throwable ex) {
1225 int c; Object[] a; // to null out buffer on async error
1226 if (ex != null)
1227 pendingError = ex; // races are OK
1228 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1229 if ((c & RUN) == 0)
1230 tryStart();
1231 else if ((a = array) != null)
1232 Arrays.fill(a, null);
1233 }
1234 }
1235
1236 public final void cancel() {
1237 onError(null);
1238 }
1239
1240 public final void request(long n) {
1241 if (n > 0L) {
1242 for (;;) {
1243 long p = demand, d = p + n; // saturate
1244 if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1245 break;
1246 }
1247 startOnSignal(RUN | ACTIVE | REQS);
1248 }
1249 else
1250 onError(new IllegalArgumentException(
1251 "non-positive subscription request"));
1252 }
1253
1254 // Consumer task actions
1255
1256 /**
1257 * Consumer loop, called from ConsumerTask, or indirectly when
1258 * helping during submit.
1259 */
1260 final void consume() {
1261 Subscriber<? super T> s;
1262 if ((s = subscriber) != null) { // hoist checks
1263 subscribeOnOpen(s);
1264 long d = demand;
1265 for (int h = head, t = tail;;) {
1266 int c, taken; boolean empty;
1267 if (((c = ctl) & ERROR) != 0) {
1268 closeOnError(s, null);
1269 break;
1270 }
1271 else if ((taken = takeItems(s, d, h)) > 0) {
1272 head = h += taken;
1273 d = subtractDemand(taken);
1274 }
1275 else if ((d = demand) == 0L && (c & REQS) != 0)
1276 weakCasCtl(c, c & ~REQS); // exhausted demand
1277 else if (d != 0L && (c & REQS) == 0)
1278 weakCasCtl(c, c | REQS); // new demand
1279 else if (t == (t = tail)) { // stability check
1280 if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1281 closeOnComplete(s); // end of stream
1282 break;
1283 }
1284 else if (empty || d == 0L) {
1285 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1286 if (weakCasCtl(c, c & ~bit) && bit == RUN)
1287 break; // un-keep-alive or exit
1288 }
1289 }
1290 }
1291 }
1292 }
1293
1294 /**
1295 * Consumes some items until unavailable or bound or error.
1296 *
1297 * @param s subscriber
1298 * @param d current demand
1299 * @param h current head
1300 * @return number taken
1301 */
1302 final int takeItems(Subscriber<? super T> s, long d, int h) {
1303 Object[] a;
1304 int k = 0, cap;
1305 if ((a = array) != null && (cap = a.length) > 0) {
1306 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1307 int n = (d < (long)b) ? (int)d : b;
1308 for (; k < n; ++h, ++k) {
1309 Object x = QA.getAndSet(a, h & m, null);
1310 if (waiting != 0)
1311 signalWaiter();
1312 if (x == null)
1313 break;
1314 else if (!consumeNext(s, x))
1315 break;
1316 }
1317 }
1318 return k;
1319 }
1320
1321 final boolean consumeNext(Subscriber<? super T> s, Object x) {
1322 try {
1323 @SuppressWarnings("unchecked") T y = (T) x;
1324 if (s != null)
1325 s.onNext(y);
1326 return true;
1327 } catch (Throwable ex) {
1328 handleOnNext(s, ex);
1329 return false;
1330 }
1331 }
1332
1333 /**
1334 * Processes exception in Subscriber.onNext.
1335 */
1336 final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1337 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1338 try {
1339 if ((h = onNextHandler) != null)
1340 h.accept(s, ex);
1341 } catch (Throwable ignore) {
1342 }
1343 closeOnError(s, ex);
1344 }
1345
1346 /**
1347 * Issues subscriber.onSubscribe if this is first signal.
1348 */
1349 final void subscribeOnOpen(Subscriber<? super T> s) {
1350 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1351 consumeSubscribe(s);
1352 }
1353
1354 final void consumeSubscribe(Subscriber<? super T> s) {
1355 try {
1356 if (s != null) // ignore if disabled
1357 s.onSubscribe(this);
1358 } catch (Throwable ex) {
1359 closeOnError(s, ex);
1360 }
1361 }
1362
1363 /**
1364 * Issues subscriber.onComplete unless already closed.
1365 */
1366 final void closeOnComplete(Subscriber<? super T> s) {
1367 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1368 consumeComplete(s);
1369 }
1370
1371 final void consumeComplete(Subscriber<? super T> s) {
1372 try {
1373 if (s != null)
1374 s.onComplete();
1375 } catch (Throwable ignore) {
1376 }
1377 }
1378
1379 /**
1380 * Issues subscriber.onError, and unblocks producer if needed.
1381 */
1382 final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1383 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1384 if (ex == null)
1385 ex = pendingError;
1386 pendingError = null; // detach
1387 executor = null; // suppress racing start calls
1388 signalWaiter();
1389 consumeError(s, ex);
1390 }
1391 }
1392
1393 final void consumeError(Subscriber<? super T> s, Throwable ex) {
1394 try {
1395 if (ex != null && s != null)
1396 s.onError(ex);
1397 } catch (Throwable ignore) {
1398 }
1399 }
1400
1401 // Blocking support
1402
1403 /**
1404 * Unblocks waiting producer.
1405 */
1406 final void signalWaiter() {
1407 Thread w;
1408 waiting = 0;
1409 if ((w = waiter) != null)
1410 LockSupport.unpark(w);
1411 }
1412
1413 /**
1414 * Returns true if closed or space available.
1415 * For ManagedBlocker.
1416 */
1417 public final boolean isReleasable() {
1418 Object[] a; int cap;
1419 return ((ctl & CLOSED) != 0 ||
1420 ((a = array) != null && (cap = a.length) > 0 &&
1421 QA.getAcquire(a, (cap - 1) & tail) == null));
1422 }
1423
1424 /**
1425 * Helps or blocks until timeout, closed, or space available.
1426 */
1427 final void awaitSpace(long nanos) {
1428 if (!isReleasable()) {
1429 ForkJoinPool.helpAsyncBlocker(executor, this);
1430 if (!isReleasable()) {
1431 timeout = nanos;
1432 try {
1433 ForkJoinPool.managedBlock(this);
1434 } catch (InterruptedException ie) {
1435 timeout = INTERRUPTED;
1436 }
1437 if (timeout == INTERRUPTED)
1438 Thread.currentThread().interrupt();
1439 }
1440 }
1441 }
1442
1443 /**
1444 * Blocks until closed, space available or timeout.
1445 * For ManagedBlocker.
1446 */
1447 public final boolean block() {
1448 long nanos = timeout;
1449 boolean timed = (nanos < Long.MAX_VALUE);
1450 long deadline = timed ? System.nanoTime() + nanos : 0L;
1451 while (!isReleasable()) {
1452 if (Thread.interrupted()) {
1453 timeout = INTERRUPTED;
1454 if (timed)
1455 break;
1456 }
1457 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1458 break;
1459 else if (waiter == null)
1460 waiter = Thread.currentThread();
1461 else if (waiting == 0)
1462 waiting = 1;
1463 else if (timed)
1464 LockSupport.parkNanos(this, nanos);
1465 else
1466 LockSupport.park(this);
1467 }
1468 waiter = null;
1469 waiting = 0;
1470 return true;
1471 }
1472
1473 // VarHandle mechanics
1474 static final VarHandle CTL;
1475 static final VarHandle DEMAND;
1476 static final VarHandle QA;
1477
1478 static {
1479 try {
1480 MethodHandles.Lookup l = MethodHandles.lookup();
1481 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1482 int.class);
1483 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1484 long.class);
1485 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1486 } catch (ReflectiveOperationException e) {
1487 throw new ExceptionInInitializerError(e);
1488 }
1489
1490 // Reduce the risk of rare disastrous classloading in first call to
1491 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1492 Class<?> ensureLoaded = LockSupport.class;
1493 }
1494 }
1495 }