ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.57
Committed: Wed Dec 16 02:29:06 2015 UTC (8 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.56: +1 -1 lines
Log Message:
handle jdk9 move: sun.misc.Contended -> jdk.internal.vm.annotation.Contended

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiConsumer;
13 import java.util.function.BiPredicate;
14 import java.util.function.Consumer;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted
18 * (non-null) items to current subscribers until it is closed. Each
19 * current subscriber receives newly submitted items in the same order
20 * unless drops or exceptions are encountered. Using a
21 * SubmissionPublisher allows item generators to act as compliant <a
22 * href="http://www.reactive-streams.org/"> reactive-streams</a>
23 * Publishers relying on drop handling and/or blocking for flow
24 * control.
25 *
26 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
27 * constructor for delivery to subscribers. The best choice of
28 * Executor depends on expected usage. If the generator(s) of
29 * submitted items run in separate threads, and the number of
30 * subscribers can be estimated, consider using a {@link
31 * Executors#newFixedThreadPool}. Otherwise consider using the
32 * default, normally the {@link ForkJoinPool#commonPool}.
33 *
34 * <p>Buffering allows producers and consumers to transiently operate
35 * at different rates. Each subscriber uses an independent buffer.
36 * Buffers are created upon first use and expanded as needed up to the
37 * given maximum. (The enforced capacity may be rounded up to the
38 * nearest power of two and/or bounded by the largest value supported
39 * by this implementation.) Invocations of {@link
40 * Flow.Subscription#request(long) request} do not directly result in
41 * buffer expansion, but risk saturation if unfilled requests exceed
42 * the maximum capacity. The default value of {@link
43 * Flow#defaultBufferSize()} may provide a useful starting point for
44 * choosing a capacity based on expected rates, resources, and usages.
45 *
46 * <p>Publication methods support different policies about what to do
47 * when buffers are saturated. Method {@link #submit(Object) submit}
48 * blocks until resources are available. This is simplest, but least
49 * responsive. The {@code offer} methods may drop items (either
50 * immediately or with bounded timeout), but provide an opportunity to
51 * interpose a handler and then retry.
52 *
53 * <p>If any Subscriber method throws an exception, its subscription
54 * is cancelled. If a handler is supplied as a constructor argument,
55 * it is invoked before cancellation upon an exception in method
56 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
57 * {@link Flow.Subscriber#onSubscribe onSubscribe},
58 * {@link Flow.Subscriber#onError(Throwable) onError} and
59 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
60 * handled before cancellation. If the supplied Executor throws
61 * {@link RejectedExecutionException} (or any other RuntimeException
62 * or Error) when attempting to execute a task, or a drop handler
63 * throws an exception when processing a dropped item, then the
64 * exception is rethrown. In these cases, not all subscribers will
65 * have been issued the published item. It is usually good practice to
66 * {@link #closeExceptionally closeExceptionally} in these cases.
67 *
68 * <p>Method {@link #consume(Consumer)} simplifies support for a
69 * common case in which the only action of a subscriber is to request
70 * and process all items using a supplied function.
71 *
72 * <p>This class may also serve as a convenient base for subclasses
73 * that generate items, and use the methods in this class to publish
74 * them. For example here is a class that periodically publishes the
75 * items generated from a supplier. (In practice you might add methods
76 * to independently start and stop generation, to share Executors
77 * among publishers, and so on, or use a SubmissionPublisher as a
78 * component rather than a superclass.)
79 *
80 * <pre> {@code
81 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
82 * final ScheduledFuture<?> periodicTask;
83 * final ScheduledExecutorService scheduler;
84 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
85 * Supplier<? extends T> supplier,
86 * long period, TimeUnit unit) {
87 * super(executor, maxBufferCapacity);
88 * scheduler = new ScheduledThreadPoolExecutor(1);
89 * periodicTask = scheduler.scheduleAtFixedRate(
90 * () -> submit(supplier.get()), 0, period, unit);
91 * }
92 * public void close() {
93 * periodicTask.cancel(false);
94 * scheduler.shutdown();
95 * super.close();
96 * }
97 * }}</pre>
98 *
99 * <p>Here is an example of a {@link Flow.Processor} implementation.
100 * It uses single-step requests to its publisher for simplicity of
101 * illustration. A more adaptive version could monitor flow using the
102 * lag estimate returned from {@code submit}, along with other utility
103 * methods.
104 *
105 * <pre> {@code
106 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
107 * implements Flow.Processor<S,T> {
108 * final Function<? super S, ? extends T> function;
109 * Flow.Subscription subscription;
110 * TransformProcessor(Executor executor, int maxBufferCapacity,
111 * Function<? super S, ? extends T> function) {
112 * super(executor, maxBufferCapacity);
113 * this.function = function;
114 * }
115 * public void onSubscribe(Flow.Subscription subscription) {
116 * (this.subscription = subscription).request(1);
117 * }
118 * public void onNext(S item) {
119 * subscription.request(1);
120 * submit(function.apply(item));
121 * }
122 * public void onError(Throwable ex) { closeExceptionally(ex); }
123 * public void onComplete() { close(); }
124 * }}</pre>
125 *
126 * @param <T> the published item type
127 * @author Doug Lea
128 * @since 1.9
129 */
130 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
131 AutoCloseable {
132 /*
133 * Most mechanics are handled by BufferedSubscription. This class
134 * mainly tracks subscribers and ensures sequentiality, by using
135 * built-in synchronization locks across public methods. (Using
136 * built-in locks works well in the most typical case in which
137 * only one thread submits items).
138 */
139
140 /** The largest possible power of two array size. */
141 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
142
143 /** Round capacity to power of 2, at most limit. */
144 static final int roundCapacity(int cap) {
145 int n = cap - 1;
146 n |= n >>> 1;
147 n |= n >>> 2;
148 n |= n >>> 4;
149 n |= n >>> 8;
150 n |= n >>> 16;
151 return (n <= 0) ? 1 : // at least 1
152 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
153 }
154
155 // default Executor setup; nearly the same as CompletableFuture
156
157 /**
158 * Default executor -- ForkJoinPool.commonPool() unless it cannot
159 * support parallelism.
160 */
161 private static final Executor ASYNC_POOL =
162 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
163 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
164
165 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
166 private static final class ThreadPerTaskExecutor implements Executor {
167 public void execute(Runnable r) { new Thread(r).start(); }
168 }
169
170 /**
171 * Clients (BufferedSubscriptions) are maintained in a linked list
172 * (via their "next" fields). This works well for publish loops.
173 * It requires O(n) traversal to check for duplicate subscribers,
174 * but we expect that subscribing is much less common than
175 * publishing. Unsubscribing occurs only during traversal loops,
176 * when BufferedSubscription methods return negative values
177 * signifying that they have been disabled. To reduce
178 * head-of-line blocking, submit and offer methods first call
179 * BufferedSubscription.offer on each subscriber, and place
180 * saturated ones in retries list (using nextRetry field), and
181 * retry, possibly blocking or dropping.
182 */
183 BufferedSubscription<T> clients;
184
185 /** Run status, updated only within locks */
186 volatile boolean closed;
187 /** If non-null, the exception in closeExceptionally */
188 volatile Throwable closedException;
189
190 // Parameters for constructing BufferedSubscriptions
191 final Executor executor;
192 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
193 final int maxBufferCapacity;
194
195 /**
196 * Creates a new SubmissionPublisher using the given Executor for
197 * async delivery to subscribers, with the given maximum buffer size
198 * for each subscriber, and, if non-null, the given handler invoked
199 * when any Subscriber throws an exception in method {@link
200 * Flow.Subscriber#onNext(Object) onNext}.
201 *
202 * @param executor the executor to use for async delivery,
203 * supporting creation of at least one independent thread
204 * @param maxBufferCapacity the maximum capacity for each
205 * subscriber's buffer (the enforced capacity may be rounded up to
206 * the nearest power of two and/or bounded by the largest value
207 * supported by this implementation; method {@link #getMaxBufferCapacity}
208 * returns the actual value)
209 * @param handler if non-null, procedure to invoke upon exception
210 * thrown in method {@code onNext}
211 * @throws NullPointerException if executor is null
212 * @throws IllegalArgumentException if maxBufferCapacity not
213 * positive
214 */
215 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
216 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
217 if (executor == null)
218 throw new NullPointerException();
219 if (maxBufferCapacity <= 0)
220 throw new IllegalArgumentException("capacity must be positive");
221 this.executor = executor;
222 this.onNextHandler = handler;
223 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
224 }
225
226 /**
227 * Creates a new SubmissionPublisher using the given Executor for
228 * async delivery to subscribers, with the given maximum buffer size
229 * for each subscriber, and no handler for Subscriber exceptions in
230 * method {@link Flow.Subscriber#onNext(Object) onNext}.
231 *
232 * @param executor the executor to use for async delivery,
233 * supporting creation of at least one independent thread
234 * @param maxBufferCapacity the maximum capacity for each
235 * subscriber's buffer (the enforced capacity may be rounded up to
236 * the nearest power of two and/or bounded by the largest value
237 * supported by this implementation; method {@link #getMaxBufferCapacity}
238 * returns the actual value)
239 * @throws NullPointerException if executor is null
240 * @throws IllegalArgumentException if maxBufferCapacity not
241 * positive
242 */
243 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
244 this(executor, maxBufferCapacity, null);
245 }
246
247 /**
248 * Creates a new SubmissionPublisher using the {@link
249 * ForkJoinPool#commonPool()} for async delivery to subscribers
250 * (unless it does not support a parallelism level of at least two,
251 * in which case, a new Thread is created to run each task), with
252 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
253 * handler for Subscriber exceptions in method {@link
254 * Flow.Subscriber#onNext(Object) onNext}.
255 */
256 public SubmissionPublisher() {
257 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
258 }
259
260 /**
261 * Adds the given Subscriber unless already subscribed. If already
262 * subscribed, the Subscriber's {@link
263 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
264 * the existing subscription with an {@link IllegalStateException}.
265 * Otherwise, upon success, the Subscriber's {@link
266 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
267 * asynchronously with a new {@link Flow.Subscription}. If {@link
268 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
269 * subscription is cancelled. Otherwise, if this SubmissionPublisher
270 * was closed exceptionally, then the subscriber's {@link
271 * Flow.Subscriber#onError onError} method is invoked with the
272 * corresponding exception, or if closed without exception, the
273 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
274 * method is invoked. Subscribers may enable receiving items by
275 * invoking the {@link Flow.Subscription#request(long) request}
276 * method of the new Subscription, and may unsubscribe by invoking
277 * its {@link Flow.Subscription#cancel() cancel} method.
278 *
279 * @param subscriber the subscriber
280 * @throws NullPointerException if subscriber is null
281 */
282 public void subscribe(Flow.Subscriber<? super T> subscriber) {
283 if (subscriber == null) throw new NullPointerException();
284 BufferedSubscription<T> subscription =
285 new BufferedSubscription<T>(subscriber, executor,
286 onNextHandler, maxBufferCapacity);
287 synchronized (this) {
288 for (BufferedSubscription<T> b = clients, pred = null;;) {
289 if (b == null) {
290 Throwable ex;
291 subscription.onSubscribe();
292 if ((ex = closedException) != null)
293 subscription.onError(ex);
294 else if (closed)
295 subscription.onComplete();
296 else if (pred == null)
297 clients = subscription;
298 else
299 pred.next = subscription;
300 break;
301 }
302 BufferedSubscription<T> next = b.next;
303 if (b.isDisabled()) { // remove
304 b.next = null; // detach
305 if (pred == null)
306 clients = next;
307 else
308 pred.next = next;
309 }
310 else if (subscriber.equals(b.subscriber)) {
311 b.onError(new IllegalStateException("Duplicate subscribe"));
312 break;
313 }
314 else
315 pred = b;
316 b = next;
317 }
318 }
319 }
320
321 /**
322 * Publishes the given item to each current subscriber by
323 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
324 * onNext} method, blocking uninterruptibly while resources for any
325 * subscriber are unavailable. This method returns an estimate of
326 * the maximum lag (number of items submitted but not yet consumed)
327 * among all current subscribers. This value is at least one
328 * (accounting for this submitted item) if there are any
329 * subscribers, else zero.
330 *
331 * <p>If the Executor for this publisher throws a
332 * RejectedExecutionException (or any other RuntimeException or
333 * Error) when attempting to asynchronously notify subscribers,
334 * then this exception is rethrown, in which case not all
335 * subscribers will have been issued this item.
336 *
337 * @param item the (non-null) item to publish
338 * @return the estimated maximum lag among subscribers
339 * @throws IllegalStateException if closed
340 * @throws NullPointerException if item is null
341 * @throws RejectedExecutionException if thrown by Executor
342 */
343 public int submit(T item) {
344 if (item == null) throw new NullPointerException();
345 int lag = 0;
346 boolean complete;
347 synchronized (this) {
348 complete = closed;
349 BufferedSubscription<T> b = clients;
350 if (!complete) {
351 BufferedSubscription<T> pred = null, r = null, rtail = null;
352 while (b != null) {
353 BufferedSubscription<T> next = b.next;
354 int stat = b.offer(item);
355 if (stat < 0) { // disabled
356 b.next = null;
357 if (pred == null)
358 clients = next;
359 else
360 pred.next = next;
361 }
362 else {
363 if (stat > lag)
364 lag = stat;
365 else if (stat == 0) { // place on retry list
366 b.nextRetry = null;
367 if (rtail == null)
368 r = b;
369 else
370 rtail.nextRetry = b;
371 rtail = b;
372 }
373 pred = b;
374 }
375 b = next;
376 }
377 while (r != null) {
378 BufferedSubscription<T> nextRetry = r.nextRetry;
379 r.nextRetry = null;
380 int stat = r.submit(item);
381 if (stat > lag)
382 lag = stat;
383 else if (stat < 0 && clients == r)
384 clients = r.next; // postpone internal unsubscribes
385 r = nextRetry;
386 }
387 }
388 }
389 if (complete)
390 throw new IllegalStateException("Closed");
391 else
392 return lag;
393 }
394
395 /**
396 * Publishes the given item, if possible, to each current subscriber
397 * by asynchronously invoking its {@link
398 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
399 * dropped by one or more subscribers if resource limits are
400 * exceeded, in which case the given handler (if non-null) is
401 * invoked, and if it returns true, retried once. Other calls to
402 * methods in this class by other threads are blocked while the
403 * handler is invoked. Unless recovery is assured, options are
404 * usually limited to logging the error and/or issuing an {@link
405 * Flow.Subscriber#onError(Throwable) onError} signal to the
406 * subscriber.
407 *
408 * <p>This method returns a status indicator: If negative, it
409 * represents the (negative) number of drops (failed attempts to
410 * issue the item to a subscriber). Otherwise it is an estimate of
411 * the maximum lag (number of items submitted but not yet
412 * consumed) among all current subscribers. This value is at least
413 * one (accounting for this submitted item) if there are any
414 * subscribers, else zero.
415 *
416 * <p>If the Executor for this publisher throws a
417 * RejectedExecutionException (or any other RuntimeException or
418 * Error) when attempting to asynchronously notify subscribers, or
419 * the drop handler throws an exception when processing a dropped
420 * item, then this exception is rethrown.
421 *
422 * @param item the (non-null) item to publish
423 * @param onDrop if non-null, the handler invoked upon a drop to a
424 * subscriber, with arguments of the subscriber and item; if it
425 * returns true, an offer is re-attempted (once)
426 * @return if negative, the (negative) number of drops; otherwise
427 * an estimate of maximum lag
428 * @throws IllegalStateException if closed
429 * @throws NullPointerException if item is null
430 * @throws RejectedExecutionException if thrown by Executor
431 */
432 public int offer(T item,
433 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
434 return doOffer(0L, item, onDrop);
435 }
436
437 /**
438 * Publishes the given item, if possible, to each current subscriber
439 * by asynchronously invoking its {@link
440 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
441 * resources for any subscription are unavailable, up to the
442 * specified timeout or until the caller thread is interrupted, at
443 * which point the given handler (if non-null) is invoked, and if it
444 * returns true, retried once. (The drop handler may distinguish
445 * timeouts from interrupts by checking whether the current thread
446 * is interrupted.) Other calls to methods in this class by other
447 * threads are blocked while the handler is invoked. Unless
448 * recovery is assured, options are usually limited to logging the
449 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
450 * onError} signal to the subscriber.
451 *
452 * <p>This method returns a status indicator: If negative, it
453 * represents the (negative) number of drops (failed attempts to
454 * issue the item to a subscriber). Otherwise it is an estimate of
455 * the maximum lag (number of items submitted but not yet
456 * consumed) among all current subscribers. This value is at least
457 * one (accounting for this submitted item) if there are any
458 * subscribers, else zero.
459 *
460 * <p>If the Executor for this publisher throws a
461 * RejectedExecutionException (or any other RuntimeException or
462 * Error) when attempting to asynchronously notify subscribers, or
463 * the drop handler throws an exception when processing a dropped
464 * item, then this exception is rethrown.
465 *
466 * @param item the (non-null) item to publish
467 * @param timeout how long to wait for resources for any subscriber
468 * before giving up, in units of {@code unit}
469 * @param unit a {@code TimeUnit} determining how to interpret the
470 * {@code timeout} parameter
471 * @param onDrop if non-null, the handler invoked upon a drop to a
472 * subscriber, with arguments of the subscriber and item; if it
473 * returns true, an offer is re-attempted (once)
474 * @return if negative, the (negative) number of drops; otherwise
475 * an estimate of maximum lag
476 * @throws IllegalStateException if closed
477 * @throws NullPointerException if item is null
478 * @throws RejectedExecutionException if thrown by Executor
479 */
480 public int offer(T item, long timeout, TimeUnit unit,
481 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
482 return doOffer(unit.toNanos(timeout), item, onDrop);
483 }
484
485 /** Common implementation for both forms of offer */
486 final int doOffer(long nanos, T item,
487 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
488 if (item == null) throw new NullPointerException();
489 int lag = 0, drops = 0;
490 boolean complete;
491 synchronized (this) {
492 complete = closed;
493 BufferedSubscription<T> b = clients;
494 if (!complete) {
495 BufferedSubscription<T> pred = null, r = null, rtail = null;
496 while (b != null) {
497 BufferedSubscription<T> next = b.next;
498 int stat = b.offer(item);
499 if (stat < 0) {
500 b.next = null;
501 if (pred == null)
502 clients = next;
503 else
504 pred.next = next;
505 }
506 else {
507 if (stat > lag)
508 lag = stat;
509 else if (stat == 0) {
510 b.nextRetry = null;
511 if (rtail == null)
512 r = b;
513 else
514 rtail.nextRetry = b;
515 rtail = b;
516 }
517 else if (stat > lag)
518 lag = stat;
519 pred = b;
520 }
521 b = next;
522 }
523 while (r != null) {
524 BufferedSubscription<T> nextRetry = r.nextRetry;
525 r.nextRetry = null;
526 int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
527 r.offer(item);
528 if (stat == 0 && onDrop != null &&
529 onDrop.test(r.subscriber, item))
530 stat = r.offer(item);
531 if (stat == 0)
532 ++drops;
533 else if (stat > lag)
534 lag = stat;
535 else if (stat < 0 && clients == r)
536 clients = r.next;
537 r = nextRetry;
538 }
539 }
540 }
541 if (complete)
542 throw new IllegalStateException("Closed");
543 else
544 return (drops > 0) ? -drops : lag;
545 }
546
547 /**
548 * Unless already closed, issues {@link
549 * Flow.Subscriber#onComplete() onComplete} signals to current
550 * subscribers, and disallows subsequent attempts to publish.
551 * Upon return, this method does <em>NOT</em> guarantee that all
552 * subscribers have yet completed.
553 */
554 public void close() {
555 if (!closed) {
556 BufferedSubscription<T> b;
557 synchronized (this) {
558 b = clients;
559 clients = null;
560 closed = true;
561 }
562 while (b != null) {
563 BufferedSubscription<T> next = b.next;
564 b.next = null;
565 b.onComplete();
566 b = next;
567 }
568 }
569 }
570
571 /**
572 * Unless already closed, issues {@link
573 * Flow.Subscriber#onError(Throwable) onError} signals to current
574 * subscribers with the given error, and disallows subsequent
575 * attempts to publish. Future subscribers also receive the given
576 * error. Upon return, this method does <em>NOT</em> guarantee
577 * that all subscribers have yet completed.
578 *
579 * @param error the {@code onError} argument sent to subscribers
580 * @throws NullPointerException if error is null
581 */
582 public void closeExceptionally(Throwable error) {
583 if (error == null)
584 throw new NullPointerException();
585 if (!closed) {
586 BufferedSubscription<T> b;
587 synchronized (this) {
588 b = clients;
589 clients = null;
590 closed = true;
591 closedException = error;
592 }
593 while (b != null) {
594 BufferedSubscription<T> next = b.next;
595 b.next = null;
596 b.onError(error);
597 b = next;
598 }
599 }
600 }
601
602 /**
603 * Returns true if this publisher is not accepting submissions.
604 *
605 * @return true if closed
606 */
607 public boolean isClosed() {
608 return closed;
609 }
610
611 /**
612 * Returns the exception associated with {@link
613 * #closeExceptionally(Throwable) closeExceptionally}, or null if
614 * not closed or if closed normally.
615 *
616 * @return the exception, or null if none
617 */
618 public Throwable getClosedException() {
619 return closedException;
620 }
621
622 /**
623 * Returns true if this publisher has any subscribers.
624 *
625 * @return true if this publisher has any subscribers
626 */
627 public boolean hasSubscribers() {
628 boolean nonEmpty = false;
629 if (!closed) {
630 synchronized (this) {
631 for (BufferedSubscription<T> b = clients; b != null;) {
632 BufferedSubscription<T> next = b.next;
633 if (b.isDisabled()) {
634 b.next = null;
635 b = clients = next;
636 }
637 else {
638 nonEmpty = true;
639 break;
640 }
641 }
642 }
643 }
644 return nonEmpty;
645 }
646
647 /**
648 * Returns the number of current subscribers.
649 *
650 * @return the number of current subscribers
651 */
652 public int getNumberOfSubscribers() {
653 int count = 0;
654 if (!closed) {
655 synchronized (this) {
656 BufferedSubscription<T> pred = null, next;
657 for (BufferedSubscription<T> b = clients; b != null; b = next) {
658 next = b.next;
659 if (b.isDisabled()) {
660 b.next = null;
661 if (pred == null)
662 clients = next;
663 else
664 pred.next = next;
665 }
666 else {
667 pred = b;
668 ++count;
669 }
670 }
671 }
672 }
673 return count;
674 }
675
676 /**
677 * Returns the Executor used for asynchronous delivery.
678 *
679 * @return the Executor used for asynchronous delivery
680 */
681 public Executor getExecutor() {
682 return executor;
683 }
684
685 /**
686 * Returns the maximum per-subscriber buffer capacity.
687 *
688 * @return the maximum per-subscriber buffer capacity
689 */
690 public int getMaxBufferCapacity() {
691 return maxBufferCapacity;
692 }
693
694 /**
695 * Returns a list of current subscribers for monitoring and
696 * tracking purposes, not for invoking {@link Flow.Subscriber}
697 * methods on the subscribers.
698 *
699 * @return list of current subscribers
700 */
701 public List<Flow.Subscriber<? super T>> getSubscribers() {
702 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
703 synchronized (this) {
704 BufferedSubscription<T> pred = null, next;
705 for (BufferedSubscription<T> b = clients; b != null; b = next) {
706 next = b.next;
707 if (b.isDisabled()) {
708 b.next = null;
709 if (pred == null)
710 clients = next;
711 else
712 pred.next = next;
713 }
714 else
715 subs.add(b.subscriber);
716 }
717 }
718 return subs;
719 }
720
721 /**
722 * Returns true if the given Subscriber is currently subscribed.
723 *
724 * @param subscriber the subscriber
725 * @return true if currently subscribed
726 * @throws NullPointerException if subscriber is null
727 */
728 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
729 if (subscriber == null) throw new NullPointerException();
730 if (!closed) {
731 synchronized (this) {
732 BufferedSubscription<T> pred = null, next;
733 for (BufferedSubscription<T> b = clients; b != null; b = next) {
734 next = b.next;
735 if (b.isDisabled()) {
736 b.next = null;
737 if (pred == null)
738 clients = next;
739 else
740 pred.next = next;
741 }
742 else if (subscriber.equals(b.subscriber))
743 return true;
744 else
745 pred = b;
746 }
747 }
748 }
749 return false;
750 }
751
752 /**
753 * Returns an estimate of the minimum number of items requested
754 * (via {@link Flow.Subscription#request(long) request}) but not
755 * yet produced, among all current subscribers.
756 *
757 * @return the estimate, or zero if no subscribers
758 */
759 public long estimateMinimumDemand() {
760 long min = Long.MAX_VALUE;
761 boolean nonEmpty = false;
762 synchronized (this) {
763 BufferedSubscription<T> pred = null, next;
764 for (BufferedSubscription<T> b = clients; b != null; b = next) {
765 int n; long d;
766 next = b.next;
767 if ((n = b.estimateLag()) < 0) {
768 b.next = null;
769 if (pred == null)
770 clients = next;
771 else
772 pred.next = next;
773 }
774 else {
775 if ((d = b.demand - n) < min)
776 min = d;
777 nonEmpty = true;
778 pred = b;
779 }
780 }
781 }
782 return nonEmpty ? min : 0;
783 }
784
785 /**
786 * Returns an estimate of the maximum number of items produced but
787 * not yet consumed among all current subscribers.
788 *
789 * @return the estimate
790 */
791 public int estimateMaximumLag() {
792 int max = 0;
793 synchronized (this) {
794 BufferedSubscription<T> pred = null, next;
795 for (BufferedSubscription<T> b = clients; b != null; b = next) {
796 int n;
797 next = b.next;
798 if ((n = b.estimateLag()) < 0) {
799 b.next = null;
800 if (pred == null)
801 clients = next;
802 else
803 pred.next = next;
804 }
805 else {
806 if (n > max)
807 max = n;
808 pred = b;
809 }
810 }
811 }
812 return max;
813 }
814
815 /**
816 * Processes all published items using the given Consumer function.
817 * Returns a CompletableFuture that is completed normally when this
818 * publisher signals {@link Flow.Subscriber#onComplete()
819 * onComplete}, or completed exceptionally upon any error, or an
820 * exception is thrown by the Consumer, or the returned
821 * CompletableFuture is cancelled, in which case no further items
822 * are processed.
823 *
824 * @param consumer the function applied to each onNext item
825 * @return a CompletableFuture that is completed normally
826 * when the publisher signals onComplete, and exceptionally
827 * upon any error or cancellation
828 * @throws NullPointerException if consumer is null
829 */
830 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
831 if (consumer == null)
832 throw new NullPointerException();
833 CompletableFuture<Void> status = new CompletableFuture<>();
834 subscribe(new ConsumerSubscriber<T>(status, consumer));
835 return status;
836 }
837
838 /** Subscriber for method consume */
839 private static final class ConsumerSubscriber<T>
840 implements Flow.Subscriber<T> {
841 final CompletableFuture<Void> status;
842 final Consumer<? super T> consumer;
843 Flow.Subscription subscription;
844 ConsumerSubscriber(CompletableFuture<Void> status,
845 Consumer<? super T> consumer) {
846 this.status = status; this.consumer = consumer;
847 }
848 public final void onSubscribe(Flow.Subscription subscription) {
849 this.subscription = subscription;
850 status.whenComplete((v, e) -> subscription.cancel());
851 if (!status.isDone())
852 subscription.request(Long.MAX_VALUE);
853 }
854 public final void onError(Throwable ex) {
855 status.completeExceptionally(ex);
856 }
857 public final void onComplete() {
858 status.complete(null);
859 }
860 public final void onNext(T item) {
861 try {
862 consumer.accept(item);
863 } catch (Throwable ex) {
864 subscription.cancel();
865 status.completeExceptionally(ex);
866 }
867 }
868 }
869
870 /**
871 * A task for consuming buffer items and signals, created and
872 * executed whenever they become available. A task consumes as
873 * many items/signals as possible before terminating, at which
874 * point another task is created when needed. The dual Runnable
875 * and ForkJoinTask declaration saves overhead when executed by
876 * ForkJoinPools, without impacting other kinds of Executors.
877 */
878 @SuppressWarnings("serial")
879 static final class ConsumerTask<T> extends ForkJoinTask<Void>
880 implements Runnable {
881 final BufferedSubscription<T> consumer;
882 ConsumerTask(BufferedSubscription<T> consumer) {
883 this.consumer = consumer;
884 }
885 public final Void getRawResult() { return null; }
886 public final void setRawResult(Void v) {}
887 public final boolean exec() { consumer.consume(); return false; }
888 public final void run() { consumer.consume(); }
889 }
890
891 /**
892 * A bounded (ring) buffer with integrated control to start a
893 * consumer task whenever items are available. The buffer
894 * algorithm is similar to one used inside ForkJoinPool (see its
895 * internal documentation for details) specialized for the case of
896 * at most one concurrent producer and consumer, and power of two
897 * buffer sizes. This allows methods to operate without locks even
898 * while supporting resizing, blocking, task-triggering, and
899 * garbage-free buffers (nulling out elements when consumed),
900 * although supporting these does impose a bit of overhead
901 * compared to plain fixed-size ring buffers.
902 *
903 * The publisher guarantees a single producer via its lock. We
904 * ensure in this class that there is at most one consumer. The
905 * request and cancel methods must be fully thread-safe but are
906 * coded to exploit the most common case in which they are only
907 * called by consumers (usually within onNext).
908 *
909 * Execution control is managed using the ACTIVE ctl bit. We
910 * ensure that a task is active when consumable items (and
911 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
912 * there is demand (unfilled requests). This is complicated on
913 * the creation side by the possibility of exceptions when trying
914 * to execute tasks. These eventually force DISABLED state, but
915 * sometimes not directly. On the task side, termination (clearing
916 * ACTIVE) that would otherwise race with producers or request()
917 * calls uses the CONSUME keep-alive bit to force a recheck.
918 *
919 * The ctl field also manages run state. When DISABLED, no further
920 * updates are possible. Disabling may be preceded by setting
921 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
922 * case the associated Subscriber methods are invoked, possibly
923 * synchronously if there is no active consumer task (including
924 * cases where execute() failed). The cancel() method is supported
925 * by treating as ERROR but suppressing onError signal.
926 *
927 * Support for blocking also exploits the fact that there is only
928 * one possible waiter. ManagedBlocker-compatible control fields
929 * are placed in this class itself rather than in wait-nodes.
930 * Blocking control relies on the "waiter" field. Producers set
931 * the field before trying to block, but must then recheck (via
932 * offer) before parking. Signalling then just unparks and clears
933 * waiter field. If the producer and consumer are both in the same
934 * ForkJoinPool, or consumers are running in commonPool, the
935 * producer attempts to help run consumer tasks that it forked
936 * before blocking. To avoid potential cycles, only one level of
937 * helping is currently supported.
938 *
939 * This class uses @Contended and heuristic field declaration
940 * ordering to reduce false-sharing-based memory contention among
941 * instances of BufferedSubscription, but it does not currently
942 * attempt to avoid memory contention among buffers. This field
943 * and element packing can hurt performance especially when each
944 * publisher has only one client operating at a high rate.
945 * Addressing this may require allocating substantially more space
946 * than users expect.
947 */
948 @SuppressWarnings("serial")
949 @jdk.internal.vm.annotation.Contended
950 private static final class BufferedSubscription<T>
951 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
952 // Order-sensitive field declarations
953 long timeout; // > 0 if timed wait
954 volatile long demand; // # unfilled requests
955 int maxCapacity; // reduced on OOME
956 int putStat; // offer result for ManagedBlocker
957 int helpDepth; // nested helping depth (at most 1)
958 volatile int ctl; // atomic run state flags
959 volatile int head; // next position to take
960 int tail; // next position to put
961 Object[] array; // buffer: null if disabled
962 Flow.Subscriber<? super T> subscriber; // null if disabled
963 Executor executor; // null if disabled
964 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
965 volatile Throwable pendingError; // holds until onError issued
966 volatile Thread waiter; // blocked producer thread
967 T putItem; // for offer within ManagedBlocker
968 BufferedSubscription<T> next; // used only by publisher
969 BufferedSubscription<T> nextRetry; // used only by publisher
970
971 // ctl values
972 static final int ACTIVE = 0x01; // consumer task active
973 static final int CONSUME = 0x02; // keep-alive for consumer task
974 static final int DISABLED = 0x04; // final state
975 static final int ERROR = 0x08; // signal onError then disable
976 static final int SUBSCRIBE = 0x10; // signal onSubscribe
977 static final int COMPLETE = 0x20; // signal onComplete when done
978
979 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
980
981 /**
982 * Initial buffer capacity used when maxBufferCapacity is
983 * greater. Must be a power of two.
984 */
985 static final int DEFAULT_INITIAL_CAP = 32;
986
987 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
988 Executor executor,
989 BiConsumer<? super Flow.Subscriber<? super T>,
990 ? super Throwable> onNextHandler,
991 int maxBufferCapacity) {
992 this.subscriber = subscriber;
993 this.executor = executor;
994 this.onNextHandler = onNextHandler;
995 this.maxCapacity = maxBufferCapacity;
996 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
997 (maxBufferCapacity < 2 ? // at least 2 slots
998 2 : maxBufferCapacity) :
999 DEFAULT_INITIAL_CAP];
1000 }
1001
1002 final boolean isDisabled() {
1003 return ctl == DISABLED;
1004 }
1005
1006 /**
1007 * Returns estimated number of buffered items, or -1 if
1008 * disabled.
1009 */
1010 final int estimateLag() {
1011 int n;
1012 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1013 }
1014
1015 /**
1016 * Tries to add item and start consumer task if necessary.
1017 * @return -1 if disabled, 0 if dropped, else estimated lag
1018 */
1019 final int offer(T item) {
1020 int h = head, t = tail, cap, size, stat;
1021 Object[] a = array;
1022 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1023 a[(cap - 1) & t] = item; // relaxed writes OK
1024 tail = t + 1;
1025 stat = size;
1026 }
1027 else
1028 stat = growAndAdd(a, item);
1029 return (stat > 0 &&
1030 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1031 startOnOffer(stat) : stat;
1032 }
1033
1034 /**
1035 * Tries to create or expand buffer, then adds item if possible.
1036 */
1037 private int growAndAdd(Object[] a, T item) {
1038 boolean alloc;
1039 int cap, stat;
1040 if ((ctl & (ERROR | DISABLED)) != 0) {
1041 cap = 0;
1042 stat = -1;
1043 alloc = false;
1044 }
1045 else if (a == null || (cap = a.length) <= 0) {
1046 cap = 0;
1047 stat = 1;
1048 alloc = true;
1049 }
1050 else {
1051 U.fullFence(); // recheck
1052 int h = head, t = tail, size = t + 1 - h;
1053 if (cap >= size) {
1054 a[(cap - 1) & t] = item;
1055 tail = t + 1;
1056 stat = size;
1057 alloc = false;
1058 }
1059 else if (cap >= maxCapacity) {
1060 stat = 0; // cannot grow
1061 alloc = false;
1062 }
1063 else {
1064 stat = cap + 1;
1065 alloc = true;
1066 }
1067 }
1068 if (alloc) {
1069 int newCap = (cap > 0) ? cap << 1 : 1;
1070 if (newCap <= cap)
1071 stat = 0;
1072 else {
1073 Object[] newArray = null;
1074 try {
1075 newArray = new Object[newCap];
1076 } catch (Throwable ex) { // try to cope with OOME
1077 }
1078 if (newArray == null) {
1079 if (cap > 0)
1080 maxCapacity = cap; // avoid continuous failure
1081 stat = 0;
1082 }
1083 else {
1084 array = newArray;
1085 int t = tail;
1086 int newMask = newCap - 1;
1087 if (a != null && cap > 0) {
1088 int mask = cap - 1;
1089 for (int j = head; j != t; ++j) {
1090 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1091 Object x = U.getObjectVolatile(a, k);
1092 if (x != null && // races with consumer
1093 U.compareAndSwapObject(a, k, x, null))
1094 newArray[j & newMask] = x;
1095 }
1096 }
1097 newArray[t & newMask] = item;
1098 tail = t + 1;
1099 }
1100 }
1101 }
1102 return stat;
1103 }
1104
1105 /**
1106 * Spins/helps/blocks while offer returns 0. Called only if
1107 * initial offer return 0.
1108 */
1109 final int submit(T item) {
1110 int stat; Executor e; ForkJoinWorkerThread w;
1111 if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1112 ((e = executor) instanceof ForkJoinPool)) {
1113 helpDepth = 1;
1114 Thread thread = Thread.currentThread();
1115 if ((thread instanceof ForkJoinWorkerThread) &&
1116 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1117 stat = internalHelpConsume(w.workQueue, item);
1118 else if (e == ForkJoinPool.commonPool())
1119 stat = externalHelpConsume
1120 (ForkJoinPool.commonSubmitterQueue(), item);
1121 helpDepth = 0;
1122 }
1123 if (stat == 0 && (stat = offer(item)) == 0) {
1124 putItem = item;
1125 timeout = 0L;
1126 try {
1127 ForkJoinPool.managedBlock(this);
1128 } catch (InterruptedException ie) {
1129 timeout = INTERRUPTED;
1130 }
1131 stat = putStat;
1132 if (timeout < 0L)
1133 Thread.currentThread().interrupt();
1134 }
1135 return stat;
1136 }
1137
1138 /**
1139 * Tries helping for FJ submitter.
1140 */
1141 private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1142 int stat = 0;
1143 if (w != null) {
1144 ForkJoinTask<?> t;
1145 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1146 if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1147 break;
1148 ((ConsumerTask<?>)t).consumer.consume();
1149 }
1150 }
1151 return stat;
1152 }
1153
1154 /**
1155 * Tries helping for non-FJ submitter.
1156 */
1157 private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1158 int stat = 0;
1159 if (w != null) {
1160 ForkJoinTask<?> t;
1161 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1162 if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1163 break;
1164 ((ConsumerTask<?>)t).consumer.consume();
1165 }
1166 }
1167 return stat;
1168 }
1169
1170 /**
1171 * Timeout version; similar to submit.
1172 */
1173 final int timedOffer(T item, long nanos) {
1174 int stat; Executor e;
1175 if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1176 ((e = executor) instanceof ForkJoinPool)) {
1177 Thread thread = Thread.currentThread();
1178 if (((thread instanceof ForkJoinWorkerThread) &&
1179 ((ForkJoinWorkerThread)thread).getPool() == e) ||
1180 e == ForkJoinPool.commonPool()) {
1181 helpDepth = 1;
1182 ForkJoinTask<?> t;
1183 long deadline = System.nanoTime() + nanos;
1184 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1185 (t instanceof ConsumerTask)) {
1186 if ((stat = offer(item)) != 0 ||
1187 (nanos = deadline - System.nanoTime()) <= 0L ||
1188 !t.tryUnfork())
1189 break;
1190 ((ConsumerTask<?>)t).consumer.consume();
1191 }
1192 helpDepth = 0;
1193 }
1194 }
1195 if (stat == 0 && (stat = offer(item)) == 0 &&
1196 (timeout = nanos) > 0L) {
1197 putItem = item;
1198 try {
1199 ForkJoinPool.managedBlock(this);
1200 } catch (InterruptedException ie) {
1201 timeout = INTERRUPTED;
1202 }
1203 stat = putStat;
1204 if (timeout < 0L)
1205 Thread.currentThread().interrupt();
1206 }
1207 return stat;
1208 }
1209
1210 /**
1211 * Tries to start consumer task after offer.
1212 * @return -1 if now disabled, else argument
1213 */
1214 private int startOnOffer(int stat) {
1215 for (;;) {
1216 Executor e; int c;
1217 if ((c = ctl) == DISABLED || (e = executor) == null) {
1218 stat = -1;
1219 break;
1220 }
1221 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1222 if ((c & CONSUME) != 0 ||
1223 U.compareAndSwapInt(this, CTL, c,
1224 c | CONSUME))
1225 break;
1226 }
1227 else if (demand == 0L || tail == head)
1228 break;
1229 else if (U.compareAndSwapInt(this, CTL, c,
1230 c | (ACTIVE | CONSUME))) {
1231 try {
1232 e.execute(new ConsumerTask<T>(this));
1233 break;
1234 } catch (RuntimeException | Error ex) { // back out
1235 do {} while (((c = ctl) & DISABLED) == 0 &&
1236 (c & ACTIVE) != 0 &&
1237 !U.compareAndSwapInt(this, CTL, c,
1238 c & ~ACTIVE));
1239 throw ex;
1240 }
1241 }
1242 }
1243 return stat;
1244 }
1245
1246 private void signalWaiter(Thread w) {
1247 waiter = null;
1248 LockSupport.unpark(w); // release producer
1249 }
1250
1251 /**
1252 * Nulls out most fields, mainly to avoid garbage retention
1253 * until publisher unsubscribes, but also to help cleanly stop
1254 * upon error by nulling required components.
1255 */
1256 private void detach() {
1257 Thread w = waiter;
1258 executor = null;
1259 subscriber = null;
1260 pendingError = null;
1261 signalWaiter(w);
1262 }
1263
1264 /**
1265 * Issues error signal, asynchronously if a task is running,
1266 * else synchronously.
1267 */
1268 final void onError(Throwable ex) {
1269 for (int c;;) {
1270 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1271 break;
1272 else if ((c & ACTIVE) != 0) {
1273 pendingError = ex;
1274 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1275 break; // cause consumer task to exit
1276 }
1277 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1278 Flow.Subscriber<? super T> s = subscriber;
1279 if (s != null && ex != null) {
1280 try {
1281 s.onError(ex);
1282 } catch (Throwable ignore) {
1283 }
1284 }
1285 detach();
1286 break;
1287 }
1288 }
1289 }
1290
1291 /**
1292 * Tries to start consumer task upon a signal or request;
1293 * disables on failure.
1294 */
1295 private void startOrDisable() {
1296 Executor e;
1297 if ((e = executor) != null) { // skip if already disabled
1298 try {
1299 e.execute(new ConsumerTask<T>(this));
1300 } catch (Throwable ex) { // back out and force signal
1301 for (int c;;) {
1302 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1303 break;
1304 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1305 onError(ex);
1306 break;
1307 }
1308 }
1309 }
1310 }
1311 }
1312
1313 final void onComplete() {
1314 for (int c;;) {
1315 if ((c = ctl) == DISABLED)
1316 break;
1317 if (U.compareAndSwapInt(this, CTL, c,
1318 c | (ACTIVE | CONSUME | COMPLETE))) {
1319 if ((c & ACTIVE) == 0)
1320 startOrDisable();
1321 break;
1322 }
1323 }
1324 }
1325
1326 final void onSubscribe() {
1327 for (int c;;) {
1328 if ((c = ctl) == DISABLED)
1329 break;
1330 if (U.compareAndSwapInt(this, CTL, c,
1331 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1332 if ((c & ACTIVE) == 0)
1333 startOrDisable();
1334 break;
1335 }
1336 }
1337 }
1338
1339 /**
1340 * Causes consumer task to exit if active (without reporting
1341 * onError unless there is already a pending error), and
1342 * disables.
1343 */
1344 public void cancel() {
1345 for (int c;;) {
1346 if ((c = ctl) == DISABLED)
1347 break;
1348 else if ((c & ACTIVE) != 0) {
1349 if (U.compareAndSwapInt(this, CTL, c,
1350 c | (CONSUME | ERROR)))
1351 break;
1352 }
1353 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1354 detach();
1355 break;
1356 }
1357 }
1358 }
1359
1360 /**
1361 * Adds to demand and possibly starts task.
1362 */
1363 public void request(long n) {
1364 if (n > 0L) {
1365 for (;;) {
1366 long prev = demand, d;
1367 if ((d = prev + n) < prev) // saturate
1368 d = Long.MAX_VALUE;
1369 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1370 for (int c, h;;) {
1371 if ((c = ctl) == DISABLED)
1372 break;
1373 else if ((c & ACTIVE) != 0) {
1374 if ((c & CONSUME) != 0 ||
1375 U.compareAndSwapInt(this, CTL, c,
1376 c | CONSUME))
1377 break;
1378 }
1379 else if ((h = head) != tail) {
1380 if (U.compareAndSwapInt(this, CTL, c,
1381 c | (ACTIVE|CONSUME))) {
1382 startOrDisable();
1383 break;
1384 }
1385 }
1386 else if (head == h && tail == h)
1387 break; // else stale
1388 if (demand == 0L)
1389 break;
1390 }
1391 break;
1392 }
1393 }
1394 }
1395 else if (n < 0L)
1396 onError(new IllegalArgumentException(
1397 "negative subscription request"));
1398 }
1399
1400 public final boolean isReleasable() { // for ManagedBlocker
1401 T item = putItem;
1402 if (item != null) {
1403 if ((putStat = offer(item)) == 0)
1404 return false;
1405 putItem = null;
1406 }
1407 return true;
1408 }
1409
1410 public final boolean block() { // for ManagedBlocker
1411 T item = putItem;
1412 if (item != null) {
1413 putItem = null;
1414 long nanos = timeout;
1415 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1416 while ((putStat = offer(item)) == 0) {
1417 if (Thread.interrupted()) {
1418 timeout = INTERRUPTED;
1419 if (nanos > 0L)
1420 break;
1421 }
1422 else if (nanos > 0L &&
1423 (nanos = deadline - System.nanoTime()) <= 0L)
1424 break;
1425 else if (waiter == null)
1426 waiter = Thread.currentThread();
1427 else {
1428 if (nanos > 0L)
1429 LockSupport.parkNanos(this, nanos);
1430 else
1431 LockSupport.park(this);
1432 waiter = null;
1433 }
1434 }
1435 }
1436 waiter = null;
1437 return true;
1438 }
1439
1440 /**
1441 * Consumer loop, called from ConsumerTask, or indirectly
1442 * when helping during submit.
1443 */
1444 final void consume() {
1445 Flow.Subscriber<? super T> s;
1446 int h = head;
1447 if ((s = subscriber) != null) { // else disabled
1448 for (;;) {
1449 long d = demand;
1450 int c; Object[] a; int n; long i; Object x; Thread w;
1451 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1452 if (!checkControl(s, c))
1453 break;
1454 }
1455 else if ((a = array) == null || h == tail ||
1456 (n = a.length) == 0 ||
1457 (x = U.getObjectVolatile
1458 (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1459 == null) {
1460 if (!checkEmpty(s, c))
1461 break;
1462 }
1463 else if (d == 0L) {
1464 if (!checkDemand(c))
1465 break;
1466 }
1467 else if (((c & CONSUME) != 0 ||
1468 U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1469 U.compareAndSwapObject(a, i, x, null)) {
1470 U.putOrderedInt(this, HEAD, ++h);
1471 U.getAndAddLong(this, DEMAND, -1L);
1472 if ((w = waiter) != null)
1473 signalWaiter(w);
1474 try {
1475 @SuppressWarnings("unchecked") T y = (T) x;
1476 s.onNext(y);
1477 } catch (Throwable ex) {
1478 handleOnNext(s, ex);
1479 }
1480 }
1481 }
1482 }
1483 }
1484
1485 /**
1486 * Responds to control events in consume().
1487 */
1488 private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1489 boolean stat = true;
1490 if ((c & ERROR) != 0) {
1491 Throwable ex = pendingError;
1492 ctl = DISABLED; // no need for CAS
1493 if (ex != null) { // null if errorless cancel
1494 try {
1495 if (s != null)
1496 s.onError(ex);
1497 } catch (Throwable ignore) {
1498 }
1499 }
1500 }
1501 else if ((c & SUBSCRIBE) != 0) {
1502 if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1503 try {
1504 if (s != null)
1505 s.onSubscribe(this);
1506 } catch (Throwable ex) {
1507 onError(ex);
1508 }
1509 }
1510 }
1511 else {
1512 detach();
1513 stat = false;
1514 }
1515 return stat;
1516 }
1517
1518 /**
1519 * Responds to apparent emptiness in consume().
1520 */
1521 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1522 boolean stat = true;
1523 if (head == tail) {
1524 if ((c & CONSUME) != 0)
1525 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1526 else if ((c & COMPLETE) != 0) {
1527 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1528 try {
1529 if (s != null)
1530 s.onComplete();
1531 } catch (Throwable ignore) {
1532 }
1533 }
1534 }
1535 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1536 stat = false;
1537 }
1538 return stat;
1539 }
1540
1541 /**
1542 * Responds to apparent zero demand in consume().
1543 */
1544 private boolean checkDemand(int c) {
1545 boolean stat = true;
1546 if (demand == 0L) {
1547 if ((c & CONSUME) != 0)
1548 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1549 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1550 stat = false;
1551 }
1552 return stat;
1553 }
1554
1555 /**
1556 * Processes exception in Subscriber.onNext.
1557 */
1558 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1559 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1560 if ((h = onNextHandler) != null) {
1561 try {
1562 h.accept(s, ex);
1563 } catch (Throwable ignore) {
1564 }
1565 }
1566 onError(ex);
1567 }
1568
1569 // Unsafe mechanics
1570 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1571 private static final long CTL;
1572 private static final long TAIL;
1573 private static final long HEAD;
1574 private static final long DEMAND;
1575 private static final int ABASE;
1576 private static final int ASHIFT;
1577
1578 static {
1579 try {
1580 CTL = U.objectFieldOffset
1581 (BufferedSubscription.class.getDeclaredField("ctl"));
1582 TAIL = U.objectFieldOffset
1583 (BufferedSubscription.class.getDeclaredField("tail"));
1584 HEAD = U.objectFieldOffset
1585 (BufferedSubscription.class.getDeclaredField("head"));
1586 DEMAND = U.objectFieldOffset
1587 (BufferedSubscription.class.getDeclaredField("demand"));
1588
1589 ABASE = U.arrayBaseOffset(Object[].class);
1590 int scale = U.arrayIndexScale(Object[].class);
1591 if ((scale & (scale - 1)) != 0)
1592 throw new Error("data type scale not a power of two");
1593 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1594 } catch (ReflectiveOperationException e) {
1595 throw new Error(e);
1596 }
1597
1598 // Reduce the risk of rare disastrous classloading in first call to
1599 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1600 Class<?> ensureLoaded = LockSupport.class;
1601 }
1602 }
1603 }