ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.69
Committed: Fri Mar 3 00:44:56 2017 UTC (7 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.68: +2 -1 lines
Log Message:
Ensure subscriber consistency about close vs closeExceptionally

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.locks.LockSupport;
14 import java.util.function.BiConsumer;
15 import java.util.function.BiPredicate;
16 import java.util.function.Consumer;
17
18 /**
19 * A {@link Flow.Publisher} that asynchronously issues submitted
20 * (non-null) items to current subscribers until it is closed. Each
21 * current subscriber receives newly submitted items in the same order
22 * unless drops or exceptions are encountered. Using a
23 * SubmissionPublisher allows item generators to act as compliant <a
24 * href="http://www.reactive-streams.org/"> reactive-streams</a>
25 * Publishers relying on drop handling and/or blocking for flow
26 * control.
27 *
28 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
29 * constructor for delivery to subscribers. The best choice of
30 * Executor depends on expected usage. If the generator(s) of
31 * submitted items run in separate threads, and the number of
32 * subscribers can be estimated, consider using a {@link
33 * Executors#newFixedThreadPool}. Otherwise consider using the
34 * default, normally the {@link ForkJoinPool#commonPool}.
35 *
36 * <p>Buffering allows producers and consumers to transiently operate
37 * at different rates. Each subscriber uses an independent buffer.
38 * Buffers are created upon first use and expanded as needed up to the
39 * given maximum. (The enforced capacity may be rounded up to the
40 * nearest power of two and/or bounded by the largest value supported
41 * by this implementation.) Invocations of {@link
42 * Flow.Subscription#request(long) request} do not directly result in
43 * buffer expansion, but risk saturation if unfilled requests exceed
44 * the maximum capacity. The default value of {@link
45 * Flow#defaultBufferSize()} may provide a useful starting point for
46 * choosing a capacity based on expected rates, resources, and usages.
47 *
48 * <p>Publication methods support different policies about what to do
49 * when buffers are saturated. Method {@link #submit(Object) submit}
50 * blocks until resources are available. This is simplest, but least
51 * responsive. The {@code offer} methods may drop items (either
52 * immediately or with bounded timeout), but provide an opportunity to
53 * interpose a handler and then retry.
54 *
55 * <p>If any Subscriber method throws an exception, its subscription
56 * is cancelled. If a handler is supplied as a constructor argument,
57 * it is invoked before cancellation upon an exception in method
58 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
59 * {@link Flow.Subscriber#onSubscribe onSubscribe},
60 * {@link Flow.Subscriber#onError(Throwable) onError} and
61 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
62 * handled before cancellation. If the supplied Executor throws
63 * {@link RejectedExecutionException} (or any other RuntimeException
64 * or Error) when attempting to execute a task, or a drop handler
65 * throws an exception when processing a dropped item, then the
66 * exception is rethrown. In these cases, not all subscribers will
67 * have been issued the published item. It is usually good practice to
68 * {@link #closeExceptionally closeExceptionally} in these cases.
69 *
70 * <p>Method {@link #consume(Consumer)} simplifies support for a
71 * common case in which the only action of a subscriber is to request
72 * and process all items using a supplied function.
73 *
74 * <p>This class may also serve as a convenient base for subclasses
75 * that generate items, and use the methods in this class to publish
76 * them. For example here is a class that periodically publishes the
77 * items generated from a supplier. (In practice you might add methods
78 * to independently start and stop generation, to share Executors
79 * among publishers, and so on, or use a SubmissionPublisher as a
80 * component rather than a superclass.)
81 *
82 * <pre> {@code
83 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
84 * final ScheduledFuture<?> periodicTask;
85 * final ScheduledExecutorService scheduler;
86 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
87 * Supplier<? extends T> supplier,
88 * long period, TimeUnit unit) {
89 * super(executor, maxBufferCapacity);
90 * scheduler = new ScheduledThreadPoolExecutor(1);
91 * periodicTask = scheduler.scheduleAtFixedRate(
92 * () -> submit(supplier.get()), 0, period, unit);
93 * }
94 * public void close() {
95 * periodicTask.cancel(false);
96 * scheduler.shutdown();
97 * super.close();
98 * }
99 * }}</pre>
100 *
101 * <p>Here is an example of a {@link Flow.Processor} implementation.
102 * It uses single-step requests to its publisher for simplicity of
103 * illustration. A more adaptive version could monitor flow using the
104 * lag estimate returned from {@code submit}, along with other utility
105 * methods.
106 *
107 * <pre> {@code
108 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
109 * implements Flow.Processor<S,T> {
110 * final Function<? super S, ? extends T> function;
111 * Flow.Subscription subscription;
112 * TransformProcessor(Executor executor, int maxBufferCapacity,
113 * Function<? super S, ? extends T> function) {
114 * super(executor, maxBufferCapacity);
115 * this.function = function;
116 * }
117 * public void onSubscribe(Flow.Subscription subscription) {
118 * (this.subscription = subscription).request(1);
119 * }
120 * public void onNext(S item) {
121 * subscription.request(1);
122 * submit(function.apply(item));
123 * }
124 * public void onError(Throwable ex) { closeExceptionally(ex); }
125 * public void onComplete() { close(); }
126 * }}</pre>
127 *
128 * @param <T> the published item type
129 * @author Doug Lea
130 * @since 9
131 */
132 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
133 AutoCloseable {
134 /*
135 * Most mechanics are handled by BufferedSubscription. This class
136 * mainly tracks subscribers and ensures sequentiality, by using
137 * built-in synchronization locks across public methods. (Using
138 * built-in locks works well in the most typical case in which
139 * only one thread submits items).
140 */
141
142 /** The largest possible power of two array size. */
143 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
144
145 /** Round capacity to power of 2, at most limit. */
146 static final int roundCapacity(int cap) {
147 int n = cap - 1;
148 n |= n >>> 1;
149 n |= n >>> 2;
150 n |= n >>> 4;
151 n |= n >>> 8;
152 n |= n >>> 16;
153 return (n <= 0) ? 1 : // at least 1
154 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
155 }
156
157 // default Executor setup; nearly the same as CompletableFuture
158
159 /**
160 * Default executor -- ForkJoinPool.commonPool() unless it cannot
161 * support parallelism.
162 */
163 private static final Executor ASYNC_POOL =
164 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
165 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
166
167 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
168 private static final class ThreadPerTaskExecutor implements Executor {
169 ThreadPerTaskExecutor() {} // prevent access constructor creation
170 public void execute(Runnable r) { new Thread(r).start(); }
171 }
172
173 /**
174 * Clients (BufferedSubscriptions) are maintained in a linked list
175 * (via their "next" fields). This works well for publish loops.
176 * It requires O(n) traversal to check for duplicate subscribers,
177 * but we expect that subscribing is much less common than
178 * publishing. Unsubscribing occurs only during traversal loops,
179 * when BufferedSubscription methods return negative values
180 * signifying that they have been disabled. To reduce
181 * head-of-line blocking, submit and offer methods first call
182 * BufferedSubscription.offer on each subscriber, and place
183 * saturated ones in retries list (using nextRetry field), and
184 * retry, possibly blocking or dropping.
185 */
186 BufferedSubscription<T> clients;
187
188 /** Run status, updated only within locks */
189 volatile boolean closed;
190 /** If non-null, the exception in closeExceptionally */
191 volatile Throwable closedException;
192
193 // Parameters for constructing BufferedSubscriptions
194 final Executor executor;
195 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
196 final int maxBufferCapacity;
197
198 /**
199 * Creates a new SubmissionPublisher using the given Executor for
200 * async delivery to subscribers, with the given maximum buffer size
201 * for each subscriber, and, if non-null, the given handler invoked
202 * when any Subscriber throws an exception in method {@link
203 * Flow.Subscriber#onNext(Object) onNext}.
204 *
205 * @param executor the executor to use for async delivery,
206 * supporting creation of at least one independent thread
207 * @param maxBufferCapacity the maximum capacity for each
208 * subscriber's buffer (the enforced capacity may be rounded up to
209 * the nearest power of two and/or bounded by the largest value
210 * supported by this implementation; method {@link #getMaxBufferCapacity}
211 * returns the actual value)
212 * @param handler if non-null, procedure to invoke upon exception
213 * thrown in method {@code onNext}
214 * @throws NullPointerException if executor is null
215 * @throws IllegalArgumentException if maxBufferCapacity not
216 * positive
217 */
218 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
219 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
220 if (executor == null)
221 throw new NullPointerException();
222 if (maxBufferCapacity <= 0)
223 throw new IllegalArgumentException("capacity must be positive");
224 this.executor = executor;
225 this.onNextHandler = handler;
226 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
227 }
228
229 /**
230 * Creates a new SubmissionPublisher using the given Executor for
231 * async delivery to subscribers, with the given maximum buffer size
232 * for each subscriber, and no handler for Subscriber exceptions in
233 * method {@link Flow.Subscriber#onNext(Object) onNext}.
234 *
235 * @param executor the executor to use for async delivery,
236 * supporting creation of at least one independent thread
237 * @param maxBufferCapacity the maximum capacity for each
238 * subscriber's buffer (the enforced capacity may be rounded up to
239 * the nearest power of two and/or bounded by the largest value
240 * supported by this implementation; method {@link #getMaxBufferCapacity}
241 * returns the actual value)
242 * @throws NullPointerException if executor is null
243 * @throws IllegalArgumentException if maxBufferCapacity not
244 * positive
245 */
246 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
247 this(executor, maxBufferCapacity, null);
248 }
249
250 /**
251 * Creates a new SubmissionPublisher using the {@link
252 * ForkJoinPool#commonPool()} for async delivery to subscribers
253 * (unless it does not support a parallelism level of at least two,
254 * in which case, a new Thread is created to run each task), with
255 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
256 * handler for Subscriber exceptions in method {@link
257 * Flow.Subscriber#onNext(Object) onNext}.
258 */
259 public SubmissionPublisher() {
260 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
261 }
262
263 /**
264 * Adds the given Subscriber unless already subscribed. If already
265 * subscribed, the Subscriber's {@link
266 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
267 * the existing subscription with an {@link IllegalStateException}.
268 * Otherwise, upon success, the Subscriber's {@link
269 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
270 * asynchronously with a new {@link Flow.Subscription}. If {@link
271 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
272 * subscription is cancelled. Otherwise, if this SubmissionPublisher
273 * was closed exceptionally, then the subscriber's {@link
274 * Flow.Subscriber#onError onError} method is invoked with the
275 * corresponding exception, or if closed without exception, the
276 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
277 * method is invoked. Subscribers may enable receiving items by
278 * invoking the {@link Flow.Subscription#request(long) request}
279 * method of the new Subscription, and may unsubscribe by invoking
280 * its {@link Flow.Subscription#cancel() cancel} method.
281 *
282 * @param subscriber the subscriber
283 * @throws NullPointerException if subscriber is null
284 */
285 public void subscribe(Flow.Subscriber<? super T> subscriber) {
286 if (subscriber == null) throw new NullPointerException();
287 BufferedSubscription<T> subscription =
288 new BufferedSubscription<T>(subscriber, executor,
289 onNextHandler, maxBufferCapacity);
290 synchronized (this) {
291 for (BufferedSubscription<T> b = clients, pred = null;;) {
292 if (b == null) {
293 Throwable ex;
294 subscription.onSubscribe();
295 if ((ex = closedException) != null)
296 subscription.onError(ex);
297 else if (closed)
298 subscription.onComplete();
299 else if (pred == null)
300 clients = subscription;
301 else
302 pred.next = subscription;
303 break;
304 }
305 BufferedSubscription<T> next = b.next;
306 if (b.isDisabled()) { // remove
307 b.next = null; // detach
308 if (pred == null)
309 clients = next;
310 else
311 pred.next = next;
312 }
313 else if (subscriber.equals(b.subscriber)) {
314 b.onError(new IllegalStateException("Duplicate subscribe"));
315 break;
316 }
317 else
318 pred = b;
319 b = next;
320 }
321 }
322 }
323
324 /**
325 * Publishes the given item to each current subscriber by
326 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
327 * onNext} method, blocking uninterruptibly while resources for any
328 * subscriber are unavailable. This method returns an estimate of
329 * the maximum lag (number of items submitted but not yet consumed)
330 * among all current subscribers. This value is at least one
331 * (accounting for this submitted item) if there are any
332 * subscribers, else zero.
333 *
334 * <p>If the Executor for this publisher throws a
335 * RejectedExecutionException (or any other RuntimeException or
336 * Error) when attempting to asynchronously notify subscribers,
337 * then this exception is rethrown, in which case not all
338 * subscribers will have been issued this item.
339 *
340 * @param item the (non-null) item to publish
341 * @return the estimated maximum lag among subscribers
342 * @throws IllegalStateException if closed
343 * @throws NullPointerException if item is null
344 * @throws RejectedExecutionException if thrown by Executor
345 */
346 public int submit(T item) {
347 if (item == null) throw new NullPointerException();
348 int lag = 0;
349 boolean complete;
350 synchronized (this) {
351 complete = closed;
352 BufferedSubscription<T> b = clients;
353 if (!complete) {
354 BufferedSubscription<T> pred = null, r = null, rtail = null;
355 while (b != null) {
356 BufferedSubscription<T> next = b.next;
357 int stat = b.offer(item);
358 if (stat < 0) { // disabled
359 b.next = null;
360 if (pred == null)
361 clients = next;
362 else
363 pred.next = next;
364 }
365 else {
366 if (stat > lag)
367 lag = stat;
368 else if (stat == 0) { // place on retry list
369 b.nextRetry = null;
370 if (rtail == null)
371 r = b;
372 else
373 rtail.nextRetry = b;
374 rtail = b;
375 }
376 pred = b;
377 }
378 b = next;
379 }
380 while (r != null) {
381 BufferedSubscription<T> nextRetry = r.nextRetry;
382 r.nextRetry = null;
383 int stat = r.submit(item);
384 if (stat > lag)
385 lag = stat;
386 else if (stat < 0 && clients == r)
387 clients = r.next; // postpone internal unsubscribes
388 r = nextRetry;
389 }
390 }
391 }
392 if (complete)
393 throw new IllegalStateException("Closed");
394 else
395 return lag;
396 }
397
398 /**
399 * Publishes the given item, if possible, to each current subscriber
400 * by asynchronously invoking its {@link
401 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
402 * dropped by one or more subscribers if resource limits are
403 * exceeded, in which case the given handler (if non-null) is
404 * invoked, and if it returns true, retried once. Other calls to
405 * methods in this class by other threads are blocked while the
406 * handler is invoked. Unless recovery is assured, options are
407 * usually limited to logging the error and/or issuing an {@link
408 * Flow.Subscriber#onError(Throwable) onError} signal to the
409 * subscriber.
410 *
411 * <p>This method returns a status indicator: If negative, it
412 * represents the (negative) number of drops (failed attempts to
413 * issue the item to a subscriber). Otherwise it is an estimate of
414 * the maximum lag (number of items submitted but not yet
415 * consumed) among all current subscribers. This value is at least
416 * one (accounting for this submitted item) if there are any
417 * subscribers, else zero.
418 *
419 * <p>If the Executor for this publisher throws a
420 * RejectedExecutionException (or any other RuntimeException or
421 * Error) when attempting to asynchronously notify subscribers, or
422 * the drop handler throws an exception when processing a dropped
423 * item, then this exception is rethrown.
424 *
425 * @param item the (non-null) item to publish
426 * @param onDrop if non-null, the handler invoked upon a drop to a
427 * subscriber, with arguments of the subscriber and item; if it
428 * returns true, an offer is re-attempted (once)
429 * @return if negative, the (negative) number of drops; otherwise
430 * an estimate of maximum lag
431 * @throws IllegalStateException if closed
432 * @throws NullPointerException if item is null
433 * @throws RejectedExecutionException if thrown by Executor
434 */
435 public int offer(T item,
436 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
437 return doOffer(0L, item, onDrop);
438 }
439
440 /**
441 * Publishes the given item, if possible, to each current subscriber
442 * by asynchronously invoking its {@link
443 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
444 * resources for any subscription are unavailable, up to the
445 * specified timeout or until the caller thread is interrupted, at
446 * which point the given handler (if non-null) is invoked, and if it
447 * returns true, retried once. (The drop handler may distinguish
448 * timeouts from interrupts by checking whether the current thread
449 * is interrupted.) Other calls to methods in this class by other
450 * threads are blocked while the handler is invoked. Unless
451 * recovery is assured, options are usually limited to logging the
452 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
453 * onError} signal to the subscriber.
454 *
455 * <p>This method returns a status indicator: If negative, it
456 * represents the (negative) number of drops (failed attempts to
457 * issue the item to a subscriber). Otherwise it is an estimate of
458 * the maximum lag (number of items submitted but not yet
459 * consumed) among all current subscribers. This value is at least
460 * one (accounting for this submitted item) if there are any
461 * subscribers, else zero.
462 *
463 * <p>If the Executor for this publisher throws a
464 * RejectedExecutionException (or any other RuntimeException or
465 * Error) when attempting to asynchronously notify subscribers, or
466 * the drop handler throws an exception when processing a dropped
467 * item, then this exception is rethrown.
468 *
469 * @param item the (non-null) item to publish
470 * @param timeout how long to wait for resources for any subscriber
471 * before giving up, in units of {@code unit}
472 * @param unit a {@code TimeUnit} determining how to interpret the
473 * {@code timeout} parameter
474 * @param onDrop if non-null, the handler invoked upon a drop to a
475 * subscriber, with arguments of the subscriber and item; if it
476 * returns true, an offer is re-attempted (once)
477 * @return if negative, the (negative) number of drops; otherwise
478 * an estimate of maximum lag
479 * @throws IllegalStateException if closed
480 * @throws NullPointerException if item is null
481 * @throws RejectedExecutionException if thrown by Executor
482 */
483 public int offer(T item, long timeout, TimeUnit unit,
484 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
485 return doOffer(unit.toNanos(timeout), item, onDrop);
486 }
487
488 /** Common implementation for both forms of offer */
489 final int doOffer(long nanos, T item,
490 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
491 if (item == null) throw new NullPointerException();
492 int lag = 0, drops = 0;
493 boolean complete;
494 synchronized (this) {
495 complete = closed;
496 BufferedSubscription<T> b = clients;
497 if (!complete) {
498 BufferedSubscription<T> pred = null, r = null, rtail = null;
499 while (b != null) {
500 BufferedSubscription<T> next = b.next;
501 int stat = b.offer(item);
502 if (stat < 0) {
503 b.next = null;
504 if (pred == null)
505 clients = next;
506 else
507 pred.next = next;
508 }
509 else {
510 if (stat > lag)
511 lag = stat;
512 else if (stat == 0) {
513 b.nextRetry = null;
514 if (rtail == null)
515 r = b;
516 else
517 rtail.nextRetry = b;
518 rtail = b;
519 }
520 else if (stat > lag)
521 lag = stat;
522 pred = b;
523 }
524 b = next;
525 }
526 while (r != null) {
527 BufferedSubscription<T> nextRetry = r.nextRetry;
528 r.nextRetry = null;
529 int stat = (nanos > 0L)
530 ? r.timedOffer(item, nanos)
531 : r.offer(item);
532 if (stat == 0 && onDrop != null &&
533 onDrop.test(r.subscriber, item))
534 stat = r.offer(item);
535 if (stat == 0)
536 ++drops;
537 else if (stat > lag)
538 lag = stat;
539 else if (stat < 0 && clients == r)
540 clients = r.next;
541 r = nextRetry;
542 }
543 }
544 }
545 if (complete)
546 throw new IllegalStateException("Closed");
547 else
548 return (drops > 0) ? -drops : lag;
549 }
550
551 /**
552 * Unless already closed, issues {@link
553 * Flow.Subscriber#onComplete() onComplete} signals to current
554 * subscribers, and disallows subsequent attempts to publish.
555 * Upon return, this method does <em>NOT</em> guarantee that all
556 * subscribers have yet completed.
557 */
558 public void close() {
559 if (!closed) {
560 BufferedSubscription<T> b;
561 synchronized (this) {
562 b = clients;
563 clients = null;
564 closed = true;
565 }
566 while (b != null) {
567 BufferedSubscription<T> next = b.next;
568 b.next = null;
569 b.onComplete();
570 b = next;
571 }
572 }
573 }
574
575 /**
576 * Unless already closed, issues {@link
577 * Flow.Subscriber#onError(Throwable) onError} signals to current
578 * subscribers with the given error, and disallows subsequent
579 * attempts to publish. Future subscribers also receive the given
580 * error. Upon return, this method does <em>NOT</em> guarantee
581 * that all subscribers have yet completed.
582 *
583 * @param error the {@code onError} argument sent to subscribers
584 * @throws NullPointerException if error is null
585 */
586 public void closeExceptionally(Throwable error) {
587 if (error == null)
588 throw new NullPointerException();
589 if (!closed) {
590 BufferedSubscription<T> b;
591 synchronized (this) {
592 b = clients;
593 clients = null;
594 if (!closed) // don't override plain close
595 closedException = error;
596 closed = true;
597 }
598 while (b != null) {
599 BufferedSubscription<T> next = b.next;
600 b.next = null;
601 b.onError(error);
602 b = next;
603 }
604 }
605 }
606
607 /**
608 * Returns true if this publisher is not accepting submissions.
609 *
610 * @return true if closed
611 */
612 public boolean isClosed() {
613 return closed;
614 }
615
616 /**
617 * Returns the exception associated with {@link
618 * #closeExceptionally(Throwable) closeExceptionally}, or null if
619 * not closed or if closed normally.
620 *
621 * @return the exception, or null if none
622 */
623 public Throwable getClosedException() {
624 return closedException;
625 }
626
627 /**
628 * Returns true if this publisher has any subscribers.
629 *
630 * @return true if this publisher has any subscribers
631 */
632 public boolean hasSubscribers() {
633 boolean nonEmpty = false;
634 if (!closed) {
635 synchronized (this) {
636 for (BufferedSubscription<T> b = clients; b != null;) {
637 BufferedSubscription<T> next = b.next;
638 if (b.isDisabled()) {
639 b.next = null;
640 b = clients = next;
641 }
642 else {
643 nonEmpty = true;
644 break;
645 }
646 }
647 }
648 }
649 return nonEmpty;
650 }
651
652 /**
653 * Returns the number of current subscribers.
654 *
655 * @return the number of current subscribers
656 */
657 public int getNumberOfSubscribers() {
658 int count = 0;
659 if (!closed) {
660 synchronized (this) {
661 BufferedSubscription<T> pred = null, next;
662 for (BufferedSubscription<T> b = clients; b != null; b = next) {
663 next = b.next;
664 if (b.isDisabled()) {
665 b.next = null;
666 if (pred == null)
667 clients = next;
668 else
669 pred.next = next;
670 }
671 else {
672 pred = b;
673 ++count;
674 }
675 }
676 }
677 }
678 return count;
679 }
680
681 /**
682 * Returns the Executor used for asynchronous delivery.
683 *
684 * @return the Executor used for asynchronous delivery
685 */
686 public Executor getExecutor() {
687 return executor;
688 }
689
690 /**
691 * Returns the maximum per-subscriber buffer capacity.
692 *
693 * @return the maximum per-subscriber buffer capacity
694 */
695 public int getMaxBufferCapacity() {
696 return maxBufferCapacity;
697 }
698
699 /**
700 * Returns a list of current subscribers for monitoring and
701 * tracking purposes, not for invoking {@link Flow.Subscriber}
702 * methods on the subscribers.
703 *
704 * @return list of current subscribers
705 */
706 public List<Flow.Subscriber<? super T>> getSubscribers() {
707 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
708 synchronized (this) {
709 BufferedSubscription<T> pred = null, next;
710 for (BufferedSubscription<T> b = clients; b != null; b = next) {
711 next = b.next;
712 if (b.isDisabled()) {
713 b.next = null;
714 if (pred == null)
715 clients = next;
716 else
717 pred.next = next;
718 }
719 else
720 subs.add(b.subscriber);
721 }
722 }
723 return subs;
724 }
725
726 /**
727 * Returns true if the given Subscriber is currently subscribed.
728 *
729 * @param subscriber the subscriber
730 * @return true if currently subscribed
731 * @throws NullPointerException if subscriber is null
732 */
733 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
734 if (subscriber == null) throw new NullPointerException();
735 if (!closed) {
736 synchronized (this) {
737 BufferedSubscription<T> pred = null, next;
738 for (BufferedSubscription<T> b = clients; b != null; b = next) {
739 next = b.next;
740 if (b.isDisabled()) {
741 b.next = null;
742 if (pred == null)
743 clients = next;
744 else
745 pred.next = next;
746 }
747 else if (subscriber.equals(b.subscriber))
748 return true;
749 else
750 pred = b;
751 }
752 }
753 }
754 return false;
755 }
756
757 /**
758 * Returns an estimate of the minimum number of items requested
759 * (via {@link Flow.Subscription#request(long) request}) but not
760 * yet produced, among all current subscribers.
761 *
762 * @return the estimate, or zero if no subscribers
763 */
764 public long estimateMinimumDemand() {
765 long min = Long.MAX_VALUE;
766 boolean nonEmpty = false;
767 synchronized (this) {
768 BufferedSubscription<T> pred = null, next;
769 for (BufferedSubscription<T> b = clients; b != null; b = next) {
770 int n; long d;
771 next = b.next;
772 if ((n = b.estimateLag()) < 0) {
773 b.next = null;
774 if (pred == null)
775 clients = next;
776 else
777 pred.next = next;
778 }
779 else {
780 if ((d = b.demand - n) < min)
781 min = d;
782 nonEmpty = true;
783 pred = b;
784 }
785 }
786 }
787 return nonEmpty ? min : 0;
788 }
789
790 /**
791 * Returns an estimate of the maximum number of items produced but
792 * not yet consumed among all current subscribers.
793 *
794 * @return the estimate
795 */
796 public int estimateMaximumLag() {
797 int max = 0;
798 synchronized (this) {
799 BufferedSubscription<T> pred = null, next;
800 for (BufferedSubscription<T> b = clients; b != null; b = next) {
801 int n;
802 next = b.next;
803 if ((n = b.estimateLag()) < 0) {
804 b.next = null;
805 if (pred == null)
806 clients = next;
807 else
808 pred.next = next;
809 }
810 else {
811 if (n > max)
812 max = n;
813 pred = b;
814 }
815 }
816 }
817 return max;
818 }
819
820 /**
821 * Processes all published items using the given Consumer function.
822 * Returns a CompletableFuture that is completed normally when this
823 * publisher signals {@link Flow.Subscriber#onComplete()
824 * onComplete}, or completed exceptionally upon any error, or an
825 * exception is thrown by the Consumer, or the returned
826 * CompletableFuture is cancelled, in which case no further items
827 * are processed.
828 *
829 * @param consumer the function applied to each onNext item
830 * @return a CompletableFuture that is completed normally
831 * when the publisher signals onComplete, and exceptionally
832 * upon any error or cancellation
833 * @throws NullPointerException if consumer is null
834 */
835 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
836 if (consumer == null)
837 throw new NullPointerException();
838 CompletableFuture<Void> status = new CompletableFuture<>();
839 subscribe(new ConsumerSubscriber<T>(status, consumer));
840 return status;
841 }
842
843 /** Subscriber for method consume */
844 private static final class ConsumerSubscriber<T>
845 implements Flow.Subscriber<T> {
846 final CompletableFuture<Void> status;
847 final Consumer<? super T> consumer;
848 Flow.Subscription subscription;
849 ConsumerSubscriber(CompletableFuture<Void> status,
850 Consumer<? super T> consumer) {
851 this.status = status; this.consumer = consumer;
852 }
853 public final void onSubscribe(Flow.Subscription subscription) {
854 this.subscription = subscription;
855 status.whenComplete((v, e) -> subscription.cancel());
856 if (!status.isDone())
857 subscription.request(Long.MAX_VALUE);
858 }
859 public final void onError(Throwable ex) {
860 status.completeExceptionally(ex);
861 }
862 public final void onComplete() {
863 status.complete(null);
864 }
865 public final void onNext(T item) {
866 try {
867 consumer.accept(item);
868 } catch (Throwable ex) {
869 subscription.cancel();
870 status.completeExceptionally(ex);
871 }
872 }
873 }
874
875 /**
876 * A task for consuming buffer items and signals, created and
877 * executed whenever they become available. A task consumes as
878 * many items/signals as possible before terminating, at which
879 * point another task is created when needed. The dual Runnable
880 * and ForkJoinTask declaration saves overhead when executed by
881 * ForkJoinPools, without impacting other kinds of Executors.
882 */
883 @SuppressWarnings("serial")
884 static final class ConsumerTask<T> extends ForkJoinTask<Void>
885 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
886 final BufferedSubscription<T> consumer;
887 ConsumerTask(BufferedSubscription<T> consumer) {
888 this.consumer = consumer;
889 }
890 public final Void getRawResult() { return null; }
891 public final void setRawResult(Void v) {}
892 public final boolean exec() { consumer.consume(); return false; }
893 public final void run() { consumer.consume(); }
894 }
895
896 /**
897 * A bounded (ring) buffer with integrated control to start a
898 * consumer task whenever items are available. The buffer
899 * algorithm is similar to one used inside ForkJoinPool (see its
900 * internal documentation for details) specialized for the case of
901 * at most one concurrent producer and consumer, and power of two
902 * buffer sizes. This allows methods to operate without locks even
903 * while supporting resizing, blocking, task-triggering, and
904 * garbage-free buffers (nulling out elements when consumed),
905 * although supporting these does impose a bit of overhead
906 * compared to plain fixed-size ring buffers.
907 *
908 * The publisher guarantees a single producer via its lock. We
909 * ensure in this class that there is at most one consumer. The
910 * request and cancel methods must be fully thread-safe but are
911 * coded to exploit the most common case in which they are only
912 * called by consumers (usually within onNext).
913 *
914 * Execution control is managed using the ACTIVE ctl bit. We
915 * ensure that a task is active when consumable items (and
916 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
917 * there is demand (unfilled requests). This is complicated on
918 * the creation side by the possibility of exceptions when trying
919 * to execute tasks. These eventually force DISABLED state, but
920 * sometimes not directly. On the task side, termination (clearing
921 * ACTIVE) that would otherwise race with producers or request()
922 * calls uses the CONSUME keep-alive bit to force a recheck.
923 *
924 * The ctl field also manages run state. When DISABLED, no further
925 * updates are possible. Disabling may be preceded by setting
926 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
927 * case the associated Subscriber methods are invoked, possibly
928 * synchronously if there is no active consumer task (including
929 * cases where execute() failed). The cancel() method is supported
930 * by treating as ERROR but suppressing onError signal.
931 *
932 * Support for blocking also exploits the fact that there is only
933 * one possible waiter. ManagedBlocker-compatible control fields
934 * are placed in this class itself rather than in wait-nodes.
935 * Blocking control relies on the "waiter" field. Producers set
936 * the field before trying to block, but must then recheck (via
937 * offer) before parking. Signalling then just unparks and clears
938 * waiter field. If the producer and/or consumer are using a
939 * ForkJoinPool, the producer attempts to help run consumer tasks
940 * via ForkJoinPool.helpAsyncBlocker before blocking.
941 *
942 * This class uses @Contended and heuristic field declaration
943 * ordering to reduce false-sharing-based memory contention among
944 * instances of BufferedSubscription, but it does not currently
945 * attempt to avoid memory contention among buffers. This field
946 * and element packing can hurt performance especially when each
947 * publisher has only one client operating at a high rate.
948 * Addressing this may require allocating substantially more space
949 * than users expect.
950 */
951 @SuppressWarnings("serial")
952 @jdk.internal.vm.annotation.Contended
953 private static final class BufferedSubscription<T>
954 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
955 // Order-sensitive field declarations
956 long timeout; // > 0 if timed wait
957 volatile long demand; // # unfilled requests
958 int maxCapacity; // reduced on OOME
959 int putStat; // offer result for ManagedBlocker
960 volatile int ctl; // atomic run state flags
961 volatile int head; // next position to take
962 int tail; // next position to put
963 Object[] array; // buffer: null if disabled
964 Flow.Subscriber<? super T> subscriber; // null if disabled
965 Executor executor; // null if disabled
966 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
967 volatile Throwable pendingError; // holds until onError issued
968 volatile Thread waiter; // blocked producer thread
969 T putItem; // for offer within ManagedBlocker
970 BufferedSubscription<T> next; // used only by publisher
971 BufferedSubscription<T> nextRetry; // used only by publisher
972
973 // ctl values
974 static final int ACTIVE = 0x01; // consumer task active
975 static final int CONSUME = 0x02; // keep-alive for consumer task
976 static final int DISABLED = 0x04; // final state
977 static final int ERROR = 0x08; // signal onError then disable
978 static final int SUBSCRIBE = 0x10; // signal onSubscribe
979 static final int COMPLETE = 0x20; // signal onComplete when done
980
981 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
982
983 /**
984 * Initial buffer capacity used when maxBufferCapacity is
985 * greater. Must be a power of two.
986 */
987 static final int DEFAULT_INITIAL_CAP = 32;
988
989 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
990 Executor executor,
991 BiConsumer<? super Flow.Subscriber<? super T>,
992 ? super Throwable> onNextHandler,
993 int maxBufferCapacity) {
994 this.subscriber = subscriber;
995 this.executor = executor;
996 this.onNextHandler = onNextHandler;
997 this.maxCapacity = maxBufferCapacity;
998 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
999 (maxBufferCapacity < 2 ? // at least 2 slots
1000 2 : maxBufferCapacity) :
1001 DEFAULT_INITIAL_CAP];
1002 }
1003
1004 final boolean isDisabled() {
1005 return ctl == DISABLED;
1006 }
1007
1008 /**
1009 * Returns estimated number of buffered items, or -1 if
1010 * disabled.
1011 */
1012 final int estimateLag() {
1013 int n;
1014 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1015 }
1016
1017 /**
1018 * Tries to add item and start consumer task if necessary.
1019 * @return -1 if disabled, 0 if dropped, else estimated lag
1020 */
1021 final int offer(T item) {
1022 int h = head, t = tail, cap, size, stat;
1023 Object[] a = array;
1024 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1025 a[(cap - 1) & t] = item; // relaxed writes OK
1026 tail = t + 1;
1027 stat = size;
1028 }
1029 else
1030 stat = growAndAdd(a, item);
1031 return (stat > 0 &&
1032 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1033 startOnOffer(stat) : stat;
1034 }
1035
1036 /**
1037 * Tries to create or expand buffer, then adds item if possible.
1038 */
1039 private int growAndAdd(Object[] a, T item) {
1040 boolean alloc;
1041 int cap, stat;
1042 if ((ctl & (ERROR | DISABLED)) != 0) {
1043 cap = 0;
1044 stat = -1;
1045 alloc = false;
1046 }
1047 else if (a == null || (cap = a.length) <= 0) {
1048 cap = 0;
1049 stat = 1;
1050 alloc = true;
1051 }
1052 else {
1053 VarHandle.fullFence(); // recheck
1054 int h = head, t = tail, size = t + 1 - h;
1055 if (cap >= size) {
1056 a[(cap - 1) & t] = item;
1057 tail = t + 1;
1058 stat = size;
1059 alloc = false;
1060 }
1061 else if (cap >= maxCapacity) {
1062 stat = 0; // cannot grow
1063 alloc = false;
1064 }
1065 else {
1066 stat = cap + 1;
1067 alloc = true;
1068 }
1069 }
1070 if (alloc) {
1071 int newCap = (cap > 0) ? cap << 1 : 1;
1072 if (newCap <= cap)
1073 stat = 0;
1074 else {
1075 Object[] newArray = null;
1076 try {
1077 newArray = new Object[newCap];
1078 } catch (Throwable ex) { // try to cope with OOME
1079 }
1080 if (newArray == null) {
1081 if (cap > 0)
1082 maxCapacity = cap; // avoid continuous failure
1083 stat = 0;
1084 }
1085 else {
1086 array = newArray;
1087 int t = tail;
1088 int newMask = newCap - 1;
1089 if (a != null && cap > 0) {
1090 int mask = cap - 1;
1091 for (int j = head; j != t; ++j) {
1092 int k = j & mask;
1093 Object x = QA.getAcquire(a, k);
1094 if (x != null && // races with consumer
1095 QA.compareAndSet(a, k, x, null))
1096 newArray[j & newMask] = x;
1097 }
1098 }
1099 newArray[t & newMask] = item;
1100 tail = t + 1;
1101 }
1102 }
1103 }
1104 return stat;
1105 }
1106
1107 /**
1108 * Spins/helps/blocks while offer returns 0. Called only if
1109 * initial offer return 0.
1110 */
1111 final int submit(T item) {
1112 int stat;
1113 if ((stat = offer(item)) == 0) {
1114 putItem = item;
1115 timeout = 0L;
1116 putStat = 0;
1117 ForkJoinPool.helpAsyncBlocker(executor, this);
1118 if ((stat = putStat) == 0) {
1119 try {
1120 ForkJoinPool.managedBlock(this);
1121 } catch (InterruptedException ie) {
1122 timeout = INTERRUPTED;
1123 }
1124 stat = putStat;
1125 }
1126 if (timeout < 0L)
1127 Thread.currentThread().interrupt();
1128 }
1129 return stat;
1130 }
1131
1132 /**
1133 * Timeout version; similar to submit.
1134 */
1135 final int timedOffer(T item, long nanos) {
1136 int stat;
1137 if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1138 putItem = item;
1139 putStat = 0;
1140 ForkJoinPool.helpAsyncBlocker(executor, this);
1141 if ((stat = putStat) == 0) {
1142 try {
1143 ForkJoinPool.managedBlock(this);
1144 } catch (InterruptedException ie) {
1145 timeout = INTERRUPTED;
1146 }
1147 stat = putStat;
1148 }
1149 if (timeout < 0L)
1150 Thread.currentThread().interrupt();
1151 }
1152 return stat;
1153 }
1154
1155 /**
1156 * Tries to start consumer task after offer.
1157 * @return -1 if now disabled, else argument
1158 */
1159 private int startOnOffer(int stat) {
1160 for (;;) {
1161 Executor e; int c;
1162 if ((c = ctl) == DISABLED || (e = executor) == null) {
1163 stat = -1;
1164 break;
1165 }
1166 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1167 if ((c & CONSUME) != 0 ||
1168 CTL.compareAndSet(this, c, c | CONSUME))
1169 break;
1170 }
1171 else if (demand == 0L || tail == head)
1172 break;
1173 else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
1174 try {
1175 e.execute(new ConsumerTask<T>(this));
1176 break;
1177 } catch (RuntimeException | Error ex) { // back out
1178 do {} while (((c = ctl) & DISABLED) == 0 &&
1179 (c & ACTIVE) != 0 &&
1180 !CTL.weakCompareAndSet
1181 (this, c, c & ~ACTIVE));
1182 throw ex;
1183 }
1184 }
1185 }
1186 return stat;
1187 }
1188
1189 private void signalWaiter(Thread w) {
1190 waiter = null;
1191 LockSupport.unpark(w); // release producer
1192 }
1193
1194 /**
1195 * Nulls out most fields, mainly to avoid garbage retention
1196 * until publisher unsubscribes, but also to help cleanly stop
1197 * upon error by nulling required components.
1198 */
1199 private void detach() {
1200 Thread w = waiter;
1201 executor = null;
1202 subscriber = null;
1203 pendingError = null;
1204 signalWaiter(w);
1205 }
1206
1207 /**
1208 * Issues error signal, asynchronously if a task is running,
1209 * else synchronously.
1210 */
1211 final void onError(Throwable ex) {
1212 for (int c;;) {
1213 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1214 break;
1215 else if ((c & ACTIVE) != 0) {
1216 pendingError = ex;
1217 if (CTL.compareAndSet(this, c, c | ERROR))
1218 break; // cause consumer task to exit
1219 }
1220 else if (CTL.compareAndSet(this, c, DISABLED)) {
1221 Flow.Subscriber<? super T> s = subscriber;
1222 if (s != null && ex != null) {
1223 try {
1224 s.onError(ex);
1225 } catch (Throwable ignore) {
1226 }
1227 }
1228 detach();
1229 break;
1230 }
1231 }
1232 }
1233
1234 /**
1235 * Tries to start consumer task upon a signal or request;
1236 * disables on failure.
1237 */
1238 private void startOrDisable() {
1239 Executor e;
1240 if ((e = executor) != null) { // skip if already disabled
1241 try {
1242 e.execute(new ConsumerTask<T>(this));
1243 } catch (Throwable ex) { // back out and force signal
1244 for (int c;;) {
1245 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1246 break;
1247 if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
1248 onError(ex);
1249 break;
1250 }
1251 }
1252 }
1253 }
1254 }
1255
1256 final void onComplete() {
1257 for (int c;;) {
1258 if ((c = ctl) == DISABLED)
1259 break;
1260 if (CTL.compareAndSet(this, c,
1261 c | (ACTIVE | CONSUME | COMPLETE))) {
1262 if ((c & ACTIVE) == 0)
1263 startOrDisable();
1264 break;
1265 }
1266 }
1267 }
1268
1269 final void onSubscribe() {
1270 for (int c;;) {
1271 if ((c = ctl) == DISABLED)
1272 break;
1273 if (CTL.compareAndSet(this, c,
1274 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1275 if ((c & ACTIVE) == 0)
1276 startOrDisable();
1277 break;
1278 }
1279 }
1280 }
1281
1282 /**
1283 * Causes consumer task to exit if active (without reporting
1284 * onError unless there is already a pending error), and
1285 * disables.
1286 */
1287 public void cancel() {
1288 for (int c;;) {
1289 if ((c = ctl) == DISABLED)
1290 break;
1291 else if ((c & ACTIVE) != 0) {
1292 if (CTL.compareAndSet(this, c,
1293 c | (CONSUME | ERROR)))
1294 break;
1295 }
1296 else if (CTL.compareAndSet(this, c, DISABLED)) {
1297 detach();
1298 break;
1299 }
1300 }
1301 }
1302
1303 /**
1304 * Adds to demand and possibly starts task.
1305 */
1306 public void request(long n) {
1307 if (n > 0L) {
1308 for (;;) {
1309 long prev = demand, d;
1310 if ((d = prev + n) < prev) // saturate
1311 d = Long.MAX_VALUE;
1312 if (DEMAND.compareAndSet(this, prev, d)) {
1313 for (int c, h;;) {
1314 if ((c = ctl) == DISABLED)
1315 break;
1316 else if ((c & ACTIVE) != 0) {
1317 if ((c & CONSUME) != 0 ||
1318 CTL.compareAndSet(this, c, c | CONSUME))
1319 break;
1320 }
1321 else if ((h = head) != tail) {
1322 if (CTL.compareAndSet(this, c,
1323 c | (ACTIVE|CONSUME))) {
1324 startOrDisable();
1325 break;
1326 }
1327 }
1328 else if (head == h && tail == h)
1329 break; // else stale
1330 if (demand == 0L)
1331 break;
1332 }
1333 break;
1334 }
1335 }
1336 }
1337 else if (n < 0L)
1338 onError(new IllegalArgumentException(
1339 "negative subscription request"));
1340 }
1341
1342 public final boolean isReleasable() { // for ManagedBlocker
1343 T item = putItem;
1344 if (item != null) {
1345 if ((putStat = offer(item)) == 0)
1346 return false;
1347 putItem = null;
1348 }
1349 return true;
1350 }
1351
1352 public final boolean block() { // for ManagedBlocker
1353 T item = putItem;
1354 if (item != null) {
1355 putItem = null;
1356 long nanos = timeout;
1357 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1358 while ((putStat = offer(item)) == 0) {
1359 if (Thread.interrupted()) {
1360 timeout = INTERRUPTED;
1361 if (nanos > 0L)
1362 break;
1363 }
1364 else if (nanos > 0L &&
1365 (nanos = deadline - System.nanoTime()) <= 0L)
1366 break;
1367 else if (waiter == null)
1368 waiter = Thread.currentThread();
1369 else {
1370 if (nanos > 0L)
1371 LockSupport.parkNanos(this, nanos);
1372 else
1373 LockSupport.park(this);
1374 waiter = null;
1375 }
1376 }
1377 }
1378 waiter = null;
1379 return true;
1380 }
1381
1382 /**
1383 * Consumer loop, called from ConsumerTask, or indirectly
1384 * when helping during submit.
1385 */
1386 final void consume() {
1387 Flow.Subscriber<? super T> s;
1388 int h = head;
1389 if ((s = subscriber) != null) { // else disabled
1390 for (;;) {
1391 long d = demand;
1392 int c; Object[] a; int n, i; Object x; Thread w;
1393 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1394 if (!checkControl(s, c))
1395 break;
1396 }
1397 else if ((a = array) == null || h == tail ||
1398 (n = a.length) == 0 ||
1399 (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
1400 if (!checkEmpty(s, c))
1401 break;
1402 }
1403 else if (d == 0L) {
1404 if (!checkDemand(c))
1405 break;
1406 }
1407 else if (((c & CONSUME) != 0 ||
1408 CTL.compareAndSet(this, c, c | CONSUME)) &&
1409 QA.compareAndSet(a, i, x, null)) {
1410 HEAD.setRelease(this, ++h);
1411 DEMAND.getAndAdd(this, -1L);
1412 if ((w = waiter) != null)
1413 signalWaiter(w);
1414 try {
1415 @SuppressWarnings("unchecked") T y = (T) x;
1416 s.onNext(y);
1417 } catch (Throwable ex) {
1418 handleOnNext(s, ex);
1419 }
1420 }
1421 }
1422 }
1423 }
1424
1425 /**
1426 * Responds to control events in consume().
1427 */
1428 private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1429 boolean stat = true;
1430 if ((c & SUBSCRIBE) != 0) {
1431 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
1432 try {
1433 if (s != null)
1434 s.onSubscribe(this);
1435 } catch (Throwable ex) {
1436 onError(ex);
1437 }
1438 }
1439 }
1440 else if ((c & ERROR) != 0) {
1441 Throwable ex = pendingError;
1442 ctl = DISABLED; // no need for CAS
1443 if (ex != null) { // null if errorless cancel
1444 try {
1445 if (s != null)
1446 s.onError(ex);
1447 } catch (Throwable ignore) {
1448 }
1449 }
1450 }
1451 else {
1452 detach();
1453 stat = false;
1454 }
1455 return stat;
1456 }
1457
1458 /**
1459 * Responds to apparent emptiness in consume().
1460 */
1461 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1462 boolean stat = true;
1463 if (head == tail) {
1464 if ((c & CONSUME) != 0)
1465 CTL.compareAndSet(this, c, c & ~CONSUME);
1466 else if ((c & COMPLETE) != 0) {
1467 if (CTL.compareAndSet(this, c, DISABLED)) {
1468 try {
1469 if (s != null)
1470 s.onComplete();
1471 } catch (Throwable ignore) {
1472 }
1473 }
1474 }
1475 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1476 stat = false;
1477 }
1478 return stat;
1479 }
1480
1481 /**
1482 * Responds to apparent zero demand in consume().
1483 */
1484 private boolean checkDemand(int c) {
1485 boolean stat = true;
1486 if (demand == 0L) {
1487 if ((c & CONSUME) != 0)
1488 CTL.compareAndSet(this, c, c & ~CONSUME);
1489 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1490 stat = false;
1491 }
1492 return stat;
1493 }
1494
1495 /**
1496 * Processes exception in Subscriber.onNext.
1497 */
1498 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1499 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1500 if ((h = onNextHandler) != null) {
1501 try {
1502 h.accept(s, ex);
1503 } catch (Throwable ignore) {
1504 }
1505 }
1506 onError(ex);
1507 }
1508
1509 // VarHandle mechanics
1510 private static final VarHandle CTL;
1511 private static final VarHandle TAIL;
1512 private static final VarHandle HEAD;
1513 private static final VarHandle DEMAND;
1514 private static final VarHandle QA;
1515
1516 static {
1517 try {
1518 MethodHandles.Lookup l = MethodHandles.lookup();
1519 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1520 int.class);
1521 TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
1522 int.class);
1523 HEAD = l.findVarHandle(BufferedSubscription.class, "head",
1524 int.class);
1525 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1526 long.class);
1527 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1528 } catch (ReflectiveOperationException e) {
1529 throw new Error(e);
1530 }
1531
1532 // Reduce the risk of rare disastrous classloading in first call to
1533 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1534 Class<?> ensureLoaded = LockSupport.class;
1535 }
1536 }
1537 }