ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.69
Committed: Fri Mar 3 00:44:56 2017 UTC (7 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.68: +2 -1 lines
Log Message:
Ensure subscriber consistency about close vs closeExceptionally

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 dl 1.61 import java.lang.invoke.MethodHandles;
10     import java.lang.invoke.VarHandle;
11 jsr166 1.10 import java.util.ArrayList;
12     import java.util.List;
13 dl 1.1 import java.util.concurrent.locks.LockSupport;
14 dl 1.36 import java.util.function.BiConsumer;
15 dl 1.1 import java.util.function.BiPredicate;
16 dl 1.45 import java.util.function.Consumer;
17 dl 1.1
18     /**
19 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
20     * (non-null) items to current subscribers until it is closed. Each
21     * current subscriber receives newly submitted items in the same order
22     * unless drops or exceptions are encountered. Using a
23 dl 1.51 * SubmissionPublisher allows item generators to act as compliant <a
24     * href="http://www.reactive-streams.org/"> reactive-streams</a>
25     * Publishers relying on drop handling and/or blocking for flow
26     * control.
27 dl 1.1 *
28 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
29 dl 1.1 * constructor for delivery to subscribers. The best choice of
30     * Executor depends on expected usage. If the generator(s) of
31     * submitted items run in separate threads, and the number of
32     * subscribers can be estimated, consider using a {@link
33 dl 1.46 * Executors#newFixedThreadPool}. Otherwise consider using the
34     * default, normally the {@link ForkJoinPool#commonPool}.
35 dl 1.1 *
36     * <p>Buffering allows producers and consumers to transiently operate
37     * at different rates. Each subscriber uses an independent buffer.
38 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
39 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
40 dl 1.21 * nearest power of two and/or bounded by the largest value supported
41     * by this implementation.) Invocations of {@link
42 jsr166 1.49 * Flow.Subscription#request(long) request} do not directly result in
43     * buffer expansion, but risk saturation if unfilled requests exceed
44     * the maximum capacity. The default value of {@link
45     * Flow#defaultBufferSize()} may provide a useful starting point for
46 dl 1.25 * choosing a capacity based on expected rates, resources, and usages.
47 dl 1.18 *
48 dl 1.1 * <p>Publication methods support different policies about what to do
49 jsr166 1.49 * when buffers are saturated. Method {@link #submit(Object) submit}
50     * blocks until resources are available. This is simplest, but least
51     * responsive. The {@code offer} methods may drop items (either
52     * immediately or with bounded timeout), but provide an opportunity to
53     * interpose a handler and then retry.
54 dl 1.1 *
55     * <p>If any Subscriber method throws an exception, its subscription
56 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
57     * it is invoked before cancellation upon an exception in method
58 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
59     * {@link Flow.Subscriber#onSubscribe onSubscribe},
60 jsr166 1.49 * {@link Flow.Subscriber#onError(Throwable) onError} and
61     * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
62 jsr166 1.41 * handled before cancellation. If the supplied Executor throws
63     * {@link RejectedExecutionException} (or any other RuntimeException
64     * or Error) when attempting to execute a task, or a drop handler
65     * throws an exception when processing a dropped item, then the
66     * exception is rethrown. In these cases, not all subscribers will
67     * have been issued the published item. It is usually good practice to
68     * {@link #closeExceptionally closeExceptionally} in these cases.
69 dl 1.1 *
70 jsr166 1.49 * <p>Method {@link #consume(Consumer)} simplifies support for a
71     * common case in which the only action of a subscriber is to request
72     * and process all items using a supplied function.
73 dl 1.45 *
74 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
75     * that generate items, and use the methods in this class to publish
76     * them. For example here is a class that periodically publishes the
77     * items generated from a supplier. (In practice you might add methods
78 dl 1.35 * to independently start and stop generation, to share Executors
79 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
80     * component rather than a superclass.)
81 dl 1.15 *
82     * <pre> {@code
83     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
84     * final ScheduledFuture<?> periodicTask;
85     * final ScheduledExecutorService scheduler;
86 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
87     * Supplier<? extends T> supplier,
88 dl 1.15 * long period, TimeUnit unit) {
89 dl 1.21 * super(executor, maxBufferCapacity);
90 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
91     * periodicTask = scheduler.scheduleAtFixedRate(
92     * () -> submit(supplier.get()), 0, period, unit);
93     * }
94     * public void close() {
95     * periodicTask.cancel(false);
96     * scheduler.shutdown();
97     * super.close();
98     * }
99     * }}</pre>
100     *
101 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
102     * It uses single-step requests to its publisher for simplicity of
103     * illustration. A more adaptive version could monitor flow using the
104 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
105 dl 1.21 * methods.
106 dl 1.16 *
107     * <pre> {@code
108     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
109     * implements Flow.Processor<S,T> {
110     * final Function<? super S, ? extends T> function;
111     * Flow.Subscription subscription;
112 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
113 dl 1.16 * Function<? super S, ? extends T> function) {
114 dl 1.21 * super(executor, maxBufferCapacity);
115 dl 1.16 * this.function = function;
116     * }
117     * public void onSubscribe(Flow.Subscription subscription) {
118     * (this.subscription = subscription).request(1);
119     * }
120     * public void onNext(S item) {
121     * subscription.request(1);
122     * submit(function.apply(item));
123     * }
124     * public void onError(Throwable ex) { closeExceptionally(ex); }
125     * public void onComplete() { close(); }
126     * }}</pre>
127     *
128 dl 1.1 * @param <T> the published item type
129     * @author Doug Lea
130 jsr166 1.58 * @since 9
131 dl 1.1 */
132     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
133     AutoCloseable {
134     /*
135     * Most mechanics are handled by BufferedSubscription. This class
136 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
137     * built-in synchronization locks across public methods. (Using
138     * built-in locks works well in the most typical case in which
139     * only one thread submits items).
140 dl 1.1 */
141    
142 dl 1.25 /** The largest possible power of two array size. */
143 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
144 dl 1.1
145 dl 1.40 /** Round capacity to power of 2, at most limit. */
146     static final int roundCapacity(int cap) {
147 dl 1.1 int n = cap - 1;
148     n |= n >>> 1;
149     n |= n >>> 2;
150     n |= n >>> 4;
151     n |= n >>> 8;
152     n |= n >>> 16;
153 dl 1.40 return (n <= 0) ? 1 : // at least 1
154 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
155 dl 1.1 }
156    
157 dl 1.48 // default Executor setup; nearly the same as CompletableFuture
158 dl 1.46
159     /**
160     * Default executor -- ForkJoinPool.commonPool() unless it cannot
161     * support parallelism.
162     */
163 jsr166 1.52 private static final Executor ASYNC_POOL =
164 dl 1.46 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
165     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
166    
167     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
168 jsr166 1.68 private static final class ThreadPerTaskExecutor implements Executor {
169     ThreadPerTaskExecutor() {} // prevent access constructor creation
170 dl 1.46 public void execute(Runnable r) { new Thread(r).start(); }
171     }
172    
173 dl 1.1 /**
174     * Clients (BufferedSubscriptions) are maintained in a linked list
175     * (via their "next" fields). This works well for publish loops.
176     * It requires O(n) traversal to check for duplicate subscribers,
177     * but we expect that subscribing is much less common than
178 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
179 dl 1.25 * when BufferedSubscription methods return negative values
180 dl 1.29 * signifying that they have been disabled. To reduce
181     * head-of-line blocking, submit and offer methods first call
182     * BufferedSubscription.offer on each subscriber, and place
183     * saturated ones in retries list (using nextRetry field), and
184     * retry, possibly blocking or dropping.
185 dl 1.1 */
186     BufferedSubscription<T> clients;
187    
188 dl 1.20 /** Run status, updated only within locks */
189     volatile boolean closed;
190 dl 1.36 /** If non-null, the exception in closeExceptionally */
191 dl 1.37 volatile Throwable closedException;
192 dl 1.20
193 dl 1.1 // Parameters for constructing BufferedSubscriptions
194     final Executor executor;
195 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
196 dl 1.1 final int maxBufferCapacity;
197    
198 dl 1.20 /**
199 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
200 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
201     * for each subscriber, and, if non-null, the given handler invoked
202     * when any Subscriber throws an exception in method {@link
203     * Flow.Subscriber#onNext(Object) onNext}.
204 dl 1.1 *
205     * @param executor the executor to use for async delivery,
206     * supporting creation of at least one independent thread
207     * @param maxBufferCapacity the maximum capacity for each
208 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
209     * the nearest power of two and/or bounded by the largest value
210     * supported by this implementation; method {@link #getMaxBufferCapacity}
211     * returns the actual value)
212 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
213 jsr166 1.49 * thrown in method {@code onNext}
214 dl 1.1 * @throws NullPointerException if executor is null
215 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
216     * positive
217 dl 1.1 */
218 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
219     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
220 dl 1.21 if (executor == null)
221     throw new NullPointerException();
222     if (maxBufferCapacity <= 0)
223     throw new IllegalArgumentException("capacity must be positive");
224 dl 1.18 this.executor = executor;
225 dl 1.36 this.onNextHandler = handler;
226 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
227 dl 1.18 }
228    
229 dl 1.1 /**
230 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
231 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
232     * for each subscriber, and no handler for Subscriber exceptions in
233     * method {@link Flow.Subscriber#onNext(Object) onNext}.
234 dl 1.36 *
235     * @param executor the executor to use for async delivery,
236     * supporting creation of at least one independent thread
237     * @param maxBufferCapacity the maximum capacity for each
238     * subscriber's buffer (the enforced capacity may be rounded up to
239     * the nearest power of two and/or bounded by the largest value
240     * supported by this implementation; method {@link #getMaxBufferCapacity}
241     * returns the actual value)
242     * @throws NullPointerException if executor is null
243     * @throws IllegalArgumentException if maxBufferCapacity not
244     * positive
245     */
246     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
247     this(executor, maxBufferCapacity, null);
248     }
249    
250     /**
251 dl 1.25 * Creates a new SubmissionPublisher using the {@link
252 dl 1.46 * ForkJoinPool#commonPool()} for async delivery to subscribers
253 jsr166 1.49 * (unless it does not support a parallelism level of at least two,
254     * in which case, a new Thread is created to run each task), with
255     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
256     * handler for Subscriber exceptions in method {@link
257     * Flow.Subscriber#onNext(Object) onNext}.
258 dl 1.25 */
259     public SubmissionPublisher() {
260 jsr166 1.52 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
261 dl 1.25 }
262    
263     /**
264 jsr166 1.49 * Adds the given Subscriber unless already subscribed. If already
265     * subscribed, the Subscriber's {@link
266     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
267     * the existing subscription with an {@link IllegalStateException}.
268     * Otherwise, upon success, the Subscriber's {@link
269     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
270     * asynchronously with a new {@link Flow.Subscription}. If {@link
271     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
272     * subscription is cancelled. Otherwise, if this SubmissionPublisher
273     * was closed exceptionally, then the subscriber's {@link
274     * Flow.Subscriber#onError onError} method is invoked with the
275     * corresponding exception, or if closed without exception, the
276     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
277     * method is invoked. Subscribers may enable receiving items by
278     * invoking the {@link Flow.Subscription#request(long) request}
279     * method of the new Subscription, and may unsubscribe by invoking
280     * its {@link Flow.Subscription#cancel() cancel} method.
281 dl 1.18 *
282     * @param subscriber the subscriber
283     * @throws NullPointerException if subscriber is null
284     */
285     public void subscribe(Flow.Subscriber<? super T> subscriber) {
286     if (subscriber == null) throw new NullPointerException();
287 dl 1.20 BufferedSubscription<T> subscription =
288 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
289     onNextHandler, maxBufferCapacity);
290 dl 1.19 synchronized (this) {
291 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
292     if (b == null) {
293 dl 1.36 Throwable ex;
294 dl 1.29 subscription.onSubscribe();
295 dl 1.37 if ((ex = closedException) != null)
296 dl 1.36 subscription.onError(ex);
297     else if (closed)
298 dl 1.29 subscription.onComplete();
299     else if (pred == null)
300     clients = subscription;
301 dl 1.27 else
302 dl 1.29 pred.next = subscription;
303     break;
304 dl 1.27 }
305 dl 1.29 BufferedSubscription<T> next = b.next;
306     if (b.isDisabled()) { // remove
307     b.next = null; // detach
308 dl 1.19 if (pred == null)
309 dl 1.29 clients = next;
310 dl 1.19 else
311 dl 1.29 pred.next = next;
312     }
313     else if (subscriber.equals(b.subscriber)) {
314     b.onError(new IllegalStateException("Duplicate subscribe"));
315     break;
316 dl 1.19 }
317 dl 1.29 else
318     pred = b;
319     b = next;
320 dl 1.19 }
321     }
322 dl 1.18 }
323    
324     /**
325 dl 1.16 * Publishes the given item to each current subscriber by
326 jsr166 1.49 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
327     * onNext} method, blocking uninterruptibly while resources for any
328 dl 1.30 * subscriber are unavailable. This method returns an estimate of
329 jsr166 1.49 * the maximum lag (number of items submitted but not yet consumed)
330     * among all current subscribers. This value is at least one
331     * (accounting for this submitted item) if there are any
332 dl 1.30 * subscribers, else zero.
333 dl 1.16 *
334     * <p>If the Executor for this publisher throws a
335     * RejectedExecutionException (or any other RuntimeException or
336     * Error) when attempting to asynchronously notify subscribers,
337 dl 1.31 * then this exception is rethrown, in which case not all
338     * subscribers will have been issued this item.
339 dl 1.16 *
340     * @param item the (non-null) item to publish
341 dl 1.21 * @return the estimated maximum lag among subscribers
342 dl 1.16 * @throws IllegalStateException if closed
343     * @throws NullPointerException if item is null
344     * @throws RejectedExecutionException if thrown by Executor
345     */
346 dl 1.21 public int submit(T item) {
347 dl 1.16 if (item == null) throw new NullPointerException();
348 dl 1.21 int lag = 0;
349 dl 1.27 boolean complete;
350 dl 1.16 synchronized (this) {
351 dl 1.27 complete = closed;
352 dl 1.29 BufferedSubscription<T> b = clients;
353 dl 1.27 if (!complete) {
354 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
355 dl 1.27 while (b != null) {
356     BufferedSubscription<T> next = b.next;
357 dl 1.29 int stat = b.offer(item);
358     if (stat < 0) { // disabled
359     b.next = null;
360 dl 1.27 if (pred == null)
361     clients = next;
362 dl 1.24 else
363 dl 1.27 pred.next = next;
364 dl 1.24 }
365 dl 1.27 else {
366 dl 1.29 if (stat > lag)
367     lag = stat;
368     else if (stat == 0) { // place on retry list
369     b.nextRetry = null;
370 dl 1.27 if (rtail == null)
371 dl 1.29 r = b;
372 dl 1.27 else
373     rtail.nextRetry = b;
374     rtail = b;
375     }
376     pred = b;
377     }
378     b = next;
379 dl 1.21 }
380 dl 1.29 while (r != null) {
381     BufferedSubscription<T> nextRetry = r.nextRetry;
382     r.nextRetry = null;
383     int stat = r.submit(item);
384     if (stat > lag)
385     lag = stat;
386     else if (stat < 0 && clients == r)
387     clients = r.next; // postpone internal unsubscribes
388     r = nextRetry;
389     }
390 dl 1.16 }
391     }
392 dl 1.27 if (complete)
393     throw new IllegalStateException("Closed");
394     else
395     return lag;
396 dl 1.16 }
397    
398     /**
399 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
400     * by asynchronously invoking its {@link
401     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
402     * dropped by one or more subscribers if resource limits are
403     * exceeded, in which case the given handler (if non-null) is
404     * invoked, and if it returns true, retried once. Other calls to
405     * methods in this class by other threads are blocked while the
406     * handler is invoked. Unless recovery is assured, options are
407     * usually limited to logging the error and/or issuing an {@link
408     * Flow.Subscriber#onError(Throwable) onError} signal to the
409     * subscriber.
410 dl 1.1 *
411 dl 1.21 * <p>This method returns a status indicator: If negative, it
412     * represents the (negative) number of drops (failed attempts to
413     * issue the item to a subscriber). Otherwise it is an estimate of
414     * the maximum lag (number of items submitted but not yet
415     * consumed) among all current subscribers. This value is at least
416     * one (accounting for this submitted item) if there are any
417     * subscribers, else zero.
418 jsr166 1.23 *
419 dl 1.1 * <p>If the Executor for this publisher throws a
420     * RejectedExecutionException (or any other RuntimeException or
421     * Error) when attempting to asynchronously notify subscribers, or
422     * the drop handler throws an exception when processing a dropped
423     * item, then this exception is rethrown.
424     *
425     * @param item the (non-null) item to publish
426     * @param onDrop if non-null, the handler invoked upon a drop to a
427     * subscriber, with arguments of the subscriber and item; if it
428     * returns true, an offer is re-attempted (once)
429 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
430     * an estimate of maximum lag
431 dl 1.1 * @throws IllegalStateException if closed
432     * @throws NullPointerException if item is null
433     * @throws RejectedExecutionException if thrown by Executor
434     */
435     public int offer(T item,
436     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
437 dl 1.29 return doOffer(0L, item, onDrop);
438 dl 1.1 }
439    
440     /**
441 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
442     * by asynchronously invoking its {@link
443     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
444     * resources for any subscription are unavailable, up to the
445     * specified timeout or until the caller thread is interrupted, at
446     * which point the given handler (if non-null) is invoked, and if it
447     * returns true, retried once. (The drop handler may distinguish
448     * timeouts from interrupts by checking whether the current thread
449     * is interrupted.) Other calls to methods in this class by other
450 dl 1.30 * threads are blocked while the handler is invoked. Unless
451     * recovery is assured, options are usually limited to logging the
452 jsr166 1.49 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
453     * onError} signal to the subscriber.
454 dl 1.1 *
455 dl 1.21 * <p>This method returns a status indicator: If negative, it
456     * represents the (negative) number of drops (failed attempts to
457     * issue the item to a subscriber). Otherwise it is an estimate of
458     * the maximum lag (number of items submitted but not yet
459     * consumed) among all current subscribers. This value is at least
460     * one (accounting for this submitted item) if there are any
461     * subscribers, else zero.
462     *
463 dl 1.1 * <p>If the Executor for this publisher throws a
464     * RejectedExecutionException (or any other RuntimeException or
465     * Error) when attempting to asynchronously notify subscribers, or
466     * the drop handler throws an exception when processing a dropped
467     * item, then this exception is rethrown.
468     *
469     * @param item the (non-null) item to publish
470     * @param timeout how long to wait for resources for any subscriber
471     * before giving up, in units of {@code unit}
472     * @param unit a {@code TimeUnit} determining how to interpret the
473     * {@code timeout} parameter
474     * @param onDrop if non-null, the handler invoked upon a drop to a
475     * subscriber, with arguments of the subscriber and item; if it
476     * returns true, an offer is re-attempted (once)
477 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
478     * an estimate of maximum lag
479 dl 1.1 * @throws IllegalStateException if closed
480     * @throws NullPointerException if item is null
481     * @throws RejectedExecutionException if thrown by Executor
482     */
483     public int offer(T item, long timeout, TimeUnit unit,
484     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
485 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
486     }
487    
488     /** Common implementation for both forms of offer */
489     final int doOffer(long nanos, T item,
490     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
491 dl 1.1 if (item == null) throw new NullPointerException();
492 dl 1.27 int lag = 0, drops = 0;
493     boolean complete;
494 jsr166 1.2 synchronized (this) {
495 dl 1.27 complete = closed;
496 dl 1.29 BufferedSubscription<T> b = clients;
497 dl 1.27 if (!complete) {
498 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
499 dl 1.27 while (b != null) {
500     BufferedSubscription<T> next = b.next;
501 dl 1.29 int stat = b.offer(item);
502     if (stat < 0) {
503     b.next = null;
504 dl 1.27 if (pred == null)
505     clients = next;
506     else
507     pred.next = next;
508     }
509     else {
510 dl 1.29 if (stat > lag)
511     lag = stat;
512     else if (stat == 0) {
513     b.nextRetry = null;
514 dl 1.27 if (rtail == null)
515 dl 1.29 r = b;
516 dl 1.27 else
517     rtail.nextRetry = b;
518     rtail = b;
519     }
520 dl 1.29 else if (stat > lag)
521 dl 1.27 lag = stat;
522     pred = b;
523     }
524     b = next;
525 dl 1.1 }
526 dl 1.29 while (r != null) {
527     BufferedSubscription<T> nextRetry = r.nextRetry;
528     r.nextRetry = null;
529 jsr166 1.64 int stat = (nanos > 0L)
530     ? r.timedOffer(item, nanos)
531     : r.offer(item);
532 dl 1.29 if (stat == 0 && onDrop != null &&
533     onDrop.test(r.subscriber, item))
534     stat = r.offer(item);
535     if (stat == 0)
536     ++drops;
537     else if (stat > lag)
538     lag = stat;
539     else if (stat < 0 && clients == r)
540     clients = r.next;
541     r = nextRetry;
542     }
543 dl 1.1 }
544     }
545 dl 1.27 if (complete)
546     throw new IllegalStateException("Closed");
547     else
548     return (drops > 0) ? -drops : lag;
549 dl 1.1 }
550    
551     /**
552 dl 1.30 * Unless already closed, issues {@link
553 jsr166 1.49 * Flow.Subscriber#onComplete() onComplete} signals to current
554     * subscribers, and disallows subsequent attempts to publish.
555     * Upon return, this method does <em>NOT</em> guarantee that all
556     * subscribers have yet completed.
557 dl 1.1 */
558     public void close() {
559     if (!closed) {
560 dl 1.32 BufferedSubscription<T> b;
561 jsr166 1.2 synchronized (this) {
562 dl 1.1 b = clients;
563     clients = null;
564     closed = true;
565     }
566     while (b != null) {
567 dl 1.32 BufferedSubscription<T> next = b.next;
568     b.next = null;
569 dl 1.19 b.onComplete();
570 dl 1.1 b = next;
571     }
572     }
573     }
574    
575     /**
576 jsr166 1.49 * Unless already closed, issues {@link
577     * Flow.Subscriber#onError(Throwable) onError} signals to current
578     * subscribers with the given error, and disallows subsequent
579     * attempts to publish. Future subscribers also receive the given
580     * error. Upon return, this method does <em>NOT</em> guarantee
581     * that all subscribers have yet completed.
582 dl 1.1 *
583 dl 1.30 * @param error the {@code onError} argument sent to subscribers
584 dl 1.1 * @throws NullPointerException if error is null
585     */
586     public void closeExceptionally(Throwable error) {
587     if (error == null)
588     throw new NullPointerException();
589     if (!closed) {
590 dl 1.32 BufferedSubscription<T> b;
591 jsr166 1.2 synchronized (this) {
592 dl 1.1 b = clients;
593     clients = null;
594 dl 1.69 if (!closed) // don't override plain close
595     closedException = error;
596 dl 1.1 closed = true;
597     }
598     while (b != null) {
599 dl 1.32 BufferedSubscription<T> next = b.next;
600     b.next = null;
601 dl 1.19 b.onError(error);
602 dl 1.1 b = next;
603     }
604     }
605     }
606    
607     /**
608     * Returns true if this publisher is not accepting submissions.
609     *
610     * @return true if closed
611     */
612     public boolean isClosed() {
613     return closed;
614     }
615    
616 dl 1.37 /**
617 jsr166 1.49 * Returns the exception associated with {@link
618     * #closeExceptionally(Throwable) closeExceptionally}, or null if
619     * not closed or if closed normally.
620 dl 1.37 *
621     * @return the exception, or null if none
622     */
623     public Throwable getClosedException() {
624     return closedException;
625     }
626    
627 dl 1.1 /**
628 jsr166 1.6 * Returns true if this publisher has any subscribers.
629 dl 1.1 *
630     * @return true if this publisher has any subscribers
631     */
632     public boolean hasSubscribers() {
633     boolean nonEmpty = false;
634     if (!closed) {
635 jsr166 1.2 synchronized (this) {
636 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
637     BufferedSubscription<T> next = b.next;
638 dl 1.19 if (b.isDisabled()) {
639 dl 1.29 b.next = null;
640 dl 1.31 b = clients = next;
641 jsr166 1.2 }
642 dl 1.1 else {
643     nonEmpty = true;
644     break;
645     }
646     }
647     }
648     }
649     return nonEmpty;
650     }
651    
652     /**
653 dl 1.21 * Returns the number of current subscribers.
654 dl 1.1 *
655 dl 1.21 * @return the number of current subscribers
656 dl 1.1 */
657 dl 1.21 public int getNumberOfSubscribers() {
658     int count = 0;
659     if (!closed) {
660     synchronized (this) {
661     BufferedSubscription<T> pred = null, next;
662     for (BufferedSubscription<T> b = clients; b != null; b = next) {
663     next = b.next;
664     if (b.isDisabled()) {
665 dl 1.29 b.next = null;
666 dl 1.21 if (pred == null)
667     clients = next;
668     else
669     pred.next = next;
670     }
671     else {
672     pred = b;
673     ++count;
674     }
675     }
676     }
677     }
678     return count;
679 dl 1.1 }
680    
681 jsr166 1.7 /**
682 dl 1.21 * Returns the Executor used for asynchronous delivery.
683 dl 1.1 *
684 dl 1.21 * @return the Executor used for asynchronous delivery
685 dl 1.1 */
686 dl 1.21 public Executor getExecutor() {
687     return executor;
688 dl 1.1 }
689    
690 jsr166 1.7 /**
691 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
692 dl 1.1 *
693 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
694 dl 1.1 */
695     public int getMaxBufferCapacity() {
696     return maxBufferCapacity;
697     }
698    
699     /**
700 dl 1.29 * Returns a list of current subscribers for monitoring and
701     * tracking purposes, not for invoking {@link Flow.Subscriber}
702     * methods on the subscribers.
703 dl 1.1 *
704     * @return list of current subscribers
705     */
706     public List<Flow.Subscriber<? super T>> getSubscribers() {
707     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
708 jsr166 1.2 synchronized (this) {
709 dl 1.1 BufferedSubscription<T> pred = null, next;
710     for (BufferedSubscription<T> b = clients; b != null; b = next) {
711     next = b.next;
712 dl 1.19 if (b.isDisabled()) {
713 dl 1.29 b.next = null;
714 dl 1.1 if (pred == null)
715     clients = next;
716     else
717     pred.next = next;
718 jsr166 1.2 }
719 dl 1.1 else
720     subs.add(b.subscriber);
721     }
722     }
723     return subs;
724     }
725 jsr166 1.2
726 dl 1.1 /**
727     * Returns true if the given Subscriber is currently subscribed.
728     *
729     * @param subscriber the subscriber
730     * @return true if currently subscribed
731 dl 1.21 * @throws NullPointerException if subscriber is null
732 dl 1.1 */
733     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
734 dl 1.21 if (subscriber == null) throw new NullPointerException();
735 jsr166 1.2 if (!closed) {
736     synchronized (this) {
737 dl 1.1 BufferedSubscription<T> pred = null, next;
738     for (BufferedSubscription<T> b = clients; b != null; b = next) {
739     next = b.next;
740 dl 1.19 if (b.isDisabled()) {
741 dl 1.29 b.next = null;
742 dl 1.1 if (pred == null)
743     clients = next;
744     else
745     pred.next = next;
746 jsr166 1.2 }
747 dl 1.21 else if (subscriber.equals(b.subscriber))
748 dl 1.1 return true;
749 dl 1.21 else
750     pred = b;
751 dl 1.1 }
752     }
753     }
754     return false;
755     }
756    
757     /**
758 dl 1.21 * Returns an estimate of the minimum number of items requested
759 jsr166 1.49 * (via {@link Flow.Subscription#request(long) request}) but not
760     * yet produced, among all current subscribers.
761 dl 1.21 *
762     * @return the estimate, or zero if no subscribers
763     */
764     public long estimateMinimumDemand() {
765     long min = Long.MAX_VALUE;
766     boolean nonEmpty = false;
767     synchronized (this) {
768     BufferedSubscription<T> pred = null, next;
769     for (BufferedSubscription<T> b = clients; b != null; b = next) {
770     int n; long d;
771     next = b.next;
772     if ((n = b.estimateLag()) < 0) {
773 dl 1.29 b.next = null;
774 dl 1.21 if (pred == null)
775     clients = next;
776     else
777     pred.next = next;
778     }
779     else {
780     if ((d = b.demand - n) < min)
781     min = d;
782     nonEmpty = true;
783 dl 1.32 pred = b;
784 dl 1.21 }
785     }
786     }
787 jsr166 1.22 return nonEmpty ? min : 0;
788 dl 1.21 }
789    
790     /**
791     * Returns an estimate of the maximum number of items produced but
792     * not yet consumed among all current subscribers.
793 dl 1.17 *
794     * @return the estimate
795 dl 1.1 */
796 dl 1.21 public int estimateMaximumLag() {
797     int max = 0;
798 dl 1.17 synchronized (this) {
799     BufferedSubscription<T> pred = null, next;
800     for (BufferedSubscription<T> b = clients; b != null; b = next) {
801 dl 1.21 int n;
802 dl 1.17 next = b.next;
803 dl 1.21 if ((n = b.estimateLag()) < 0) {
804 dl 1.29 b.next = null;
805 dl 1.17 if (pred == null)
806     clients = next;
807     else
808     pred.next = next;
809     }
810 dl 1.32 else {
811     if (n > max)
812     max = n;
813     pred = b;
814     }
815 dl 1.17 }
816     }
817 dl 1.21 return max;
818 dl 1.1 }
819    
820 dl 1.25 /**
821 jsr166 1.49 * Processes all published items using the given Consumer function.
822     * Returns a CompletableFuture that is completed normally when this
823     * publisher signals {@link Flow.Subscriber#onComplete()
824     * onComplete}, or completed exceptionally upon any error, or an
825     * exception is thrown by the Consumer, or the returned
826     * CompletableFuture is cancelled, in which case no further items
827     * are processed.
828 dl 1.45 *
829     * @param consumer the function applied to each onNext item
830     * @return a CompletableFuture that is completed normally
831     * when the publisher signals onComplete, and exceptionally
832 jsr166 1.47 * upon any error or cancellation
833 dl 1.45 * @throws NullPointerException if consumer is null
834     */
835     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
836     if (consumer == null)
837     throw new NullPointerException();
838     CompletableFuture<Void> status = new CompletableFuture<>();
839     subscribe(new ConsumerSubscriber<T>(status, consumer));
840     return status;
841     }
842    
843     /** Subscriber for method consume */
844 jsr166 1.54 private static final class ConsumerSubscriber<T>
845 dl 1.63 implements Flow.Subscriber<T> {
846 dl 1.45 final CompletableFuture<Void> status;
847     final Consumer<? super T> consumer;
848     Flow.Subscription subscription;
849     ConsumerSubscriber(CompletableFuture<Void> status,
850     Consumer<? super T> consumer) {
851     this.status = status; this.consumer = consumer;
852     }
853     public final void onSubscribe(Flow.Subscription subscription) {
854     this.subscription = subscription;
855 dl 1.55 status.whenComplete((v, e) -> subscription.cancel());
856     if (!status.isDone())
857 dl 1.45 subscription.request(Long.MAX_VALUE);
858     }
859     public final void onError(Throwable ex) {
860     status.completeExceptionally(ex);
861     }
862     public final void onComplete() {
863     status.complete(null);
864     }
865     public final void onNext(T item) {
866 dl 1.55 try {
867     consumer.accept(item);
868     } catch (Throwable ex) {
869 dl 1.45 subscription.cancel();
870 dl 1.55 status.completeExceptionally(ex);
871 dl 1.45 }
872     }
873     }
874    
875     /**
876 dl 1.25 * A task for consuming buffer items and signals, created and
877     * executed whenever they become available. A task consumes as
878     * many items/signals as possible before terminating, at which
879     * point another task is created when needed. The dual Runnable
880     * and ForkJoinTask declaration saves overhead when executed by
881     * ForkJoinPools, without impacting other kinds of Executors.
882     */
883     @SuppressWarnings("serial")
884     static final class ConsumerTask<T> extends ForkJoinTask<Void>
885 dl 1.59 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
886 dl 1.25 final BufferedSubscription<T> consumer;
887     ConsumerTask(BufferedSubscription<T> consumer) {
888     this.consumer = consumer;
889     }
890     public final Void getRawResult() { return null; }
891     public final void setRawResult(Void v) {}
892     public final boolean exec() { consumer.consume(); return false; }
893     public final void run() { consumer.consume(); }
894     }
895    
896 dl 1.1 /**
897 dl 1.15 * A bounded (ring) buffer with integrated control to start a
898     * consumer task whenever items are available. The buffer
899 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
900     * internal documentation for details) specialized for the case of
901     * at most one concurrent producer and consumer, and power of two
902     * buffer sizes. This allows methods to operate without locks even
903     * while supporting resizing, blocking, task-triggering, and
904     * garbage-free buffers (nulling out elements when consumed),
905     * although supporting these does impose a bit of overhead
906     * compared to plain fixed-size ring buffers.
907 dl 1.1 *
908     * The publisher guarantees a single producer via its lock. We
909     * ensure in this class that there is at most one consumer. The
910     * request and cancel methods must be fully thread-safe but are
911     * coded to exploit the most common case in which they are only
912     * called by consumers (usually within onNext).
913     *
914 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
915     * ensure that a task is active when consumable items (and
916 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
917 dl 1.24 * there is demand (unfilled requests). This is complicated on
918 dl 1.19 * the creation side by the possibility of exceptions when trying
919     * to execute tasks. These eventually force DISABLED state, but
920 dl 1.1 * sometimes not directly. On the task side, termination (clearing
921 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
922 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
923 dl 1.1 *
924     * The ctl field also manages run state. When DISABLED, no further
925 dl 1.20 * updates are possible. Disabling may be preceded by setting
926 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
927     * case the associated Subscriber methods are invoked, possibly
928     * synchronously if there is no active consumer task (including
929 dl 1.20 * cases where execute() failed). The cancel() method is supported
930     * by treating as ERROR but suppressing onError signal.
931 dl 1.1 *
932     * Support for blocking also exploits the fact that there is only
933     * one possible waiter. ManagedBlocker-compatible control fields
934     * are placed in this class itself rather than in wait-nodes.
935     * Blocking control relies on the "waiter" field. Producers set
936     * the field before trying to block, but must then recheck (via
937     * offer) before parking. Signalling then just unparks and clears
938 dl 1.59 * waiter field. If the producer and/or consumer are using a
939     * ForkJoinPool, the producer attempts to help run consumer tasks
940     * via ForkJoinPool.helpAsyncBlocker before blocking.
941 dl 1.1 *
942     * This class uses @Contended and heuristic field declaration
943 dl 1.31 * ordering to reduce false-sharing-based memory contention among
944     * instances of BufferedSubscription, but it does not currently
945     * attempt to avoid memory contention among buffers. This field
946     * and element packing can hurt performance especially when each
947     * publisher has only one client operating at a high rate.
948 dl 1.1 * Addressing this may require allocating substantially more space
949     * than users expect.
950     */
951 dl 1.15 @SuppressWarnings("serial")
952 jsr166 1.57 @jdk.internal.vm.annotation.Contended
953 jsr166 1.54 private static final class BufferedSubscription<T>
954 dl 1.25 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
955 dl 1.1 // Order-sensitive field declarations
956 dl 1.27 long timeout; // > 0 if timed wait
957     volatile long demand; // # unfilled requests
958     int maxCapacity; // reduced on OOME
959     int putStat; // offer result for ManagedBlocker
960     volatile int ctl; // atomic run state flags
961     volatile int head; // next position to take
962 dl 1.34 int tail; // next position to put
963 dl 1.27 Object[] array; // buffer: null if disabled
964 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
965 dl 1.27 Executor executor; // null if disabled
966 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
967 dl 1.27 volatile Throwable pendingError; // holds until onError issued
968     volatile Thread waiter; // blocked producer thread
969     T putItem; // for offer within ManagedBlocker
970     BufferedSubscription<T> next; // used only by publisher
971     BufferedSubscription<T> nextRetry; // used only by publisher
972 dl 1.1
973     // ctl values
974 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
975 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
976 dl 1.27 static final int DISABLED = 0x04; // final state
977     static final int ERROR = 0x08; // signal onError then disable
978     static final int SUBSCRIBE = 0x10; // signal onSubscribe
979     static final int COMPLETE = 0x20; // signal onComplete when done
980 dl 1.1
981 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
982 dl 1.16
983 jsr166 1.22 /**
984 dl 1.40 * Initial buffer capacity used when maxBufferCapacity is
985     * greater. Must be a power of two.
986 dl 1.21 */
987 dl 1.40 static final int DEFAULT_INITIAL_CAP = 32;
988 dl 1.21
989 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
990 dl 1.36 Executor executor,
991     BiConsumer<? super Flow.Subscriber<? super T>,
992     ? super Throwable> onNextHandler,
993     int maxBufferCapacity) {
994 dl 1.1 this.subscriber = subscriber;
995     this.executor = executor;
996 dl 1.36 this.onNextHandler = onNextHandler;
997 dl 1.1 this.maxCapacity = maxBufferCapacity;
998 dl 1.40 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
999 dl 1.55 (maxBufferCapacity < 2 ? // at least 2 slots
1000     2 : maxBufferCapacity) :
1001     DEFAULT_INITIAL_CAP];
1002 dl 1.1 }
1003    
1004 dl 1.19 final boolean isDisabled() {
1005 dl 1.20 return ctl == DISABLED;
1006     }
1007    
1008 dl 1.21 /**
1009     * Returns estimated number of buffered items, or -1 if
1010 jsr166 1.42 * disabled.
1011 dl 1.21 */
1012     final int estimateLag() {
1013 dl 1.20 int n;
1014 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1015 dl 1.19 }
1016    
1017 dl 1.1 /**
1018 dl 1.16 * Tries to add item and start consumer task if necessary.
1019 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
1020 dl 1.1 */
1021     final int offer(T item) {
1022 dl 1.34 int h = head, t = tail, cap, size, stat;
1023 dl 1.1 Object[] a = array;
1024 dl 1.40 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1025 dl 1.34 a[(cap - 1) & t] = item; // relaxed writes OK
1026     tail = t + 1;
1027 dl 1.27 stat = size;
1028 dl 1.1 }
1029 dl 1.34 else
1030     stat = growAndAdd(a, item);
1031 dl 1.29 return (stat > 0 &&
1032     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1033 dl 1.27 startOnOffer(stat) : stat;
1034     }
1035    
1036     /**
1037 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
1038 dl 1.27 */
1039     private int growAndAdd(Object[] a, T item) {
1040 dl 1.34 boolean alloc;
1041 dl 1.27 int cap, stat;
1042     if ((ctl & (ERROR | DISABLED)) != 0) {
1043     cap = 0;
1044     stat = -1;
1045 dl 1.34 alloc = false;
1046 dl 1.27 }
1047 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
1048 dl 1.27 cap = 0;
1049     stat = 1;
1050 dl 1.34 alloc = true;
1051     }
1052     else {
1053 dl 1.61 VarHandle.fullFence(); // recheck
1054 dl 1.34 int h = head, t = tail, size = t + 1 - h;
1055 dl 1.40 if (cap >= size) {
1056 dl 1.34 a[(cap - 1) & t] = item;
1057     tail = t + 1;
1058     stat = size;
1059     alloc = false;
1060     }
1061     else if (cap >= maxCapacity) {
1062     stat = 0; // cannot grow
1063     alloc = false;
1064     }
1065     else {
1066     stat = cap + 1;
1067     alloc = true;
1068     }
1069 dl 1.27 }
1070 dl 1.34 if (alloc) {
1071 dl 1.40 int newCap = (cap > 0) ? cap << 1 : 1;
1072 dl 1.27 if (newCap <= cap)
1073     stat = 0;
1074     else {
1075     Object[] newArray = null;
1076 dl 1.16 try {
1077 dl 1.27 newArray = new Object[newCap];
1078     } catch (Throwable ex) { // try to cope with OOME
1079     }
1080     if (newArray == null) {
1081     if (cap > 0)
1082     maxCapacity = cap; // avoid continuous failure
1083     stat = 0;
1084     }
1085     else {
1086     array = newArray;
1087     int t = tail;
1088     int newMask = newCap - 1;
1089     if (a != null && cap > 0) {
1090     int mask = cap - 1;
1091     for (int j = head; j != t; ++j) {
1092 dl 1.61 int k = j & mask;
1093 dl 1.62 Object x = QA.getAcquire(a, k);
1094 dl 1.34 if (x != null && // races with consumer
1095 dl 1.61 QA.compareAndSet(a, k, x, null))
1096 dl 1.27 newArray[j & newMask] = x;
1097     }
1098     }
1099     newArray[t & newMask] = item;
1100     tail = t + 1;
1101 dl 1.16 }
1102     }
1103     }
1104 dl 1.21 return stat;
1105 dl 1.1 }
1106    
1107     /**
1108 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1109 dl 1.29 * initial offer return 0.
1110 dl 1.20 */
1111     final int submit(T item) {
1112 dl 1.59 int stat;
1113     if ((stat = offer(item)) == 0) {
1114 dl 1.25 putItem = item;
1115     timeout = 0L;
1116 dl 1.59 putStat = 0;
1117     ForkJoinPool.helpAsyncBlocker(executor, this);
1118     if ((stat = putStat) == 0) {
1119     try {
1120     ForkJoinPool.managedBlock(this);
1121     } catch (InterruptedException ie) {
1122     timeout = INTERRUPTED;
1123     }
1124     stat = putStat;
1125 dl 1.25 }
1126     if (timeout < 0L)
1127     Thread.currentThread().interrupt();
1128     }
1129 dl 1.20 return stat;
1130     }
1131    
1132     /**
1133 jsr166 1.53 * Timeout version; similar to submit.
1134 dl 1.20 */
1135 dl 1.27 final int timedOffer(T item, long nanos) {
1136 dl 1.59 int stat;
1137     if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1138     putItem = item;
1139     putStat = 0;
1140     ForkJoinPool.helpAsyncBlocker(executor, this);
1141     if ((stat = putStat) == 0) {
1142     try {
1143     ForkJoinPool.managedBlock(this);
1144     } catch (InterruptedException ie) {
1145     timeout = INTERRUPTED;
1146 dl 1.29 }
1147 dl 1.59 stat = putStat;
1148 dl 1.25 }
1149     if (timeout < 0L)
1150     Thread.currentThread().interrupt();
1151 dl 1.20 }
1152     return stat;
1153     }
1154    
1155     /**
1156 dl 1.27 * Tries to start consumer task after offer.
1157     * @return -1 if now disabled, else argument
1158     */
1159     private int startOnOffer(int stat) {
1160     for (;;) {
1161     Executor e; int c;
1162     if ((c = ctl) == DISABLED || (e = executor) == null) {
1163     stat = -1;
1164     break;
1165     }
1166     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1167 dl 1.29 if ((c & CONSUME) != 0 ||
1168 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME))
1169 dl 1.27 break;
1170     }
1171     else if (demand == 0L || tail == head)
1172     break;
1173 dl 1.61 else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
1174 dl 1.27 try {
1175     e.execute(new ConsumerTask<T>(this));
1176     break;
1177     } catch (RuntimeException | Error ex) { // back out
1178 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1179 dl 1.27 (c & ACTIVE) != 0 &&
1180 jsr166 1.65 !CTL.weakCompareAndSet
1181 dl 1.63 (this, c, c & ~ACTIVE));
1182 dl 1.27 throw ex;
1183     }
1184     }
1185     }
1186     return stat;
1187     }
1188    
1189 dl 1.34 private void signalWaiter(Thread w) {
1190     waiter = null;
1191     LockSupport.unpark(w); // release producer
1192     }
1193    
1194 dl 1.27 /**
1195 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1196 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1197     * upon error by nulling required components.
1198 dl 1.1 */
1199 dl 1.27 private void detach() {
1200     Thread w = waiter;
1201     executor = null;
1202     subscriber = null;
1203 dl 1.16 pendingError = null;
1204 dl 1.34 signalWaiter(w);
1205 dl 1.1 }
1206    
1207     /**
1208 dl 1.19 * Issues error signal, asynchronously if a task is running,
1209 jsr166 1.26 * else synchronously.
1210 dl 1.19 */
1211     final void onError(Throwable ex) {
1212     for (int c;;) {
1213 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1214 dl 1.19 break;
1215     else if ((c & ACTIVE) != 0) {
1216     pendingError = ex;
1217 dl 1.61 if (CTL.compareAndSet(this, c, c | ERROR))
1218 dl 1.19 break; // cause consumer task to exit
1219     }
1220 dl 1.61 else if (CTL.compareAndSet(this, c, DISABLED)) {
1221 dl 1.19 Flow.Subscriber<? super T> s = subscriber;
1222     if (s != null && ex != null) {
1223     try {
1224     s.onError(ex);
1225     } catch (Throwable ignore) {
1226     }
1227     }
1228     detach();
1229     break;
1230     }
1231     }
1232     }
1233    
1234     /**
1235 dl 1.1 * Tries to start consumer task upon a signal or request;
1236 jsr166 1.12 * disables on failure.
1237 dl 1.1 */
1238 dl 1.27 private void startOrDisable() {
1239     Executor e;
1240     if ((e = executor) != null) { // skip if already disabled
1241 dl 1.1 try {
1242 dl 1.25 e.execute(new ConsumerTask<T>(this));
1243 dl 1.27 } catch (Throwable ex) { // back out and force signal
1244 dl 1.1 for (int c;;) {
1245 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1246 dl 1.1 break;
1247 dl 1.61 if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
1248 dl 1.19 onError(ex);
1249 dl 1.1 break;
1250     }
1251     }
1252     }
1253     }
1254     }
1255    
1256 dl 1.19 final void onComplete() {
1257 dl 1.1 for (int c;;) {
1258 dl 1.20 if ((c = ctl) == DISABLED)
1259 dl 1.1 break;
1260 dl 1.61 if (CTL.compareAndSet(this, c,
1261     c | (ACTIVE | CONSUME | COMPLETE))) {
1262 dl 1.16 if ((c & ACTIVE) == 0)
1263     startOrDisable();
1264 dl 1.1 break;
1265     }
1266     }
1267     }
1268    
1269 dl 1.19 final void onSubscribe() {
1270 dl 1.1 for (int c;;) {
1271 dl 1.20 if ((c = ctl) == DISABLED)
1272 dl 1.1 break;
1273 dl 1.61 if (CTL.compareAndSet(this, c,
1274 dl 1.63 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1275 dl 1.19 if ((c & ACTIVE) == 0)
1276     startOrDisable();
1277 dl 1.1 break;
1278     }
1279     }
1280     }
1281    
1282 dl 1.16 /**
1283     * Causes consumer task to exit if active (without reporting
1284     * onError unless there is already a pending error), and
1285     * disables.
1286     */
1287     public void cancel() {
1288 dl 1.1 for (int c;;) {
1289 dl 1.20 if ((c = ctl) == DISABLED)
1290 dl 1.1 break;
1291 dl 1.16 else if ((c & ACTIVE) != 0) {
1292 dl 1.61 if (CTL.compareAndSet(this, c,
1293 dl 1.63 c | (CONSUME | ERROR)))
1294 dl 1.16 break;
1295     }
1296 dl 1.61 else if (CTL.compareAndSet(this, c, DISABLED)) {
1297 dl 1.16 detach();
1298 dl 1.1 break;
1299     }
1300     }
1301     }
1302    
1303 dl 1.16 /**
1304     * Adds to demand and possibly starts task.
1305     */
1306     public void request(long n) {
1307     if (n > 0L) {
1308     for (;;) {
1309     long prev = demand, d;
1310     if ((d = prev + n) < prev) // saturate
1311     d = Long.MAX_VALUE;
1312 dl 1.61 if (DEMAND.compareAndSet(this, prev, d)) {
1313 dl 1.31 for (int c, h;;) {
1314 dl 1.27 if ((c = ctl) == DISABLED)
1315 dl 1.16 break;
1316 dl 1.27 else if ((c & ACTIVE) != 0) {
1317 dl 1.29 if ((c & CONSUME) != 0 ||
1318 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME))
1319 dl 1.27 break;
1320     }
1321     else if ((h = head) != tail) {
1322 dl 1.61 if (CTL.compareAndSet(this, c,
1323     c | (ACTIVE|CONSUME))) {
1324 dl 1.16 startOrDisable();
1325     break;
1326     }
1327     }
1328     else if (head == h && tail == h)
1329 dl 1.27 break; // else stale
1330 dl 1.31 if (demand == 0L)
1331     break;
1332 dl 1.16 }
1333     break;
1334     }
1335     }
1336     }
1337     else if (n < 0L)
1338 dl 1.19 onError(new IllegalArgumentException(
1339     "negative subscription request"));
1340 dl 1.16 }
1341    
1342 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1343 dl 1.1 T item = putItem;
1344 dl 1.34 if (item != null) {
1345     if ((putStat = offer(item)) == 0)
1346     return false;
1347     putItem = null;
1348 dl 1.1 }
1349     return true;
1350     }
1351    
1352 dl 1.20 public final boolean block() { // for ManagedBlocker
1353 dl 1.1 T item = putItem;
1354     if (item != null) {
1355     putItem = null;
1356     long nanos = timeout;
1357     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1358     while ((putStat = offer(item)) == 0) {
1359     if (Thread.interrupted()) {
1360 dl 1.16 timeout = INTERRUPTED;
1361 dl 1.1 if (nanos > 0L)
1362     break;
1363     }
1364     else if (nanos > 0L &&
1365     (nanos = deadline - System.nanoTime()) <= 0L)
1366     break;
1367     else if (waiter == null)
1368     waiter = Thread.currentThread();
1369     else {
1370     if (nanos > 0L)
1371     LockSupport.parkNanos(this, nanos);
1372     else
1373     LockSupport.park(this);
1374     waiter = null;
1375     }
1376     }
1377     }
1378     waiter = null;
1379     return true;
1380     }
1381    
1382 dl 1.29 /**
1383     * Consumer loop, called from ConsumerTask, or indirectly
1384 dl 1.39 * when helping during submit.
1385 dl 1.29 */
1386 dl 1.25 final void consume() {
1387 dl 1.1 Flow.Subscriber<? super T> s;
1388 dl 1.34 int h = head;
1389 dl 1.27 if ((s = subscriber) != null) { // else disabled
1390 dl 1.34 for (;;) {
1391 dl 1.27 long d = demand;
1392 dl 1.61 int c; Object[] a; int n, i; Object x; Thread w;
1393 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1394 dl 1.34 if (!checkControl(s, c))
1395 dl 1.20 break;
1396 dl 1.19 }
1397 dl 1.34 else if ((a = array) == null || h == tail ||
1398     (n = a.length) == 0 ||
1399 dl 1.62 (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
1400 dl 1.34 if (!checkEmpty(s, c))
1401 dl 1.27 break;
1402     }
1403 dl 1.34 else if (d == 0L) {
1404     if (!checkDemand(c))
1405     break;
1406 dl 1.1 }
1407 dl 1.34 else if (((c & CONSUME) != 0 ||
1408 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME)) &&
1409     QA.compareAndSet(a, i, x, null)) {
1410     HEAD.setRelease(this, ++h);
1411     DEMAND.getAndAdd(this, -1L);
1412 dl 1.34 if ((w = waiter) != null)
1413     signalWaiter(w);
1414 dl 1.36 try {
1415     @SuppressWarnings("unchecked") T y = (T) x;
1416     s.onNext(y);
1417     } catch (Throwable ex) {
1418     handleOnNext(s, ex);
1419     }
1420 dl 1.34 }
1421     }
1422     }
1423     }
1424    
1425     /**
1426 jsr166 1.42 * Responds to control events in consume().
1427 dl 1.34 */
1428     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1429     boolean stat = true;
1430 dl 1.66 if ((c & SUBSCRIBE) != 0) {
1431     if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
1432     try {
1433     if (s != null)
1434     s.onSubscribe(this);
1435     } catch (Throwable ex) {
1436     onError(ex);
1437     }
1438     }
1439     }
1440     else if ((c & ERROR) != 0) {
1441 dl 1.34 Throwable ex = pendingError;
1442     ctl = DISABLED; // no need for CAS
1443     if (ex != null) { // null if errorless cancel
1444     try {
1445     if (s != null)
1446     s.onError(ex);
1447     } catch (Throwable ignore) {
1448     }
1449     }
1450     }
1451     else {
1452     detach();
1453     stat = false;
1454     }
1455     return stat;
1456     }
1457    
1458     /**
1459 jsr166 1.42 * Responds to apparent emptiness in consume().
1460 dl 1.34 */
1461     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1462     boolean stat = true;
1463     if (head == tail) {
1464     if ((c & CONSUME) != 0)
1465 dl 1.61 CTL.compareAndSet(this, c, c & ~CONSUME);
1466 dl 1.34 else if ((c & COMPLETE) != 0) {
1467 dl 1.61 if (CTL.compareAndSet(this, c, DISABLED)) {
1468 dl 1.1 try {
1469 dl 1.34 if (s != null)
1470     s.onComplete();
1471     } catch (Throwable ignore) {
1472 dl 1.1 }
1473     }
1474     }
1475 dl 1.61 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1476 dl 1.34 stat = false;
1477     }
1478     return stat;
1479     }
1480    
1481     /**
1482 jsr166 1.42 * Responds to apparent zero demand in consume().
1483 dl 1.34 */
1484     private boolean checkDemand(int c) {
1485     boolean stat = true;
1486     if (demand == 0L) {
1487     if ((c & CONSUME) != 0)
1488 dl 1.61 CTL.compareAndSet(this, c, c & ~CONSUME);
1489     else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1490 dl 1.34 stat = false;
1491     }
1492     return stat;
1493     }
1494    
1495     /**
1496 jsr166 1.42 * Processes exception in Subscriber.onNext.
1497 dl 1.34 */
1498 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1499     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1500     if ((h = onNextHandler) != null) {
1501     try {
1502     h.accept(s, ex);
1503     } catch (Throwable ignore) {
1504     }
1505 dl 1.1 }
1506 dl 1.36 onError(ex);
1507 dl 1.1 }
1508    
1509 dl 1.61 // VarHandle mechanics
1510     private static final VarHandle CTL;
1511     private static final VarHandle TAIL;
1512     private static final VarHandle HEAD;
1513     private static final VarHandle DEMAND;
1514     private static final VarHandle QA;
1515 dl 1.1
1516     static {
1517     try {
1518 dl 1.61 MethodHandles.Lookup l = MethodHandles.lookup();
1519     CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1520     int.class);
1521     TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
1522     int.class);
1523     HEAD = l.findVarHandle(BufferedSubscription.class, "head",
1524     int.class);
1525     DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1526     long.class);
1527     QA = MethodHandles.arrayElementVarHandle(Object[].class);
1528 jsr166 1.9 } catch (ReflectiveOperationException e) {
1529 dl 1.1 throw new Error(e);
1530     }
1531 jsr166 1.33
1532     // Reduce the risk of rare disastrous classloading in first call to
1533     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1534     Class<?> ensureLoaded = LockSupport.class;
1535 dl 1.1 }
1536     }
1537     }