ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.83
Committed: Fri Jan 17 18:43:11 2020 UTC (4 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.82: +1 -1 lines
Log Message:
typos

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 dl 1.61 import java.lang.invoke.MethodHandles;
10     import java.lang.invoke.VarHandle;
11 jsr166 1.10 import java.util.ArrayList;
12 dl 1.72 import java.util.Arrays;
13 jsr166 1.10 import java.util.List;
14 dl 1.1 import java.util.concurrent.locks.LockSupport;
15 dl 1.82 import java.util.concurrent.locks.ReentrantLock;
16 dl 1.36 import java.util.function.BiConsumer;
17 dl 1.1 import java.util.function.BiPredicate;
18 dl 1.45 import java.util.function.Consumer;
19 jsr166 1.74 import static java.util.concurrent.Flow.Publisher;
20     import static java.util.concurrent.Flow.Subscriber;
21     import static java.util.concurrent.Flow.Subscription;
22 dl 1.1
23     /**
24 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
25     * (non-null) items to current subscribers until it is closed. Each
26     * current subscriber receives newly submitted items in the same order
27     * unless drops or exceptions are encountered. Using a
28 dl 1.51 * SubmissionPublisher allows item generators to act as compliant <a
29     * href="http://www.reactive-streams.org/"> reactive-streams</a>
30     * Publishers relying on drop handling and/or blocking for flow
31     * control.
32 dl 1.1 *
33 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
34 dl 1.1 * constructor for delivery to subscribers. The best choice of
35     * Executor depends on expected usage. If the generator(s) of
36     * submitted items run in separate threads, and the number of
37     * subscribers can be estimated, consider using a {@link
38 dl 1.46 * Executors#newFixedThreadPool}. Otherwise consider using the
39     * default, normally the {@link ForkJoinPool#commonPool}.
40 dl 1.1 *
41     * <p>Buffering allows producers and consumers to transiently operate
42     * at different rates. Each subscriber uses an independent buffer.
43 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
44 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
45 dl 1.21 * nearest power of two and/or bounded by the largest value supported
46     * by this implementation.) Invocations of {@link
47 jsr166 1.49 * Flow.Subscription#request(long) request} do not directly result in
48     * buffer expansion, but risk saturation if unfilled requests exceed
49     * the maximum capacity. The default value of {@link
50     * Flow#defaultBufferSize()} may provide a useful starting point for
51 dl 1.25 * choosing a capacity based on expected rates, resources, and usages.
52 dl 1.18 *
53 dl 1.72 * <p>A single SubmissionPublisher may be shared among multiple
54     * sources. Actions in a source thread prior to publishing an item or
55     * issuing a signal <a href="package-summary.html#MemoryVisibility">
56     * <i>happen-before</i></a> actions subsequent to the corresponding
57     * access by each subscriber. But reported estimates of lag and demand
58     * are designed for use in monitoring, not for synchronization
59     * control, and may reflect stale or inaccurate views of progress.
60     *
61 dl 1.1 * <p>Publication methods support different policies about what to do
62 jsr166 1.49 * when buffers are saturated. Method {@link #submit(Object) submit}
63     * blocks until resources are available. This is simplest, but least
64     * responsive. The {@code offer} methods may drop items (either
65     * immediately or with bounded timeout), but provide an opportunity to
66     * interpose a handler and then retry.
67 dl 1.1 *
68     * <p>If any Subscriber method throws an exception, its subscription
69 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
70     * it is invoked before cancellation upon an exception in method
71 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
72     * {@link Flow.Subscriber#onSubscribe onSubscribe},
73 jsr166 1.49 * {@link Flow.Subscriber#onError(Throwable) onError} and
74     * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
75 jsr166 1.41 * handled before cancellation. If the supplied Executor throws
76     * {@link RejectedExecutionException} (or any other RuntimeException
77     * or Error) when attempting to execute a task, or a drop handler
78     * throws an exception when processing a dropped item, then the
79     * exception is rethrown. In these cases, not all subscribers will
80     * have been issued the published item. It is usually good practice to
81     * {@link #closeExceptionally closeExceptionally} in these cases.
82 dl 1.1 *
83 jsr166 1.49 * <p>Method {@link #consume(Consumer)} simplifies support for a
84     * common case in which the only action of a subscriber is to request
85     * and process all items using a supplied function.
86 dl 1.45 *
87 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
88     * that generate items, and use the methods in this class to publish
89     * them. For example here is a class that periodically publishes the
90     * items generated from a supplier. (In practice you might add methods
91 dl 1.35 * to independently start and stop generation, to share Executors
92 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
93     * component rather than a superclass.)
94 dl 1.15 *
95     * <pre> {@code
96     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
97     * final ScheduledFuture<?> periodicTask;
98     * final ScheduledExecutorService scheduler;
99 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
100     * Supplier<? extends T> supplier,
101 dl 1.15 * long period, TimeUnit unit) {
102 dl 1.21 * super(executor, maxBufferCapacity);
103 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
104     * periodicTask = scheduler.scheduleAtFixedRate(
105     * () -> submit(supplier.get()), 0, period, unit);
106     * }
107     * public void close() {
108     * periodicTask.cancel(false);
109     * scheduler.shutdown();
110     * super.close();
111     * }
112     * }}</pre>
113     *
114 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
115     * It uses single-step requests to its publisher for simplicity of
116     * illustration. A more adaptive version could monitor flow using the
117 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
118 dl 1.21 * methods.
119 dl 1.16 *
120     * <pre> {@code
121     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
122     * implements Flow.Processor<S,T> {
123     * final Function<? super S, ? extends T> function;
124     * Flow.Subscription subscription;
125 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
126 dl 1.16 * Function<? super S, ? extends T> function) {
127 dl 1.21 * super(executor, maxBufferCapacity);
128 dl 1.16 * this.function = function;
129     * }
130     * public void onSubscribe(Flow.Subscription subscription) {
131     * (this.subscription = subscription).request(1);
132     * }
133     * public void onNext(S item) {
134     * subscription.request(1);
135     * submit(function.apply(item));
136     * }
137     * public void onError(Throwable ex) { closeExceptionally(ex); }
138     * public void onComplete() { close(); }
139     * }}</pre>
140     *
141 dl 1.1 * @param <T> the published item type
142     * @author Doug Lea
143 jsr166 1.58 * @since 9
144 dl 1.1 */
145 dl 1.72 public class SubmissionPublisher<T> implements Publisher<T>,
146 dl 1.1 AutoCloseable {
147     /*
148     * Most mechanics are handled by BufferedSubscription. This class
149 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
150 dl 1.82 * locks across public methods, to ensure thread-safety in the
151 jsr166 1.83 * presence of multiple sources and maintain acquire-release
152 dl 1.82 * ordering around user operations. However, we also track whether
153     * there is only a single source, and if so streamline some buffer
154     * operations by avoiding some atomics.
155 dl 1.1 */
156    
157 dl 1.25 /** The largest possible power of two array size. */
158 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
159 dl 1.1
160 dl 1.72 /**
161     * Initial buffer capacity used when maxBufferCapacity is
162     * greater. Must be a power of two.
163     */
164     static final int INITIAL_CAPACITY = 32;
165    
166 dl 1.40 /** Round capacity to power of 2, at most limit. */
167     static final int roundCapacity(int cap) {
168 dl 1.1 int n = cap - 1;
169     n |= n >>> 1;
170     n |= n >>> 2;
171     n |= n >>> 4;
172     n |= n >>> 8;
173     n |= n >>> 16;
174 dl 1.40 return (n <= 0) ? 1 : // at least 1
175 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
176 dl 1.1 }
177    
178 dl 1.48 // default Executor setup; nearly the same as CompletableFuture
179 dl 1.46
180     /**
181     * Default executor -- ForkJoinPool.commonPool() unless it cannot
182     * support parallelism.
183     */
184 jsr166 1.52 private static final Executor ASYNC_POOL =
185 dl 1.46 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
186     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
187    
188     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
189 jsr166 1.68 private static final class ThreadPerTaskExecutor implements Executor {
190     ThreadPerTaskExecutor() {} // prevent access constructor creation
191 dl 1.46 public void execute(Runnable r) { new Thread(r).start(); }
192     }
193    
194 dl 1.1 /**
195     * Clients (BufferedSubscriptions) are maintained in a linked list
196     * (via their "next" fields). This works well for publish loops.
197     * It requires O(n) traversal to check for duplicate subscribers,
198     * but we expect that subscribing is much less common than
199 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
200 dl 1.25 * when BufferedSubscription methods return negative values
201 dl 1.72 * signifying that they have been closed. To reduce
202 jsr166 1.74 * head-of-line blocking, submit and offer methods first call
203 dl 1.29 * BufferedSubscription.offer on each subscriber, and place
204     * saturated ones in retries list (using nextRetry field), and
205     * retry, possibly blocking or dropping.
206 dl 1.1 */
207     BufferedSubscription<T> clients;
208    
209 dl 1.82 /** Lock for exclusion across multiple sources */
210     final ReentrantLock lock;
211 dl 1.20 /** Run status, updated only within locks */
212     volatile boolean closed;
213 dl 1.72 /** Set true on first call to subscribe, to initialize possible owner */
214     boolean subscribed;
215     /** The first caller thread to subscribe, or null if thread ever changed */
216     Thread owner;
217 dl 1.36 /** If non-null, the exception in closeExceptionally */
218 dl 1.37 volatile Throwable closedException;
219 dl 1.20
220 dl 1.1 // Parameters for constructing BufferedSubscriptions
221     final Executor executor;
222 dl 1.72 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
223 dl 1.1 final int maxBufferCapacity;
224    
225 dl 1.20 /**
226 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
227 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
228     * for each subscriber, and, if non-null, the given handler invoked
229     * when any Subscriber throws an exception in method {@link
230     * Flow.Subscriber#onNext(Object) onNext}.
231 dl 1.1 *
232     * @param executor the executor to use for async delivery,
233     * supporting creation of at least one independent thread
234     * @param maxBufferCapacity the maximum capacity for each
235 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
236     * the nearest power of two and/or bounded by the largest value
237     * supported by this implementation; method {@link #getMaxBufferCapacity}
238     * returns the actual value)
239 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
240 jsr166 1.49 * thrown in method {@code onNext}
241 dl 1.1 * @throws NullPointerException if executor is null
242 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
243     * positive
244 dl 1.1 */
245 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
246 dl 1.72 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
247 dl 1.21 if (executor == null)
248     throw new NullPointerException();
249     if (maxBufferCapacity <= 0)
250     throw new IllegalArgumentException("capacity must be positive");
251 dl 1.82 this.lock = new ReentrantLock();
252 dl 1.18 this.executor = executor;
253 dl 1.36 this.onNextHandler = handler;
254 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
255 dl 1.18 }
256    
257 dl 1.1 /**
258 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
259 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
260     * for each subscriber, and no handler for Subscriber exceptions in
261     * method {@link Flow.Subscriber#onNext(Object) onNext}.
262 dl 1.36 *
263     * @param executor the executor to use for async delivery,
264     * supporting creation of at least one independent thread
265     * @param maxBufferCapacity the maximum capacity for each
266     * subscriber's buffer (the enforced capacity may be rounded up to
267     * the nearest power of two and/or bounded by the largest value
268     * supported by this implementation; method {@link #getMaxBufferCapacity}
269     * returns the actual value)
270     * @throws NullPointerException if executor is null
271     * @throws IllegalArgumentException if maxBufferCapacity not
272     * positive
273     */
274     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
275     this(executor, maxBufferCapacity, null);
276     }
277    
278     /**
279 dl 1.25 * Creates a new SubmissionPublisher using the {@link
280 dl 1.46 * ForkJoinPool#commonPool()} for async delivery to subscribers
281 jsr166 1.49 * (unless it does not support a parallelism level of at least two,
282     * in which case, a new Thread is created to run each task), with
283     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
284     * handler for Subscriber exceptions in method {@link
285     * Flow.Subscriber#onNext(Object) onNext}.
286 dl 1.25 */
287     public SubmissionPublisher() {
288 jsr166 1.52 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
289 dl 1.25 }
290    
291     /**
292 jsr166 1.49 * Adds the given Subscriber unless already subscribed. If already
293     * subscribed, the Subscriber's {@link
294     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
295     * the existing subscription with an {@link IllegalStateException}.
296     * Otherwise, upon success, the Subscriber's {@link
297     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
298     * asynchronously with a new {@link Flow.Subscription}. If {@link
299     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
300     * subscription is cancelled. Otherwise, if this SubmissionPublisher
301     * was closed exceptionally, then the subscriber's {@link
302     * Flow.Subscriber#onError onError} method is invoked with the
303     * corresponding exception, or if closed without exception, the
304     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
305     * method is invoked. Subscribers may enable receiving items by
306     * invoking the {@link Flow.Subscription#request(long) request}
307     * method of the new Subscription, and may unsubscribe by invoking
308     * its {@link Flow.Subscription#cancel() cancel} method.
309 dl 1.18 *
310     * @param subscriber the subscriber
311     * @throws NullPointerException if subscriber is null
312     */
313 dl 1.72 public void subscribe(Subscriber<? super T> subscriber) {
314 dl 1.18 if (subscriber == null) throw new NullPointerException();
315 dl 1.82 ReentrantLock lock = this.lock;
316 dl 1.72 int max = maxBufferCapacity; // allocate initial array
317     Object[] array = new Object[max < INITIAL_CAPACITY ?
318     max : INITIAL_CAPACITY];
319 dl 1.20 BufferedSubscription<T> subscription =
320 dl 1.72 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
321     array, max);
322 dl 1.82 lock.lock();
323     try {
324 dl 1.72 if (!subscribed) {
325     subscribed = true;
326     owner = Thread.currentThread();
327     }
328 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
329     if (b == null) {
330 dl 1.36 Throwable ex;
331 dl 1.29 subscription.onSubscribe();
332 dl 1.37 if ((ex = closedException) != null)
333 dl 1.36 subscription.onError(ex);
334     else if (closed)
335 dl 1.29 subscription.onComplete();
336     else if (pred == null)
337     clients = subscription;
338 dl 1.27 else
339 dl 1.29 pred.next = subscription;
340     break;
341 dl 1.27 }
342 dl 1.29 BufferedSubscription<T> next = b.next;
343 dl 1.72 if (b.isClosed()) { // remove
344 dl 1.29 b.next = null; // detach
345 dl 1.19 if (pred == null)
346 dl 1.29 clients = next;
347 dl 1.19 else
348 dl 1.29 pred.next = next;
349     }
350     else if (subscriber.equals(b.subscriber)) {
351     b.onError(new IllegalStateException("Duplicate subscribe"));
352     break;
353 dl 1.19 }
354 dl 1.29 else
355     pred = b;
356     b = next;
357 dl 1.19 }
358 dl 1.82 } finally {
359     lock.unlock();
360 dl 1.19 }
361 dl 1.18 }
362    
363     /**
364 dl 1.72 * Common implementation for all three forms of submit and offer.
365 jsr166 1.74 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
366 dl 1.72 */
367     private int doOffer(T item, long nanos,
368     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
369     if (item == null) throw new NullPointerException();
370     int lag = 0;
371     boolean complete, unowned;
372 dl 1.82 ReentrantLock lock = this.lock;
373     lock.lock();
374     try {
375 dl 1.72 Thread t = Thread.currentThread(), o;
376     BufferedSubscription<T> b = clients;
377     if ((unowned = ((o = owner) != t)) && o != null)
378     owner = null; // disable bias
379     if (b == null)
380     complete = closed;
381     else {
382     complete = false;
383     boolean cleanMe = false;
384 dl 1.76 BufferedSubscription<T> retries = null, rtail = null, next;
385 dl 1.72 do {
386     next = b.next;
387     int stat = b.offer(item, unowned);
388 dl 1.76 if (stat == 0) { // saturated; add to retry list
389     b.nextRetry = null; // avoid garbage on exceptions
390     if (rtail == null)
391     retries = b;
392     else
393     rtail.nextRetry = b;
394     rtail = b;
395 dl 1.72 }
396     else if (stat < 0) // closed
397     cleanMe = true; // remove later
398     else if (stat > lag)
399     lag = stat;
400     } while ((b = next) != null);
401    
402     if (retries != null || cleanMe)
403     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
404     }
405 dl 1.82 } finally {
406     lock.unlock();
407 dl 1.72 }
408     if (complete)
409     throw new IllegalStateException("Closed");
410     else
411     return lag;
412     }
413    
414     /**
415     * Helps, (timed) waits for, and/or drops buffers on list; returns
416 jsr166 1.74 * lag or negative drops (for use in offer).
417 dl 1.72 */
418     private int retryOffer(T item, long nanos,
419     BiPredicate<Subscriber<? super T>, ? super T> onDrop,
420     BufferedSubscription<T> retries, int lag,
421     boolean cleanMe) {
422     for (BufferedSubscription<T> r = retries; r != null;) {
423     BufferedSubscription<T> nextRetry = r.nextRetry;
424     r.nextRetry = null;
425     if (nanos > 0L)
426     r.awaitSpace(nanos);
427     int stat = r.retryOffer(item);
428     if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
429     stat = r.retryOffer(item);
430     if (stat == 0)
431     lag = (lag >= 0) ? -1 : lag - 1;
432     else if (stat < 0)
433     cleanMe = true;
434     else if (lag >= 0 && stat > lag)
435     lag = stat;
436     r = nextRetry;
437     }
438     if (cleanMe)
439     cleanAndCount();
440     return lag;
441     }
442    
443     /**
444     * Returns current list count after removing closed subscribers.
445     * Call only while holding lock. Used mainly by retryOffer for
446     * cleanup.
447     */
448     private int cleanAndCount() {
449     int count = 0;
450     BufferedSubscription<T> pred = null, next;
451     for (BufferedSubscription<T> b = clients; b != null; b = next) {
452     next = b.next;
453     if (b.isClosed()) {
454     b.next = null;
455     if (pred == null)
456     clients = next;
457     else
458     pred.next = next;
459     }
460     else {
461     pred = b;
462     ++count;
463     }
464     }
465     return count;
466     }
467    
468     /**
469 dl 1.16 * Publishes the given item to each current subscriber by
470 jsr166 1.49 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
471     * onNext} method, blocking uninterruptibly while resources for any
472 dl 1.30 * subscriber are unavailable. This method returns an estimate of
473 jsr166 1.49 * the maximum lag (number of items submitted but not yet consumed)
474     * among all current subscribers. This value is at least one
475     * (accounting for this submitted item) if there are any
476 dl 1.30 * subscribers, else zero.
477 dl 1.16 *
478     * <p>If the Executor for this publisher throws a
479     * RejectedExecutionException (or any other RuntimeException or
480     * Error) when attempting to asynchronously notify subscribers,
481 dl 1.31 * then this exception is rethrown, in which case not all
482     * subscribers will have been issued this item.
483 dl 1.16 *
484     * @param item the (non-null) item to publish
485 dl 1.21 * @return the estimated maximum lag among subscribers
486 dl 1.16 * @throws IllegalStateException if closed
487     * @throws NullPointerException if item is null
488     * @throws RejectedExecutionException if thrown by Executor
489     */
490 dl 1.21 public int submit(T item) {
491 dl 1.72 return doOffer(item, Long.MAX_VALUE, null);
492 dl 1.16 }
493    
494     /**
495 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
496     * by asynchronously invoking its {@link
497     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
498     * dropped by one or more subscribers if resource limits are
499     * exceeded, in which case the given handler (if non-null) is
500     * invoked, and if it returns true, retried once. Other calls to
501     * methods in this class by other threads are blocked while the
502     * handler is invoked. Unless recovery is assured, options are
503     * usually limited to logging the error and/or issuing an {@link
504     * Flow.Subscriber#onError(Throwable) onError} signal to the
505     * subscriber.
506 dl 1.1 *
507 dl 1.21 * <p>This method returns a status indicator: If negative, it
508     * represents the (negative) number of drops (failed attempts to
509     * issue the item to a subscriber). Otherwise it is an estimate of
510     * the maximum lag (number of items submitted but not yet
511     * consumed) among all current subscribers. This value is at least
512     * one (accounting for this submitted item) if there are any
513     * subscribers, else zero.
514 jsr166 1.23 *
515 dl 1.1 * <p>If the Executor for this publisher throws a
516     * RejectedExecutionException (or any other RuntimeException or
517     * Error) when attempting to asynchronously notify subscribers, or
518     * the drop handler throws an exception when processing a dropped
519     * item, then this exception is rethrown.
520     *
521     * @param item the (non-null) item to publish
522     * @param onDrop if non-null, the handler invoked upon a drop to a
523     * subscriber, with arguments of the subscriber and item; if it
524     * returns true, an offer is re-attempted (once)
525 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
526     * an estimate of maximum lag
527 dl 1.1 * @throws IllegalStateException if closed
528     * @throws NullPointerException if item is null
529     * @throws RejectedExecutionException if thrown by Executor
530     */
531     public int offer(T item,
532 dl 1.72 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
533     return doOffer(item, 0L, onDrop);
534 dl 1.1 }
535    
536     /**
537 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
538     * by asynchronously invoking its {@link
539     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
540     * resources for any subscription are unavailable, up to the
541     * specified timeout or until the caller thread is interrupted, at
542     * which point the given handler (if non-null) is invoked, and if it
543     * returns true, retried once. (The drop handler may distinguish
544     * timeouts from interrupts by checking whether the current thread
545     * is interrupted.) Other calls to methods in this class by other
546 dl 1.30 * threads are blocked while the handler is invoked. Unless
547     * recovery is assured, options are usually limited to logging the
548 jsr166 1.49 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
549     * onError} signal to the subscriber.
550 dl 1.1 *
551 dl 1.21 * <p>This method returns a status indicator: If negative, it
552     * represents the (negative) number of drops (failed attempts to
553     * issue the item to a subscriber). Otherwise it is an estimate of
554     * the maximum lag (number of items submitted but not yet
555     * consumed) among all current subscribers. This value is at least
556     * one (accounting for this submitted item) if there are any
557     * subscribers, else zero.
558     *
559 dl 1.1 * <p>If the Executor for this publisher throws a
560     * RejectedExecutionException (or any other RuntimeException or
561     * Error) when attempting to asynchronously notify subscribers, or
562     * the drop handler throws an exception when processing a dropped
563     * item, then this exception is rethrown.
564     *
565     * @param item the (non-null) item to publish
566     * @param timeout how long to wait for resources for any subscriber
567     * before giving up, in units of {@code unit}
568     * @param unit a {@code TimeUnit} determining how to interpret the
569     * {@code timeout} parameter
570     * @param onDrop if non-null, the handler invoked upon a drop to a
571     * subscriber, with arguments of the subscriber and item; if it
572     * returns true, an offer is re-attempted (once)
573 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
574     * an estimate of maximum lag
575 dl 1.1 * @throws IllegalStateException if closed
576     * @throws NullPointerException if item is null
577     * @throws RejectedExecutionException if thrown by Executor
578     */
579     public int offer(T item, long timeout, TimeUnit unit,
580 dl 1.72 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
581     long nanos = unit.toNanos(timeout);
582     // distinguishes from untimed (only wrt interrupt policy)
583     if (nanos == Long.MAX_VALUE) --nanos;
584     return doOffer(item, nanos, onDrop);
585 dl 1.1 }
586    
587     /**
588 dl 1.30 * Unless already closed, issues {@link
589 jsr166 1.49 * Flow.Subscriber#onComplete() onComplete} signals to current
590     * subscribers, and disallows subsequent attempts to publish.
591     * Upon return, this method does <em>NOT</em> guarantee that all
592     * subscribers have yet completed.
593 dl 1.1 */
594     public void close() {
595 dl 1.82 ReentrantLock lock = this.lock;
596 dl 1.1 if (!closed) {
597 dl 1.32 BufferedSubscription<T> b;
598 dl 1.82 lock.lock();
599     try {
600 jsr166 1.70 // no need to re-check closed here
601 dl 1.1 b = clients;
602     clients = null;
603 dl 1.72 owner = null;
604 dl 1.1 closed = true;
605 dl 1.82 } finally {
606     lock.unlock();
607 dl 1.1 }
608     while (b != null) {
609 dl 1.32 BufferedSubscription<T> next = b.next;
610     b.next = null;
611 dl 1.19 b.onComplete();
612 dl 1.1 b = next;
613     }
614     }
615     }
616    
617     /**
618 jsr166 1.49 * Unless already closed, issues {@link
619     * Flow.Subscriber#onError(Throwable) onError} signals to current
620     * subscribers with the given error, and disallows subsequent
621     * attempts to publish. Future subscribers also receive the given
622     * error. Upon return, this method does <em>NOT</em> guarantee
623     * that all subscribers have yet completed.
624 dl 1.1 *
625 dl 1.30 * @param error the {@code onError} argument sent to subscribers
626 dl 1.1 * @throws NullPointerException if error is null
627     */
628     public void closeExceptionally(Throwable error) {
629     if (error == null)
630     throw new NullPointerException();
631 dl 1.82 ReentrantLock lock = this.lock;
632 dl 1.1 if (!closed) {
633 dl 1.32 BufferedSubscription<T> b;
634 dl 1.82 lock.lock();
635     try {
636 dl 1.1 b = clients;
637 jsr166 1.70 if (!closed) { // don't clobber racing close
638 dl 1.72 closedException = error;
639 jsr166 1.70 clients = null;
640 dl 1.72 owner = null;
641 jsr166 1.70 closed = true;
642     }
643 dl 1.82 } finally {
644     lock.unlock();
645 dl 1.1 }
646     while (b != null) {
647 dl 1.32 BufferedSubscription<T> next = b.next;
648     b.next = null;
649 dl 1.19 b.onError(error);
650 dl 1.1 b = next;
651     }
652     }
653     }
654    
655     /**
656     * Returns true if this publisher is not accepting submissions.
657     *
658     * @return true if closed
659     */
660     public boolean isClosed() {
661     return closed;
662     }
663    
664 dl 1.37 /**
665 jsr166 1.49 * Returns the exception associated with {@link
666     * #closeExceptionally(Throwable) closeExceptionally}, or null if
667     * not closed or if closed normally.
668 dl 1.37 *
669     * @return the exception, or null if none
670     */
671     public Throwable getClosedException() {
672     return closedException;
673     }
674    
675 dl 1.1 /**
676 jsr166 1.6 * Returns true if this publisher has any subscribers.
677 dl 1.1 *
678     * @return true if this publisher has any subscribers
679     */
680     public boolean hasSubscribers() {
681     boolean nonEmpty = false;
682 dl 1.82 ReentrantLock lock = this.lock;
683     lock.lock();
684     try {
685 dl 1.72 for (BufferedSubscription<T> b = clients; b != null;) {
686     BufferedSubscription<T> next = b.next;
687     if (b.isClosed()) {
688     b.next = null;
689     b = clients = next;
690     }
691     else {
692     nonEmpty = true;
693     break;
694 dl 1.1 }
695     }
696 dl 1.82 } finally {
697     lock.unlock();
698 dl 1.1 }
699     return nonEmpty;
700     }
701    
702     /**
703 dl 1.21 * Returns the number of current subscribers.
704 dl 1.1 *
705 dl 1.21 * @return the number of current subscribers
706 dl 1.1 */
707 dl 1.21 public int getNumberOfSubscribers() {
708 dl 1.82 int n;
709     ReentrantLock lock = this.lock;
710     lock.lock();
711     try {
712     n = cleanAndCount();
713     } finally {
714     lock.unlock();
715 dl 1.21 }
716 dl 1.82 return n;
717 dl 1.1 }
718    
719 jsr166 1.7 /**
720 dl 1.21 * Returns the Executor used for asynchronous delivery.
721 dl 1.1 *
722 dl 1.21 * @return the Executor used for asynchronous delivery
723 dl 1.1 */
724 dl 1.21 public Executor getExecutor() {
725     return executor;
726 dl 1.1 }
727    
728 jsr166 1.7 /**
729 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
730 dl 1.1 *
731 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
732 dl 1.1 */
733     public int getMaxBufferCapacity() {
734     return maxBufferCapacity;
735     }
736    
737     /**
738 dl 1.29 * Returns a list of current subscribers for monitoring and
739     * tracking purposes, not for invoking {@link Flow.Subscriber}
740     * methods on the subscribers.
741 dl 1.1 *
742     * @return list of current subscribers
743     */
744 dl 1.72 public List<Subscriber<? super T>> getSubscribers() {
745     ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
746 dl 1.82 ReentrantLock lock = this.lock;
747     lock.lock();
748     try {
749 dl 1.1 BufferedSubscription<T> pred = null, next;
750     for (BufferedSubscription<T> b = clients; b != null; b = next) {
751     next = b.next;
752 dl 1.72 if (b.isClosed()) {
753 dl 1.29 b.next = null;
754 dl 1.1 if (pred == null)
755     clients = next;
756     else
757     pred.next = next;
758 jsr166 1.2 }
759 dl 1.79 else {
760 dl 1.1 subs.add(b.subscriber);
761 dl 1.79 pred = b;
762     }
763 dl 1.1 }
764 dl 1.82 } finally {
765     lock.unlock();
766 dl 1.1 }
767     return subs;
768     }
769 jsr166 1.2
770 dl 1.1 /**
771     * Returns true if the given Subscriber is currently subscribed.
772     *
773     * @param subscriber the subscriber
774     * @return true if currently subscribed
775 dl 1.21 * @throws NullPointerException if subscriber is null
776 dl 1.1 */
777 dl 1.72 public boolean isSubscribed(Subscriber<? super T> subscriber) {
778 dl 1.21 if (subscriber == null) throw new NullPointerException();
779 dl 1.82 boolean subscribed = false;
780     ReentrantLock lock = this.lock;
781 jsr166 1.2 if (!closed) {
782 dl 1.82 lock.lock();
783     try {
784 dl 1.1 BufferedSubscription<T> pred = null, next;
785     for (BufferedSubscription<T> b = clients; b != null; b = next) {
786     next = b.next;
787 dl 1.72 if (b.isClosed()) {
788 dl 1.29 b.next = null;
789 dl 1.1 if (pred == null)
790     clients = next;
791     else
792     pred.next = next;
793 jsr166 1.2 }
794 dl 1.82 else if (subscribed = subscriber.equals(b.subscriber))
795     break;
796 dl 1.21 else
797     pred = b;
798 dl 1.1 }
799 dl 1.82 } finally {
800     lock.unlock();
801 dl 1.1 }
802     }
803 dl 1.82 return subscribed;
804 dl 1.1 }
805    
806     /**
807 dl 1.21 * Returns an estimate of the minimum number of items requested
808 jsr166 1.49 * (via {@link Flow.Subscription#request(long) request}) but not
809     * yet produced, among all current subscribers.
810 dl 1.21 *
811     * @return the estimate, or zero if no subscribers
812     */
813     public long estimateMinimumDemand() {
814     long min = Long.MAX_VALUE;
815     boolean nonEmpty = false;
816 dl 1.82 ReentrantLock lock = this.lock;
817     lock.lock();
818     try {
819 dl 1.21 BufferedSubscription<T> pred = null, next;
820     for (BufferedSubscription<T> b = clients; b != null; b = next) {
821     int n; long d;
822     next = b.next;
823     if ((n = b.estimateLag()) < 0) {
824 dl 1.29 b.next = null;
825 dl 1.21 if (pred == null)
826     clients = next;
827     else
828     pred.next = next;
829     }
830     else {
831     if ((d = b.demand - n) < min)
832     min = d;
833     nonEmpty = true;
834 dl 1.32 pred = b;
835 dl 1.21 }
836     }
837 dl 1.82 } finally {
838     lock.unlock();
839 dl 1.21 }
840 jsr166 1.22 return nonEmpty ? min : 0;
841 dl 1.21 }
842    
843     /**
844     * Returns an estimate of the maximum number of items produced but
845     * not yet consumed among all current subscribers.
846 dl 1.17 *
847     * @return the estimate
848 dl 1.1 */
849 dl 1.21 public int estimateMaximumLag() {
850     int max = 0;
851 dl 1.82 ReentrantLock lock = this.lock;
852     lock.lock();
853     try {
854 dl 1.17 BufferedSubscription<T> pred = null, next;
855     for (BufferedSubscription<T> b = clients; b != null; b = next) {
856 dl 1.21 int n;
857 dl 1.17 next = b.next;
858 dl 1.21 if ((n = b.estimateLag()) < 0) {
859 dl 1.29 b.next = null;
860 dl 1.17 if (pred == null)
861     clients = next;
862     else
863     pred.next = next;
864     }
865 dl 1.32 else {
866     if (n > max)
867     max = n;
868     pred = b;
869     }
870 dl 1.17 }
871 dl 1.82 } finally {
872     lock.unlock();
873 dl 1.17 }
874 dl 1.21 return max;
875 dl 1.1 }
876    
877 dl 1.25 /**
878 jsr166 1.49 * Processes all published items using the given Consumer function.
879     * Returns a CompletableFuture that is completed normally when this
880     * publisher signals {@link Flow.Subscriber#onComplete()
881     * onComplete}, or completed exceptionally upon any error, or an
882     * exception is thrown by the Consumer, or the returned
883     * CompletableFuture is cancelled, in which case no further items
884     * are processed.
885 dl 1.45 *
886     * @param consumer the function applied to each onNext item
887     * @return a CompletableFuture that is completed normally
888     * when the publisher signals onComplete, and exceptionally
889 jsr166 1.47 * upon any error or cancellation
890 dl 1.45 * @throws NullPointerException if consumer is null
891     */
892     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
893     if (consumer == null)
894     throw new NullPointerException();
895     CompletableFuture<Void> status = new CompletableFuture<>();
896     subscribe(new ConsumerSubscriber<T>(status, consumer));
897     return status;
898     }
899    
900     /** Subscriber for method consume */
901 dl 1.72 static final class ConsumerSubscriber<T> implements Subscriber<T> {
902 dl 1.45 final CompletableFuture<Void> status;
903     final Consumer<? super T> consumer;
904 dl 1.72 Subscription subscription;
905 dl 1.45 ConsumerSubscriber(CompletableFuture<Void> status,
906     Consumer<? super T> consumer) {
907     this.status = status; this.consumer = consumer;
908     }
909 dl 1.72 public final void onSubscribe(Subscription subscription) {
910 dl 1.45 this.subscription = subscription;
911 dl 1.55 status.whenComplete((v, e) -> subscription.cancel());
912     if (!status.isDone())
913 dl 1.45 subscription.request(Long.MAX_VALUE);
914     }
915     public final void onError(Throwable ex) {
916     status.completeExceptionally(ex);
917     }
918     public final void onComplete() {
919     status.complete(null);
920     }
921     public final void onNext(T item) {
922 dl 1.55 try {
923     consumer.accept(item);
924     } catch (Throwable ex) {
925 dl 1.45 subscription.cancel();
926 dl 1.55 status.completeExceptionally(ex);
927 dl 1.45 }
928     }
929     }
930    
931     /**
932 dl 1.25 * A task for consuming buffer items and signals, created and
933     * executed whenever they become available. A task consumes as
934     * many items/signals as possible before terminating, at which
935     * point another task is created when needed. The dual Runnable
936     * and ForkJoinTask declaration saves overhead when executed by
937     * ForkJoinPools, without impacting other kinds of Executors.
938     */
939     @SuppressWarnings("serial")
940     static final class ConsumerTask<T> extends ForkJoinTask<Void>
941 dl 1.59 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
942 dl 1.25 final BufferedSubscription<T> consumer;
943     ConsumerTask(BufferedSubscription<T> consumer) {
944     this.consumer = consumer;
945     }
946     public final Void getRawResult() { return null; }
947     public final void setRawResult(Void v) {}
948     public final boolean exec() { consumer.consume(); return false; }
949     public final void run() { consumer.consume(); }
950     }
951    
952 dl 1.1 /**
953 dl 1.72 * A resizable array-based ring buffer with integrated control to
954     * start a consumer task whenever items are available. The buffer
955     * algorithm is specialized for the case of at most one concurrent
956     * producer and consumer, and power of two buffer sizes. It relies
957     * primarily on atomic operations (CAS or getAndSet) at the next
958     * array slot to put or take an element, at the "tail" and "head"
959     * indices written only by the producer and consumer respectively.
960     *
961     * We ensure internally that there is at most one active consumer
962     * task at any given time. The publisher guarantees a single
963     * producer via its lock. Sync among producers and consumers
964     * relies on volatile fields "ctl", "demand", and "waiting" (along
965     * with element access). Other variables are accessed in plain
966     * mode, relying on outer ordering and exclusion, and/or enclosing
967     * them within other volatile accesses. Some atomic operations are
968     * avoided by tracking single threaded ownership by producers (in
969     * the style of biased locking).
970     *
971     * Execution control and protocol state are managed using field
972     * "ctl". Methods to subscribe, close, request, and cancel set
973     * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
974     * and ensure that a task is running. (The corresponding consumer
975     * side actions are in method consume.) To avoid starting a new
976     * task on each action, ctl also includes a keep-alive bit
977     * (ACTIVE) that is refreshed if needed on producer actions.
978     * (Maintaining agreement about keep-alives requires most atomic
979     * updates to be full SC/Volatile strength, which is still much
980     * cheaper than using one task per item.) Error signals
981     * additionally null out items and/or fields to reduce termination
982     * latency. The cancel() method is supported by treating as ERROR
983     * but suppressing onError signal.
984 dl 1.1 *
985     * Support for blocking also exploits the fact that there is only
986     * one possible waiter. ManagedBlocker-compatible control fields
987     * are placed in this class itself rather than in wait-nodes.
988 dl 1.72 * Blocking control relies on the "waiting" and "waiter"
989     * fields. Producers set them before trying to block. Signalling
990     * unparks and clears fields. If the producer and/or consumer are
991     * using a ForkJoinPool, the producer attempts to help run
992     * consumer tasks via ForkJoinPool.helpAsyncBlocker before
993     * blocking.
994     *
995     * Usages of this class may encounter any of several forms of
996     * memory contention. We try to ameliorate across them without
997     * unduly impacting footprints in low-contention usages where it
998     * isn't needed. Buffer arrays start out small and grow only as
999     * needed. The class uses @Contended and heuristic field
1000     * declaration ordering to reduce false-sharing memory contention
1001     * across instances of BufferedSubscription (as in, multiple
1002     * subscribers per publisher). We additionally segregate some
1003     * fields that would otherwise nearly always encounter cache line
1004     * contention among producers and consumers. To reduce contention
1005 jsr166 1.74 * across time (vs space), consumers only periodically update
1006 dl 1.72 * other fields (see method takeItems), at the expense of possibly
1007     * staler reporting of lags and demand (bounded at 12.5% == 1/8
1008     * capacity) and possibly more atomic operations.
1009     *
1010     * Other forms of imbalance and slowdowns can occur during startup
1011     * when producer and consumer methods are compiled and/or memory
1012     * is allocated at different rates. This is ameliorated by
1013     * artificially subdividing some consumer methods, including
1014     * isolation of all subscriber callbacks. This code also includes
1015     * typical power-of-two array screening idioms to avoid compilers
1016     * generating traps, along with the usual SSA-based inline
1017     * assignment coding style. Also, all methods and fields have
1018     * default visibility to simplify usage by callers.
1019 dl 1.1 */
1020 dl 1.15 @SuppressWarnings("serial")
1021 jsr166 1.57 @jdk.internal.vm.annotation.Contended
1022 dl 1.72 static final class BufferedSubscription<T>
1023     implements Subscription, ForkJoinPool.ManagedBlocker {
1024     long timeout; // Long.MAX_VALUE if untimed wait
1025     int head; // next position to take
1026     int tail; // next position to put
1027     final int maxCapacity; // max buffer size
1028 dl 1.27 volatile int ctl; // atomic run state flags
1029 dl 1.72 Object[] array; // buffer
1030     final Subscriber<? super T> subscriber;
1031     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
1032     Executor executor; // null on error
1033     Thread waiter; // blocked producer thread
1034     Throwable pendingError; // holds until onError issued
1035 dl 1.27 BufferedSubscription<T> next; // used only by publisher
1036     BufferedSubscription<T> nextRetry; // used only by publisher
1037 dl 1.1
1038 dl 1.72 @jdk.internal.vm.annotation.Contended("c") // segregate
1039     volatile long demand; // # unfilled requests
1040     @jdk.internal.vm.annotation.Contended("c")
1041     volatile int waiting; // nonzero if producer blocked
1042    
1043     // ctl bit values
1044     static final int CLOSED = 0x01; // if set, other bits ignored
1045     static final int ACTIVE = 0x02; // keep-alive for consumer task
1046     static final int REQS = 0x04; // (possibly) nonzero demand
1047     static final int ERROR = 0x08; // issues onError when noticed
1048     static final int COMPLETE = 0x10; // issues onComplete when done
1049     static final int RUN = 0x20; // task is or will be running
1050     static final int OPEN = 0x40; // true after subscribe
1051 dl 1.1
1052 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1053 dl 1.16
1054 dl 1.72 BufferedSubscription(Subscriber<? super T> subscriber,
1055 dl 1.36 Executor executor,
1056 dl 1.72 BiConsumer<? super Subscriber<? super T>,
1057 dl 1.36 ? super Throwable> onNextHandler,
1058 dl 1.72 Object[] array,
1059 dl 1.36 int maxBufferCapacity) {
1060 dl 1.1 this.subscriber = subscriber;
1061     this.executor = executor;
1062 dl 1.36 this.onNextHandler = onNextHandler;
1063 dl 1.72 this.array = array;
1064 dl 1.1 this.maxCapacity = maxBufferCapacity;
1065     }
1066    
1067 dl 1.72 // Wrappers for some VarHandle methods
1068    
1069     final boolean weakCasCtl(int cmp, int val) {
1070     return CTL.weakCompareAndSet(this, cmp, val);
1071     }
1072    
1073     final int getAndBitwiseOrCtl(int bits) {
1074     return (int)CTL.getAndBitwiseOr(this, bits);
1075     }
1076    
1077     final long subtractDemand(int k) {
1078     long n = (long)(-k);
1079     return n + (long)DEMAND.getAndAdd(this, n);
1080     }
1081    
1082     final boolean casDemand(long cmp, long val) {
1083     return DEMAND.compareAndSet(this, cmp, val);
1084     }
1085    
1086     // Utilities used by SubmissionPublisher
1087    
1088     /**
1089     * Returns true if closed (consumer task may still be running).
1090     */
1091     final boolean isClosed() {
1092     return (ctl & CLOSED) != 0;
1093 dl 1.20 }
1094    
1095 dl 1.21 /**
1096 dl 1.72 * Returns estimated number of buffered items, or negative if
1097     * closed.
1098 dl 1.21 */
1099     final int estimateLag() {
1100 dl 1.72 int c = ctl, n = tail - head;
1101     return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1102 dl 1.19 }
1103    
1104 dl 1.72 // Methods for submitting items
1105    
1106 dl 1.1 /**
1107 dl 1.16 * Tries to add item and start consumer task if necessary.
1108 dl 1.72 * @return negative if closed, 0 if saturated, else estimated lag
1109 dl 1.1 */
1110 dl 1.72 final int offer(T item, boolean unowned) {
1111     Object[] a;
1112     int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1113     int t = tail, i = t & (cap - 1), n = t + 1 - head;
1114     if (cap > 0) {
1115     boolean added;
1116     if (n >= cap && cap < maxCapacity) // resize
1117 jsr166 1.80 added = growAndOffer(item, a, t);
1118 dl 1.72 else if (n >= cap || unowned) // need volatile CAS
1119     added = QA.compareAndSet(a, i, null, item);
1120     else { // can use release mode
1121     QA.setRelease(a, i, item);
1122     added = true;
1123     }
1124     if (added) {
1125     tail = t + 1;
1126     stat = n;
1127     }
1128 dl 1.1 }
1129 dl 1.72 return startOnOffer(stat);
1130 dl 1.27 }
1131    
1132     /**
1133 dl 1.72 * Tries to expand buffer and add item, returning true on
1134     * success. Currently fails only if out of memory.
1135 dl 1.27 */
1136 jsr166 1.80 final boolean growAndOffer(T item, Object[] a, int t) {
1137 dl 1.72 int cap = 0, newCap = 0;
1138     Object[] newArray = null;
1139     if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1140     try {
1141     newArray = new Object[newCap];
1142     } catch (OutOfMemoryError ex) {
1143 dl 1.34 }
1144 dl 1.27 }
1145 dl 1.72 if (newArray == null)
1146     return false;
1147     else { // take and move items
1148     int newMask = newCap - 1;
1149     newArray[t-- & newMask] = item;
1150     for (int mask = cap - 1, k = mask; k >= 0; --k) {
1151     Object x = QA.getAndSet(a, t & mask, null);
1152     if (x == null)
1153     break; // already consumed
1154     else
1155     newArray[t-- & newMask] = x;
1156 dl 1.16 }
1157 dl 1.72 array = newArray;
1158     VarHandle.releaseFence(); // release array and slots
1159     return true;
1160 dl 1.16 }
1161 dl 1.1 }
1162    
1163     /**
1164 dl 1.72 * Version of offer for retries (no resize or bias)
1165 dl 1.20 */
1166 dl 1.72 final int retryOffer(T item) {
1167     Object[] a;
1168     int stat = 0, t = tail, h = head, cap;
1169     if ((a = array) != null && (cap = a.length) > 0 &&
1170     QA.compareAndSet(a, (cap - 1) & t, null, item))
1171     stat = (tail = t + 1) - h;
1172     return startOnOffer(stat);
1173 dl 1.20 }
1174    
1175     /**
1176 dl 1.72 * Tries to start consumer task after offer.
1177     * @return negative if now closed, else argument
1178 dl 1.20 */
1179 dl 1.72 final int startOnOffer(int stat) {
1180     int c; // start or keep alive if requests exist and not active
1181     if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1182     ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1183     tryStart();
1184     else if ((c & CLOSED) != 0)
1185     stat = -1;
1186 dl 1.20 return stat;
1187     }
1188    
1189     /**
1190 dl 1.72 * Tries to start consumer task. Sets error state on failure.
1191 dl 1.27 */
1192 dl 1.72 final void tryStart() {
1193     try {
1194     Executor e;
1195     ConsumerTask<T> task = new ConsumerTask<T>(this);
1196     if ((e = executor) != null) // skip if disabled on error
1197     e.execute(task);
1198     } catch (RuntimeException | Error ex) {
1199     getAndBitwiseOrCtl(ERROR | CLOSED);
1200     throw ex;
1201 dl 1.27 }
1202     }
1203    
1204 dl 1.72 // Signals to consumer tasks
1205 dl 1.34
1206 dl 1.27 /**
1207 dl 1.72 * Sets the given control bits, starting task if not running or closed.
1208     * @param bits state bits, assumed to include RUN but not CLOSED
1209 dl 1.1 */
1210 dl 1.72 final void startOnSignal(int bits) {
1211 dl 1.76 if ((ctl & bits) != bits &&
1212     (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1213 dl 1.72 tryStart();
1214     }
1215    
1216     final void onSubscribe() {
1217     startOnSignal(RUN | ACTIVE);
1218     }
1219    
1220     final void onComplete() {
1221     startOnSignal(RUN | ACTIVE | COMPLETE);
1222 dl 1.1 }
1223    
1224 dl 1.19 final void onError(Throwable ex) {
1225 dl 1.72 int c; Object[] a; // to null out buffer on async error
1226     if (ex != null)
1227     pendingError = ex; // races are OK
1228     if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1229     if ((c & RUN) == 0)
1230     tryStart();
1231     else if ((a = array) != null)
1232     Arrays.fill(a, null);
1233     }
1234     }
1235    
1236     public final void cancel() {
1237     onError(null);
1238     }
1239    
1240     public final void request(long n) {
1241     if (n > 0L) {
1242     for (;;) {
1243     long p = demand, d = p + n; // saturate
1244     if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1245     break;
1246 dl 1.19 }
1247 dl 1.76 startOnSignal(RUN | ACTIVE | REQS);
1248 dl 1.19 }
1249 dl 1.72 else
1250     onError(new IllegalArgumentException(
1251     "non-positive subscription request"));
1252 dl 1.19 }
1253    
1254 dl 1.72 // Consumer task actions
1255    
1256 dl 1.19 /**
1257 dl 1.72 * Consumer loop, called from ConsumerTask, or indirectly when
1258     * helping during submit.
1259 dl 1.1 */
1260 dl 1.72 final void consume() {
1261     Subscriber<? super T> s;
1262     if ((s = subscriber) != null) { // hoist checks
1263     subscribeOnOpen(s);
1264     long d = demand;
1265     for (int h = head, t = tail;;) {
1266     int c, taken; boolean empty;
1267     if (((c = ctl) & ERROR) != 0) {
1268     closeOnError(s, null);
1269     break;
1270     }
1271     else if ((taken = takeItems(s, d, h)) > 0) {
1272     head = h += taken;
1273     d = subtractDemand(taken);
1274     }
1275     else if ((d = demand) == 0L && (c & REQS) != 0)
1276     weakCasCtl(c, c & ~REQS); // exhausted demand
1277     else if (d != 0L && (c & REQS) == 0)
1278     weakCasCtl(c, c | REQS); // new demand
1279 dl 1.78 else if (t == (t = tail)) { // stability check
1280     if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1281     closeOnComplete(s); // end of stream
1282     break;
1283     }
1284     else if (empty || d == 0L) {
1285     int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1286     if (weakCasCtl(c, c & ~bit) && bit == RUN)
1287     break; // un-keep-alive or exit
1288     }
1289 dl 1.1 }
1290     }
1291     }
1292     }
1293    
1294 dl 1.72 /**
1295     * Consumes some items until unavailable or bound or error.
1296     *
1297     * @param s subscriber
1298     * @param d current demand
1299     * @param h current head
1300     * @return number taken
1301     */
1302     final int takeItems(Subscriber<? super T> s, long d, int h) {
1303     Object[] a;
1304     int k = 0, cap;
1305     if ((a = array) != null && (cap = a.length) > 0) {
1306     int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1307     int n = (d < (long)b) ? (int)d : b;
1308     for (; k < n; ++h, ++k) {
1309     Object x = QA.getAndSet(a, h & m, null);
1310     if (waiting != 0)
1311     signalWaiter();
1312     if (x == null)
1313     break;
1314     else if (!consumeNext(s, x))
1315     break;
1316 dl 1.1 }
1317     }
1318 dl 1.72 return k;
1319 dl 1.1 }
1320    
1321 dl 1.72 final boolean consumeNext(Subscriber<? super T> s, Object x) {
1322     try {
1323     @SuppressWarnings("unchecked") T y = (T) x;
1324     if (s != null)
1325     s.onNext(y);
1326     return true;
1327     } catch (Throwable ex) {
1328     handleOnNext(s, ex);
1329     return false;
1330 dl 1.1 }
1331     }
1332    
1333 dl 1.16 /**
1334 dl 1.72 * Processes exception in Subscriber.onNext.
1335 dl 1.16 */
1336 dl 1.72 final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1337     BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1338     try {
1339     if ((h = onNextHandler) != null)
1340     h.accept(s, ex);
1341     } catch (Throwable ignore) {
1342 dl 1.1 }
1343 dl 1.72 closeOnError(s, ex);
1344 dl 1.1 }
1345    
1346 dl 1.16 /**
1347 dl 1.72 * Issues subscriber.onSubscribe if this is first signal.
1348 dl 1.16 */
1349 dl 1.72 final void subscribeOnOpen(Subscriber<? super T> s) {
1350     if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1351     consumeSubscribe(s);
1352     }
1353    
1354     final void consumeSubscribe(Subscriber<? super T> s) {
1355     try {
1356     if (s != null) // ignore if disabled
1357     s.onSubscribe(this);
1358     } catch (Throwable ex) {
1359     closeOnError(s, ex);
1360 dl 1.16 }
1361     }
1362    
1363 dl 1.72 /**
1364     * Issues subscriber.onComplete unless already closed.
1365     */
1366     final void closeOnComplete(Subscriber<? super T> s) {
1367     if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1368     consumeComplete(s);
1369 dl 1.1 }
1370    
1371 dl 1.72 final void consumeComplete(Subscriber<? super T> s) {
1372     try {
1373     if (s != null)
1374     s.onComplete();
1375     } catch (Throwable ignore) {
1376 dl 1.1 }
1377     }
1378    
1379 dl 1.29 /**
1380 dl 1.72 * Issues subscriber.onError, and unblocks producer if needed.
1381 dl 1.29 */
1382 dl 1.72 final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1383     if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1384     if (ex == null)
1385     ex = pendingError;
1386     pendingError = null; // detach
1387     executor = null; // suppress racing start calls
1388     signalWaiter();
1389     consumeError(s, ex);
1390     }
1391     }
1392    
1393     final void consumeError(Subscriber<? super T> s, Throwable ex) {
1394     try {
1395     if (ex != null && s != null)
1396     s.onError(ex);
1397     } catch (Throwable ignore) {
1398 dl 1.34 }
1399     }
1400    
1401 dl 1.72 // Blocking support
1402    
1403 dl 1.34 /**
1404 dl 1.72 * Unblocks waiting producer.
1405 dl 1.34 */
1406 dl 1.72 final void signalWaiter() {
1407     Thread w;
1408     waiting = 0;
1409     if ((w = waiter) != null)
1410     LockSupport.unpark(w);
1411 dl 1.34 }
1412    
1413     /**
1414 dl 1.72 * Returns true if closed or space available.
1415     * For ManagedBlocker.
1416 dl 1.34 */
1417 dl 1.72 public final boolean isReleasable() {
1418     Object[] a; int cap;
1419     return ((ctl & CLOSED) != 0 ||
1420     ((a = array) != null && (cap = a.length) > 0 &&
1421     QA.getAcquire(a, (cap - 1) & tail) == null));
1422 dl 1.34 }
1423    
1424     /**
1425 dl 1.72 * Helps or blocks until timeout, closed, or space available.
1426 dl 1.34 */
1427 dl 1.72 final void awaitSpace(long nanos) {
1428     if (!isReleasable()) {
1429     ForkJoinPool.helpAsyncBlocker(executor, this);
1430     if (!isReleasable()) {
1431     timeout = nanos;
1432     try {
1433     ForkJoinPool.managedBlock(this);
1434     } catch (InterruptedException ie) {
1435     timeout = INTERRUPTED;
1436     }
1437     if (timeout == INTERRUPTED)
1438     Thread.currentThread().interrupt();
1439     }
1440 dl 1.34 }
1441     }
1442    
1443     /**
1444 dl 1.72 * Blocks until closed, space available or timeout.
1445     * For ManagedBlocker.
1446 dl 1.34 */
1447 dl 1.72 public final boolean block() {
1448     long nanos = timeout;
1449     boolean timed = (nanos < Long.MAX_VALUE);
1450     long deadline = timed ? System.nanoTime() + nanos : 0L;
1451     while (!isReleasable()) {
1452     if (Thread.interrupted()) {
1453     timeout = INTERRUPTED;
1454     if (timed)
1455     break;
1456 dl 1.36 }
1457 dl 1.72 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1458     break;
1459     else if (waiter == null)
1460     waiter = Thread.currentThread();
1461     else if (waiting == 0)
1462     waiting = 1;
1463     else if (timed)
1464     LockSupport.parkNanos(this, nanos);
1465     else
1466     LockSupport.park(this);
1467 dl 1.1 }
1468 dl 1.72 waiter = null;
1469     waiting = 0;
1470     return true;
1471 dl 1.1 }
1472    
1473 dl 1.61 // VarHandle mechanics
1474 dl 1.72 static final VarHandle CTL;
1475     static final VarHandle DEMAND;
1476     static final VarHandle QA;
1477 dl 1.1
1478     static {
1479     try {
1480 dl 1.61 MethodHandles.Lookup l = MethodHandles.lookup();
1481     CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1482     int.class);
1483     DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1484     long.class);
1485     QA = MethodHandles.arrayElementVarHandle(Object[].class);
1486 jsr166 1.9 } catch (ReflectiveOperationException e) {
1487 jsr166 1.81 throw new ExceptionInInitializerError(e);
1488 dl 1.1 }
1489 jsr166 1.33
1490     // Reduce the risk of rare disastrous classloading in first call to
1491     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1492     Class<?> ensureLoaded = LockSupport.class;
1493 dl 1.1 }
1494     }
1495     }