ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.74
Committed: Sun Nov 26 23:41:05 2017 UTC (6 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.73: +7 -5 lines
Log Message:
tidying

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.function.BiConsumer;
16 import java.util.function.BiPredicate;
17 import java.util.function.Consumer;
18 import static java.util.concurrent.Flow.Publisher;
19 import static java.util.concurrent.Flow.Subscriber;
20 import static java.util.concurrent.Flow.Subscription;
21
22 /**
23 * A {@link Flow.Publisher} that asynchronously issues submitted
24 * (non-null) items to current subscribers until it is closed. Each
25 * current subscriber receives newly submitted items in the same order
26 * unless drops or exceptions are encountered. Using a
27 * SubmissionPublisher allows item generators to act as compliant <a
28 * href="http://www.reactive-streams.org/"> reactive-streams</a>
29 * Publishers relying on drop handling and/or blocking for flow
30 * control.
31 *
32 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
33 * constructor for delivery to subscribers. The best choice of
34 * Executor depends on expected usage. If the generator(s) of
35 * submitted items run in separate threads, and the number of
36 * subscribers can be estimated, consider using a {@link
37 * Executors#newFixedThreadPool}. Otherwise consider using the
38 * default, normally the {@link ForkJoinPool#commonPool}.
39 *
40 * <p>Buffering allows producers and consumers to transiently operate
41 * at different rates. Each subscriber uses an independent buffer.
42 * Buffers are created upon first use and expanded as needed up to the
43 * given maximum. (The enforced capacity may be rounded up to the
44 * nearest power of two and/or bounded by the largest value supported
45 * by this implementation.) Invocations of {@link
46 * Flow.Subscription#request(long) request} do not directly result in
47 * buffer expansion, but risk saturation if unfilled requests exceed
48 * the maximum capacity. The default value of {@link
49 * Flow#defaultBufferSize()} may provide a useful starting point for
50 * choosing a capacity based on expected rates, resources, and usages.
51 *
52 * <p>A single SubmissionPublisher may be shared among multiple
53 * sources. Actions in a source thread prior to publishing an item or
54 * issuing a signal <a href="package-summary.html#MemoryVisibility">
55 * <i>happen-before</i></a> actions subsequent to the corresponding
56 * access by each subscriber. But reported estimates of lag and demand
57 * are designed for use in monitoring, not for synchronization
58 * control, and may reflect stale or inaccurate views of progress.
59 *
60 * <p>Publication methods support different policies about what to do
61 * when buffers are saturated. Method {@link #submit(Object) submit}
62 * blocks until resources are available. This is simplest, but least
63 * responsive. The {@code offer} methods may drop items (either
64 * immediately or with bounded timeout), but provide an opportunity to
65 * interpose a handler and then retry.
66 *
67 * <p>If any Subscriber method throws an exception, its subscription
68 * is cancelled. If a handler is supplied as a constructor argument,
69 * it is invoked before cancellation upon an exception in method
70 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
71 * {@link Flow.Subscriber#onSubscribe onSubscribe},
72 * {@link Flow.Subscriber#onError(Throwable) onError} and
73 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
74 * handled before cancellation. If the supplied Executor throws
75 * {@link RejectedExecutionException} (or any other RuntimeException
76 * or Error) when attempting to execute a task, or a drop handler
77 * throws an exception when processing a dropped item, then the
78 * exception is rethrown. In these cases, not all subscribers will
79 * have been issued the published item. It is usually good practice to
80 * {@link #closeExceptionally closeExceptionally} in these cases.
81 *
82 * <p>Method {@link #consume(Consumer)} simplifies support for a
83 * common case in which the only action of a subscriber is to request
84 * and process all items using a supplied function.
85 *
86 * <p>This class may also serve as a convenient base for subclasses
87 * that generate items, and use the methods in this class to publish
88 * them. For example here is a class that periodically publishes the
89 * items generated from a supplier. (In practice you might add methods
90 * to independently start and stop generation, to share Executors
91 * among publishers, and so on, or use a SubmissionPublisher as a
92 * component rather than a superclass.)
93 *
94 * <pre> {@code
95 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
96 * final ScheduledFuture<?> periodicTask;
97 * final ScheduledExecutorService scheduler;
98 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
99 * Supplier<? extends T> supplier,
100 * long period, TimeUnit unit) {
101 * super(executor, maxBufferCapacity);
102 * scheduler = new ScheduledThreadPoolExecutor(1);
103 * periodicTask = scheduler.scheduleAtFixedRate(
104 * () -> submit(supplier.get()), 0, period, unit);
105 * }
106 * public void close() {
107 * periodicTask.cancel(false);
108 * scheduler.shutdown();
109 * super.close();
110 * }
111 * }}</pre>
112 *
113 * <p>Here is an example of a {@link Flow.Processor} implementation.
114 * It uses single-step requests to its publisher for simplicity of
115 * illustration. A more adaptive version could monitor flow using the
116 * lag estimate returned from {@code submit}, along with other utility
117 * methods.
118 *
119 * <pre> {@code
120 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
121 * implements Flow.Processor<S,T> {
122 * final Function<? super S, ? extends T> function;
123 * Flow.Subscription subscription;
124 * TransformProcessor(Executor executor, int maxBufferCapacity,
125 * Function<? super S, ? extends T> function) {
126 * super(executor, maxBufferCapacity);
127 * this.function = function;
128 * }
129 * public void onSubscribe(Flow.Subscription subscription) {
130 * (this.subscription = subscription).request(1);
131 * }
132 * public void onNext(S item) {
133 * subscription.request(1);
134 * submit(function.apply(item));
135 * }
136 * public void onError(Throwable ex) { closeExceptionally(ex); }
137 * public void onComplete() { close(); }
138 * }}</pre>
139 *
140 * @param <T> the published item type
141 * @author Doug Lea
142 * @since 9
143 */
144 public class SubmissionPublisher<T> implements Publisher<T>,
145 AutoCloseable {
146 /*
147 * Most mechanics are handled by BufferedSubscription. This class
148 * mainly tracks subscribers and ensures sequentiality, by using
149 * built-in synchronization locks across public methods. Using
150 * built-in locks works well in the most typical case in which
151 * only one thread submits items. We extend this idea in
152 * submission methods by detecting single-ownership to reduce
153 * producer-consumer synchronization strength.
154 */
155
156 /** The largest possible power of two array size. */
157 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
158
159 /**
160 * Initial buffer capacity used when maxBufferCapacity is
161 * greater. Must be a power of two.
162 */
163 static final int INITIAL_CAPACITY = 32;
164
165 /** Round capacity to power of 2, at most limit. */
166 static final int roundCapacity(int cap) {
167 int n = cap - 1;
168 n |= n >>> 1;
169 n |= n >>> 2;
170 n |= n >>> 4;
171 n |= n >>> 8;
172 n |= n >>> 16;
173 return (n <= 0) ? 1 : // at least 1
174 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
175 }
176
177 // default Executor setup; nearly the same as CompletableFuture
178
179 /**
180 * Default executor -- ForkJoinPool.commonPool() unless it cannot
181 * support parallelism.
182 */
183 private static final Executor ASYNC_POOL =
184 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
185 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
186
187 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
188 private static final class ThreadPerTaskExecutor implements Executor {
189 ThreadPerTaskExecutor() {} // prevent access constructor creation
190 public void execute(Runnable r) { new Thread(r).start(); }
191 }
192
193 /**
194 * Clients (BufferedSubscriptions) are maintained in a linked list
195 * (via their "next" fields). This works well for publish loops.
196 * It requires O(n) traversal to check for duplicate subscribers,
197 * but we expect that subscribing is much less common than
198 * publishing. Unsubscribing occurs only during traversal loops,
199 * when BufferedSubscription methods return negative values
200 * signifying that they have been closed. To reduce
201 * head-of-line blocking, submit and offer methods first call
202 * BufferedSubscription.offer on each subscriber, and place
203 * saturated ones in retries list (using nextRetry field), and
204 * retry, possibly blocking or dropping.
205 */
206 BufferedSubscription<T> clients;
207
208 /** Run status, updated only within locks */
209 volatile boolean closed;
210 /** Set true on first call to subscribe, to initialize possible owner */
211 boolean subscribed;
212 /** The first caller thread to subscribe, or null if thread ever changed */
213 Thread owner;
214 /** If non-null, the exception in closeExceptionally */
215 volatile Throwable closedException;
216
217 // Parameters for constructing BufferedSubscriptions
218 final Executor executor;
219 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
220 final int maxBufferCapacity;
221
222 /**
223 * Creates a new SubmissionPublisher using the given Executor for
224 * async delivery to subscribers, with the given maximum buffer size
225 * for each subscriber, and, if non-null, the given handler invoked
226 * when any Subscriber throws an exception in method {@link
227 * Flow.Subscriber#onNext(Object) onNext}.
228 *
229 * @param executor the executor to use for async delivery,
230 * supporting creation of at least one independent thread
231 * @param maxBufferCapacity the maximum capacity for each
232 * subscriber's buffer (the enforced capacity may be rounded up to
233 * the nearest power of two and/or bounded by the largest value
234 * supported by this implementation; method {@link #getMaxBufferCapacity}
235 * returns the actual value)
236 * @param handler if non-null, procedure to invoke upon exception
237 * thrown in method {@code onNext}
238 * @throws NullPointerException if executor is null
239 * @throws IllegalArgumentException if maxBufferCapacity not
240 * positive
241 */
242 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
243 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
244 if (executor == null)
245 throw new NullPointerException();
246 if (maxBufferCapacity <= 0)
247 throw new IllegalArgumentException("capacity must be positive");
248 this.executor = executor;
249 this.onNextHandler = handler;
250 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
251 }
252
253 /**
254 * Creates a new SubmissionPublisher using the given Executor for
255 * async delivery to subscribers, with the given maximum buffer size
256 * for each subscriber, and no handler for Subscriber exceptions in
257 * method {@link Flow.Subscriber#onNext(Object) onNext}.
258 *
259 * @param executor the executor to use for async delivery,
260 * supporting creation of at least one independent thread
261 * @param maxBufferCapacity the maximum capacity for each
262 * subscriber's buffer (the enforced capacity may be rounded up to
263 * the nearest power of two and/or bounded by the largest value
264 * supported by this implementation; method {@link #getMaxBufferCapacity}
265 * returns the actual value)
266 * @throws NullPointerException if executor is null
267 * @throws IllegalArgumentException if maxBufferCapacity not
268 * positive
269 */
270 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
271 this(executor, maxBufferCapacity, null);
272 }
273
274 /**
275 * Creates a new SubmissionPublisher using the {@link
276 * ForkJoinPool#commonPool()} for async delivery to subscribers
277 * (unless it does not support a parallelism level of at least two,
278 * in which case, a new Thread is created to run each task), with
279 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
280 * handler for Subscriber exceptions in method {@link
281 * Flow.Subscriber#onNext(Object) onNext}.
282 */
283 public SubmissionPublisher() {
284 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
285 }
286
287 /**
288 * Adds the given Subscriber unless already subscribed. If already
289 * subscribed, the Subscriber's {@link
290 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
291 * the existing subscription with an {@link IllegalStateException}.
292 * Otherwise, upon success, the Subscriber's {@link
293 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
294 * asynchronously with a new {@link Flow.Subscription}. If {@link
295 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
296 * subscription is cancelled. Otherwise, if this SubmissionPublisher
297 * was closed exceptionally, then the subscriber's {@link
298 * Flow.Subscriber#onError onError} method is invoked with the
299 * corresponding exception, or if closed without exception, the
300 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
301 * method is invoked. Subscribers may enable receiving items by
302 * invoking the {@link Flow.Subscription#request(long) request}
303 * method of the new Subscription, and may unsubscribe by invoking
304 * its {@link Flow.Subscription#cancel() cancel} method.
305 *
306 * @param subscriber the subscriber
307 * @throws NullPointerException if subscriber is null
308 */
309 public void subscribe(Subscriber<? super T> subscriber) {
310 if (subscriber == null) throw new NullPointerException();
311 int max = maxBufferCapacity; // allocate initial array
312 Object[] array = new Object[max < INITIAL_CAPACITY ?
313 max : INITIAL_CAPACITY];
314 BufferedSubscription<T> subscription =
315 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
316 array, max);
317 synchronized (this) {
318 if (!subscribed) {
319 subscribed = true;
320 owner = Thread.currentThread();
321 }
322 for (BufferedSubscription<T> b = clients, pred = null;;) {
323 if (b == null) {
324 Throwable ex;
325 subscription.onSubscribe();
326 if ((ex = closedException) != null)
327 subscription.onError(ex);
328 else if (closed)
329 subscription.onComplete();
330 else if (pred == null)
331 clients = subscription;
332 else
333 pred.next = subscription;
334 break;
335 }
336 BufferedSubscription<T> next = b.next;
337 if (b.isClosed()) { // remove
338 b.next = null; // detach
339 if (pred == null)
340 clients = next;
341 else
342 pred.next = next;
343 }
344 else if (subscriber.equals(b.subscriber)) {
345 b.onError(new IllegalStateException("Duplicate subscribe"));
346 break;
347 }
348 else
349 pred = b;
350 b = next;
351 }
352 }
353 }
354
355 /**
356 * Common implementation for all three forms of submit and offer.
357 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
358 */
359 private int doOffer(T item, long nanos,
360 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
361 if (item == null) throw new NullPointerException();
362 int lag = 0;
363 boolean complete, unowned;
364 synchronized (this) {
365 Thread t = Thread.currentThread(), o;
366 BufferedSubscription<T> b = clients;
367 if ((unowned = ((o = owner) != t)) && o != null)
368 owner = null; // disable bias
369 if (b == null)
370 complete = closed;
371 else {
372 complete = false;
373 boolean cleanMe = false;
374 BufferedSubscription<T> retries = null, next;
375 do {
376 next = b.next;
377 int stat = b.offer(item, unowned);
378 if (stat == 0) { // saturated
379 b.nextRetry = retries; // add to retry list
380 retries = b;
381 }
382 else if (stat < 0) // closed
383 cleanMe = true; // remove later
384 else if (stat > lag)
385 lag = stat;
386 } while ((b = next) != null);
387
388 if (retries != null || cleanMe)
389 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
390 }
391 }
392 if (complete)
393 throw new IllegalStateException("Closed");
394 else
395 return lag;
396 }
397
398 /**
399 * Helps, (timed) waits for, and/or drops buffers on list; returns
400 * lag or negative drops (for use in offer).
401 */
402 private int retryOffer(T item, long nanos,
403 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
404 BufferedSubscription<T> retries, int lag,
405 boolean cleanMe) {
406 for (BufferedSubscription<T> r = retries; r != null;) {
407 BufferedSubscription<T> nextRetry = r.nextRetry;
408 r.nextRetry = null;
409 if (nanos > 0L)
410 r.awaitSpace(nanos);
411 int stat = r.retryOffer(item);
412 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
413 stat = r.retryOffer(item);
414 if (stat == 0)
415 lag = (lag >= 0) ? -1 : lag - 1;
416 else if (stat < 0)
417 cleanMe = true;
418 else if (lag >= 0 && stat > lag)
419 lag = stat;
420 r = nextRetry;
421 }
422 if (cleanMe)
423 cleanAndCount();
424 return lag;
425 }
426
427 /**
428 * Returns current list count after removing closed subscribers.
429 * Call only while holding lock. Used mainly by retryOffer for
430 * cleanup.
431 */
432 private int cleanAndCount() {
433 int count = 0;
434 BufferedSubscription<T> pred = null, next;
435 for (BufferedSubscription<T> b = clients; b != null; b = next) {
436 next = b.next;
437 if (b.isClosed()) {
438 b.next = null;
439 if (pred == null)
440 clients = next;
441 else
442 pred.next = next;
443 }
444 else {
445 pred = b;
446 ++count;
447 }
448 }
449 return count;
450 }
451
452 /**
453 * Publishes the given item to each current subscriber by
454 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
455 * onNext} method, blocking uninterruptibly while resources for any
456 * subscriber are unavailable. This method returns an estimate of
457 * the maximum lag (number of items submitted but not yet consumed)
458 * among all current subscribers. This value is at least one
459 * (accounting for this submitted item) if there are any
460 * subscribers, else zero.
461 *
462 * <p>If the Executor for this publisher throws a
463 * RejectedExecutionException (or any other RuntimeException or
464 * Error) when attempting to asynchronously notify subscribers,
465 * then this exception is rethrown, in which case not all
466 * subscribers will have been issued this item.
467 *
468 * @param item the (non-null) item to publish
469 * @return the estimated maximum lag among subscribers
470 * @throws IllegalStateException if closed
471 * @throws NullPointerException if item is null
472 * @throws RejectedExecutionException if thrown by Executor
473 */
474 public int submit(T item) {
475 return doOffer(item, Long.MAX_VALUE, null);
476 }
477
478 /**
479 * Publishes the given item, if possible, to each current subscriber
480 * by asynchronously invoking its {@link
481 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
482 * dropped by one or more subscribers if resource limits are
483 * exceeded, in which case the given handler (if non-null) is
484 * invoked, and if it returns true, retried once. Other calls to
485 * methods in this class by other threads are blocked while the
486 * handler is invoked. Unless recovery is assured, options are
487 * usually limited to logging the error and/or issuing an {@link
488 * Flow.Subscriber#onError(Throwable) onError} signal to the
489 * subscriber.
490 *
491 * <p>This method returns a status indicator: If negative, it
492 * represents the (negative) number of drops (failed attempts to
493 * issue the item to a subscriber). Otherwise it is an estimate of
494 * the maximum lag (number of items submitted but not yet
495 * consumed) among all current subscribers. This value is at least
496 * one (accounting for this submitted item) if there are any
497 * subscribers, else zero.
498 *
499 * <p>If the Executor for this publisher throws a
500 * RejectedExecutionException (or any other RuntimeException or
501 * Error) when attempting to asynchronously notify subscribers, or
502 * the drop handler throws an exception when processing a dropped
503 * item, then this exception is rethrown.
504 *
505 * @param item the (non-null) item to publish
506 * @param onDrop if non-null, the handler invoked upon a drop to a
507 * subscriber, with arguments of the subscriber and item; if it
508 * returns true, an offer is re-attempted (once)
509 * @return if negative, the (negative) number of drops; otherwise
510 * an estimate of maximum lag
511 * @throws IllegalStateException if closed
512 * @throws NullPointerException if item is null
513 * @throws RejectedExecutionException if thrown by Executor
514 */
515 public int offer(T item,
516 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
517 return doOffer(item, 0L, onDrop);
518 }
519
520 /**
521 * Publishes the given item, if possible, to each current subscriber
522 * by asynchronously invoking its {@link
523 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
524 * resources for any subscription are unavailable, up to the
525 * specified timeout or until the caller thread is interrupted, at
526 * which point the given handler (if non-null) is invoked, and if it
527 * returns true, retried once. (The drop handler may distinguish
528 * timeouts from interrupts by checking whether the current thread
529 * is interrupted.) Other calls to methods in this class by other
530 * threads are blocked while the handler is invoked. Unless
531 * recovery is assured, options are usually limited to logging the
532 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
533 * onError} signal to the subscriber.
534 *
535 * <p>This method returns a status indicator: If negative, it
536 * represents the (negative) number of drops (failed attempts to
537 * issue the item to a subscriber). Otherwise it is an estimate of
538 * the maximum lag (number of items submitted but not yet
539 * consumed) among all current subscribers. This value is at least
540 * one (accounting for this submitted item) if there are any
541 * subscribers, else zero.
542 *
543 * <p>If the Executor for this publisher throws a
544 * RejectedExecutionException (or any other RuntimeException or
545 * Error) when attempting to asynchronously notify subscribers, or
546 * the drop handler throws an exception when processing a dropped
547 * item, then this exception is rethrown.
548 *
549 * @param item the (non-null) item to publish
550 * @param timeout how long to wait for resources for any subscriber
551 * before giving up, in units of {@code unit}
552 * @param unit a {@code TimeUnit} determining how to interpret the
553 * {@code timeout} parameter
554 * @param onDrop if non-null, the handler invoked upon a drop to a
555 * subscriber, with arguments of the subscriber and item; if it
556 * returns true, an offer is re-attempted (once)
557 * @return if negative, the (negative) number of drops; otherwise
558 * an estimate of maximum lag
559 * @throws IllegalStateException if closed
560 * @throws NullPointerException if item is null
561 * @throws RejectedExecutionException if thrown by Executor
562 */
563 public int offer(T item, long timeout, TimeUnit unit,
564 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
565 long nanos = unit.toNanos(timeout);
566 // distinguishes from untimed (only wrt interrupt policy)
567 if (nanos == Long.MAX_VALUE) --nanos;
568 return doOffer(item, nanos, onDrop);
569 }
570
571 /**
572 * Unless already closed, issues {@link
573 * Flow.Subscriber#onComplete() onComplete} signals to current
574 * subscribers, and disallows subsequent attempts to publish.
575 * Upon return, this method does <em>NOT</em> guarantee that all
576 * subscribers have yet completed.
577 */
578 public void close() {
579 if (!closed) {
580 BufferedSubscription<T> b;
581 synchronized (this) {
582 // no need to re-check closed here
583 b = clients;
584 clients = null;
585 owner = null;
586 closed = true;
587 }
588 while (b != null) {
589 BufferedSubscription<T> next = b.next;
590 b.next = null;
591 b.onComplete();
592 b = next;
593 }
594 }
595 }
596
597 /**
598 * Unless already closed, issues {@link
599 * Flow.Subscriber#onError(Throwable) onError} signals to current
600 * subscribers with the given error, and disallows subsequent
601 * attempts to publish. Future subscribers also receive the given
602 * error. Upon return, this method does <em>NOT</em> guarantee
603 * that all subscribers have yet completed.
604 *
605 * @param error the {@code onError} argument sent to subscribers
606 * @throws NullPointerException if error is null
607 */
608 public void closeExceptionally(Throwable error) {
609 if (error == null)
610 throw new NullPointerException();
611 if (!closed) {
612 BufferedSubscription<T> b;
613 synchronized (this) {
614 b = clients;
615 if (!closed) { // don't clobber racing close
616 closedException = error;
617 clients = null;
618 owner = null;
619 closed = true;
620 }
621 }
622 while (b != null) {
623 BufferedSubscription<T> next = b.next;
624 b.next = null;
625 b.onError(error);
626 b = next;
627 }
628 }
629 }
630
631 /**
632 * Returns true if this publisher is not accepting submissions.
633 *
634 * @return true if closed
635 */
636 public boolean isClosed() {
637 return closed;
638 }
639
640 /**
641 * Returns the exception associated with {@link
642 * #closeExceptionally(Throwable) closeExceptionally}, or null if
643 * not closed or if closed normally.
644 *
645 * @return the exception, or null if none
646 */
647 public Throwable getClosedException() {
648 return closedException;
649 }
650
651 /**
652 * Returns true if this publisher has any subscribers.
653 *
654 * @return true if this publisher has any subscribers
655 */
656 public boolean hasSubscribers() {
657 boolean nonEmpty = false;
658 synchronized (this) {
659 for (BufferedSubscription<T> b = clients; b != null;) {
660 BufferedSubscription<T> next = b.next;
661 if (b.isClosed()) {
662 b.next = null;
663 b = clients = next;
664 }
665 else {
666 nonEmpty = true;
667 break;
668 }
669 }
670 }
671 return nonEmpty;
672 }
673
674 /**
675 * Returns the number of current subscribers.
676 *
677 * @return the number of current subscribers
678 */
679 public int getNumberOfSubscribers() {
680 int count;
681 synchronized (this) {
682 count = cleanAndCount();
683 }
684 return count;
685 }
686
687 /**
688 * Returns the Executor used for asynchronous delivery.
689 *
690 * @return the Executor used for asynchronous delivery
691 */
692 public Executor getExecutor() {
693 return executor;
694 }
695
696 /**
697 * Returns the maximum per-subscriber buffer capacity.
698 *
699 * @return the maximum per-subscriber buffer capacity
700 */
701 public int getMaxBufferCapacity() {
702 return maxBufferCapacity;
703 }
704
705 /**
706 * Returns a list of current subscribers for monitoring and
707 * tracking purposes, not for invoking {@link Flow.Subscriber}
708 * methods on the subscribers.
709 *
710 * @return list of current subscribers
711 */
712 public List<Subscriber<? super T>> getSubscribers() {
713 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
714 synchronized (this) {
715 BufferedSubscription<T> pred = null, next;
716 for (BufferedSubscription<T> b = clients; b != null; b = next) {
717 next = b.next;
718 if (b.isClosed()) {
719 b.next = null;
720 if (pred == null)
721 clients = next;
722 else
723 pred.next = next;
724 }
725 else
726 subs.add(b.subscriber);
727 }
728 }
729 return subs;
730 }
731
732 /**
733 * Returns true if the given Subscriber is currently subscribed.
734 *
735 * @param subscriber the subscriber
736 * @return true if currently subscribed
737 * @throws NullPointerException if subscriber is null
738 */
739 public boolean isSubscribed(Subscriber<? super T> subscriber) {
740 if (subscriber == null) throw new NullPointerException();
741 if (!closed) {
742 synchronized (this) {
743 BufferedSubscription<T> pred = null, next;
744 for (BufferedSubscription<T> b = clients; b != null; b = next) {
745 next = b.next;
746 if (b.isClosed()) {
747 b.next = null;
748 if (pred == null)
749 clients = next;
750 else
751 pred.next = next;
752 }
753 else if (subscriber.equals(b.subscriber))
754 return true;
755 else
756 pred = b;
757 }
758 }
759 }
760 return false;
761 }
762
763 /**
764 * Returns an estimate of the minimum number of items requested
765 * (via {@link Flow.Subscription#request(long) request}) but not
766 * yet produced, among all current subscribers.
767 *
768 * @return the estimate, or zero if no subscribers
769 */
770 public long estimateMinimumDemand() {
771 long min = Long.MAX_VALUE;
772 boolean nonEmpty = false;
773 synchronized (this) {
774 BufferedSubscription<T> pred = null, next;
775 for (BufferedSubscription<T> b = clients; b != null; b = next) {
776 int n; long d;
777 next = b.next;
778 if ((n = b.estimateLag()) < 0) {
779 b.next = null;
780 if (pred == null)
781 clients = next;
782 else
783 pred.next = next;
784 }
785 else {
786 if ((d = b.demand - n) < min)
787 min = d;
788 nonEmpty = true;
789 pred = b;
790 }
791 }
792 }
793 return nonEmpty ? min : 0;
794 }
795
796 /**
797 * Returns an estimate of the maximum number of items produced but
798 * not yet consumed among all current subscribers.
799 *
800 * @return the estimate
801 */
802 public int estimateMaximumLag() {
803 int max = 0;
804 synchronized (this) {
805 BufferedSubscription<T> pred = null, next;
806 for (BufferedSubscription<T> b = clients; b != null; b = next) {
807 int n;
808 next = b.next;
809 if ((n = b.estimateLag()) < 0) {
810 b.next = null;
811 if (pred == null)
812 clients = next;
813 else
814 pred.next = next;
815 }
816 else {
817 if (n > max)
818 max = n;
819 pred = b;
820 }
821 }
822 }
823 return max;
824 }
825
826 /**
827 * Processes all published items using the given Consumer function.
828 * Returns a CompletableFuture that is completed normally when this
829 * publisher signals {@link Flow.Subscriber#onComplete()
830 * onComplete}, or completed exceptionally upon any error, or an
831 * exception is thrown by the Consumer, or the returned
832 * CompletableFuture is cancelled, in which case no further items
833 * are processed.
834 *
835 * @param consumer the function applied to each onNext item
836 * @return a CompletableFuture that is completed normally
837 * when the publisher signals onComplete, and exceptionally
838 * upon any error or cancellation
839 * @throws NullPointerException if consumer is null
840 */
841 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
842 if (consumer == null)
843 throw new NullPointerException();
844 CompletableFuture<Void> status = new CompletableFuture<>();
845 subscribe(new ConsumerSubscriber<T>(status, consumer));
846 return status;
847 }
848
849 /** Subscriber for method consume */
850 static final class ConsumerSubscriber<T> implements Subscriber<T> {
851 final CompletableFuture<Void> status;
852 final Consumer<? super T> consumer;
853 Subscription subscription;
854 ConsumerSubscriber(CompletableFuture<Void> status,
855 Consumer<? super T> consumer) {
856 this.status = status; this.consumer = consumer;
857 }
858 public final void onSubscribe(Subscription subscription) {
859 this.subscription = subscription;
860 status.whenComplete((v, e) -> subscription.cancel());
861 if (!status.isDone())
862 subscription.request(Long.MAX_VALUE);
863 }
864 public final void onError(Throwable ex) {
865 status.completeExceptionally(ex);
866 }
867 public final void onComplete() {
868 status.complete(null);
869 }
870 public final void onNext(T item) {
871 try {
872 consumer.accept(item);
873 } catch (Throwable ex) {
874 subscription.cancel();
875 status.completeExceptionally(ex);
876 }
877 }
878 }
879
880 /**
881 * A task for consuming buffer items and signals, created and
882 * executed whenever they become available. A task consumes as
883 * many items/signals as possible before terminating, at which
884 * point another task is created when needed. The dual Runnable
885 * and ForkJoinTask declaration saves overhead when executed by
886 * ForkJoinPools, without impacting other kinds of Executors.
887 */
888 @SuppressWarnings("serial")
889 static final class ConsumerTask<T> extends ForkJoinTask<Void>
890 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
891 final BufferedSubscription<T> consumer;
892 ConsumerTask(BufferedSubscription<T> consumer) {
893 this.consumer = consumer;
894 }
895 public final Void getRawResult() { return null; }
896 public final void setRawResult(Void v) {}
897 public final boolean exec() { consumer.consume(); return false; }
898 public final void run() { consumer.consume(); }
899 }
900
901 /**
902 * A resizable array-based ring buffer with integrated control to
903 * start a consumer task whenever items are available. The buffer
904 * algorithm is specialized for the case of at most one concurrent
905 * producer and consumer, and power of two buffer sizes. It relies
906 * primarily on atomic operations (CAS or getAndSet) at the next
907 * array slot to put or take an element, at the "tail" and "head"
908 * indices written only by the producer and consumer respectively.
909 *
910 * We ensure internally that there is at most one active consumer
911 * task at any given time. The publisher guarantees a single
912 * producer via its lock. Sync among producers and consumers
913 * relies on volatile fields "ctl", "demand", and "waiting" (along
914 * with element access). Other variables are accessed in plain
915 * mode, relying on outer ordering and exclusion, and/or enclosing
916 * them within other volatile accesses. Some atomic operations are
917 * avoided by tracking single threaded ownership by producers (in
918 * the style of biased locking).
919 *
920 * Execution control and protocol state are managed using field
921 * "ctl". Methods to subscribe, close, request, and cancel set
922 * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
923 * and ensure that a task is running. (The corresponding consumer
924 * side actions are in method consume.) To avoid starting a new
925 * task on each action, ctl also includes a keep-alive bit
926 * (ACTIVE) that is refreshed if needed on producer actions.
927 * (Maintaining agreement about keep-alives requires most atomic
928 * updates to be full SC/Volatile strength, which is still much
929 * cheaper than using one task per item.) Error signals
930 * additionally null out items and/or fields to reduce termination
931 * latency. The cancel() method is supported by treating as ERROR
932 * but suppressing onError signal.
933 *
934 * Support for blocking also exploits the fact that there is only
935 * one possible waiter. ManagedBlocker-compatible control fields
936 * are placed in this class itself rather than in wait-nodes.
937 * Blocking control relies on the "waiting" and "waiter"
938 * fields. Producers set them before trying to block. Signalling
939 * unparks and clears fields. If the producer and/or consumer are
940 * using a ForkJoinPool, the producer attempts to help run
941 * consumer tasks via ForkJoinPool.helpAsyncBlocker before
942 * blocking.
943 *
944 * Usages of this class may encounter any of several forms of
945 * memory contention. We try to ameliorate across them without
946 * unduly impacting footprints in low-contention usages where it
947 * isn't needed. Buffer arrays start out small and grow only as
948 * needed. The class uses @Contended and heuristic field
949 * declaration ordering to reduce false-sharing memory contention
950 * across instances of BufferedSubscription (as in, multiple
951 * subscribers per publisher). We additionally segregate some
952 * fields that would otherwise nearly always encounter cache line
953 * contention among producers and consumers. To reduce contention
954 * across time (vs space), consumers only periodically update
955 * other fields (see method takeItems), at the expense of possibly
956 * staler reporting of lags and demand (bounded at 12.5% == 1/8
957 * capacity) and possibly more atomic operations.
958 *
959 * Other forms of imbalance and slowdowns can occur during startup
960 * when producer and consumer methods are compiled and/or memory
961 * is allocated at different rates. This is ameliorated by
962 * artificially subdividing some consumer methods, including
963 * isolation of all subscriber callbacks. This code also includes
964 * typical power-of-two array screening idioms to avoid compilers
965 * generating traps, along with the usual SSA-based inline
966 * assignment coding style. Also, all methods and fields have
967 * default visibility to simplify usage by callers.
968 */
969 @SuppressWarnings("serial")
970 @jdk.internal.vm.annotation.Contended
971 static final class BufferedSubscription<T>
972 implements Subscription, ForkJoinPool.ManagedBlocker {
973 // Order-sensitive field declarations
974 long timeout; // Long.MAX_VALUE if untimed wait
975 int head; // next position to take
976 int tail; // next position to put
977 final int maxCapacity; // max buffer size
978 volatile int ctl; // atomic run state flags
979 Object[] array; // buffer
980 final Subscriber<? super T> subscriber;
981 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
982 Executor executor; // null on error
983 Thread waiter; // blocked producer thread
984 Throwable pendingError; // holds until onError issued
985 BufferedSubscription<T> next; // used only by publisher
986 BufferedSubscription<T> nextRetry; // used only by publisher
987
988 @jdk.internal.vm.annotation.Contended("c") // segregate
989 volatile long demand; // # unfilled requests
990 @jdk.internal.vm.annotation.Contended("c")
991 volatile int waiting; // nonzero if producer blocked
992
993 // ctl bit values
994 static final int CLOSED = 0x01; // if set, other bits ignored
995 static final int ACTIVE = 0x02; // keep-alive for consumer task
996 static final int REQS = 0x04; // (possibly) nonzero demand
997 static final int ERROR = 0x08; // issues onError when noticed
998 static final int COMPLETE = 0x10; // issues onComplete when done
999 static final int RUN = 0x20; // task is or will be running
1000 static final int OPEN = 0x40; // true after subscribe
1001
1002 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1003
1004 BufferedSubscription(Subscriber<? super T> subscriber,
1005 Executor executor,
1006 BiConsumer<? super Subscriber<? super T>,
1007 ? super Throwable> onNextHandler,
1008 Object[] array,
1009 int maxBufferCapacity) {
1010 this.subscriber = subscriber;
1011 this.executor = executor;
1012 this.onNextHandler = onNextHandler;
1013 this.array = array;
1014 this.maxCapacity = maxBufferCapacity;
1015 }
1016
1017 // Wrappers for some VarHandle methods
1018
1019 final boolean weakCasCtl(int cmp, int val) {
1020 return CTL.weakCompareAndSet(this, cmp, val);
1021 }
1022
1023 final int getAndBitwiseOrCtl(int bits) {
1024 return (int)CTL.getAndBitwiseOr(this, bits);
1025 }
1026
1027 final long subtractDemand(int k) {
1028 long n = (long)(-k);
1029 return n + (long)DEMAND.getAndAdd(this, n);
1030 }
1031
1032 final boolean casDemand(long cmp, long val) {
1033 return DEMAND.compareAndSet(this, cmp, val);
1034 }
1035
1036 // Utilities used by SubmissionPublisher
1037
1038 /**
1039 * Returns true if closed (consumer task may still be running).
1040 */
1041 final boolean isClosed() {
1042 return (ctl & CLOSED) != 0;
1043 }
1044
1045 /**
1046 * Returns estimated number of buffered items, or negative if
1047 * closed.
1048 */
1049 final int estimateLag() {
1050 int c = ctl, n = tail - head;
1051 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1052 }
1053
1054 // Methods for submitting items
1055
1056 /**
1057 * Tries to add item and start consumer task if necessary.
1058 * @return negative if closed, 0 if saturated, else estimated lag
1059 */
1060 final int offer(T item, boolean unowned) {
1061 Object[] a;
1062 int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1063 int t = tail, i = t & (cap - 1), n = t + 1 - head;
1064 if (cap > 0) {
1065 boolean added;
1066 if (n >= cap && cap < maxCapacity) // resize
1067 added = growAndoffer(item, a, t);
1068 else if (n >= cap || unowned) // need volatile CAS
1069 added = QA.compareAndSet(a, i, null, item);
1070 else { // can use release mode
1071 QA.setRelease(a, i, item);
1072 added = true;
1073 }
1074 if (added) {
1075 tail = t + 1;
1076 stat = n;
1077 }
1078 }
1079 return startOnOffer(stat);
1080 }
1081
1082 /**
1083 * Tries to expand buffer and add item, returning true on
1084 * success. Currently fails only if out of memory.
1085 */
1086 final boolean growAndoffer(T item, Object[] a, int t) {
1087 int cap = 0, newCap = 0;
1088 Object[] newArray = null;
1089 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1090 try {
1091 newArray = new Object[newCap];
1092 } catch (OutOfMemoryError ex) {
1093 }
1094 }
1095 if (newArray == null)
1096 return false;
1097 else { // take and move items
1098 int newMask = newCap - 1;
1099 newArray[t-- & newMask] = item;
1100 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1101 Object x = QA.getAndSet(a, t & mask, null);
1102 if (x == null)
1103 break; // already consumed
1104 else
1105 newArray[t-- & newMask] = x;
1106 }
1107 array = newArray;
1108 VarHandle.releaseFence(); // release array and slots
1109 return true;
1110 }
1111 }
1112
1113 /**
1114 * Version of offer for retries (no resize or bias)
1115 */
1116 final int retryOffer(T item) {
1117 Object[] a;
1118 int stat = 0, t = tail, h = head, cap;
1119 if ((a = array) != null && (cap = a.length) > 0 &&
1120 QA.compareAndSet(a, (cap - 1) & t, null, item))
1121 stat = (tail = t + 1) - h;
1122 return startOnOffer(stat);
1123 }
1124
1125 /**
1126 * Tries to start consumer task after offer.
1127 * @return negative if now closed, else argument
1128 */
1129 final int startOnOffer(int stat) {
1130 int c; // start or keep alive if requests exist and not active
1131 if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1132 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1133 tryStart();
1134 else if ((c & CLOSED) != 0)
1135 stat = -1;
1136 return stat;
1137 }
1138
1139 /**
1140 * Tries to start consumer task. Sets error state on failure.
1141 */
1142 final void tryStart() {
1143 try {
1144 Executor e;
1145 ConsumerTask<T> task = new ConsumerTask<T>(this);
1146 if ((e = executor) != null) // skip if disabled on error
1147 e.execute(task);
1148 } catch (RuntimeException | Error ex) {
1149 getAndBitwiseOrCtl(ERROR | CLOSED);
1150 throw ex;
1151 }
1152 }
1153
1154 // Signals to consumer tasks
1155
1156 /**
1157 * Sets the given control bits, starting task if not running or closed.
1158 * @param bits state bits, assumed to include RUN but not CLOSED
1159 */
1160 final void startOnSignal(int bits) {
1161 if ((getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1162 tryStart();
1163 }
1164
1165 final void onSubscribe() {
1166 startOnSignal(RUN | ACTIVE);
1167 }
1168
1169 final void onComplete() {
1170 startOnSignal(RUN | ACTIVE | COMPLETE);
1171 }
1172
1173 final void onError(Throwable ex) {
1174 int c; Object[] a; // to null out buffer on async error
1175 if (ex != null)
1176 pendingError = ex; // races are OK
1177 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1178 if ((c & RUN) == 0)
1179 tryStart();
1180 else if ((a = array) != null)
1181 Arrays.fill(a, null);
1182 }
1183 }
1184
1185 public final void cancel() {
1186 onError(null);
1187 }
1188
1189 public final void request(long n) {
1190 if (n > 0L) {
1191 for (;;) {
1192 long p = demand, d = p + n; // saturate
1193 if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1194 break;
1195 }
1196 if ((ctl & (RUN | ACTIVE | REQS)) != (RUN | ACTIVE | REQS))
1197 startOnSignal(RUN | ACTIVE | REQS);
1198 }
1199 else
1200 onError(new IllegalArgumentException(
1201 "non-positive subscription request"));
1202 }
1203
1204 // Consumer task actions
1205
1206 /**
1207 * Consumer loop, called from ConsumerTask, or indirectly when
1208 * helping during submit.
1209 */
1210 final void consume() {
1211 Subscriber<? super T> s;
1212 if ((s = subscriber) != null) { // hoist checks
1213 subscribeOnOpen(s);
1214 long d = demand;
1215 for (int h = head, t = tail;;) {
1216 int c, taken; boolean empty;
1217 if (((c = ctl) & ERROR) != 0) {
1218 closeOnError(s, null);
1219 break;
1220 }
1221 else if ((taken = takeItems(s, d, h)) > 0) {
1222 head = h += taken;
1223 d = subtractDemand(taken);
1224 }
1225 else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1226 closeOnComplete(s); // end of stream
1227 break;
1228 }
1229 else if ((d = demand) == 0L && (c & REQS) != 0)
1230 weakCasCtl(c, c & ~REQS); // exhausted demand
1231 else if (d != 0L && (c & REQS) == 0)
1232 weakCasCtl(c, c | REQS); // new demand
1233 else if (t == (t = tail) && (empty || d == 0L)) {
1234 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1235 if (weakCasCtl(c, c & ~bit) && bit == RUN)
1236 break; // un-keep-alive or exit
1237 }
1238 }
1239 }
1240 }
1241
1242 /**
1243 * Consumes some items until unavailable or bound or error.
1244 *
1245 * @param s subscriber
1246 * @param d current demand
1247 * @param h current head
1248 * @return number taken
1249 */
1250 final int takeItems(Subscriber<? super T> s, long d, int h) {
1251 Object[] a;
1252 int k = 0, cap;
1253 if ((a = array) != null && (cap = a.length) > 0) {
1254 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1255 int n = (d < (long)b) ? (int)d : b;
1256 for (; k < n; ++h, ++k) {
1257 Object x = QA.getAndSet(a, h & m, null);
1258 if (waiting != 0)
1259 signalWaiter();
1260 if (x == null)
1261 break;
1262 else if (!consumeNext(s, x))
1263 break;
1264 }
1265 }
1266 return k;
1267 }
1268
1269 final boolean consumeNext(Subscriber<? super T> s, Object x) {
1270 try {
1271 @SuppressWarnings("unchecked") T y = (T) x;
1272 if (s != null)
1273 s.onNext(y);
1274 return true;
1275 } catch (Throwable ex) {
1276 handleOnNext(s, ex);
1277 return false;
1278 }
1279 }
1280
1281 /**
1282 * Processes exception in Subscriber.onNext.
1283 */
1284 final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1285 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1286 try {
1287 if ((h = onNextHandler) != null)
1288 h.accept(s, ex);
1289 } catch (Throwable ignore) {
1290 }
1291 closeOnError(s, ex);
1292 }
1293
1294 /**
1295 * Issues subscriber.onSubscribe if this is first signal.
1296 */
1297 final void subscribeOnOpen(Subscriber<? super T> s) {
1298 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1299 consumeSubscribe(s);
1300 }
1301
1302 final void consumeSubscribe(Subscriber<? super T> s) {
1303 try {
1304 if (s != null) // ignore if disabled
1305 s.onSubscribe(this);
1306 } catch (Throwable ex) {
1307 closeOnError(s, ex);
1308 }
1309 }
1310
1311 /**
1312 * Issues subscriber.onComplete unless already closed.
1313 */
1314 final void closeOnComplete(Subscriber<? super T> s) {
1315 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1316 consumeComplete(s);
1317 }
1318
1319 final void consumeComplete(Subscriber<? super T> s) {
1320 try {
1321 if (s != null)
1322 s.onComplete();
1323 } catch (Throwable ignore) {
1324 }
1325 }
1326
1327 /**
1328 * Issues subscriber.onError, and unblocks producer if needed.
1329 */
1330 final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1331 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1332 if (ex == null)
1333 ex = pendingError;
1334 pendingError = null; // detach
1335 executor = null; // suppress racing start calls
1336 signalWaiter();
1337 consumeError(s, ex);
1338 }
1339 }
1340
1341 final void consumeError(Subscriber<? super T> s, Throwable ex) {
1342 try {
1343 if (ex != null && s != null)
1344 s.onError(ex);
1345 } catch (Throwable ignore) {
1346 }
1347 }
1348
1349 // Blocking support
1350
1351 /**
1352 * Unblocks waiting producer.
1353 */
1354 final void signalWaiter() {
1355 Thread w;
1356 waiting = 0;
1357 if ((w = waiter) != null)
1358 LockSupport.unpark(w);
1359 }
1360
1361 /**
1362 * Returns true if closed or space available.
1363 * For ManagedBlocker.
1364 */
1365 public final boolean isReleasable() {
1366 Object[] a; int cap;
1367 return ((ctl & CLOSED) != 0 ||
1368 ((a = array) != null && (cap = a.length) > 0 &&
1369 QA.getAcquire(a, (cap - 1) & tail) == null));
1370 }
1371
1372 /**
1373 * Helps or blocks until timeout, closed, or space available.
1374 */
1375 final void awaitSpace(long nanos) {
1376 if (!isReleasable()) {
1377 ForkJoinPool.helpAsyncBlocker(executor, this);
1378 if (!isReleasable()) {
1379 timeout = nanos;
1380 try {
1381 ForkJoinPool.managedBlock(this);
1382 } catch (InterruptedException ie) {
1383 timeout = INTERRUPTED;
1384 }
1385 if (timeout == INTERRUPTED)
1386 Thread.currentThread().interrupt();
1387 }
1388 }
1389 }
1390
1391 /**
1392 * Blocks until closed, space available or timeout.
1393 * For ManagedBlocker.
1394 */
1395 public final boolean block() {
1396 long nanos = timeout;
1397 boolean timed = (nanos < Long.MAX_VALUE);
1398 long deadline = timed ? System.nanoTime() + nanos : 0L;
1399 while (!isReleasable()) {
1400 if (Thread.interrupted()) {
1401 timeout = INTERRUPTED;
1402 if (timed)
1403 break;
1404 }
1405 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1406 break;
1407 else if (waiter == null)
1408 waiter = Thread.currentThread();
1409 else if (waiting == 0)
1410 waiting = 1;
1411 else if (timed)
1412 LockSupport.parkNanos(this, nanos);
1413 else
1414 LockSupport.park(this);
1415 }
1416 waiter = null;
1417 waiting = 0;
1418 return true;
1419 }
1420
1421 // VarHandle mechanics
1422 static final VarHandle CTL;
1423 static final VarHandle DEMAND;
1424 static final VarHandle QA;
1425
1426 static {
1427 try {
1428 MethodHandles.Lookup l = MethodHandles.lookup();
1429 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1430 int.class);
1431 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1432 long.class);
1433 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1434 } catch (ReflectiveOperationException e) {
1435 throw new Error(e);
1436 }
1437
1438 // Reduce the risk of rare disastrous classloading in first call to
1439 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1440 Class<?> ensureLoaded = LockSupport.class;
1441 }
1442 }
1443 }