ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.65
Committed: Sat Sep 10 04:06:51 2016 UTC (7 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.64: +1 -1 lines
Log Message:
incorporate upstream VarHandle changes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 dl 1.61 import java.lang.invoke.MethodHandles;
10     import java.lang.invoke.VarHandle;
11 jsr166 1.10 import java.util.ArrayList;
12     import java.util.List;
13 dl 1.1 import java.util.concurrent.locks.LockSupport;
14 dl 1.36 import java.util.function.BiConsumer;
15 dl 1.1 import java.util.function.BiPredicate;
16 dl 1.45 import java.util.function.Consumer;
17 dl 1.1
18     /**
19 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
20     * (non-null) items to current subscribers until it is closed. Each
21     * current subscriber receives newly submitted items in the same order
22     * unless drops or exceptions are encountered. Using a
23 dl 1.51 * SubmissionPublisher allows item generators to act as compliant <a
24     * href="http://www.reactive-streams.org/"> reactive-streams</a>
25     * Publishers relying on drop handling and/or blocking for flow
26     * control.
27 dl 1.1 *
28 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
29 dl 1.1 * constructor for delivery to subscribers. The best choice of
30     * Executor depends on expected usage. If the generator(s) of
31     * submitted items run in separate threads, and the number of
32     * subscribers can be estimated, consider using a {@link
33 dl 1.46 * Executors#newFixedThreadPool}. Otherwise consider using the
34     * default, normally the {@link ForkJoinPool#commonPool}.
35 dl 1.1 *
36     * <p>Buffering allows producers and consumers to transiently operate
37     * at different rates. Each subscriber uses an independent buffer.
38 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
39 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
40 dl 1.21 * nearest power of two and/or bounded by the largest value supported
41     * by this implementation.) Invocations of {@link
42 jsr166 1.49 * Flow.Subscription#request(long) request} do not directly result in
43     * buffer expansion, but risk saturation if unfilled requests exceed
44     * the maximum capacity. The default value of {@link
45     * Flow#defaultBufferSize()} may provide a useful starting point for
46 dl 1.25 * choosing a capacity based on expected rates, resources, and usages.
47 dl 1.18 *
48 dl 1.1 * <p>Publication methods support different policies about what to do
49 jsr166 1.49 * when buffers are saturated. Method {@link #submit(Object) submit}
50     * blocks until resources are available. This is simplest, but least
51     * responsive. The {@code offer} methods may drop items (either
52     * immediately or with bounded timeout), but provide an opportunity to
53     * interpose a handler and then retry.
54 dl 1.1 *
55     * <p>If any Subscriber method throws an exception, its subscription
56 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
57     * it is invoked before cancellation upon an exception in method
58 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
59     * {@link Flow.Subscriber#onSubscribe onSubscribe},
60 jsr166 1.49 * {@link Flow.Subscriber#onError(Throwable) onError} and
61     * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
62 jsr166 1.41 * handled before cancellation. If the supplied Executor throws
63     * {@link RejectedExecutionException} (or any other RuntimeException
64     * or Error) when attempting to execute a task, or a drop handler
65     * throws an exception when processing a dropped item, then the
66     * exception is rethrown. In these cases, not all subscribers will
67     * have been issued the published item. It is usually good practice to
68     * {@link #closeExceptionally closeExceptionally} in these cases.
69 dl 1.1 *
70 jsr166 1.49 * <p>Method {@link #consume(Consumer)} simplifies support for a
71     * common case in which the only action of a subscriber is to request
72     * and process all items using a supplied function.
73 dl 1.45 *
74 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
75     * that generate items, and use the methods in this class to publish
76     * them. For example here is a class that periodically publishes the
77     * items generated from a supplier. (In practice you might add methods
78 dl 1.35 * to independently start and stop generation, to share Executors
79 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
80     * component rather than a superclass.)
81 dl 1.15 *
82     * <pre> {@code
83     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
84     * final ScheduledFuture<?> periodicTask;
85     * final ScheduledExecutorService scheduler;
86 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
87     * Supplier<? extends T> supplier,
88 dl 1.15 * long period, TimeUnit unit) {
89 dl 1.21 * super(executor, maxBufferCapacity);
90 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
91     * periodicTask = scheduler.scheduleAtFixedRate(
92     * () -> submit(supplier.get()), 0, period, unit);
93     * }
94     * public void close() {
95     * periodicTask.cancel(false);
96     * scheduler.shutdown();
97     * super.close();
98     * }
99     * }}</pre>
100     *
101 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
102     * It uses single-step requests to its publisher for simplicity of
103     * illustration. A more adaptive version could monitor flow using the
104 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
105 dl 1.21 * methods.
106 dl 1.16 *
107     * <pre> {@code
108     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
109     * implements Flow.Processor<S,T> {
110     * final Function<? super S, ? extends T> function;
111     * Flow.Subscription subscription;
112 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
113 dl 1.16 * Function<? super S, ? extends T> function) {
114 dl 1.21 * super(executor, maxBufferCapacity);
115 dl 1.16 * this.function = function;
116     * }
117     * public void onSubscribe(Flow.Subscription subscription) {
118     * (this.subscription = subscription).request(1);
119     * }
120     * public void onNext(S item) {
121     * subscription.request(1);
122     * submit(function.apply(item));
123     * }
124     * public void onError(Throwable ex) { closeExceptionally(ex); }
125     * public void onComplete() { close(); }
126     * }}</pre>
127     *
128 dl 1.1 * @param <T> the published item type
129     * @author Doug Lea
130 jsr166 1.58 * @since 9
131 dl 1.1 */
132     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
133     AutoCloseable {
134     /*
135     * Most mechanics are handled by BufferedSubscription. This class
136 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
137     * built-in synchronization locks across public methods. (Using
138     * built-in locks works well in the most typical case in which
139     * only one thread submits items).
140 dl 1.1 */
141    
142 dl 1.25 /** The largest possible power of two array size. */
143 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
144 dl 1.1
145 dl 1.40 /** Round capacity to power of 2, at most limit. */
146     static final int roundCapacity(int cap) {
147 dl 1.1 int n = cap - 1;
148     n |= n >>> 1;
149     n |= n >>> 2;
150     n |= n >>> 4;
151     n |= n >>> 8;
152     n |= n >>> 16;
153 dl 1.40 return (n <= 0) ? 1 : // at least 1
154 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
155 dl 1.1 }
156    
157 dl 1.48 // default Executor setup; nearly the same as CompletableFuture
158 dl 1.46
159     /**
160     * Default executor -- ForkJoinPool.commonPool() unless it cannot
161     * support parallelism.
162     */
163 jsr166 1.52 private static final Executor ASYNC_POOL =
164 dl 1.46 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
165     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
166    
167     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
168 jsr166 1.54 private static final class ThreadPerTaskExecutor implements Executor {
169 dl 1.46 public void execute(Runnable r) { new Thread(r).start(); }
170     }
171    
172 dl 1.1 /**
173     * Clients (BufferedSubscriptions) are maintained in a linked list
174     * (via their "next" fields). This works well for publish loops.
175     * It requires O(n) traversal to check for duplicate subscribers,
176     * but we expect that subscribing is much less common than
177 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
178 dl 1.25 * when BufferedSubscription methods return negative values
179 dl 1.29 * signifying that they have been disabled. To reduce
180     * head-of-line blocking, submit and offer methods first call
181     * BufferedSubscription.offer on each subscriber, and place
182     * saturated ones in retries list (using nextRetry field), and
183     * retry, possibly blocking or dropping.
184 dl 1.1 */
185     BufferedSubscription<T> clients;
186    
187 dl 1.20 /** Run status, updated only within locks */
188     volatile boolean closed;
189 dl 1.36 /** If non-null, the exception in closeExceptionally */
190 dl 1.37 volatile Throwable closedException;
191 dl 1.20
192 dl 1.1 // Parameters for constructing BufferedSubscriptions
193     final Executor executor;
194 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
195 dl 1.1 final int maxBufferCapacity;
196    
197 dl 1.20 /**
198 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
199 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
200     * for each subscriber, and, if non-null, the given handler invoked
201     * when any Subscriber throws an exception in method {@link
202     * Flow.Subscriber#onNext(Object) onNext}.
203 dl 1.1 *
204     * @param executor the executor to use for async delivery,
205     * supporting creation of at least one independent thread
206     * @param maxBufferCapacity the maximum capacity for each
207 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
208     * the nearest power of two and/or bounded by the largest value
209     * supported by this implementation; method {@link #getMaxBufferCapacity}
210     * returns the actual value)
211 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
212 jsr166 1.49 * thrown in method {@code onNext}
213 dl 1.1 * @throws NullPointerException if executor is null
214 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
215     * positive
216 dl 1.1 */
217 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
218     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
219 dl 1.21 if (executor == null)
220     throw new NullPointerException();
221     if (maxBufferCapacity <= 0)
222     throw new IllegalArgumentException("capacity must be positive");
223 dl 1.18 this.executor = executor;
224 dl 1.36 this.onNextHandler = handler;
225 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
226 dl 1.18 }
227    
228 dl 1.1 /**
229 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
230 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
231     * for each subscriber, and no handler for Subscriber exceptions in
232     * method {@link Flow.Subscriber#onNext(Object) onNext}.
233 dl 1.36 *
234     * @param executor the executor to use for async delivery,
235     * supporting creation of at least one independent thread
236     * @param maxBufferCapacity the maximum capacity for each
237     * subscriber's buffer (the enforced capacity may be rounded up to
238     * the nearest power of two and/or bounded by the largest value
239     * supported by this implementation; method {@link #getMaxBufferCapacity}
240     * returns the actual value)
241     * @throws NullPointerException if executor is null
242     * @throws IllegalArgumentException if maxBufferCapacity not
243     * positive
244     */
245     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
246     this(executor, maxBufferCapacity, null);
247     }
248    
249     /**
250 dl 1.25 * Creates a new SubmissionPublisher using the {@link
251 dl 1.46 * ForkJoinPool#commonPool()} for async delivery to subscribers
252 jsr166 1.49 * (unless it does not support a parallelism level of at least two,
253     * in which case, a new Thread is created to run each task), with
254     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
255     * handler for Subscriber exceptions in method {@link
256     * Flow.Subscriber#onNext(Object) onNext}.
257 dl 1.25 */
258     public SubmissionPublisher() {
259 jsr166 1.52 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
260 dl 1.25 }
261    
262     /**
263 jsr166 1.49 * Adds the given Subscriber unless already subscribed. If already
264     * subscribed, the Subscriber's {@link
265     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
266     * the existing subscription with an {@link IllegalStateException}.
267     * Otherwise, upon success, the Subscriber's {@link
268     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
269     * asynchronously with a new {@link Flow.Subscription}. If {@link
270     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
271     * subscription is cancelled. Otherwise, if this SubmissionPublisher
272     * was closed exceptionally, then the subscriber's {@link
273     * Flow.Subscriber#onError onError} method is invoked with the
274     * corresponding exception, or if closed without exception, the
275     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
276     * method is invoked. Subscribers may enable receiving items by
277     * invoking the {@link Flow.Subscription#request(long) request}
278     * method of the new Subscription, and may unsubscribe by invoking
279     * its {@link Flow.Subscription#cancel() cancel} method.
280 dl 1.18 *
281     * @param subscriber the subscriber
282     * @throws NullPointerException if subscriber is null
283     */
284     public void subscribe(Flow.Subscriber<? super T> subscriber) {
285     if (subscriber == null) throw new NullPointerException();
286 dl 1.20 BufferedSubscription<T> subscription =
287 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
288     onNextHandler, maxBufferCapacity);
289 dl 1.19 synchronized (this) {
290 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
291     if (b == null) {
292 dl 1.36 Throwable ex;
293 dl 1.29 subscription.onSubscribe();
294 dl 1.37 if ((ex = closedException) != null)
295 dl 1.36 subscription.onError(ex);
296     else if (closed)
297 dl 1.29 subscription.onComplete();
298     else if (pred == null)
299     clients = subscription;
300 dl 1.27 else
301 dl 1.29 pred.next = subscription;
302     break;
303 dl 1.27 }
304 dl 1.29 BufferedSubscription<T> next = b.next;
305     if (b.isDisabled()) { // remove
306     b.next = null; // detach
307 dl 1.19 if (pred == null)
308 dl 1.29 clients = next;
309 dl 1.19 else
310 dl 1.29 pred.next = next;
311     }
312     else if (subscriber.equals(b.subscriber)) {
313     b.onError(new IllegalStateException("Duplicate subscribe"));
314     break;
315 dl 1.19 }
316 dl 1.29 else
317     pred = b;
318     b = next;
319 dl 1.19 }
320     }
321 dl 1.18 }
322    
323     /**
324 dl 1.16 * Publishes the given item to each current subscriber by
325 jsr166 1.49 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
326     * onNext} method, blocking uninterruptibly while resources for any
327 dl 1.30 * subscriber are unavailable. This method returns an estimate of
328 jsr166 1.49 * the maximum lag (number of items submitted but not yet consumed)
329     * among all current subscribers. This value is at least one
330     * (accounting for this submitted item) if there are any
331 dl 1.30 * subscribers, else zero.
332 dl 1.16 *
333     * <p>If the Executor for this publisher throws a
334     * RejectedExecutionException (or any other RuntimeException or
335     * Error) when attempting to asynchronously notify subscribers,
336 dl 1.31 * then this exception is rethrown, in which case not all
337     * subscribers will have been issued this item.
338 dl 1.16 *
339     * @param item the (non-null) item to publish
340 dl 1.21 * @return the estimated maximum lag among subscribers
341 dl 1.16 * @throws IllegalStateException if closed
342     * @throws NullPointerException if item is null
343     * @throws RejectedExecutionException if thrown by Executor
344     */
345 dl 1.21 public int submit(T item) {
346 dl 1.16 if (item == null) throw new NullPointerException();
347 dl 1.21 int lag = 0;
348 dl 1.27 boolean complete;
349 dl 1.16 synchronized (this) {
350 dl 1.27 complete = closed;
351 dl 1.29 BufferedSubscription<T> b = clients;
352 dl 1.27 if (!complete) {
353 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
354 dl 1.27 while (b != null) {
355     BufferedSubscription<T> next = b.next;
356 dl 1.29 int stat = b.offer(item);
357     if (stat < 0) { // disabled
358     b.next = null;
359 dl 1.27 if (pred == null)
360     clients = next;
361 dl 1.24 else
362 dl 1.27 pred.next = next;
363 dl 1.24 }
364 dl 1.27 else {
365 dl 1.29 if (stat > lag)
366     lag = stat;
367     else if (stat == 0) { // place on retry list
368     b.nextRetry = null;
369 dl 1.27 if (rtail == null)
370 dl 1.29 r = b;
371 dl 1.27 else
372     rtail.nextRetry = b;
373     rtail = b;
374     }
375     pred = b;
376     }
377     b = next;
378 dl 1.21 }
379 dl 1.29 while (r != null) {
380     BufferedSubscription<T> nextRetry = r.nextRetry;
381     r.nextRetry = null;
382     int stat = r.submit(item);
383     if (stat > lag)
384     lag = stat;
385     else if (stat < 0 && clients == r)
386     clients = r.next; // postpone internal unsubscribes
387     r = nextRetry;
388     }
389 dl 1.16 }
390     }
391 dl 1.27 if (complete)
392     throw new IllegalStateException("Closed");
393     else
394     return lag;
395 dl 1.16 }
396    
397     /**
398 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
399     * by asynchronously invoking its {@link
400     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
401     * dropped by one or more subscribers if resource limits are
402     * exceeded, in which case the given handler (if non-null) is
403     * invoked, and if it returns true, retried once. Other calls to
404     * methods in this class by other threads are blocked while the
405     * handler is invoked. Unless recovery is assured, options are
406     * usually limited to logging the error and/or issuing an {@link
407     * Flow.Subscriber#onError(Throwable) onError} signal to the
408     * subscriber.
409 dl 1.1 *
410 dl 1.21 * <p>This method returns a status indicator: If negative, it
411     * represents the (negative) number of drops (failed attempts to
412     * issue the item to a subscriber). Otherwise it is an estimate of
413     * the maximum lag (number of items submitted but not yet
414     * consumed) among all current subscribers. This value is at least
415     * one (accounting for this submitted item) if there are any
416     * subscribers, else zero.
417 jsr166 1.23 *
418 dl 1.1 * <p>If the Executor for this publisher throws a
419     * RejectedExecutionException (or any other RuntimeException or
420     * Error) when attempting to asynchronously notify subscribers, or
421     * the drop handler throws an exception when processing a dropped
422     * item, then this exception is rethrown.
423     *
424     * @param item the (non-null) item to publish
425     * @param onDrop if non-null, the handler invoked upon a drop to a
426     * subscriber, with arguments of the subscriber and item; if it
427     * returns true, an offer is re-attempted (once)
428 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
429     * an estimate of maximum lag
430 dl 1.1 * @throws IllegalStateException if closed
431     * @throws NullPointerException if item is null
432     * @throws RejectedExecutionException if thrown by Executor
433     */
434     public int offer(T item,
435     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
436 dl 1.29 return doOffer(0L, item, onDrop);
437 dl 1.1 }
438    
439     /**
440 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
441     * by asynchronously invoking its {@link
442     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
443     * resources for any subscription are unavailable, up to the
444     * specified timeout or until the caller thread is interrupted, at
445     * which point the given handler (if non-null) is invoked, and if it
446     * returns true, retried once. (The drop handler may distinguish
447     * timeouts from interrupts by checking whether the current thread
448     * is interrupted.) Other calls to methods in this class by other
449 dl 1.30 * threads are blocked while the handler is invoked. Unless
450     * recovery is assured, options are usually limited to logging the
451 jsr166 1.49 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
452     * onError} signal to the subscriber.
453 dl 1.1 *
454 dl 1.21 * <p>This method returns a status indicator: If negative, it
455     * represents the (negative) number of drops (failed attempts to
456     * issue the item to a subscriber). Otherwise it is an estimate of
457     * the maximum lag (number of items submitted but not yet
458     * consumed) among all current subscribers. This value is at least
459     * one (accounting for this submitted item) if there are any
460     * subscribers, else zero.
461     *
462 dl 1.1 * <p>If the Executor for this publisher throws a
463     * RejectedExecutionException (or any other RuntimeException or
464     * Error) when attempting to asynchronously notify subscribers, or
465     * the drop handler throws an exception when processing a dropped
466     * item, then this exception is rethrown.
467     *
468     * @param item the (non-null) item to publish
469     * @param timeout how long to wait for resources for any subscriber
470     * before giving up, in units of {@code unit}
471     * @param unit a {@code TimeUnit} determining how to interpret the
472     * {@code timeout} parameter
473     * @param onDrop if non-null, the handler invoked upon a drop to a
474     * subscriber, with arguments of the subscriber and item; if it
475     * returns true, an offer is re-attempted (once)
476 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
477     * an estimate of maximum lag
478 dl 1.1 * @throws IllegalStateException if closed
479     * @throws NullPointerException if item is null
480     * @throws RejectedExecutionException if thrown by Executor
481     */
482     public int offer(T item, long timeout, TimeUnit unit,
483     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
484 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
485     }
486    
487     /** Common implementation for both forms of offer */
488     final int doOffer(long nanos, T item,
489     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
490 dl 1.1 if (item == null) throw new NullPointerException();
491 dl 1.27 int lag = 0, drops = 0;
492     boolean complete;
493 jsr166 1.2 synchronized (this) {
494 dl 1.27 complete = closed;
495 dl 1.29 BufferedSubscription<T> b = clients;
496 dl 1.27 if (!complete) {
497 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
498 dl 1.27 while (b != null) {
499     BufferedSubscription<T> next = b.next;
500 dl 1.29 int stat = b.offer(item);
501     if (stat < 0) {
502     b.next = null;
503 dl 1.27 if (pred == null)
504     clients = next;
505     else
506     pred.next = next;
507     }
508     else {
509 dl 1.29 if (stat > lag)
510     lag = stat;
511     else if (stat == 0) {
512     b.nextRetry = null;
513 dl 1.27 if (rtail == null)
514 dl 1.29 r = b;
515 dl 1.27 else
516     rtail.nextRetry = b;
517     rtail = b;
518     }
519 dl 1.29 else if (stat > lag)
520 dl 1.27 lag = stat;
521     pred = b;
522     }
523     b = next;
524 dl 1.1 }
525 dl 1.29 while (r != null) {
526     BufferedSubscription<T> nextRetry = r.nextRetry;
527     r.nextRetry = null;
528 jsr166 1.64 int stat = (nanos > 0L)
529     ? r.timedOffer(item, nanos)
530     : r.offer(item);
531 dl 1.29 if (stat == 0 && onDrop != null &&
532     onDrop.test(r.subscriber, item))
533     stat = r.offer(item);
534     if (stat == 0)
535     ++drops;
536     else if (stat > lag)
537     lag = stat;
538     else if (stat < 0 && clients == r)
539     clients = r.next;
540     r = nextRetry;
541     }
542 dl 1.1 }
543     }
544 dl 1.27 if (complete)
545     throw new IllegalStateException("Closed");
546     else
547     return (drops > 0) ? -drops : lag;
548 dl 1.1 }
549    
550     /**
551 dl 1.30 * Unless already closed, issues {@link
552 jsr166 1.49 * Flow.Subscriber#onComplete() onComplete} signals to current
553     * subscribers, and disallows subsequent attempts to publish.
554     * Upon return, this method does <em>NOT</em> guarantee that all
555     * subscribers have yet completed.
556 dl 1.1 */
557     public void close() {
558     if (!closed) {
559 dl 1.32 BufferedSubscription<T> b;
560 jsr166 1.2 synchronized (this) {
561 dl 1.1 b = clients;
562     clients = null;
563     closed = true;
564     }
565     while (b != null) {
566 dl 1.32 BufferedSubscription<T> next = b.next;
567     b.next = null;
568 dl 1.19 b.onComplete();
569 dl 1.1 b = next;
570     }
571     }
572     }
573    
574     /**
575 jsr166 1.49 * Unless already closed, issues {@link
576     * Flow.Subscriber#onError(Throwable) onError} signals to current
577     * subscribers with the given error, and disallows subsequent
578     * attempts to publish. Future subscribers also receive the given
579     * error. Upon return, this method does <em>NOT</em> guarantee
580     * that all subscribers have yet completed.
581 dl 1.1 *
582 dl 1.30 * @param error the {@code onError} argument sent to subscribers
583 dl 1.1 * @throws NullPointerException if error is null
584     */
585     public void closeExceptionally(Throwable error) {
586     if (error == null)
587     throw new NullPointerException();
588     if (!closed) {
589 dl 1.32 BufferedSubscription<T> b;
590 jsr166 1.2 synchronized (this) {
591 dl 1.1 b = clients;
592     clients = null;
593     closed = true;
594 dl 1.37 closedException = error;
595 dl 1.1 }
596     while (b != null) {
597 dl 1.32 BufferedSubscription<T> next = b.next;
598     b.next = null;
599 dl 1.19 b.onError(error);
600 dl 1.1 b = next;
601     }
602     }
603     }
604    
605     /**
606     * Returns true if this publisher is not accepting submissions.
607     *
608     * @return true if closed
609     */
610     public boolean isClosed() {
611     return closed;
612     }
613    
614 dl 1.37 /**
615 jsr166 1.49 * Returns the exception associated with {@link
616     * #closeExceptionally(Throwable) closeExceptionally}, or null if
617     * not closed or if closed normally.
618 dl 1.37 *
619     * @return the exception, or null if none
620     */
621     public Throwable getClosedException() {
622     return closedException;
623     }
624    
625 dl 1.1 /**
626 jsr166 1.6 * Returns true if this publisher has any subscribers.
627 dl 1.1 *
628     * @return true if this publisher has any subscribers
629     */
630     public boolean hasSubscribers() {
631     boolean nonEmpty = false;
632     if (!closed) {
633 jsr166 1.2 synchronized (this) {
634 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
635     BufferedSubscription<T> next = b.next;
636 dl 1.19 if (b.isDisabled()) {
637 dl 1.29 b.next = null;
638 dl 1.31 b = clients = next;
639 jsr166 1.2 }
640 dl 1.1 else {
641     nonEmpty = true;
642     break;
643     }
644     }
645     }
646     }
647     return nonEmpty;
648     }
649    
650     /**
651 dl 1.21 * Returns the number of current subscribers.
652 dl 1.1 *
653 dl 1.21 * @return the number of current subscribers
654 dl 1.1 */
655 dl 1.21 public int getNumberOfSubscribers() {
656     int count = 0;
657     if (!closed) {
658     synchronized (this) {
659     BufferedSubscription<T> pred = null, next;
660     for (BufferedSubscription<T> b = clients; b != null; b = next) {
661     next = b.next;
662     if (b.isDisabled()) {
663 dl 1.29 b.next = null;
664 dl 1.21 if (pred == null)
665     clients = next;
666     else
667     pred.next = next;
668     }
669     else {
670     pred = b;
671     ++count;
672     }
673     }
674     }
675     }
676     return count;
677 dl 1.1 }
678    
679 jsr166 1.7 /**
680 dl 1.21 * Returns the Executor used for asynchronous delivery.
681 dl 1.1 *
682 dl 1.21 * @return the Executor used for asynchronous delivery
683 dl 1.1 */
684 dl 1.21 public Executor getExecutor() {
685     return executor;
686 dl 1.1 }
687    
688 jsr166 1.7 /**
689 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
690 dl 1.1 *
691 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
692 dl 1.1 */
693     public int getMaxBufferCapacity() {
694     return maxBufferCapacity;
695     }
696    
697     /**
698 dl 1.29 * Returns a list of current subscribers for monitoring and
699     * tracking purposes, not for invoking {@link Flow.Subscriber}
700     * methods on the subscribers.
701 dl 1.1 *
702     * @return list of current subscribers
703     */
704     public List<Flow.Subscriber<? super T>> getSubscribers() {
705     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
706 jsr166 1.2 synchronized (this) {
707 dl 1.1 BufferedSubscription<T> pred = null, next;
708     for (BufferedSubscription<T> b = clients; b != null; b = next) {
709     next = b.next;
710 dl 1.19 if (b.isDisabled()) {
711 dl 1.29 b.next = null;
712 dl 1.1 if (pred == null)
713     clients = next;
714     else
715     pred.next = next;
716 jsr166 1.2 }
717 dl 1.1 else
718     subs.add(b.subscriber);
719     }
720     }
721     return subs;
722     }
723 jsr166 1.2
724 dl 1.1 /**
725     * Returns true if the given Subscriber is currently subscribed.
726     *
727     * @param subscriber the subscriber
728     * @return true if currently subscribed
729 dl 1.21 * @throws NullPointerException if subscriber is null
730 dl 1.1 */
731     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
732 dl 1.21 if (subscriber == null) throw new NullPointerException();
733 jsr166 1.2 if (!closed) {
734     synchronized (this) {
735 dl 1.1 BufferedSubscription<T> pred = null, next;
736     for (BufferedSubscription<T> b = clients; b != null; b = next) {
737     next = b.next;
738 dl 1.19 if (b.isDisabled()) {
739 dl 1.29 b.next = null;
740 dl 1.1 if (pred == null)
741     clients = next;
742     else
743     pred.next = next;
744 jsr166 1.2 }
745 dl 1.21 else if (subscriber.equals(b.subscriber))
746 dl 1.1 return true;
747 dl 1.21 else
748     pred = b;
749 dl 1.1 }
750     }
751     }
752     return false;
753     }
754    
755     /**
756 dl 1.21 * Returns an estimate of the minimum number of items requested
757 jsr166 1.49 * (via {@link Flow.Subscription#request(long) request}) but not
758     * yet produced, among all current subscribers.
759 dl 1.21 *
760     * @return the estimate, or zero if no subscribers
761     */
762     public long estimateMinimumDemand() {
763     long min = Long.MAX_VALUE;
764     boolean nonEmpty = false;
765     synchronized (this) {
766     BufferedSubscription<T> pred = null, next;
767     for (BufferedSubscription<T> b = clients; b != null; b = next) {
768     int n; long d;
769     next = b.next;
770     if ((n = b.estimateLag()) < 0) {
771 dl 1.29 b.next = null;
772 dl 1.21 if (pred == null)
773     clients = next;
774     else
775     pred.next = next;
776     }
777     else {
778     if ((d = b.demand - n) < min)
779     min = d;
780     nonEmpty = true;
781 dl 1.32 pred = b;
782 dl 1.21 }
783     }
784     }
785 jsr166 1.22 return nonEmpty ? min : 0;
786 dl 1.21 }
787    
788     /**
789     * Returns an estimate of the maximum number of items produced but
790     * not yet consumed among all current subscribers.
791 dl 1.17 *
792     * @return the estimate
793 dl 1.1 */
794 dl 1.21 public int estimateMaximumLag() {
795     int max = 0;
796 dl 1.17 synchronized (this) {
797     BufferedSubscription<T> pred = null, next;
798     for (BufferedSubscription<T> b = clients; b != null; b = next) {
799 dl 1.21 int n;
800 dl 1.17 next = b.next;
801 dl 1.21 if ((n = b.estimateLag()) < 0) {
802 dl 1.29 b.next = null;
803 dl 1.17 if (pred == null)
804     clients = next;
805     else
806     pred.next = next;
807     }
808 dl 1.32 else {
809     if (n > max)
810     max = n;
811     pred = b;
812     }
813 dl 1.17 }
814     }
815 dl 1.21 return max;
816 dl 1.1 }
817    
818 dl 1.25 /**
819 jsr166 1.49 * Processes all published items using the given Consumer function.
820     * Returns a CompletableFuture that is completed normally when this
821     * publisher signals {@link Flow.Subscriber#onComplete()
822     * onComplete}, or completed exceptionally upon any error, or an
823     * exception is thrown by the Consumer, or the returned
824     * CompletableFuture is cancelled, in which case no further items
825     * are processed.
826 dl 1.45 *
827     * @param consumer the function applied to each onNext item
828     * @return a CompletableFuture that is completed normally
829     * when the publisher signals onComplete, and exceptionally
830 jsr166 1.47 * upon any error or cancellation
831 dl 1.45 * @throws NullPointerException if consumer is null
832     */
833     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
834     if (consumer == null)
835     throw new NullPointerException();
836     CompletableFuture<Void> status = new CompletableFuture<>();
837     subscribe(new ConsumerSubscriber<T>(status, consumer));
838     return status;
839     }
840    
841     /** Subscriber for method consume */
842 jsr166 1.54 private static final class ConsumerSubscriber<T>
843 dl 1.63 implements Flow.Subscriber<T> {
844 dl 1.45 final CompletableFuture<Void> status;
845     final Consumer<? super T> consumer;
846     Flow.Subscription subscription;
847     ConsumerSubscriber(CompletableFuture<Void> status,
848     Consumer<? super T> consumer) {
849     this.status = status; this.consumer = consumer;
850     }
851     public final void onSubscribe(Flow.Subscription subscription) {
852     this.subscription = subscription;
853 dl 1.55 status.whenComplete((v, e) -> subscription.cancel());
854     if (!status.isDone())
855 dl 1.45 subscription.request(Long.MAX_VALUE);
856     }
857     public final void onError(Throwable ex) {
858     status.completeExceptionally(ex);
859     }
860     public final void onComplete() {
861     status.complete(null);
862     }
863     public final void onNext(T item) {
864 dl 1.55 try {
865     consumer.accept(item);
866     } catch (Throwable ex) {
867 dl 1.45 subscription.cancel();
868 dl 1.55 status.completeExceptionally(ex);
869 dl 1.45 }
870     }
871     }
872    
873     /**
874 dl 1.25 * A task for consuming buffer items and signals, created and
875     * executed whenever they become available. A task consumes as
876     * many items/signals as possible before terminating, at which
877     * point another task is created when needed. The dual Runnable
878     * and ForkJoinTask declaration saves overhead when executed by
879     * ForkJoinPools, without impacting other kinds of Executors.
880     */
881     @SuppressWarnings("serial")
882     static final class ConsumerTask<T> extends ForkJoinTask<Void>
883 dl 1.59 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
884 dl 1.25 final BufferedSubscription<T> consumer;
885     ConsumerTask(BufferedSubscription<T> consumer) {
886     this.consumer = consumer;
887     }
888     public final Void getRawResult() { return null; }
889     public final void setRawResult(Void v) {}
890     public final boolean exec() { consumer.consume(); return false; }
891     public final void run() { consumer.consume(); }
892     }
893    
894 dl 1.1 /**
895 dl 1.15 * A bounded (ring) buffer with integrated control to start a
896     * consumer task whenever items are available. The buffer
897 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
898     * internal documentation for details) specialized for the case of
899     * at most one concurrent producer and consumer, and power of two
900     * buffer sizes. This allows methods to operate without locks even
901     * while supporting resizing, blocking, task-triggering, and
902     * garbage-free buffers (nulling out elements when consumed),
903     * although supporting these does impose a bit of overhead
904     * compared to plain fixed-size ring buffers.
905 dl 1.1 *
906     * The publisher guarantees a single producer via its lock. We
907     * ensure in this class that there is at most one consumer. The
908     * request and cancel methods must be fully thread-safe but are
909     * coded to exploit the most common case in which they are only
910     * called by consumers (usually within onNext).
911     *
912 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
913     * ensure that a task is active when consumable items (and
914 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
915 dl 1.24 * there is demand (unfilled requests). This is complicated on
916 dl 1.19 * the creation side by the possibility of exceptions when trying
917     * to execute tasks. These eventually force DISABLED state, but
918 dl 1.1 * sometimes not directly. On the task side, termination (clearing
919 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
920 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
921 dl 1.1 *
922     * The ctl field also manages run state. When DISABLED, no further
923 dl 1.20 * updates are possible. Disabling may be preceded by setting
924 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
925     * case the associated Subscriber methods are invoked, possibly
926     * synchronously if there is no active consumer task (including
927 dl 1.20 * cases where execute() failed). The cancel() method is supported
928     * by treating as ERROR but suppressing onError signal.
929 dl 1.1 *
930     * Support for blocking also exploits the fact that there is only
931     * one possible waiter. ManagedBlocker-compatible control fields
932     * are placed in this class itself rather than in wait-nodes.
933     * Blocking control relies on the "waiter" field. Producers set
934     * the field before trying to block, but must then recheck (via
935     * offer) before parking. Signalling then just unparks and clears
936 dl 1.59 * waiter field. If the producer and/or consumer are using a
937     * ForkJoinPool, the producer attempts to help run consumer tasks
938     * via ForkJoinPool.helpAsyncBlocker before blocking.
939 dl 1.1 *
940     * This class uses @Contended and heuristic field declaration
941 dl 1.31 * ordering to reduce false-sharing-based memory contention among
942     * instances of BufferedSubscription, but it does not currently
943     * attempt to avoid memory contention among buffers. This field
944     * and element packing can hurt performance especially when each
945     * publisher has only one client operating at a high rate.
946 dl 1.1 * Addressing this may require allocating substantially more space
947     * than users expect.
948     */
949 dl 1.15 @SuppressWarnings("serial")
950 jsr166 1.57 @jdk.internal.vm.annotation.Contended
951 jsr166 1.54 private static final class BufferedSubscription<T>
952 dl 1.25 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
953 dl 1.1 // Order-sensitive field declarations
954 dl 1.27 long timeout; // > 0 if timed wait
955     volatile long demand; // # unfilled requests
956     int maxCapacity; // reduced on OOME
957     int putStat; // offer result for ManagedBlocker
958     volatile int ctl; // atomic run state flags
959     volatile int head; // next position to take
960 dl 1.34 int tail; // next position to put
961 dl 1.27 Object[] array; // buffer: null if disabled
962 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
963 dl 1.27 Executor executor; // null if disabled
964 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
965 dl 1.27 volatile Throwable pendingError; // holds until onError issued
966     volatile Thread waiter; // blocked producer thread
967     T putItem; // for offer within ManagedBlocker
968     BufferedSubscription<T> next; // used only by publisher
969     BufferedSubscription<T> nextRetry; // used only by publisher
970 dl 1.1
971     // ctl values
972 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
973 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
974 dl 1.27 static final int DISABLED = 0x04; // final state
975     static final int ERROR = 0x08; // signal onError then disable
976     static final int SUBSCRIBE = 0x10; // signal onSubscribe
977     static final int COMPLETE = 0x20; // signal onComplete when done
978 dl 1.1
979 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
980 dl 1.16
981 jsr166 1.22 /**
982 dl 1.40 * Initial buffer capacity used when maxBufferCapacity is
983     * greater. Must be a power of two.
984 dl 1.21 */
985 dl 1.40 static final int DEFAULT_INITIAL_CAP = 32;
986 dl 1.21
987 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
988 dl 1.36 Executor executor,
989     BiConsumer<? super Flow.Subscriber<? super T>,
990     ? super Throwable> onNextHandler,
991     int maxBufferCapacity) {
992 dl 1.1 this.subscriber = subscriber;
993     this.executor = executor;
994 dl 1.36 this.onNextHandler = onNextHandler;
995 dl 1.1 this.maxCapacity = maxBufferCapacity;
996 dl 1.40 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
997 dl 1.55 (maxBufferCapacity < 2 ? // at least 2 slots
998     2 : maxBufferCapacity) :
999     DEFAULT_INITIAL_CAP];
1000 dl 1.1 }
1001    
1002 dl 1.19 final boolean isDisabled() {
1003 dl 1.20 return ctl == DISABLED;
1004     }
1005    
1006 dl 1.21 /**
1007     * Returns estimated number of buffered items, or -1 if
1008 jsr166 1.42 * disabled.
1009 dl 1.21 */
1010     final int estimateLag() {
1011 dl 1.20 int n;
1012 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1013 dl 1.19 }
1014    
1015 dl 1.1 /**
1016 dl 1.16 * Tries to add item and start consumer task if necessary.
1017 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
1018 dl 1.1 */
1019     final int offer(T item) {
1020 dl 1.34 int h = head, t = tail, cap, size, stat;
1021 dl 1.1 Object[] a = array;
1022 dl 1.40 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1023 dl 1.34 a[(cap - 1) & t] = item; // relaxed writes OK
1024     tail = t + 1;
1025 dl 1.27 stat = size;
1026 dl 1.1 }
1027 dl 1.34 else
1028     stat = growAndAdd(a, item);
1029 dl 1.29 return (stat > 0 &&
1030     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1031 dl 1.27 startOnOffer(stat) : stat;
1032     }
1033    
1034     /**
1035 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
1036 dl 1.27 */
1037     private int growAndAdd(Object[] a, T item) {
1038 dl 1.34 boolean alloc;
1039 dl 1.27 int cap, stat;
1040     if ((ctl & (ERROR | DISABLED)) != 0) {
1041     cap = 0;
1042     stat = -1;
1043 dl 1.34 alloc = false;
1044 dl 1.27 }
1045 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
1046 dl 1.27 cap = 0;
1047     stat = 1;
1048 dl 1.34 alloc = true;
1049     }
1050     else {
1051 dl 1.61 VarHandle.fullFence(); // recheck
1052 dl 1.34 int h = head, t = tail, size = t + 1 - h;
1053 dl 1.40 if (cap >= size) {
1054 dl 1.34 a[(cap - 1) & t] = item;
1055     tail = t + 1;
1056     stat = size;
1057     alloc = false;
1058     }
1059     else if (cap >= maxCapacity) {
1060     stat = 0; // cannot grow
1061     alloc = false;
1062     }
1063     else {
1064     stat = cap + 1;
1065     alloc = true;
1066     }
1067 dl 1.27 }
1068 dl 1.34 if (alloc) {
1069 dl 1.40 int newCap = (cap > 0) ? cap << 1 : 1;
1070 dl 1.27 if (newCap <= cap)
1071     stat = 0;
1072     else {
1073     Object[] newArray = null;
1074 dl 1.16 try {
1075 dl 1.27 newArray = new Object[newCap];
1076     } catch (Throwable ex) { // try to cope with OOME
1077     }
1078     if (newArray == null) {
1079     if (cap > 0)
1080     maxCapacity = cap; // avoid continuous failure
1081     stat = 0;
1082     }
1083     else {
1084     array = newArray;
1085     int t = tail;
1086     int newMask = newCap - 1;
1087     if (a != null && cap > 0) {
1088     int mask = cap - 1;
1089     for (int j = head; j != t; ++j) {
1090 dl 1.61 int k = j & mask;
1091 dl 1.62 Object x = QA.getAcquire(a, k);
1092 dl 1.34 if (x != null && // races with consumer
1093 dl 1.61 QA.compareAndSet(a, k, x, null))
1094 dl 1.27 newArray[j & newMask] = x;
1095     }
1096     }
1097     newArray[t & newMask] = item;
1098     tail = t + 1;
1099 dl 1.16 }
1100     }
1101     }
1102 dl 1.21 return stat;
1103 dl 1.1 }
1104    
1105     /**
1106 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1107 dl 1.29 * initial offer return 0.
1108 dl 1.20 */
1109     final int submit(T item) {
1110 dl 1.59 int stat;
1111     if ((stat = offer(item)) == 0) {
1112 dl 1.25 putItem = item;
1113     timeout = 0L;
1114 dl 1.59 putStat = 0;
1115     ForkJoinPool.helpAsyncBlocker(executor, this);
1116     if ((stat = putStat) == 0) {
1117     try {
1118     ForkJoinPool.managedBlock(this);
1119     } catch (InterruptedException ie) {
1120     timeout = INTERRUPTED;
1121     }
1122     stat = putStat;
1123 dl 1.25 }
1124     if (timeout < 0L)
1125     Thread.currentThread().interrupt();
1126     }
1127 dl 1.20 return stat;
1128     }
1129    
1130     /**
1131 jsr166 1.53 * Timeout version; similar to submit.
1132 dl 1.20 */
1133 dl 1.27 final int timedOffer(T item, long nanos) {
1134 dl 1.59 int stat;
1135     if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1136     putItem = item;
1137     putStat = 0;
1138     ForkJoinPool.helpAsyncBlocker(executor, this);
1139     if ((stat = putStat) == 0) {
1140     try {
1141     ForkJoinPool.managedBlock(this);
1142     } catch (InterruptedException ie) {
1143     timeout = INTERRUPTED;
1144 dl 1.29 }
1145 dl 1.59 stat = putStat;
1146 dl 1.25 }
1147     if (timeout < 0L)
1148     Thread.currentThread().interrupt();
1149 dl 1.20 }
1150     return stat;
1151     }
1152    
1153     /**
1154 dl 1.27 * Tries to start consumer task after offer.
1155     * @return -1 if now disabled, else argument
1156     */
1157     private int startOnOffer(int stat) {
1158     for (;;) {
1159     Executor e; int c;
1160     if ((c = ctl) == DISABLED || (e = executor) == null) {
1161     stat = -1;
1162     break;
1163     }
1164     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1165 dl 1.29 if ((c & CONSUME) != 0 ||
1166 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME))
1167 dl 1.27 break;
1168     }
1169     else if (demand == 0L || tail == head)
1170     break;
1171 dl 1.61 else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
1172 dl 1.27 try {
1173     e.execute(new ConsumerTask<T>(this));
1174     break;
1175     } catch (RuntimeException | Error ex) { // back out
1176 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1177 dl 1.27 (c & ACTIVE) != 0 &&
1178 jsr166 1.65 !CTL.weakCompareAndSet
1179 dl 1.63 (this, c, c & ~ACTIVE));
1180 dl 1.27 throw ex;
1181     }
1182     }
1183     }
1184     return stat;
1185     }
1186    
1187 dl 1.34 private void signalWaiter(Thread w) {
1188     waiter = null;
1189     LockSupport.unpark(w); // release producer
1190     }
1191    
1192 dl 1.27 /**
1193 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1194 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1195     * upon error by nulling required components.
1196 dl 1.1 */
1197 dl 1.27 private void detach() {
1198     Thread w = waiter;
1199     executor = null;
1200     subscriber = null;
1201 dl 1.16 pendingError = null;
1202 dl 1.34 signalWaiter(w);
1203 dl 1.1 }
1204    
1205     /**
1206 dl 1.19 * Issues error signal, asynchronously if a task is running,
1207 jsr166 1.26 * else synchronously.
1208 dl 1.19 */
1209     final void onError(Throwable ex) {
1210     for (int c;;) {
1211 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1212 dl 1.19 break;
1213     else if ((c & ACTIVE) != 0) {
1214     pendingError = ex;
1215 dl 1.61 if (CTL.compareAndSet(this, c, c | ERROR))
1216 dl 1.19 break; // cause consumer task to exit
1217     }
1218 dl 1.61 else if (CTL.compareAndSet(this, c, DISABLED)) {
1219 dl 1.19 Flow.Subscriber<? super T> s = subscriber;
1220     if (s != null && ex != null) {
1221     try {
1222     s.onError(ex);
1223     } catch (Throwable ignore) {
1224     }
1225     }
1226     detach();
1227     break;
1228     }
1229     }
1230     }
1231    
1232     /**
1233 dl 1.1 * Tries to start consumer task upon a signal or request;
1234 jsr166 1.12 * disables on failure.
1235 dl 1.1 */
1236 dl 1.27 private void startOrDisable() {
1237     Executor e;
1238     if ((e = executor) != null) { // skip if already disabled
1239 dl 1.1 try {
1240 dl 1.25 e.execute(new ConsumerTask<T>(this));
1241 dl 1.27 } catch (Throwable ex) { // back out and force signal
1242 dl 1.1 for (int c;;) {
1243 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1244 dl 1.1 break;
1245 dl 1.61 if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
1246 dl 1.19 onError(ex);
1247 dl 1.1 break;
1248     }
1249     }
1250     }
1251     }
1252     }
1253    
1254 dl 1.19 final void onComplete() {
1255 dl 1.1 for (int c;;) {
1256 dl 1.20 if ((c = ctl) == DISABLED)
1257 dl 1.1 break;
1258 dl 1.61 if (CTL.compareAndSet(this, c,
1259     c | (ACTIVE | CONSUME | COMPLETE))) {
1260 dl 1.16 if ((c & ACTIVE) == 0)
1261     startOrDisable();
1262 dl 1.1 break;
1263     }
1264     }
1265     }
1266    
1267 dl 1.19 final void onSubscribe() {
1268 dl 1.1 for (int c;;) {
1269 dl 1.20 if ((c = ctl) == DISABLED)
1270 dl 1.1 break;
1271 dl 1.61 if (CTL.compareAndSet(this, c,
1272 dl 1.63 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1273 dl 1.19 if ((c & ACTIVE) == 0)
1274     startOrDisable();
1275 dl 1.1 break;
1276     }
1277     }
1278     }
1279    
1280 dl 1.16 /**
1281     * Causes consumer task to exit if active (without reporting
1282     * onError unless there is already a pending error), and
1283     * disables.
1284     */
1285     public void cancel() {
1286 dl 1.1 for (int c;;) {
1287 dl 1.20 if ((c = ctl) == DISABLED)
1288 dl 1.1 break;
1289 dl 1.16 else if ((c & ACTIVE) != 0) {
1290 dl 1.61 if (CTL.compareAndSet(this, c,
1291 dl 1.63 c | (CONSUME | ERROR)))
1292 dl 1.16 break;
1293     }
1294 dl 1.61 else if (CTL.compareAndSet(this, c, DISABLED)) {
1295 dl 1.16 detach();
1296 dl 1.1 break;
1297     }
1298     }
1299     }
1300    
1301 dl 1.16 /**
1302     * Adds to demand and possibly starts task.
1303     */
1304     public void request(long n) {
1305     if (n > 0L) {
1306     for (;;) {
1307     long prev = demand, d;
1308     if ((d = prev + n) < prev) // saturate
1309     d = Long.MAX_VALUE;
1310 dl 1.61 if (DEMAND.compareAndSet(this, prev, d)) {
1311 dl 1.31 for (int c, h;;) {
1312 dl 1.27 if ((c = ctl) == DISABLED)
1313 dl 1.16 break;
1314 dl 1.27 else if ((c & ACTIVE) != 0) {
1315 dl 1.29 if ((c & CONSUME) != 0 ||
1316 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME))
1317 dl 1.27 break;
1318     }
1319     else if ((h = head) != tail) {
1320 dl 1.61 if (CTL.compareAndSet(this, c,
1321     c | (ACTIVE|CONSUME))) {
1322 dl 1.16 startOrDisable();
1323     break;
1324     }
1325     }
1326     else if (head == h && tail == h)
1327 dl 1.27 break; // else stale
1328 dl 1.31 if (demand == 0L)
1329     break;
1330 dl 1.16 }
1331     break;
1332     }
1333     }
1334     }
1335     else if (n < 0L)
1336 dl 1.19 onError(new IllegalArgumentException(
1337     "negative subscription request"));
1338 dl 1.16 }
1339    
1340 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1341 dl 1.1 T item = putItem;
1342 dl 1.34 if (item != null) {
1343     if ((putStat = offer(item)) == 0)
1344     return false;
1345     putItem = null;
1346 dl 1.1 }
1347     return true;
1348     }
1349    
1350 dl 1.20 public final boolean block() { // for ManagedBlocker
1351 dl 1.1 T item = putItem;
1352     if (item != null) {
1353     putItem = null;
1354     long nanos = timeout;
1355     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1356     while ((putStat = offer(item)) == 0) {
1357     if (Thread.interrupted()) {
1358 dl 1.16 timeout = INTERRUPTED;
1359 dl 1.1 if (nanos > 0L)
1360     break;
1361     }
1362     else if (nanos > 0L &&
1363     (nanos = deadline - System.nanoTime()) <= 0L)
1364     break;
1365     else if (waiter == null)
1366     waiter = Thread.currentThread();
1367     else {
1368     if (nanos > 0L)
1369     LockSupport.parkNanos(this, nanos);
1370     else
1371     LockSupport.park(this);
1372     waiter = null;
1373     }
1374     }
1375     }
1376     waiter = null;
1377     return true;
1378     }
1379    
1380 dl 1.29 /**
1381     * Consumer loop, called from ConsumerTask, or indirectly
1382 dl 1.39 * when helping during submit.
1383 dl 1.29 */
1384 dl 1.25 final void consume() {
1385 dl 1.1 Flow.Subscriber<? super T> s;
1386 dl 1.34 int h = head;
1387 dl 1.27 if ((s = subscriber) != null) { // else disabled
1388 dl 1.34 for (;;) {
1389 dl 1.27 long d = demand;
1390 dl 1.61 int c; Object[] a; int n, i; Object x; Thread w;
1391 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1392 dl 1.34 if (!checkControl(s, c))
1393 dl 1.20 break;
1394 dl 1.19 }
1395 dl 1.34 else if ((a = array) == null || h == tail ||
1396     (n = a.length) == 0 ||
1397 dl 1.62 (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
1398 dl 1.34 if (!checkEmpty(s, c))
1399 dl 1.27 break;
1400     }
1401 dl 1.34 else if (d == 0L) {
1402     if (!checkDemand(c))
1403     break;
1404 dl 1.1 }
1405 dl 1.34 else if (((c & CONSUME) != 0 ||
1406 dl 1.61 CTL.compareAndSet(this, c, c | CONSUME)) &&
1407     QA.compareAndSet(a, i, x, null)) {
1408     HEAD.setRelease(this, ++h);
1409     DEMAND.getAndAdd(this, -1L);
1410 dl 1.34 if ((w = waiter) != null)
1411     signalWaiter(w);
1412 dl 1.36 try {
1413     @SuppressWarnings("unchecked") T y = (T) x;
1414     s.onNext(y);
1415     } catch (Throwable ex) {
1416     handleOnNext(s, ex);
1417     }
1418 dl 1.34 }
1419     }
1420     }
1421     }
1422    
1423     /**
1424 jsr166 1.42 * Responds to control events in consume().
1425 dl 1.34 */
1426     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1427     boolean stat = true;
1428     if ((c & ERROR) != 0) {
1429     Throwable ex = pendingError;
1430     ctl = DISABLED; // no need for CAS
1431     if (ex != null) { // null if errorless cancel
1432     try {
1433     if (s != null)
1434     s.onError(ex);
1435     } catch (Throwable ignore) {
1436     }
1437     }
1438     }
1439     else if ((c & SUBSCRIBE) != 0) {
1440 dl 1.61 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
1441 dl 1.34 try {
1442     if (s != null)
1443     s.onSubscribe(this);
1444     } catch (Throwable ex) {
1445     onError(ex);
1446     }
1447     }
1448     }
1449     else {
1450     detach();
1451     stat = false;
1452     }
1453     return stat;
1454     }
1455    
1456     /**
1457 jsr166 1.42 * Responds to apparent emptiness in consume().
1458 dl 1.34 */
1459     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1460     boolean stat = true;
1461     if (head == tail) {
1462     if ((c & CONSUME) != 0)
1463 dl 1.61 CTL.compareAndSet(this, c, c & ~CONSUME);
1464 dl 1.34 else if ((c & COMPLETE) != 0) {
1465 dl 1.61 if (CTL.compareAndSet(this, c, DISABLED)) {
1466 dl 1.1 try {
1467 dl 1.34 if (s != null)
1468     s.onComplete();
1469     } catch (Throwable ignore) {
1470 dl 1.1 }
1471     }
1472     }
1473 dl 1.61 else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1474 dl 1.34 stat = false;
1475     }
1476     return stat;
1477     }
1478    
1479     /**
1480 jsr166 1.42 * Responds to apparent zero demand in consume().
1481 dl 1.34 */
1482     private boolean checkDemand(int c) {
1483     boolean stat = true;
1484     if (demand == 0L) {
1485     if ((c & CONSUME) != 0)
1486 dl 1.61 CTL.compareAndSet(this, c, c & ~CONSUME);
1487     else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1488 dl 1.34 stat = false;
1489     }
1490     return stat;
1491     }
1492    
1493     /**
1494 jsr166 1.42 * Processes exception in Subscriber.onNext.
1495 dl 1.34 */
1496 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1497     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1498     if ((h = onNextHandler) != null) {
1499     try {
1500     h.accept(s, ex);
1501     } catch (Throwable ignore) {
1502     }
1503 dl 1.1 }
1504 dl 1.36 onError(ex);
1505 dl 1.1 }
1506    
1507 dl 1.61 // VarHandle mechanics
1508     private static final VarHandle CTL;
1509     private static final VarHandle TAIL;
1510     private static final VarHandle HEAD;
1511     private static final VarHandle DEMAND;
1512     private static final VarHandle QA;
1513 dl 1.1
1514     static {
1515     try {
1516 dl 1.61 MethodHandles.Lookup l = MethodHandles.lookup();
1517     CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1518     int.class);
1519     TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
1520     int.class);
1521     HEAD = l.findVarHandle(BufferedSubscription.class, "head",
1522     int.class);
1523     DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1524     long.class);
1525     QA = MethodHandles.arrayElementVarHandle(Object[].class);
1526 jsr166 1.9 } catch (ReflectiveOperationException e) {
1527 dl 1.1 throw new Error(e);
1528     }
1529 jsr166 1.33
1530     // Reduce the risk of rare disastrous classloading in first call to
1531     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1532     Class<?> ensureLoaded = LockSupport.class;
1533 dl 1.1 }
1534     }
1535     }