ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.73
Committed: Sun Nov 26 20:35:30 2017 UTC (6 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.72: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.function.BiConsumer;
16 import java.util.function.BiPredicate;
17 import java.util.function.Consumer;
18 import static java.util.concurrent.Flow.*; // improve code readablitly
19
20 /**
21 * A {@link Flow.Publisher} that asynchronously issues submitted
22 * (non-null) items to current subscribers until it is closed. Each
23 * current subscriber receives newly submitted items in the same order
24 * unless drops or exceptions are encountered. Using a
25 * SubmissionPublisher allows item generators to act as compliant <a
26 * href="http://www.reactive-streams.org/"> reactive-streams</a>
27 * Publishers relying on drop handling and/or blocking for flow
28 * control.
29 *
30 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
31 * constructor for delivery to subscribers. The best choice of
32 * Executor depends on expected usage. If the generator(s) of
33 * submitted items run in separate threads, and the number of
34 * subscribers can be estimated, consider using a {@link
35 * Executors#newFixedThreadPool}. Otherwise consider using the
36 * default, normally the {@link ForkJoinPool#commonPool}.
37 *
38 * <p>Buffering allows producers and consumers to transiently operate
39 * at different rates. Each subscriber uses an independent buffer.
40 * Buffers are created upon first use and expanded as needed up to the
41 * given maximum. (The enforced capacity may be rounded up to the
42 * nearest power of two and/or bounded by the largest value supported
43 * by this implementation.) Invocations of {@link
44 * Flow.Subscription#request(long) request} do not directly result in
45 * buffer expansion, but risk saturation if unfilled requests exceed
46 * the maximum capacity. The default value of {@link
47 * Flow#defaultBufferSize()} may provide a useful starting point for
48 * choosing a capacity based on expected rates, resources, and usages.
49 *
50 * <p>A single SubmissionPublisher may be shared among multiple
51 * sources. Actions in a source thread prior to publishing an item or
52 * issuing a signal <a href="package-summary.html#MemoryVisibility">
53 * <i>happen-before</i></a> actions subsequent to the corresponding
54 * access by each subscriber. But reported estimates of lag and demand
55 * are designed for use in monitoring, not for synchronization
56 * control, and may reflect stale or inaccurate views of progress.
57 *
58 * <p>Publication methods support different policies about what to do
59 * when buffers are saturated. Method {@link #submit(Object) submit}
60 * blocks until resources are available. This is simplest, but least
61 * responsive. The {@code offer} methods may drop items (either
62 * immediately or with bounded timeout), but provide an opportunity to
63 * interpose a handler and then retry.
64 *
65 * <p>If any Subscriber method throws an exception, its subscription
66 * is cancelled. If a handler is supplied as a constructor argument,
67 * it is invoked before cancellation upon an exception in method
68 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
69 * {@link Flow.Subscriber#onSubscribe onSubscribe},
70 * {@link Flow.Subscriber#onError(Throwable) onError} and
71 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
72 * handled before cancellation. If the supplied Executor throws
73 * {@link RejectedExecutionException} (or any other RuntimeException
74 * or Error) when attempting to execute a task, or a drop handler
75 * throws an exception when processing a dropped item, then the
76 * exception is rethrown. In these cases, not all subscribers will
77 * have been issued the published item. It is usually good practice to
78 * {@link #closeExceptionally closeExceptionally} in these cases.
79 *
80 * <p>Method {@link #consume(Consumer)} simplifies support for a
81 * common case in which the only action of a subscriber is to request
82 * and process all items using a supplied function.
83 *
84 * <p>This class may also serve as a convenient base for subclasses
85 * that generate items, and use the methods in this class to publish
86 * them. For example here is a class that periodically publishes the
87 * items generated from a supplier. (In practice you might add methods
88 * to independently start and stop generation, to share Executors
89 * among publishers, and so on, or use a SubmissionPublisher as a
90 * component rather than a superclass.)
91 *
92 * <pre> {@code
93 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
94 * final ScheduledFuture<?> periodicTask;
95 * final ScheduledExecutorService scheduler;
96 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
97 * Supplier<? extends T> supplier,
98 * long period, TimeUnit unit) {
99 * super(executor, maxBufferCapacity);
100 * scheduler = new ScheduledThreadPoolExecutor(1);
101 * periodicTask = scheduler.scheduleAtFixedRate(
102 * () -> submit(supplier.get()), 0, period, unit);
103 * }
104 * public void close() {
105 * periodicTask.cancel(false);
106 * scheduler.shutdown();
107 * super.close();
108 * }
109 * }}</pre>
110 *
111 * <p>Here is an example of a {@link Flow.Processor} implementation.
112 * It uses single-step requests to its publisher for simplicity of
113 * illustration. A more adaptive version could monitor flow using the
114 * lag estimate returned from {@code submit}, along with other utility
115 * methods.
116 *
117 * <pre> {@code
118 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
119 * implements Flow.Processor<S,T> {
120 * final Function<? super S, ? extends T> function;
121 * Flow.Subscription subscription;
122 * TransformProcessor(Executor executor, int maxBufferCapacity,
123 * Function<? super S, ? extends T> function) {
124 * super(executor, maxBufferCapacity);
125 * this.function = function;
126 * }
127 * public void onSubscribe(Flow.Subscription subscription) {
128 * (this.subscription = subscription).request(1);
129 * }
130 * public void onNext(S item) {
131 * subscription.request(1);
132 * submit(function.apply(item));
133 * }
134 * public void onError(Throwable ex) { closeExceptionally(ex); }
135 * public void onComplete() { close(); }
136 * }}</pre>
137 *
138 * @param <T> the published item type
139 * @author Doug Lea
140 * @since 9
141 */
142 public class SubmissionPublisher<T> implements Publisher<T>,
143 AutoCloseable {
144 /*
145 * Most mechanics are handled by BufferedSubscription. This class
146 * mainly tracks subscribers and ensures sequentiality, by using
147 * built-in synchronization locks across public methods. Using
148 * built-in locks works well in the most typical case in which
149 * only one thread submits items. We extend this idea in
150 * submission methods by detecting single-ownership to reduce
151 * producer-consumer synchronization strength.
152 */
153
154 /** The largest possible power of two array size. */
155 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
156
157 /**
158 * Initial buffer capacity used when maxBufferCapacity is
159 * greater. Must be a power of two.
160 */
161 static final int INITIAL_CAPACITY = 32;
162
163 /** Round capacity to power of 2, at most limit. */
164 static final int roundCapacity(int cap) {
165 int n = cap - 1;
166 n |= n >>> 1;
167 n |= n >>> 2;
168 n |= n >>> 4;
169 n |= n >>> 8;
170 n |= n >>> 16;
171 return (n <= 0) ? 1 : // at least 1
172 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
173 }
174
175 // default Executor setup; nearly the same as CompletableFuture
176
177 /**
178 * Default executor -- ForkJoinPool.commonPool() unless it cannot
179 * support parallelism.
180 */
181 private static final Executor ASYNC_POOL =
182 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
183 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
184
185 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
186 private static final class ThreadPerTaskExecutor implements Executor {
187 ThreadPerTaskExecutor() {} // prevent access constructor creation
188 public void execute(Runnable r) { new Thread(r).start(); }
189 }
190
191 /**
192 * Clients (BufferedSubscriptions) are maintained in a linked list
193 * (via their "next" fields). This works well for publish loops.
194 * It requires O(n) traversal to check for duplicate subscribers,
195 * but we expect that subscribing is much less common than
196 * publishing. Unsubscribing occurs only during traversal loops,
197 * when BufferedSubscription methods return negative values
198 * signifying that they have been closed. To reduce
199 * head-of-line blocking, submit and offer methods first call
200 * BufferedSubscription.offer on each subscriber, and place
201 * saturated ones in retries list (using nextRetry field), and
202 * retry, possibly blocking or dropping.
203 */
204 BufferedSubscription<T> clients;
205
206 /** Run status, updated only within locks */
207 volatile boolean closed;
208 /** Set true on first call to subscribe, to initialize possible owner */
209 boolean subscribed;
210 /** The first caller thread to subscribe, or null if thread ever changed */
211 Thread owner;
212 /** If non-null, the exception in closeExceptionally */
213 volatile Throwable closedException;
214
215 // Parameters for constructing BufferedSubscriptions
216 final Executor executor;
217 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
218 final int maxBufferCapacity;
219
220 /**
221 * Creates a new SubmissionPublisher using the given Executor for
222 * async delivery to subscribers, with the given maximum buffer size
223 * for each subscriber, and, if non-null, the given handler invoked
224 * when any Subscriber throws an exception in method {@link
225 * Flow.Subscriber#onNext(Object) onNext}.
226 *
227 * @param executor the executor to use for async delivery,
228 * supporting creation of at least one independent thread
229 * @param maxBufferCapacity the maximum capacity for each
230 * subscriber's buffer (the enforced capacity may be rounded up to
231 * the nearest power of two and/or bounded by the largest value
232 * supported by this implementation; method {@link #getMaxBufferCapacity}
233 * returns the actual value)
234 * @param handler if non-null, procedure to invoke upon exception
235 * thrown in method {@code onNext}
236 * @throws NullPointerException if executor is null
237 * @throws IllegalArgumentException if maxBufferCapacity not
238 * positive
239 */
240 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
241 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
242 if (executor == null)
243 throw new NullPointerException();
244 if (maxBufferCapacity <= 0)
245 throw new IllegalArgumentException("capacity must be positive");
246 this.executor = executor;
247 this.onNextHandler = handler;
248 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
249 }
250
251 /**
252 * Creates a new SubmissionPublisher using the given Executor for
253 * async delivery to subscribers, with the given maximum buffer size
254 * for each subscriber, and no handler for Subscriber exceptions in
255 * method {@link Flow.Subscriber#onNext(Object) onNext}.
256 *
257 * @param executor the executor to use for async delivery,
258 * supporting creation of at least one independent thread
259 * @param maxBufferCapacity the maximum capacity for each
260 * subscriber's buffer (the enforced capacity may be rounded up to
261 * the nearest power of two and/or bounded by the largest value
262 * supported by this implementation; method {@link #getMaxBufferCapacity}
263 * returns the actual value)
264 * @throws NullPointerException if executor is null
265 * @throws IllegalArgumentException if maxBufferCapacity not
266 * positive
267 */
268 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
269 this(executor, maxBufferCapacity, null);
270 }
271
272 /**
273 * Creates a new SubmissionPublisher using the {@link
274 * ForkJoinPool#commonPool()} for async delivery to subscribers
275 * (unless it does not support a parallelism level of at least two,
276 * in which case, a new Thread is created to run each task), with
277 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
278 * handler for Subscriber exceptions in method {@link
279 * Flow.Subscriber#onNext(Object) onNext}.
280 */
281 public SubmissionPublisher() {
282 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
283 }
284
285 /**
286 * Adds the given Subscriber unless already subscribed. If already
287 * subscribed, the Subscriber's {@link
288 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
289 * the existing subscription with an {@link IllegalStateException}.
290 * Otherwise, upon success, the Subscriber's {@link
291 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
292 * asynchronously with a new {@link Flow.Subscription}. If {@link
293 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
294 * subscription is cancelled. Otherwise, if this SubmissionPublisher
295 * was closed exceptionally, then the subscriber's {@link
296 * Flow.Subscriber#onError onError} method is invoked with the
297 * corresponding exception, or if closed without exception, the
298 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
299 * method is invoked. Subscribers may enable receiving items by
300 * invoking the {@link Flow.Subscription#request(long) request}
301 * method of the new Subscription, and may unsubscribe by invoking
302 * its {@link Flow.Subscription#cancel() cancel} method.
303 *
304 * @param subscriber the subscriber
305 * @throws NullPointerException if subscriber is null
306 */
307 public void subscribe(Subscriber<? super T> subscriber) {
308 if (subscriber == null) throw new NullPointerException();
309 int max = maxBufferCapacity; // allocate initial array
310 Object[] array = new Object[max < INITIAL_CAPACITY ?
311 max : INITIAL_CAPACITY];
312 BufferedSubscription<T> subscription =
313 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
314 array, max);
315 synchronized (this) {
316 if (!subscribed) {
317 subscribed = true;
318 owner = Thread.currentThread();
319 }
320 for (BufferedSubscription<T> b = clients, pred = null;;) {
321 if (b == null) {
322 Throwable ex;
323 subscription.onSubscribe();
324 if ((ex = closedException) != null)
325 subscription.onError(ex);
326 else if (closed)
327 subscription.onComplete();
328 else if (pred == null)
329 clients = subscription;
330 else
331 pred.next = subscription;
332 break;
333 }
334 BufferedSubscription<T> next = b.next;
335 if (b.isClosed()) { // remove
336 b.next = null; // detach
337 if (pred == null)
338 clients = next;
339 else
340 pred.next = next;
341 }
342 else if (subscriber.equals(b.subscriber)) {
343 b.onError(new IllegalStateException("Duplicate subscribe"));
344 break;
345 }
346 else
347 pred = b;
348 b = next;
349 }
350 }
351 }
352
353 /**
354 * Common implementation for all three forms of submit and offer.
355 * Acts as submit if nanos == Long.MAX_VALUE, else offer
356 */
357 private int doOffer(T item, long nanos,
358 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
359 if (item == null) throw new NullPointerException();
360 int lag = 0;
361 boolean complete, unowned;
362 synchronized (this) {
363 Thread t = Thread.currentThread(), o;
364 BufferedSubscription<T> b = clients;
365 if ((unowned = ((o = owner) != t)) && o != null)
366 owner = null; // disable bias
367 if (b == null)
368 complete = closed;
369 else {
370 complete = false;
371 boolean cleanMe = false;
372 BufferedSubscription<T> retries = null, next;
373 do {
374 next = b.next;
375 int stat = b.offer(item, unowned);
376 if (stat == 0) { // saturated
377 b.nextRetry = retries; // add to retry list
378 retries = b;
379 }
380 else if (stat < 0) // closed
381 cleanMe = true; // remove later
382 else if (stat > lag)
383 lag = stat;
384 } while ((b = next) != null);
385
386 if (retries != null || cleanMe)
387 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
388 }
389 }
390 if (complete)
391 throw new IllegalStateException("Closed");
392 else
393 return lag;
394 }
395
396 /**
397 * Helps, (timed) waits for, and/or drops buffers on list; returns
398 * lag or negative drops (for use in offer)
399 */
400 private int retryOffer(T item, long nanos,
401 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
402 BufferedSubscription<T> retries, int lag,
403 boolean cleanMe) {
404 for (BufferedSubscription<T> r = retries; r != null;) {
405 BufferedSubscription<T> nextRetry = r.nextRetry;
406 r.nextRetry = null;
407 if (nanos > 0L)
408 r.awaitSpace(nanos);
409 int stat = r.retryOffer(item);
410 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
411 stat = r.retryOffer(item);
412 if (stat == 0)
413 lag = (lag >= 0) ? -1 : lag - 1;
414 else if (stat < 0)
415 cleanMe = true;
416 else if (lag >= 0 && stat > lag)
417 lag = stat;
418 r = nextRetry;
419 }
420 if (cleanMe)
421 cleanAndCount();
422 return lag;
423 }
424
425 /**
426 * Returns current list count after removing closed subscribers.
427 * Call only while holding lock. Used mainly by retryOffer for
428 * cleanup.
429 */
430 private int cleanAndCount() {
431 int count = 0;
432 BufferedSubscription<T> pred = null, next;
433 for (BufferedSubscription<T> b = clients; b != null; b = next) {
434 next = b.next;
435 if (b.isClosed()) {
436 b.next = null;
437 if (pred == null)
438 clients = next;
439 else
440 pred.next = next;
441 }
442 else {
443 pred = b;
444 ++count;
445 }
446 }
447 return count;
448 }
449
450 /**
451 * Publishes the given item to each current subscriber by
452 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
453 * onNext} method, blocking uninterruptibly while resources for any
454 * subscriber are unavailable. This method returns an estimate of
455 * the maximum lag (number of items submitted but not yet consumed)
456 * among all current subscribers. This value is at least one
457 * (accounting for this submitted item) if there are any
458 * subscribers, else zero.
459 *
460 * <p>If the Executor for this publisher throws a
461 * RejectedExecutionException (or any other RuntimeException or
462 * Error) when attempting to asynchronously notify subscribers,
463 * then this exception is rethrown, in which case not all
464 * subscribers will have been issued this item.
465 *
466 * @param item the (non-null) item to publish
467 * @return the estimated maximum lag among subscribers
468 * @throws IllegalStateException if closed
469 * @throws NullPointerException if item is null
470 * @throws RejectedExecutionException if thrown by Executor
471 */
472 public int submit(T item) {
473 return doOffer(item, Long.MAX_VALUE, null);
474 }
475
476 /**
477 * Publishes the given item, if possible, to each current subscriber
478 * by asynchronously invoking its {@link
479 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
480 * dropped by one or more subscribers if resource limits are
481 * exceeded, in which case the given handler (if non-null) is
482 * invoked, and if it returns true, retried once. Other calls to
483 * methods in this class by other threads are blocked while the
484 * handler is invoked. Unless recovery is assured, options are
485 * usually limited to logging the error and/or issuing an {@link
486 * Flow.Subscriber#onError(Throwable) onError} signal to the
487 * subscriber.
488 *
489 * <p>This method returns a status indicator: If negative, it
490 * represents the (negative) number of drops (failed attempts to
491 * issue the item to a subscriber). Otherwise it is an estimate of
492 * the maximum lag (number of items submitted but not yet
493 * consumed) among all current subscribers. This value is at least
494 * one (accounting for this submitted item) if there are any
495 * subscribers, else zero.
496 *
497 * <p>If the Executor for this publisher throws a
498 * RejectedExecutionException (or any other RuntimeException or
499 * Error) when attempting to asynchronously notify subscribers, or
500 * the drop handler throws an exception when processing a dropped
501 * item, then this exception is rethrown.
502 *
503 * @param item the (non-null) item to publish
504 * @param onDrop if non-null, the handler invoked upon a drop to a
505 * subscriber, with arguments of the subscriber and item; if it
506 * returns true, an offer is re-attempted (once)
507 * @return if negative, the (negative) number of drops; otherwise
508 * an estimate of maximum lag
509 * @throws IllegalStateException if closed
510 * @throws NullPointerException if item is null
511 * @throws RejectedExecutionException if thrown by Executor
512 */
513 public int offer(T item,
514 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
515 return doOffer(item, 0L, onDrop);
516 }
517
518 /**
519 * Publishes the given item, if possible, to each current subscriber
520 * by asynchronously invoking its {@link
521 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
522 * resources for any subscription are unavailable, up to the
523 * specified timeout or until the caller thread is interrupted, at
524 * which point the given handler (if non-null) is invoked, and if it
525 * returns true, retried once. (The drop handler may distinguish
526 * timeouts from interrupts by checking whether the current thread
527 * is interrupted.) Other calls to methods in this class by other
528 * threads are blocked while the handler is invoked. Unless
529 * recovery is assured, options are usually limited to logging the
530 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
531 * onError} signal to the subscriber.
532 *
533 * <p>This method returns a status indicator: If negative, it
534 * represents the (negative) number of drops (failed attempts to
535 * issue the item to a subscriber). Otherwise it is an estimate of
536 * the maximum lag (number of items submitted but not yet
537 * consumed) among all current subscribers. This value is at least
538 * one (accounting for this submitted item) if there are any
539 * subscribers, else zero.
540 *
541 * <p>If the Executor for this publisher throws a
542 * RejectedExecutionException (or any other RuntimeException or
543 * Error) when attempting to asynchronously notify subscribers, or
544 * the drop handler throws an exception when processing a dropped
545 * item, then this exception is rethrown.
546 *
547 * @param item the (non-null) item to publish
548 * @param timeout how long to wait for resources for any subscriber
549 * before giving up, in units of {@code unit}
550 * @param unit a {@code TimeUnit} determining how to interpret the
551 * {@code timeout} parameter
552 * @param onDrop if non-null, the handler invoked upon a drop to a
553 * subscriber, with arguments of the subscriber and item; if it
554 * returns true, an offer is re-attempted (once)
555 * @return if negative, the (negative) number of drops; otherwise
556 * an estimate of maximum lag
557 * @throws IllegalStateException if closed
558 * @throws NullPointerException if item is null
559 * @throws RejectedExecutionException if thrown by Executor
560 */
561 public int offer(T item, long timeout, TimeUnit unit,
562 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
563 long nanos = unit.toNanos(timeout);
564 // distinguishes from untimed (only wrt interrupt policy)
565 if (nanos == Long.MAX_VALUE) --nanos;
566 return doOffer(item, nanos, onDrop);
567 }
568
569 /**
570 * Unless already closed, issues {@link
571 * Flow.Subscriber#onComplete() onComplete} signals to current
572 * subscribers, and disallows subsequent attempts to publish.
573 * Upon return, this method does <em>NOT</em> guarantee that all
574 * subscribers have yet completed.
575 */
576 public void close() {
577 if (!closed) {
578 BufferedSubscription<T> b;
579 synchronized (this) {
580 // no need to re-check closed here
581 b = clients;
582 clients = null;
583 owner = null;
584 closed = true;
585 }
586 while (b != null) {
587 BufferedSubscription<T> next = b.next;
588 b.next = null;
589 b.onComplete();
590 b = next;
591 }
592 }
593 }
594
595 /**
596 * Unless already closed, issues {@link
597 * Flow.Subscriber#onError(Throwable) onError} signals to current
598 * subscribers with the given error, and disallows subsequent
599 * attempts to publish. Future subscribers also receive the given
600 * error. Upon return, this method does <em>NOT</em> guarantee
601 * that all subscribers have yet completed.
602 *
603 * @param error the {@code onError} argument sent to subscribers
604 * @throws NullPointerException if error is null
605 */
606 public void closeExceptionally(Throwable error) {
607 if (error == null)
608 throw new NullPointerException();
609 if (!closed) {
610 BufferedSubscription<T> b;
611 synchronized (this) {
612 b = clients;
613 if (!closed) { // don't clobber racing close
614 closedException = error;
615 clients = null;
616 owner = null;
617 closed = true;
618 }
619 }
620 while (b != null) {
621 BufferedSubscription<T> next = b.next;
622 b.next = null;
623 b.onError(error);
624 b = next;
625 }
626 }
627 }
628
629 /**
630 * Returns true if this publisher is not accepting submissions.
631 *
632 * @return true if closed
633 */
634 public boolean isClosed() {
635 return closed;
636 }
637
638 /**
639 * Returns the exception associated with {@link
640 * #closeExceptionally(Throwable) closeExceptionally}, or null if
641 * not closed or if closed normally.
642 *
643 * @return the exception, or null if none
644 */
645 public Throwable getClosedException() {
646 return closedException;
647 }
648
649 /**
650 * Returns true if this publisher has any subscribers.
651 *
652 * @return true if this publisher has any subscribers
653 */
654 public boolean hasSubscribers() {
655 boolean nonEmpty = false;
656 synchronized (this) {
657 for (BufferedSubscription<T> b = clients; b != null;) {
658 BufferedSubscription<T> next = b.next;
659 if (b.isClosed()) {
660 b.next = null;
661 b = clients = next;
662 }
663 else {
664 nonEmpty = true;
665 break;
666 }
667 }
668 }
669 return nonEmpty;
670 }
671
672 /**
673 * Returns the number of current subscribers.
674 *
675 * @return the number of current subscribers
676 */
677 public int getNumberOfSubscribers() {
678 int count;
679 synchronized (this) {
680 count = cleanAndCount();
681 }
682 return count;
683 }
684
685 /**
686 * Returns the Executor used for asynchronous delivery.
687 *
688 * @return the Executor used for asynchronous delivery
689 */
690 public Executor getExecutor() {
691 return executor;
692 }
693
694 /**
695 * Returns the maximum per-subscriber buffer capacity.
696 *
697 * @return the maximum per-subscriber buffer capacity
698 */
699 public int getMaxBufferCapacity() {
700 return maxBufferCapacity;
701 }
702
703 /**
704 * Returns a list of current subscribers for monitoring and
705 * tracking purposes, not for invoking {@link Flow.Subscriber}
706 * methods on the subscribers.
707 *
708 * @return list of current subscribers
709 */
710 public List<Subscriber<? super T>> getSubscribers() {
711 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
712 synchronized (this) {
713 BufferedSubscription<T> pred = null, next;
714 for (BufferedSubscription<T> b = clients; b != null; b = next) {
715 next = b.next;
716 if (b.isClosed()) {
717 b.next = null;
718 if (pred == null)
719 clients = next;
720 else
721 pred.next = next;
722 }
723 else
724 subs.add(b.subscriber);
725 }
726 }
727 return subs;
728 }
729
730 /**
731 * Returns true if the given Subscriber is currently subscribed.
732 *
733 * @param subscriber the subscriber
734 * @return true if currently subscribed
735 * @throws NullPointerException if subscriber is null
736 */
737 public boolean isSubscribed(Subscriber<? super T> subscriber) {
738 if (subscriber == null) throw new NullPointerException();
739 if (!closed) {
740 synchronized (this) {
741 BufferedSubscription<T> pred = null, next;
742 for (BufferedSubscription<T> b = clients; b != null; b = next) {
743 next = b.next;
744 if (b.isClosed()) {
745 b.next = null;
746 if (pred == null)
747 clients = next;
748 else
749 pred.next = next;
750 }
751 else if (subscriber.equals(b.subscriber))
752 return true;
753 else
754 pred = b;
755 }
756 }
757 }
758 return false;
759 }
760
761 /**
762 * Returns an estimate of the minimum number of items requested
763 * (via {@link Flow.Subscription#request(long) request}) but not
764 * yet produced, among all current subscribers.
765 *
766 * @return the estimate, or zero if no subscribers
767 */
768 public long estimateMinimumDemand() {
769 long min = Long.MAX_VALUE;
770 boolean nonEmpty = false;
771 synchronized (this) {
772 BufferedSubscription<T> pred = null, next;
773 for (BufferedSubscription<T> b = clients; b != null; b = next) {
774 int n; long d;
775 next = b.next;
776 if ((n = b.estimateLag()) < 0) {
777 b.next = null;
778 if (pred == null)
779 clients = next;
780 else
781 pred.next = next;
782 }
783 else {
784 if ((d = b.demand - n) < min)
785 min = d;
786 nonEmpty = true;
787 pred = b;
788 }
789 }
790 }
791 return nonEmpty ? min : 0;
792 }
793
794 /**
795 * Returns an estimate of the maximum number of items produced but
796 * not yet consumed among all current subscribers.
797 *
798 * @return the estimate
799 */
800 public int estimateMaximumLag() {
801 int max = 0;
802 synchronized (this) {
803 BufferedSubscription<T> pred = null, next;
804 for (BufferedSubscription<T> b = clients; b != null; b = next) {
805 int n;
806 next = b.next;
807 if ((n = b.estimateLag()) < 0) {
808 b.next = null;
809 if (pred == null)
810 clients = next;
811 else
812 pred.next = next;
813 }
814 else {
815 if (n > max)
816 max = n;
817 pred = b;
818 }
819 }
820 }
821 return max;
822 }
823
824 /**
825 * Processes all published items using the given Consumer function.
826 * Returns a CompletableFuture that is completed normally when this
827 * publisher signals {@link Flow.Subscriber#onComplete()
828 * onComplete}, or completed exceptionally upon any error, or an
829 * exception is thrown by the Consumer, or the returned
830 * CompletableFuture is cancelled, in which case no further items
831 * are processed.
832 *
833 * @param consumer the function applied to each onNext item
834 * @return a CompletableFuture that is completed normally
835 * when the publisher signals onComplete, and exceptionally
836 * upon any error or cancellation
837 * @throws NullPointerException if consumer is null
838 */
839 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
840 if (consumer == null)
841 throw new NullPointerException();
842 CompletableFuture<Void> status = new CompletableFuture<>();
843 subscribe(new ConsumerSubscriber<T>(status, consumer));
844 return status;
845 }
846
847 /** Subscriber for method consume */
848 static final class ConsumerSubscriber<T> implements Subscriber<T> {
849 final CompletableFuture<Void> status;
850 final Consumer<? super T> consumer;
851 Subscription subscription;
852 ConsumerSubscriber(CompletableFuture<Void> status,
853 Consumer<? super T> consumer) {
854 this.status = status; this.consumer = consumer;
855 }
856 public final void onSubscribe(Subscription subscription) {
857 this.subscription = subscription;
858 status.whenComplete((v, e) -> subscription.cancel());
859 if (!status.isDone())
860 subscription.request(Long.MAX_VALUE);
861 }
862 public final void onError(Throwable ex) {
863 status.completeExceptionally(ex);
864 }
865 public final void onComplete() {
866 status.complete(null);
867 }
868 public final void onNext(T item) {
869 try {
870 consumer.accept(item);
871 } catch (Throwable ex) {
872 subscription.cancel();
873 status.completeExceptionally(ex);
874 }
875 }
876 }
877
878 /**
879 * A task for consuming buffer items and signals, created and
880 * executed whenever they become available. A task consumes as
881 * many items/signals as possible before terminating, at which
882 * point another task is created when needed. The dual Runnable
883 * and ForkJoinTask declaration saves overhead when executed by
884 * ForkJoinPools, without impacting other kinds of Executors.
885 */
886 @SuppressWarnings("serial")
887 static final class ConsumerTask<T> extends ForkJoinTask<Void>
888 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
889 final BufferedSubscription<T> consumer;
890 ConsumerTask(BufferedSubscription<T> consumer) {
891 this.consumer = consumer;
892 }
893 public final Void getRawResult() { return null; }
894 public final void setRawResult(Void v) {}
895 public final boolean exec() { consumer.consume(); return false; }
896 public final void run() { consumer.consume(); }
897 }
898
899 /**
900 * A resizable array-based ring buffer with integrated control to
901 * start a consumer task whenever items are available. The buffer
902 * algorithm is specialized for the case of at most one concurrent
903 * producer and consumer, and power of two buffer sizes. It relies
904 * primarily on atomic operations (CAS or getAndSet) at the next
905 * array slot to put or take an element, at the "tail" and "head"
906 * indices written only by the producer and consumer respectively.
907 *
908 * We ensure internally that there is at most one active consumer
909 * task at any given time. The publisher guarantees a single
910 * producer via its lock. Sync among producers and consumers
911 * relies on volatile fields "ctl", "demand", and "waiting" (along
912 * with element access). Other variables are accessed in plain
913 * mode, relying on outer ordering and exclusion, and/or enclosing
914 * them within other volatile accesses. Some atomic operations are
915 * avoided by tracking single threaded ownership by producers (in
916 * the style of biased locking).
917 *
918 * Execution control and protocol state are managed using field
919 * "ctl". Methods to subscribe, close, request, and cancel set
920 * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
921 * and ensure that a task is running. (The corresponding consumer
922 * side actions are in method consume.) To avoid starting a new
923 * task on each action, ctl also includes a keep-alive bit
924 * (ACTIVE) that is refreshed if needed on producer actions.
925 * (Maintaining agreement about keep-alives requires most atomic
926 * updates to be full SC/Volatile strength, which is still much
927 * cheaper than using one task per item.) Error signals
928 * additionally null out items and/or fields to reduce termination
929 * latency. The cancel() method is supported by treating as ERROR
930 * but suppressing onError signal.
931 *
932 * Support for blocking also exploits the fact that there is only
933 * one possible waiter. ManagedBlocker-compatible control fields
934 * are placed in this class itself rather than in wait-nodes.
935 * Blocking control relies on the "waiting" and "waiter"
936 * fields. Producers set them before trying to block. Signalling
937 * unparks and clears fields. If the producer and/or consumer are
938 * using a ForkJoinPool, the producer attempts to help run
939 * consumer tasks via ForkJoinPool.helpAsyncBlocker before
940 * blocking.
941 *
942 * Usages of this class may encounter any of several forms of
943 * memory contention. We try to ameliorate across them without
944 * unduly impacting footprints in low-contention usages where it
945 * isn't needed. Buffer arrays start out small and grow only as
946 * needed. The class uses @Contended and heuristic field
947 * declaration ordering to reduce false-sharing memory contention
948 * across instances of BufferedSubscription (as in, multiple
949 * subscribers per publisher). We additionally segregate some
950 * fields that would otherwise nearly always encounter cache line
951 * contention among producers and consumers. To reduce contention
952 * acrosss time (vs space), consumers only periodically update
953 * other fields (see method takeItems), at the expense of possibly
954 * staler reporting of lags and demand (bounded at 12.5% == 1/8
955 * capacity) and possibly more atomic operations.
956 *
957 * Other forms of imbalance and slowdowns can occur during startup
958 * when producer and consumer methods are compiled and/or memory
959 * is allocated at different rates. This is ameliorated by
960 * artificially subdividing some consumer methods, including
961 * isolation of all subscriber callbacks. This code also includes
962 * typical power-of-two array screening idioms to avoid compilers
963 * generating traps, along with the usual SSA-based inline
964 * assignment coding style. Also, all methods and fields have
965 * default visibility to simplify usage by callers.
966 */
967 @SuppressWarnings("serial")
968 @jdk.internal.vm.annotation.Contended
969 static final class BufferedSubscription<T>
970 implements Subscription, ForkJoinPool.ManagedBlocker {
971 // Order-sensitive field declarations
972 long timeout; // Long.MAX_VALUE if untimed wait
973 int head; // next position to take
974 int tail; // next position to put
975 final int maxCapacity; // max buffer size
976 volatile int ctl; // atomic run state flags
977 Object[] array; // buffer
978 final Subscriber<? super T> subscriber;
979 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
980 Executor executor; // null on error
981 Thread waiter; // blocked producer thread
982 Throwable pendingError; // holds until onError issued
983 BufferedSubscription<T> next; // used only by publisher
984 BufferedSubscription<T> nextRetry; // used only by publisher
985
986 @jdk.internal.vm.annotation.Contended("c") // segregate
987 volatile long demand; // # unfilled requests
988 @jdk.internal.vm.annotation.Contended("c")
989 volatile int waiting; // nonzero if producer blocked
990
991 // ctl bit values
992 static final int CLOSED = 0x01; // if set, other bits ignored
993 static final int ACTIVE = 0x02; // keep-alive for consumer task
994 static final int REQS = 0x04; // (possibly) nonzero demand
995 static final int ERROR = 0x08; // issues onError when noticed
996 static final int COMPLETE = 0x10; // issues onComplete when done
997 static final int RUN = 0x20; // task is or will be running
998 static final int OPEN = 0x40; // true after subscribe
999
1000 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1001
1002 BufferedSubscription(Subscriber<? super T> subscriber,
1003 Executor executor,
1004 BiConsumer<? super Subscriber<? super T>,
1005 ? super Throwable> onNextHandler,
1006 Object[] array,
1007 int maxBufferCapacity) {
1008 this.subscriber = subscriber;
1009 this.executor = executor;
1010 this.onNextHandler = onNextHandler;
1011 this.array = array;
1012 this.maxCapacity = maxBufferCapacity;
1013 }
1014
1015 // Wrappers for some VarHandle methods
1016
1017 final boolean weakCasCtl(int cmp, int val) {
1018 return CTL.weakCompareAndSet(this, cmp, val);
1019 }
1020
1021 final int getAndBitwiseOrCtl(int bits) {
1022 return (int)CTL.getAndBitwiseOr(this, bits);
1023 }
1024
1025 final long subtractDemand(int k) {
1026 long n = (long)(-k);
1027 return n + (long)DEMAND.getAndAdd(this, n);
1028 }
1029
1030 final boolean casDemand(long cmp, long val) {
1031 return DEMAND.compareAndSet(this, cmp, val);
1032 }
1033
1034 // Utilities used by SubmissionPublisher
1035
1036 /**
1037 * Returns true if closed (consumer task may still be running).
1038 */
1039 final boolean isClosed() {
1040 return (ctl & CLOSED) != 0;
1041 }
1042
1043 /**
1044 * Returns estimated number of buffered items, or negative if
1045 * closed.
1046 */
1047 final int estimateLag() {
1048 int c = ctl, n = tail - head;
1049 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1050 }
1051
1052 // Methods for submitting items
1053
1054 /**
1055 * Tries to add item and start consumer task if necessary.
1056 * @return negative if closed, 0 if saturated, else estimated lag
1057 */
1058 final int offer(T item, boolean unowned) {
1059 Object[] a;
1060 int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1061 int t = tail, i = t & (cap - 1), n = t + 1 - head;
1062 if (cap > 0) {
1063 boolean added;
1064 if (n >= cap && cap < maxCapacity) // resize
1065 added = growAndoffer(item, a, t);
1066 else if (n >= cap || unowned) // need volatile CAS
1067 added = QA.compareAndSet(a, i, null, item);
1068 else { // can use release mode
1069 QA.setRelease(a, i, item);
1070 added = true;
1071 }
1072 if (added) {
1073 tail = t + 1;
1074 stat = n;
1075 }
1076 }
1077 return startOnOffer(stat);
1078 }
1079
1080 /**
1081 * Tries to expand buffer and add item, returning true on
1082 * success. Currently fails only if out of memory.
1083 */
1084 final boolean growAndoffer(T item, Object[] a, int t) {
1085 int cap = 0, newCap = 0;
1086 Object[] newArray = null;
1087 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1088 try {
1089 newArray = new Object[newCap];
1090 } catch (OutOfMemoryError ex) {
1091 }
1092 }
1093 if (newArray == null)
1094 return false;
1095 else { // take and move items
1096 int newMask = newCap - 1;
1097 newArray[t-- & newMask] = item;
1098 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1099 Object x = QA.getAndSet(a, t & mask, null);
1100 if (x == null)
1101 break; // already consumed
1102 else
1103 newArray[t-- & newMask] = x;
1104 }
1105 array = newArray;
1106 VarHandle.releaseFence(); // release array and slots
1107 return true;
1108 }
1109 }
1110
1111 /**
1112 * Version of offer for retries (no resize or bias)
1113 */
1114 final int retryOffer(T item) {
1115 Object[] a;
1116 int stat = 0, t = tail, h = head, cap;
1117 if ((a = array) != null && (cap = a.length) > 0 &&
1118 QA.compareAndSet(a, (cap - 1) & t, null, item))
1119 stat = (tail = t + 1) - h;
1120 return startOnOffer(stat);
1121 }
1122
1123 /**
1124 * Tries to start consumer task after offer.
1125 * @return negative if now closed, else argument
1126 */
1127 final int startOnOffer(int stat) {
1128 int c; // start or keep alive if requests exist and not active
1129 if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1130 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1131 tryStart();
1132 else if ((c & CLOSED) != 0)
1133 stat = -1;
1134 return stat;
1135 }
1136
1137 /**
1138 * Tries to start consumer task. Sets error state on failure.
1139 */
1140 final void tryStart() {
1141 try {
1142 Executor e;
1143 ConsumerTask<T> task = new ConsumerTask<T>(this);
1144 if ((e = executor) != null) // skip if disabled on error
1145 e.execute(task);
1146 } catch (RuntimeException | Error ex) {
1147 getAndBitwiseOrCtl(ERROR | CLOSED);
1148 throw ex;
1149 }
1150 }
1151
1152 // Signals to consumer tasks
1153
1154 /**
1155 * Sets the given control bits, starting task if not running or closed.
1156 * @param bits state bits, assumed to include RUN but not CLOSED
1157 */
1158 final void startOnSignal(int bits) {
1159 if ((getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1160 tryStart();
1161 }
1162
1163 final void onSubscribe() {
1164 startOnSignal(RUN | ACTIVE);
1165 }
1166
1167 final void onComplete() {
1168 startOnSignal(RUN | ACTIVE | COMPLETE);
1169 }
1170
1171 final void onError(Throwable ex) {
1172 int c; Object[] a; // to null out buffer on async error
1173 if (ex != null)
1174 pendingError = ex; // races are OK
1175 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1176 if ((c & RUN) == 0)
1177 tryStart();
1178 else if ((a = array) != null)
1179 Arrays.fill(a, null);
1180 }
1181 }
1182
1183 public final void cancel() {
1184 onError(null);
1185 }
1186
1187 public final void request(long n) {
1188 if (n > 0L) {
1189 for (;;) {
1190 long p = demand, d = p + n; // saturate
1191 if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1192 break;
1193 }
1194 if ((ctl & (RUN | ACTIVE | REQS)) != (RUN | ACTIVE | REQS))
1195 startOnSignal(RUN | ACTIVE | REQS);
1196 }
1197 else
1198 onError(new IllegalArgumentException(
1199 "non-positive subscription request"));
1200 }
1201
1202 // Consumer task actions
1203
1204 /**
1205 * Consumer loop, called from ConsumerTask, or indirectly when
1206 * helping during submit.
1207 */
1208 final void consume() {
1209 Subscriber<? super T> s;
1210 if ((s = subscriber) != null) { // hoist checks
1211 subscribeOnOpen(s);
1212 long d = demand;
1213 for (int h = head, t = tail;;) {
1214 int c, taken; boolean empty;
1215 if (((c = ctl) & ERROR) != 0) {
1216 closeOnError(s, null);
1217 break;
1218 }
1219 else if ((taken = takeItems(s, d, h)) > 0) {
1220 head = h += taken;
1221 d = subtractDemand(taken);
1222 }
1223 else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1224 closeOnComplete(s); // end of stream
1225 break;
1226 }
1227 else if ((d = demand) == 0L && (c & REQS) != 0)
1228 weakCasCtl(c, c & ~REQS); // exhausted demand
1229 else if (d != 0L && (c & REQS) == 0)
1230 weakCasCtl(c, c | REQS); // new demand
1231 else if (t == (t = tail) && (empty || d == 0L)) {
1232 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1233 if (weakCasCtl(c, c & ~bit) && bit == RUN)
1234 break; // un-keep-alive or exit
1235 }
1236 }
1237 }
1238 }
1239
1240 /**
1241 * Consumes some items until unavailable or bound or error.
1242 *
1243 * @param s subscriber
1244 * @param d current demand
1245 * @param h current head
1246 * @return number taken
1247 */
1248 final int takeItems(Subscriber<? super T> s, long d, int h) {
1249 Object[] a;
1250 int k = 0, cap;
1251 if ((a = array) != null && (cap = a.length) > 0) {
1252 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1253 int n = (d < (long)b) ? (int)d : b;
1254 for (; k < n; ++h, ++k) {
1255 Object x = QA.getAndSet(a, h & m, null);
1256 if (waiting != 0)
1257 signalWaiter();
1258 if (x == null)
1259 break;
1260 else if (!consumeNext(s, x))
1261 break;
1262 }
1263 }
1264 return k;
1265 }
1266
1267 final boolean consumeNext(Subscriber<? super T> s, Object x) {
1268 try {
1269 @SuppressWarnings("unchecked") T y = (T) x;
1270 if (s != null)
1271 s.onNext(y);
1272 return true;
1273 } catch (Throwable ex) {
1274 handleOnNext(s, ex);
1275 return false;
1276 }
1277 }
1278
1279 /**
1280 * Processes exception in Subscriber.onNext.
1281 */
1282 final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1283 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1284 try {
1285 if ((h = onNextHandler) != null)
1286 h.accept(s, ex);
1287 } catch (Throwable ignore) {
1288 }
1289 closeOnError(s, ex);
1290 }
1291
1292 /**
1293 * Issues subscriber.onSubscribe if this is first signal.
1294 */
1295 final void subscribeOnOpen(Subscriber<? super T> s) {
1296 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1297 consumeSubscribe(s);
1298 }
1299
1300 final void consumeSubscribe(Subscriber<? super T> s) {
1301 try {
1302 if (s != null) // ignore if disabled
1303 s.onSubscribe(this);
1304 } catch (Throwable ex) {
1305 closeOnError(s, ex);
1306 }
1307 }
1308
1309 /**
1310 * Issues subscriber.onComplete unless already closed.
1311 */
1312 final void closeOnComplete(Subscriber<? super T> s) {
1313 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1314 consumeComplete(s);
1315 }
1316
1317 final void consumeComplete(Subscriber<? super T> s) {
1318 try {
1319 if (s != null)
1320 s.onComplete();
1321 } catch (Throwable ignore) {
1322 }
1323 }
1324
1325 /**
1326 * Issues subscriber.onError, and unblocks producer if needed.
1327 */
1328 final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1329 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1330 if (ex == null)
1331 ex = pendingError;
1332 pendingError = null; // detach
1333 executor = null; // suppress racing start calls
1334 signalWaiter();
1335 consumeError(s, ex);
1336 }
1337 }
1338
1339 final void consumeError(Subscriber<? super T> s, Throwable ex) {
1340 try {
1341 if (ex != null && s != null)
1342 s.onError(ex);
1343 } catch (Throwable ignore) {
1344 }
1345 }
1346
1347 // Blocking support
1348
1349 /**
1350 * Unblocks waiting producer.
1351 */
1352 final void signalWaiter() {
1353 Thread w;
1354 waiting = 0;
1355 if ((w = waiter) != null)
1356 LockSupport.unpark(w);
1357 }
1358
1359 /**
1360 * Returns true if closed or space available.
1361 * For ManagedBlocker.
1362 */
1363 public final boolean isReleasable() {
1364 Object[] a; int cap;
1365 return ((ctl & CLOSED) != 0 ||
1366 ((a = array) != null && (cap = a.length) > 0 &&
1367 QA.getAcquire(a, (cap - 1) & tail) == null));
1368 }
1369
1370 /**
1371 * Helps or blocks until timeout, closed, or space available.
1372 */
1373 final void awaitSpace(long nanos) {
1374 if (!isReleasable()) {
1375 ForkJoinPool.helpAsyncBlocker(executor, this);
1376 if (!isReleasable()) {
1377 timeout = nanos;
1378 try {
1379 ForkJoinPool.managedBlock(this);
1380 } catch (InterruptedException ie) {
1381 timeout = INTERRUPTED;
1382 }
1383 if (timeout == INTERRUPTED)
1384 Thread.currentThread().interrupt();
1385 }
1386 }
1387 }
1388
1389 /**
1390 * Blocks until closed, space available or timeout.
1391 * For ManagedBlocker.
1392 */
1393 public final boolean block() {
1394 long nanos = timeout;
1395 boolean timed = (nanos < Long.MAX_VALUE);
1396 long deadline = timed ? System.nanoTime() + nanos : 0L;
1397 while (!isReleasable()) {
1398 if (Thread.interrupted()) {
1399 timeout = INTERRUPTED;
1400 if (timed)
1401 break;
1402 }
1403 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1404 break;
1405 else if (waiter == null)
1406 waiter = Thread.currentThread();
1407 else if (waiting == 0)
1408 waiting = 1;
1409 else if (timed)
1410 LockSupport.parkNanos(this, nanos);
1411 else
1412 LockSupport.park(this);
1413 }
1414 waiter = null;
1415 waiting = 0;
1416 return true;
1417 }
1418
1419 // VarHandle mechanics
1420 static final VarHandle CTL;
1421 static final VarHandle DEMAND;
1422 static final VarHandle QA;
1423
1424 static {
1425 try {
1426 MethodHandles.Lookup l = MethodHandles.lookup();
1427 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1428 int.class);
1429 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1430 long.class);
1431 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1432 } catch (ReflectiveOperationException e) {
1433 throw new Error(e);
1434 }
1435
1436 // Reduce the risk of rare disastrous classloading in first call to
1437 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1438 Class<?> ensureLoaded = LockSupport.class;
1439 }
1440 }
1441 }