ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.66
Committed: Tue Dec 6 00:29:08 2016 UTC (7 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.65: +11 -11 lines
Log Message:
Ensure onSubscribe occurs before onError

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.locks.LockSupport;
14 import java.util.function.BiConsumer;
15 import java.util.function.BiPredicate;
16 import java.util.function.Consumer;
17
18 /**
19 * A {@link Flow.Publisher} that asynchronously issues submitted
20 * (non-null) items to current subscribers until it is closed. Each
21 * current subscriber receives newly submitted items in the same order
22 * unless drops or exceptions are encountered. Using a
23 * SubmissionPublisher allows item generators to act as compliant <a
24 * href="http://www.reactive-streams.org/"> reactive-streams</a>
25 * Publishers relying on drop handling and/or blocking for flow
26 * control.
27 *
28 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
29 * constructor for delivery to subscribers. The best choice of
30 * Executor depends on expected usage. If the generator(s) of
31 * submitted items run in separate threads, and the number of
32 * subscribers can be estimated, consider using a {@link
33 * Executors#newFixedThreadPool}. Otherwise consider using the
34 * default, normally the {@link ForkJoinPool#commonPool}.
35 *
36 * <p>Buffering allows producers and consumers to transiently operate
37 * at different rates. Each subscriber uses an independent buffer.
38 * Buffers are created upon first use and expanded as needed up to the
39 * given maximum. (The enforced capacity may be rounded up to the
40 * nearest power of two and/or bounded by the largest value supported
41 * by this implementation.) Invocations of {@link
42 * Flow.Subscription#request(long) request} do not directly result in
43 * buffer expansion, but risk saturation if unfilled requests exceed
44 * the maximum capacity. The default value of {@link
45 * Flow#defaultBufferSize()} may provide a useful starting point for
46 * choosing a capacity based on expected rates, resources, and usages.
47 *
48 * <p>Publication methods support different policies about what to do
49 * when buffers are saturated. Method {@link #submit(Object) submit}
50 * blocks until resources are available. This is simplest, but least
51 * responsive. The {@code offer} methods may drop items (either
52 * immediately or with bounded timeout), but provide an opportunity to
53 * interpose a handler and then retry.
54 *
55 * <p>If any Subscriber method throws an exception, its subscription
56 * is cancelled. If a handler is supplied as a constructor argument,
57 * it is invoked before cancellation upon an exception in method
58 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
59 * {@link Flow.Subscriber#onSubscribe onSubscribe},
60 * {@link Flow.Subscriber#onError(Throwable) onError} and
61 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
62 * handled before cancellation. If the supplied Executor throws
63 * {@link RejectedExecutionException} (or any other RuntimeException
64 * or Error) when attempting to execute a task, or a drop handler
65 * throws an exception when processing a dropped item, then the
66 * exception is rethrown. In these cases, not all subscribers will
67 * have been issued the published item. It is usually good practice to
68 * {@link #closeExceptionally closeExceptionally} in these cases.
69 *
70 * <p>Method {@link #consume(Consumer)} simplifies support for a
71 * common case in which the only action of a subscriber is to request
72 * and process all items using a supplied function.
73 *
74 * <p>This class may also serve as a convenient base for subclasses
75 * that generate items, and use the methods in this class to publish
76 * them. For example here is a class that periodically publishes the
77 * items generated from a supplier. (In practice you might add methods
78 * to independently start and stop generation, to share Executors
79 * among publishers, and so on, or use a SubmissionPublisher as a
80 * component rather than a superclass.)
81 *
82 * <pre> {@code
83 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
84 * final ScheduledFuture<?> periodicTask;
85 * final ScheduledExecutorService scheduler;
86 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
87 * Supplier<? extends T> supplier,
88 * long period, TimeUnit unit) {
89 * super(executor, maxBufferCapacity);
90 * scheduler = new ScheduledThreadPoolExecutor(1);
91 * periodicTask = scheduler.scheduleAtFixedRate(
92 * () -> submit(supplier.get()), 0, period, unit);
93 * }
94 * public void close() {
95 * periodicTask.cancel(false);
96 * scheduler.shutdown();
97 * super.close();
98 * }
99 * }}</pre>
100 *
101 * <p>Here is an example of a {@link Flow.Processor} implementation.
102 * It uses single-step requests to its publisher for simplicity of
103 * illustration. A more adaptive version could monitor flow using the
104 * lag estimate returned from {@code submit}, along with other utility
105 * methods.
106 *
107 * <pre> {@code
108 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
109 * implements Flow.Processor<S,T> {
110 * final Function<? super S, ? extends T> function;
111 * Flow.Subscription subscription;
112 * TransformProcessor(Executor executor, int maxBufferCapacity,
113 * Function<? super S, ? extends T> function) {
114 * super(executor, maxBufferCapacity);
115 * this.function = function;
116 * }
117 * public void onSubscribe(Flow.Subscription subscription) {
118 * (this.subscription = subscription).request(1);
119 * }
120 * public void onNext(S item) {
121 * subscription.request(1);
122 * submit(function.apply(item));
123 * }
124 * public void onError(Throwable ex) { closeExceptionally(ex); }
125 * public void onComplete() { close(); }
126 * }}</pre>
127 *
128 * @param <T> the published item type
129 * @author Doug Lea
130 * @since 9
131 */
132 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
133 AutoCloseable {
134 /*
135 * Most mechanics are handled by BufferedSubscription. This class
136 * mainly tracks subscribers and ensures sequentiality, by using
137 * built-in synchronization locks across public methods. (Using
138 * built-in locks works well in the most typical case in which
139 * only one thread submits items).
140 */
141
142 /** The largest possible power of two array size. */
143 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
144
145 /** Round capacity to power of 2, at most limit. */
146 static final int roundCapacity(int cap) {
147 int n = cap - 1;
148 n |= n >>> 1;
149 n |= n >>> 2;
150 n |= n >>> 4;
151 n |= n >>> 8;
152 n |= n >>> 16;
153 return (n <= 0) ? 1 : // at least 1
154 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
155 }
156
157 // default Executor setup; nearly the same as CompletableFuture
158
159 /**
160 * Default executor -- ForkJoinPool.commonPool() unless it cannot
161 * support parallelism.
162 */
163 private static final Executor ASYNC_POOL =
164 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
165 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
166
167 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
168 private static final class ThreadPerTaskExecutor implements Executor {
169 public void execute(Runnable r) { new Thread(r).start(); }
170 }
171
172 /**
173 * Clients (BufferedSubscriptions) are maintained in a linked list
174 * (via their "next" fields). This works well for publish loops.
175 * It requires O(n) traversal to check for duplicate subscribers,
176 * but we expect that subscribing is much less common than
177 * publishing. Unsubscribing occurs only during traversal loops,
178 * when BufferedSubscription methods return negative values
179 * signifying that they have been disabled. To reduce
180 * head-of-line blocking, submit and offer methods first call
181 * BufferedSubscription.offer on each subscriber, and place
182 * saturated ones in retries list (using nextRetry field), and
183 * retry, possibly blocking or dropping.
184 */
185 BufferedSubscription<T> clients;
186
187 /** Run status, updated only within locks */
188 volatile boolean closed;
189 /** If non-null, the exception in closeExceptionally */
190 volatile Throwable closedException;
191
192 // Parameters for constructing BufferedSubscriptions
193 final Executor executor;
194 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
195 final int maxBufferCapacity;
196
197 /**
198 * Creates a new SubmissionPublisher using the given Executor for
199 * async delivery to subscribers, with the given maximum buffer size
200 * for each subscriber, and, if non-null, the given handler invoked
201 * when any Subscriber throws an exception in method {@link
202 * Flow.Subscriber#onNext(Object) onNext}.
203 *
204 * @param executor the executor to use for async delivery,
205 * supporting creation of at least one independent thread
206 * @param maxBufferCapacity the maximum capacity for each
207 * subscriber's buffer (the enforced capacity may be rounded up to
208 * the nearest power of two and/or bounded by the largest value
209 * supported by this implementation; method {@link #getMaxBufferCapacity}
210 * returns the actual value)
211 * @param handler if non-null, procedure to invoke upon exception
212 * thrown in method {@code onNext}
213 * @throws NullPointerException if executor is null
214 * @throws IllegalArgumentException if maxBufferCapacity not
215 * positive
216 */
217 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
218 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
219 if (executor == null)
220 throw new NullPointerException();
221 if (maxBufferCapacity <= 0)
222 throw new IllegalArgumentException("capacity must be positive");
223 this.executor = executor;
224 this.onNextHandler = handler;
225 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
226 }
227
228 /**
229 * Creates a new SubmissionPublisher using the given Executor for
230 * async delivery to subscribers, with the given maximum buffer size
231 * for each subscriber, and no handler for Subscriber exceptions in
232 * method {@link Flow.Subscriber#onNext(Object) onNext}.
233 *
234 * @param executor the executor to use for async delivery,
235 * supporting creation of at least one independent thread
236 * @param maxBufferCapacity the maximum capacity for each
237 * subscriber's buffer (the enforced capacity may be rounded up to
238 * the nearest power of two and/or bounded by the largest value
239 * supported by this implementation; method {@link #getMaxBufferCapacity}
240 * returns the actual value)
241 * @throws NullPointerException if executor is null
242 * @throws IllegalArgumentException if maxBufferCapacity not
243 * positive
244 */
245 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
246 this(executor, maxBufferCapacity, null);
247 }
248
249 /**
250 * Creates a new SubmissionPublisher using the {@link
251 * ForkJoinPool#commonPool()} for async delivery to subscribers
252 * (unless it does not support a parallelism level of at least two,
253 * in which case, a new Thread is created to run each task), with
254 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
255 * handler for Subscriber exceptions in method {@link
256 * Flow.Subscriber#onNext(Object) onNext}.
257 */
258 public SubmissionPublisher() {
259 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
260 }
261
262 /**
263 * Adds the given Subscriber unless already subscribed. If already
264 * subscribed, the Subscriber's {@link
265 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
266 * the existing subscription with an {@link IllegalStateException}.
267 * Otherwise, upon success, the Subscriber's {@link
268 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
269 * asynchronously with a new {@link Flow.Subscription}. If {@link
270 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
271 * subscription is cancelled. Otherwise, if this SubmissionPublisher
272 * was closed exceptionally, then the subscriber's {@link
273 * Flow.Subscriber#onError onError} method is invoked with the
274 * corresponding exception, or if closed without exception, the
275 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
276 * method is invoked. Subscribers may enable receiving items by
277 * invoking the {@link Flow.Subscription#request(long) request}
278 * method of the new Subscription, and may unsubscribe by invoking
279 * its {@link Flow.Subscription#cancel() cancel} method.
280 *
281 * @param subscriber the subscriber
282 * @throws NullPointerException if subscriber is null
283 */
284 public void subscribe(Flow.Subscriber<? super T> subscriber) {
285 if (subscriber == null) throw new NullPointerException();
286 BufferedSubscription<T> subscription =
287 new BufferedSubscription<T>(subscriber, executor,
288 onNextHandler, maxBufferCapacity);
289 synchronized (this) {
290 for (BufferedSubscription<T> b = clients, pred = null;;) {
291 if (b == null) {
292 Throwable ex;
293 subscription.onSubscribe();
294 if ((ex = closedException) != null)
295 subscription.onError(ex);
296 else if (closed)
297 subscription.onComplete();
298 else if (pred == null)
299 clients = subscription;
300 else
301 pred.next = subscription;
302 break;
303 }
304 BufferedSubscription<T> next = b.next;
305 if (b.isDisabled()) { // remove
306 b.next = null; // detach
307 if (pred == null)
308 clients = next;
309 else
310 pred.next = next;
311 }
312 else if (subscriber.equals(b.subscriber)) {
313 b.onError(new IllegalStateException("Duplicate subscribe"));
314 break;
315 }
316 else
317 pred = b;
318 b = next;
319 }
320 }
321 }
322
323 /**
324 * Publishes the given item to each current subscriber by
325 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
326 * onNext} method, blocking uninterruptibly while resources for any
327 * subscriber are unavailable. This method returns an estimate of
328 * the maximum lag (number of items submitted but not yet consumed)
329 * among all current subscribers. This value is at least one
330 * (accounting for this submitted item) if there are any
331 * subscribers, else zero.
332 *
333 * <p>If the Executor for this publisher throws a
334 * RejectedExecutionException (or any other RuntimeException or
335 * Error) when attempting to asynchronously notify subscribers,
336 * then this exception is rethrown, in which case not all
337 * subscribers will have been issued this item.
338 *
339 * @param item the (non-null) item to publish
340 * @return the estimated maximum lag among subscribers
341 * @throws IllegalStateException if closed
342 * @throws NullPointerException if item is null
343 * @throws RejectedExecutionException if thrown by Executor
344 */
345 public int submit(T item) {
346 if (item == null) throw new NullPointerException();
347 int lag = 0;
348 boolean complete;
349 synchronized (this) {
350 complete = closed;
351 BufferedSubscription<T> b = clients;
352 if (!complete) {
353 BufferedSubscription<T> pred = null, r = null, rtail = null;
354 while (b != null) {
355 BufferedSubscription<T> next = b.next;
356 int stat = b.offer(item);
357 if (stat < 0) { // disabled
358 b.next = null;
359 if (pred == null)
360 clients = next;
361 else
362 pred.next = next;
363 }
364 else {
365 if (stat > lag)
366 lag = stat;
367 else if (stat == 0) { // place on retry list
368 b.nextRetry = null;
369 if (rtail == null)
370 r = b;
371 else
372 rtail.nextRetry = b;
373 rtail = b;
374 }
375 pred = b;
376 }
377 b = next;
378 }
379 while (r != null) {
380 BufferedSubscription<T> nextRetry = r.nextRetry;
381 r.nextRetry = null;
382 int stat = r.submit(item);
383 if (stat > lag)
384 lag = stat;
385 else if (stat < 0 && clients == r)
386 clients = r.next; // postpone internal unsubscribes
387 r = nextRetry;
388 }
389 }
390 }
391 if (complete)
392 throw new IllegalStateException("Closed");
393 else
394 return lag;
395 }
396
397 /**
398 * Publishes the given item, if possible, to each current subscriber
399 * by asynchronously invoking its {@link
400 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
401 * dropped by one or more subscribers if resource limits are
402 * exceeded, in which case the given handler (if non-null) is
403 * invoked, and if it returns true, retried once. Other calls to
404 * methods in this class by other threads are blocked while the
405 * handler is invoked. Unless recovery is assured, options are
406 * usually limited to logging the error and/or issuing an {@link
407 * Flow.Subscriber#onError(Throwable) onError} signal to the
408 * subscriber.
409 *
410 * <p>This method returns a status indicator: If negative, it
411 * represents the (negative) number of drops (failed attempts to
412 * issue the item to a subscriber). Otherwise it is an estimate of
413 * the maximum lag (number of items submitted but not yet
414 * consumed) among all current subscribers. This value is at least
415 * one (accounting for this submitted item) if there are any
416 * subscribers, else zero.
417 *
418 * <p>If the Executor for this publisher throws a
419 * RejectedExecutionException (or any other RuntimeException or
420 * Error) when attempting to asynchronously notify subscribers, or
421 * the drop handler throws an exception when processing a dropped
422 * item, then this exception is rethrown.
423 *
424 * @param item the (non-null) item to publish
425 * @param onDrop if non-null, the handler invoked upon a drop to a
426 * subscriber, with arguments of the subscriber and item; if it
427 * returns true, an offer is re-attempted (once)
428 * @return if negative, the (negative) number of drops; otherwise
429 * an estimate of maximum lag
430 * @throws IllegalStateException if closed
431 * @throws NullPointerException if item is null
432 * @throws RejectedExecutionException if thrown by Executor
433 */
434 public int offer(T item,
435 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
436 return doOffer(0L, item, onDrop);
437 }
438
439 /**
440 * Publishes the given item, if possible, to each current subscriber
441 * by asynchronously invoking its {@link
442 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
443 * resources for any subscription are unavailable, up to the
444 * specified timeout or until the caller thread is interrupted, at
445 * which point the given handler (if non-null) is invoked, and if it
446 * returns true, retried once. (The drop handler may distinguish
447 * timeouts from interrupts by checking whether the current thread
448 * is interrupted.) Other calls to methods in this class by other
449 * threads are blocked while the handler is invoked. Unless
450 * recovery is assured, options are usually limited to logging the
451 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
452 * onError} signal to the subscriber.
453 *
454 * <p>This method returns a status indicator: If negative, it
455 * represents the (negative) number of drops (failed attempts to
456 * issue the item to a subscriber). Otherwise it is an estimate of
457 * the maximum lag (number of items submitted but not yet
458 * consumed) among all current subscribers. This value is at least
459 * one (accounting for this submitted item) if there are any
460 * subscribers, else zero.
461 *
462 * <p>If the Executor for this publisher throws a
463 * RejectedExecutionException (or any other RuntimeException or
464 * Error) when attempting to asynchronously notify subscribers, or
465 * the drop handler throws an exception when processing a dropped
466 * item, then this exception is rethrown.
467 *
468 * @param item the (non-null) item to publish
469 * @param timeout how long to wait for resources for any subscriber
470 * before giving up, in units of {@code unit}
471 * @param unit a {@code TimeUnit} determining how to interpret the
472 * {@code timeout} parameter
473 * @param onDrop if non-null, the handler invoked upon a drop to a
474 * subscriber, with arguments of the subscriber and item; if it
475 * returns true, an offer is re-attempted (once)
476 * @return if negative, the (negative) number of drops; otherwise
477 * an estimate of maximum lag
478 * @throws IllegalStateException if closed
479 * @throws NullPointerException if item is null
480 * @throws RejectedExecutionException if thrown by Executor
481 */
482 public int offer(T item, long timeout, TimeUnit unit,
483 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
484 return doOffer(unit.toNanos(timeout), item, onDrop);
485 }
486
487 /** Common implementation for both forms of offer */
488 final int doOffer(long nanos, T item,
489 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
490 if (item == null) throw new NullPointerException();
491 int lag = 0, drops = 0;
492 boolean complete;
493 synchronized (this) {
494 complete = closed;
495 BufferedSubscription<T> b = clients;
496 if (!complete) {
497 BufferedSubscription<T> pred = null, r = null, rtail = null;
498 while (b != null) {
499 BufferedSubscription<T> next = b.next;
500 int stat = b.offer(item);
501 if (stat < 0) {
502 b.next = null;
503 if (pred == null)
504 clients = next;
505 else
506 pred.next = next;
507 }
508 else {
509 if (stat > lag)
510 lag = stat;
511 else if (stat == 0) {
512 b.nextRetry = null;
513 if (rtail == null)
514 r = b;
515 else
516 rtail.nextRetry = b;
517 rtail = b;
518 }
519 else if (stat > lag)
520 lag = stat;
521 pred = b;
522 }
523 b = next;
524 }
525 while (r != null) {
526 BufferedSubscription<T> nextRetry = r.nextRetry;
527 r.nextRetry = null;
528 int stat = (nanos > 0L)
529 ? r.timedOffer(item, nanos)
530 : r.offer(item);
531 if (stat == 0 && onDrop != null &&
532 onDrop.test(r.subscriber, item))
533 stat = r.offer(item);
534 if (stat == 0)
535 ++drops;
536 else if (stat > lag)
537 lag = stat;
538 else if (stat < 0 && clients == r)
539 clients = r.next;
540 r = nextRetry;
541 }
542 }
543 }
544 if (complete)
545 throw new IllegalStateException("Closed");
546 else
547 return (drops > 0) ? -drops : lag;
548 }
549
550 /**
551 * Unless already closed, issues {@link
552 * Flow.Subscriber#onComplete() onComplete} signals to current
553 * subscribers, and disallows subsequent attempts to publish.
554 * Upon return, this method does <em>NOT</em> guarantee that all
555 * subscribers have yet completed.
556 */
557 public void close() {
558 if (!closed) {
559 BufferedSubscription<T> b;
560 synchronized (this) {
561 b = clients;
562 clients = null;
563 closed = true;
564 }
565 while (b != null) {
566 BufferedSubscription<T> next = b.next;
567 b.next = null;
568 b.onComplete();
569 b = next;
570 }
571 }
572 }
573
574 /**
575 * Unless already closed, issues {@link
576 * Flow.Subscriber#onError(Throwable) onError} signals to current
577 * subscribers with the given error, and disallows subsequent
578 * attempts to publish. Future subscribers also receive the given
579 * error. Upon return, this method does <em>NOT</em> guarantee
580 * that all subscribers have yet completed.
581 *
582 * @param error the {@code onError} argument sent to subscribers
583 * @throws NullPointerException if error is null
584 */
585 public void closeExceptionally(Throwable error) {
586 if (error == null)
587 throw new NullPointerException();
588 if (!closed) {
589 BufferedSubscription<T> b;
590 synchronized (this) {
591 b = clients;
592 clients = null;
593 closed = true;
594 closedException = error;
595 }
596 while (b != null) {
597 BufferedSubscription<T> next = b.next;
598 b.next = null;
599 b.onError(error);
600 b = next;
601 }
602 }
603 }
604
605 /**
606 * Returns true if this publisher is not accepting submissions.
607 *
608 * @return true if closed
609 */
610 public boolean isClosed() {
611 return closed;
612 }
613
614 /**
615 * Returns the exception associated with {@link
616 * #closeExceptionally(Throwable) closeExceptionally}, or null if
617 * not closed or if closed normally.
618 *
619 * @return the exception, or null if none
620 */
621 public Throwable getClosedException() {
622 return closedException;
623 }
624
625 /**
626 * Returns true if this publisher has any subscribers.
627 *
628 * @return true if this publisher has any subscribers
629 */
630 public boolean hasSubscribers() {
631 boolean nonEmpty = false;
632 if (!closed) {
633 synchronized (this) {
634 for (BufferedSubscription<T> b = clients; b != null;) {
635 BufferedSubscription<T> next = b.next;
636 if (b.isDisabled()) {
637 b.next = null;
638 b = clients = next;
639 }
640 else {
641 nonEmpty = true;
642 break;
643 }
644 }
645 }
646 }
647 return nonEmpty;
648 }
649
650 /**
651 * Returns the number of current subscribers.
652 *
653 * @return the number of current subscribers
654 */
655 public int getNumberOfSubscribers() {
656 int count = 0;
657 if (!closed) {
658 synchronized (this) {
659 BufferedSubscription<T> pred = null, next;
660 for (BufferedSubscription<T> b = clients; b != null; b = next) {
661 next = b.next;
662 if (b.isDisabled()) {
663 b.next = null;
664 if (pred == null)
665 clients = next;
666 else
667 pred.next = next;
668 }
669 else {
670 pred = b;
671 ++count;
672 }
673 }
674 }
675 }
676 return count;
677 }
678
679 /**
680 * Returns the Executor used for asynchronous delivery.
681 *
682 * @return the Executor used for asynchronous delivery
683 */
684 public Executor getExecutor() {
685 return executor;
686 }
687
688 /**
689 * Returns the maximum per-subscriber buffer capacity.
690 *
691 * @return the maximum per-subscriber buffer capacity
692 */
693 public int getMaxBufferCapacity() {
694 return maxBufferCapacity;
695 }
696
697 /**
698 * Returns a list of current subscribers for monitoring and
699 * tracking purposes, not for invoking {@link Flow.Subscriber}
700 * methods on the subscribers.
701 *
702 * @return list of current subscribers
703 */
704 public List<Flow.Subscriber<? super T>> getSubscribers() {
705 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
706 synchronized (this) {
707 BufferedSubscription<T> pred = null, next;
708 for (BufferedSubscription<T> b = clients; b != null; b = next) {
709 next = b.next;
710 if (b.isDisabled()) {
711 b.next = null;
712 if (pred == null)
713 clients = next;
714 else
715 pred.next = next;
716 }
717 else
718 subs.add(b.subscriber);
719 }
720 }
721 return subs;
722 }
723
724 /**
725 * Returns true if the given Subscriber is currently subscribed.
726 *
727 * @param subscriber the subscriber
728 * @return true if currently subscribed
729 * @throws NullPointerException if subscriber is null
730 */
731 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
732 if (subscriber == null) throw new NullPointerException();
733 if (!closed) {
734 synchronized (this) {
735 BufferedSubscription<T> pred = null, next;
736 for (BufferedSubscription<T> b = clients; b != null; b = next) {
737 next = b.next;
738 if (b.isDisabled()) {
739 b.next = null;
740 if (pred == null)
741 clients = next;
742 else
743 pred.next = next;
744 }
745 else if (subscriber.equals(b.subscriber))
746 return true;
747 else
748 pred = b;
749 }
750 }
751 }
752 return false;
753 }
754
755 /**
756 * Returns an estimate of the minimum number of items requested
757 * (via {@link Flow.Subscription#request(long) request}) but not
758 * yet produced, among all current subscribers.
759 *
760 * @return the estimate, or zero if no subscribers
761 */
762 public long estimateMinimumDemand() {
763 long min = Long.MAX_VALUE;
764 boolean nonEmpty = false;
765 synchronized (this) {
766 BufferedSubscription<T> pred = null, next;
767 for (BufferedSubscription<T> b = clients; b != null; b = next) {
768 int n; long d;
769 next = b.next;
770 if ((n = b.estimateLag()) < 0) {
771 b.next = null;
772 if (pred == null)
773 clients = next;
774 else
775 pred.next = next;
776 }
777 else {
778 if ((d = b.demand - n) < min)
779 min = d;
780 nonEmpty = true;
781 pred = b;
782 }
783 }
784 }
785 return nonEmpty ? min : 0;
786 }
787
788 /**
789 * Returns an estimate of the maximum number of items produced but
790 * not yet consumed among all current subscribers.
791 *
792 * @return the estimate
793 */
794 public int estimateMaximumLag() {
795 int max = 0;
796 synchronized (this) {
797 BufferedSubscription<T> pred = null, next;
798 for (BufferedSubscription<T> b = clients; b != null; b = next) {
799 int n;
800 next = b.next;
801 if ((n = b.estimateLag()) < 0) {
802 b.next = null;
803 if (pred == null)
804 clients = next;
805 else
806 pred.next = next;
807 }
808 else {
809 if (n > max)
810 max = n;
811 pred = b;
812 }
813 }
814 }
815 return max;
816 }
817
818 /**
819 * Processes all published items using the given Consumer function.
820 * Returns a CompletableFuture that is completed normally when this
821 * publisher signals {@link Flow.Subscriber#onComplete()
822 * onComplete}, or completed exceptionally upon any error, or an
823 * exception is thrown by the Consumer, or the returned
824 * CompletableFuture is cancelled, in which case no further items
825 * are processed.
826 *
827 * @param consumer the function applied to each onNext item
828 * @return a CompletableFuture that is completed normally
829 * when the publisher signals onComplete, and exceptionally
830 * upon any error or cancellation
831 * @throws NullPointerException if consumer is null
832 */
833 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
834 if (consumer == null)
835 throw new NullPointerException();
836 CompletableFuture<Void> status = new CompletableFuture<>();
837 subscribe(new ConsumerSubscriber<T>(status, consumer));
838 return status;
839 }
840
841 /** Subscriber for method consume */
842 private static final class ConsumerSubscriber<T>
843 implements Flow.Subscriber<T> {
844 final CompletableFuture<Void> status;
845 final Consumer<? super T> consumer;
846 Flow.Subscription subscription;
847 ConsumerSubscriber(CompletableFuture<Void> status,
848 Consumer<? super T> consumer) {
849 this.status = status; this.consumer = consumer;
850 }
851 public final void onSubscribe(Flow.Subscription subscription) {
852 this.subscription = subscription;
853 status.whenComplete((v, e) -> subscription.cancel());
854 if (!status.isDone())
855 subscription.request(Long.MAX_VALUE);
856 }
857 public final void onError(Throwable ex) {
858 status.completeExceptionally(ex);
859 }
860 public final void onComplete() {
861 status.complete(null);
862 }
863 public final void onNext(T item) {
864 try {
865 consumer.accept(item);
866 } catch (Throwable ex) {
867 subscription.cancel();
868 status.completeExceptionally(ex);
869 }
870 }
871 }
872
873 /**
874 * A task for consuming buffer items and signals, created and
875 * executed whenever they become available. A task consumes as
876 * many items/signals as possible before terminating, at which
877 * point another task is created when needed. The dual Runnable
878 * and ForkJoinTask declaration saves overhead when executed by
879 * ForkJoinPools, without impacting other kinds of Executors.
880 */
881 @SuppressWarnings("serial")
882 static final class ConsumerTask<T> extends ForkJoinTask<Void>
883 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
884 final BufferedSubscription<T> consumer;
885 ConsumerTask(BufferedSubscription<T> consumer) {
886 this.consumer = consumer;
887 }
888 public final Void getRawResult() { return null; }
889 public final void setRawResult(Void v) {}
890 public final boolean exec() { consumer.consume(); return false; }
891 public final void run() { consumer.consume(); }
892 }
893
894 /**
895 * A bounded (ring) buffer with integrated control to start a
896 * consumer task whenever items are available. The buffer
897 * algorithm is similar to one used inside ForkJoinPool (see its
898 * internal documentation for details) specialized for the case of
899 * at most one concurrent producer and consumer, and power of two
900 * buffer sizes. This allows methods to operate without locks even
901 * while supporting resizing, blocking, task-triggering, and
902 * garbage-free buffers (nulling out elements when consumed),
903 * although supporting these does impose a bit of overhead
904 * compared to plain fixed-size ring buffers.
905 *
906 * The publisher guarantees a single producer via its lock. We
907 * ensure in this class that there is at most one consumer. The
908 * request and cancel methods must be fully thread-safe but are
909 * coded to exploit the most common case in which they are only
910 * called by consumers (usually within onNext).
911 *
912 * Execution control is managed using the ACTIVE ctl bit. We
913 * ensure that a task is active when consumable items (and
914 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
915 * there is demand (unfilled requests). This is complicated on
916 * the creation side by the possibility of exceptions when trying
917 * to execute tasks. These eventually force DISABLED state, but
918 * sometimes not directly. On the task side, termination (clearing
919 * ACTIVE) that would otherwise race with producers or request()
920 * calls uses the CONSUME keep-alive bit to force a recheck.
921 *
922 * The ctl field also manages run state. When DISABLED, no further
923 * updates are possible. Disabling may be preceded by setting
924 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
925 * case the associated Subscriber methods are invoked, possibly
926 * synchronously if there is no active consumer task (including
927 * cases where execute() failed). The cancel() method is supported
928 * by treating as ERROR but suppressing onError signal.
929 *
930 * Support for blocking also exploits the fact that there is only
931 * one possible waiter. ManagedBlocker-compatible control fields
932 * are placed in this class itself rather than in wait-nodes.
933 * Blocking control relies on the "waiter" field. Producers set
934 * the field before trying to block, but must then recheck (via
935 * offer) before parking. Signalling then just unparks and clears
936 * waiter field. If the producer and/or consumer are using a
937 * ForkJoinPool, the producer attempts to help run consumer tasks
938 * via ForkJoinPool.helpAsyncBlocker before blocking.
939 *
940 * This class uses @Contended and heuristic field declaration
941 * ordering to reduce false-sharing-based memory contention among
942 * instances of BufferedSubscription, but it does not currently
943 * attempt to avoid memory contention among buffers. This field
944 * and element packing can hurt performance especially when each
945 * publisher has only one client operating at a high rate.
946 * Addressing this may require allocating substantially more space
947 * than users expect.
948 */
949 @SuppressWarnings("serial")
950 @jdk.internal.vm.annotation.Contended
951 private static final class BufferedSubscription<T>
952 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
953 // Order-sensitive field declarations
954 long timeout; // > 0 if timed wait
955 volatile long demand; // # unfilled requests
956 int maxCapacity; // reduced on OOME
957 int putStat; // offer result for ManagedBlocker
958 volatile int ctl; // atomic run state flags
959 volatile int head; // next position to take
960 int tail; // next position to put
961 Object[] array; // buffer: null if disabled
962 Flow.Subscriber<? super T> subscriber; // null if disabled
963 Executor executor; // null if disabled
964 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
965 volatile Throwable pendingError; // holds until onError issued
966 volatile Thread waiter; // blocked producer thread
967 T putItem; // for offer within ManagedBlocker
968 BufferedSubscription<T> next; // used only by publisher
969 BufferedSubscription<T> nextRetry; // used only by publisher
970
971 // ctl values
972 static final int ACTIVE = 0x01; // consumer task active
973 static final int CONSUME = 0x02; // keep-alive for consumer task
974 static final int DISABLED = 0x04; // final state
975 static final int ERROR = 0x08; // signal onError then disable
976 static final int SUBSCRIBE = 0x10; // signal onSubscribe
977 static final int COMPLETE = 0x20; // signal onComplete when done
978
979 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
980
981 /**
982 * Initial buffer capacity used when maxBufferCapacity is
983 * greater. Must be a power of two.
984 */
985 static final int DEFAULT_INITIAL_CAP = 32;
986
987 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
988 Executor executor,
989 BiConsumer<? super Flow.Subscriber<? super T>,
990 ? super Throwable> onNextHandler,
991 int maxBufferCapacity) {
992 this.subscriber = subscriber;
993 this.executor = executor;
994 this.onNextHandler = onNextHandler;
995 this.maxCapacity = maxBufferCapacity;
996 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
997 (maxBufferCapacity < 2 ? // at least 2 slots
998 2 : maxBufferCapacity) :
999 DEFAULT_INITIAL_CAP];
1000 }
1001
1002 final boolean isDisabled() {
1003 return ctl == DISABLED;
1004 }
1005
1006 /**
1007 * Returns estimated number of buffered items, or -1 if
1008 * disabled.
1009 */
1010 final int estimateLag() {
1011 int n;
1012 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1013 }
1014
1015 /**
1016 * Tries to add item and start consumer task if necessary.
1017 * @return -1 if disabled, 0 if dropped, else estimated lag
1018 */
1019 final int offer(T item) {
1020 int h = head, t = tail, cap, size, stat;
1021 Object[] a = array;
1022 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1023 a[(cap - 1) & t] = item; // relaxed writes OK
1024 tail = t + 1;
1025 stat = size;
1026 }
1027 else
1028 stat = growAndAdd(a, item);
1029 return (stat > 0 &&
1030 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1031 startOnOffer(stat) : stat;
1032 }
1033
1034 /**
1035 * Tries to create or expand buffer, then adds item if possible.
1036 */
1037 private int growAndAdd(Object[] a, T item) {
1038 boolean alloc;
1039 int cap, stat;
1040 if ((ctl & (ERROR | DISABLED)) != 0) {
1041 cap = 0;
1042 stat = -1;
1043 alloc = false;
1044 }
1045 else if (a == null || (cap = a.length) <= 0) {
1046 cap = 0;
1047 stat = 1;
1048 alloc = true;
1049 }
1050 else {
1051 VarHandle.fullFence(); // recheck
1052 int h = head, t = tail, size = t + 1 - h;
1053 if (cap >= size) {
1054 a[(cap - 1) & t] = item;
1055 tail = t + 1;
1056 stat = size;
1057 alloc = false;
1058 }
1059 else if (cap >= maxCapacity) {
1060 stat = 0; // cannot grow
1061 alloc = false;
1062 }
1063 else {
1064 stat = cap + 1;
1065 alloc = true;
1066 }
1067 }
1068 if (alloc) {
1069 int newCap = (cap > 0) ? cap << 1 : 1;
1070 if (newCap <= cap)
1071 stat = 0;
1072 else {
1073 Object[] newArray = null;
1074 try {
1075 newArray = new Object[newCap];
1076 } catch (Throwable ex) { // try to cope with OOME
1077 }
1078 if (newArray == null) {
1079 if (cap > 0)
1080 maxCapacity = cap; // avoid continuous failure
1081 stat = 0;
1082 }
1083 else {
1084 array = newArray;
1085 int t = tail;
1086 int newMask = newCap - 1;
1087 if (a != null && cap > 0) {
1088 int mask = cap - 1;
1089 for (int j = head; j != t; ++j) {
1090 int k = j & mask;
1091 Object x = QA.getAcquire(a, k);
1092 if (x != null && // races with consumer
1093 QA.compareAndSet(a, k, x, null))
1094 newArray[j & newMask] = x;
1095 }
1096 }
1097 newArray[t & newMask] = item;
1098 tail = t + 1;
1099 }
1100 }
1101 }
1102 return stat;
1103 }
1104
1105 /**
1106 * Spins/helps/blocks while offer returns 0. Called only if
1107 * initial offer return 0.
1108 */
1109 final int submit(T item) {
1110 int stat;
1111 if ((stat = offer(item)) == 0) {
1112 putItem = item;
1113 timeout = 0L;
1114 putStat = 0;
1115 ForkJoinPool.helpAsyncBlocker(executor, this);
1116 if ((stat = putStat) == 0) {
1117 try {
1118 ForkJoinPool.managedBlock(this);
1119 } catch (InterruptedException ie) {
1120 timeout = INTERRUPTED;
1121 }
1122 stat = putStat;
1123 }
1124 if (timeout < 0L)
1125 Thread.currentThread().interrupt();
1126 }
1127 return stat;
1128 }
1129
1130 /**
1131 * Timeout version; similar to submit.
1132 */
1133 final int timedOffer(T item, long nanos) {
1134 int stat;
1135 if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1136 putItem = item;
1137 putStat = 0;
1138 ForkJoinPool.helpAsyncBlocker(executor, this);
1139 if ((stat = putStat) == 0) {
1140 try {
1141 ForkJoinPool.managedBlock(this);
1142 } catch (InterruptedException ie) {
1143 timeout = INTERRUPTED;
1144 }
1145 stat = putStat;
1146 }
1147 if (timeout < 0L)
1148 Thread.currentThread().interrupt();
1149 }
1150 return stat;
1151 }
1152
1153 /**
1154 * Tries to start consumer task after offer.
1155 * @return -1 if now disabled, else argument
1156 */
1157 private int startOnOffer(int stat) {
1158 for (;;) {
1159 Executor e; int c;
1160 if ((c = ctl) == DISABLED || (e = executor) == null) {
1161 stat = -1;
1162 break;
1163 }
1164 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1165 if ((c & CONSUME) != 0 ||
1166 CTL.compareAndSet(this, c, c | CONSUME))
1167 break;
1168 }
1169 else if (demand == 0L || tail == head)
1170 break;
1171 else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
1172 try {
1173 e.execute(new ConsumerTask<T>(this));
1174 break;
1175 } catch (RuntimeException | Error ex) { // back out
1176 do {} while (((c = ctl) & DISABLED) == 0 &&
1177 (c & ACTIVE) != 0 &&
1178 !CTL.weakCompareAndSet
1179 (this, c, c & ~ACTIVE));
1180 throw ex;
1181 }
1182 }
1183 }
1184 return stat;
1185 }
1186
1187 private void signalWaiter(Thread w) {
1188 waiter = null;
1189 LockSupport.unpark(w); // release producer
1190 }
1191
1192 /**
1193 * Nulls out most fields, mainly to avoid garbage retention
1194 * until publisher unsubscribes, but also to help cleanly stop
1195 * upon error by nulling required components.
1196 */
1197 private void detach() {
1198 Thread w = waiter;
1199 executor = null;
1200 subscriber = null;
1201 pendingError = null;
1202 signalWaiter(w);
1203 }
1204
1205 /**
1206 * Issues error signal, asynchronously if a task is running,
1207 * else synchronously.
1208 */
1209 final void onError(Throwable ex) {
1210 for (int c;;) {
1211 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1212 break;
1213 else if ((c & ACTIVE) != 0) {
1214 pendingError = ex;
1215 if (CTL.compareAndSet(this, c, c | ERROR))
1216 break; // cause consumer task to exit
1217 }
1218 else if (CTL.compareAndSet(this, c, DISABLED)) {
1219 Flow.Subscriber<? super T> s = subscriber;
1220 if (s != null && ex != null) {
1221 try {
1222 s.onError(ex);
1223 } catch (Throwable ignore) {
1224 }
1225 }
1226 detach();
1227 break;
1228 }
1229 }
1230 }
1231
1232 /**
1233 * Tries to start consumer task upon a signal or request;
1234 * disables on failure.
1235 */
1236 private void startOrDisable() {
1237 Executor e;
1238 if ((e = executor) != null) { // skip if already disabled
1239 try {
1240 e.execute(new ConsumerTask<T>(this));
1241 } catch (Throwable ex) { // back out and force signal
1242 for (int c;;) {
1243 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1244 break;
1245 if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
1246 onError(ex);
1247 break;
1248 }
1249 }
1250 }
1251 }
1252 }
1253
1254 final void onComplete() {
1255 for (int c;;) {
1256 if ((c = ctl) == DISABLED)
1257 break;
1258 if (CTL.compareAndSet(this, c,
1259 c | (ACTIVE | CONSUME | COMPLETE))) {
1260 if ((c & ACTIVE) == 0)
1261 startOrDisable();
1262 break;
1263 }
1264 }
1265 }
1266
1267 final void onSubscribe() {
1268 for (int c;;) {
1269 if ((c = ctl) == DISABLED)
1270 break;
1271 if (CTL.compareAndSet(this, c,
1272 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1273 if ((c & ACTIVE) == 0)
1274 startOrDisable();
1275 break;
1276 }
1277 }
1278 }
1279
1280 /**
1281 * Causes consumer task to exit if active (without reporting
1282 * onError unless there is already a pending error), and
1283 * disables.
1284 */
1285 public void cancel() {
1286 for (int c;;) {
1287 if ((c = ctl) == DISABLED)
1288 break;
1289 else if ((c & ACTIVE) != 0) {
1290 if (CTL.compareAndSet(this, c,
1291 c | (CONSUME | ERROR)))
1292 break;
1293 }
1294 else if (CTL.compareAndSet(this, c, DISABLED)) {
1295 detach();
1296 break;
1297 }
1298 }
1299 }
1300
1301 /**
1302 * Adds to demand and possibly starts task.
1303 */
1304 public void request(long n) {
1305 if (n > 0L) {
1306 for (;;) {
1307 long prev = demand, d;
1308 if ((d = prev + n) < prev) // saturate
1309 d = Long.MAX_VALUE;
1310 if (DEMAND.compareAndSet(this, prev, d)) {
1311 for (int c, h;;) {
1312 if ((c = ctl) == DISABLED)
1313 break;
1314 else if ((c & ACTIVE) != 0) {
1315 if ((c & CONSUME) != 0 ||
1316 CTL.compareAndSet(this, c, c | CONSUME))
1317 break;
1318 }
1319 else if ((h = head) != tail) {
1320 if (CTL.compareAndSet(this, c,
1321 c | (ACTIVE|CONSUME))) {
1322 startOrDisable();
1323 break;
1324 }
1325 }
1326 else if (head == h && tail == h)
1327 break; // else stale
1328 if (demand == 0L)
1329 break;
1330 }
1331 break;
1332 }
1333 }
1334 }
1335 else if (n < 0L)
1336 onError(new IllegalArgumentException(
1337 "negative subscription request"));
1338 }
1339
1340 public final boolean isReleasable() { // for ManagedBlocker
1341 T item = putItem;
1342 if (item != null) {
1343 if ((putStat = offer(item)) == 0)
1344 return false;
1345 putItem = null;
1346 }
1347 return true;
1348 }
1349
1350 public final boolean block() { // for ManagedBlocker
1351 T item = putItem;
1352 if (item != null) {
1353 putItem = null;
1354 long nanos = timeout;
1355 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1356 while ((putStat = offer(item)) == 0) {
1357 if (Thread.interrupted()) {
1358 timeout = INTERRUPTED;
1359 if (nanos > 0L)
1360 break;
1361 }
1362 else if (nanos > 0L &&
1363 (nanos = deadline - System.nanoTime()) <= 0L)
1364 break;
1365 else if (waiter == null)
1366 waiter = Thread.currentThread();
1367 else {
1368 if (nanos > 0L)
1369 LockSupport.parkNanos(this, nanos);
1370 else
1371 LockSupport.park(this);
1372 waiter = null;
1373 }
1374 }
1375 }
1376 waiter = null;
1377 return true;
1378 }
1379
1380 /**
1381 * Consumer loop, called from ConsumerTask, or indirectly
1382 * when helping during submit.
1383 */
1384 final void consume() {
1385 Flow.Subscriber<? super T> s;
1386 int h = head;
1387 if ((s = subscriber) != null) { // else disabled
1388 for (;;) {
1389 long d = demand;
1390 int c; Object[] a; int n, i; Object x; Thread w;
1391 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1392 if (!checkControl(s, c))
1393 break;
1394 }
1395 else if ((a = array) == null || h == tail ||
1396 (n = a.length) == 0 ||
1397 (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
1398 if (!checkEmpty(s, c))
1399 break;
1400 }
1401 else if (d == 0L) {
1402 if (!checkDemand(c))
1403 break;
1404 }
1405 else if (((c & CONSUME) != 0 ||
1406 CTL.compareAndSet(this, c, c | CONSUME)) &&
1407 QA.compareAndSet(a, i, x, null)) {
1408 HEAD.setRelease(this, ++h);
1409 DEMAND.getAndAdd(this, -1L);
1410 if ((w = waiter) != null)
1411 signalWaiter(w);
1412 try {
1413 @SuppressWarnings("unchecked") T y = (T) x;
1414 s.onNext(y);
1415 } catch (Throwable ex) {
1416 handleOnNext(s, ex);
1417 }
1418 }
1419 }
1420 }
1421 }
1422
1423 /**
1424 * Responds to control events in consume().
1425 */
1426 private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1427 boolean stat = true;
1428 if ((c & SUBSCRIBE) != 0) {
1429 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
1430 try {
1431 if (s != null)
1432 s.onSubscribe(this);
1433 } catch (Throwable ex) {
1434 onError(ex);
1435 }
1436 }
1437 }
1438 else if ((c & ERROR) != 0) {
1439 Throwable ex = pendingError;
1440 ctl = DISABLED; // no need for CAS
1441 if (ex != null) { // null if errorless cancel
1442 try {
1443 if (s != null)
1444 s.onError(ex);
1445 } catch (Throwable ignore) {
1446 }
1447 }
1448 }
1449 else {
1450 detach();
1451 stat = false;
1452 }
1453 return stat;
1454 }
1455
1456 /**
1457 * Responds to apparent emptiness in consume().
1458 */
1459 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1460 boolean stat = true;
1461 if (head == tail) {
1462 if ((c & CONSUME) != 0)
1463 CTL.compareAndSet(this, c, c & ~CONSUME);
1464 else if ((c & COMPLETE) != 0) {
1465 if (CTL.compareAndSet(this, c, DISABLED)) {
1466 try {
1467 if (s != null)
1468 s.onComplete();
1469 } catch (Throwable ignore) {
1470 }
1471 }
1472 }
1473 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1474 stat = false;
1475 }
1476 return stat;
1477 }
1478
1479 /**
1480 * Responds to apparent zero demand in consume().
1481 */
1482 private boolean checkDemand(int c) {
1483 boolean stat = true;
1484 if (demand == 0L) {
1485 if ((c & CONSUME) != 0)
1486 CTL.compareAndSet(this, c, c & ~CONSUME);
1487 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1488 stat = false;
1489 }
1490 return stat;
1491 }
1492
1493 /**
1494 * Processes exception in Subscriber.onNext.
1495 */
1496 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1497 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1498 if ((h = onNextHandler) != null) {
1499 try {
1500 h.accept(s, ex);
1501 } catch (Throwable ignore) {
1502 }
1503 }
1504 onError(ex);
1505 }
1506
1507 // VarHandle mechanics
1508 private static final VarHandle CTL;
1509 private static final VarHandle TAIL;
1510 private static final VarHandle HEAD;
1511 private static final VarHandle DEMAND;
1512 private static final VarHandle QA;
1513
1514 static {
1515 try {
1516 MethodHandles.Lookup l = MethodHandles.lookup();
1517 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1518 int.class);
1519 TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
1520 int.class);
1521 HEAD = l.findVarHandle(BufferedSubscription.class, "head",
1522 int.class);
1523 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1524 long.class);
1525 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1526 } catch (ReflectiveOperationException e) {
1527 throw new Error(e);
1528 }
1529
1530 // Reduce the risk of rare disastrous classloading in first call to
1531 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1532 Class<?> ensureLoaded = LockSupport.class;
1533 }
1534 }
1535 }