ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.37
Committed: Mon Aug 3 15:06:44 2015 UTC (8 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.36: +14 -3 lines
Log Message:
Add exception accessor

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12 dl 1.36 import java.util.function.BiConsumer;
13 dl 1.1 import java.util.function.BiPredicate;
14     import java.util.function.Function;
15 dl 1.15 import java.util.function.Supplier;
16 dl 1.1
17     /**
18 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
19     * (non-null) items to current subscribers until it is closed. Each
20     * current subscriber receives newly submitted items in the same order
21     * unless drops or exceptions are encountered. Using a
22     * SubmissionPublisher allows item generators to act as Publishers
23     * relying on drop handling and/or blocking for flow control.
24 dl 1.1 *
25 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
26 dl 1.1 * constructor for delivery to subscribers. The best choice of
27     * Executor depends on expected usage. If the generator(s) of
28     * submitted items run in separate threads, and the number of
29     * subscribers can be estimated, consider using a {@link
30 dl 1.25 * Executors#newFixedThreadPool}. Otherwise consider using
31     * the default {@link ForkJoinPool#commonPool}.
32 dl 1.1 *
33     * <p>Buffering allows producers and consumers to transiently operate
34     * at different rates. Each subscriber uses an independent buffer.
35 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
36 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
37 dl 1.21 * nearest power of two and/or bounded by the largest value supported
38     * by this implementation.) Invocations of {@link
39 dl 1.16 * Flow.Subscription#request} do not directly result in buffer
40 dl 1.24 * expansion, but risk saturation if unfilled requests exceed the
41 dl 1.25 * maximum capacity. The default value of {@link
42     * Flow#defaultBufferSize} may provide a useful starting point for
43     * choosing a capacity based on expected rates, resources, and usages.
44 dl 1.18 *
45 dl 1.1 * <p>Publication methods support different policies about what to do
46     * when buffers are saturated. Method {@link #submit} blocks until
47 jsr166 1.26 * resources are available. This is simplest, but least responsive.
48     * The {@code offer} methods may drop items (either immediately or
49     * with bounded timeout), but provide an opportunity to interpose a
50     * handler and then retry.
51 dl 1.1 *
52     * <p>If any Subscriber method throws an exception, its subscription
53 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
54     * it is invoked before cancellation upon an exception in method
55     * {@code onNext}, but exceptions in methods {@code onSubscribe}
56     * {@code onError} and {@code onComplete} are not recorded or handled
57     * before cancellation. If the supplied Executor throws {@link
58 dl 1.27 * RejectedExecutionException} (or any other RuntimeException or
59     * Error) when attempting to execute a task, or a drop handler throws
60     * an exception when processing a dropped item, then the exception is
61 dl 1.31 * rethrown. In these cases, not all subscribers will have been issued
62     * the published item. It is usually good practice to {@link
63 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
64 dl 1.1 *
65 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
66     * that generate items, and use the methods in this class to publish
67     * them. For example here is a class that periodically publishes the
68     * items generated from a supplier. (In practice you might add methods
69 dl 1.35 * to independently start and stop generation, to share Executors
70 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
71     * component rather than a superclass.)
72 dl 1.15 *
73     * <pre> {@code
74     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
75     * final ScheduledFuture<?> periodicTask;
76     * final ScheduledExecutorService scheduler;
77 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
78     * Supplier<? extends T> supplier,
79 dl 1.15 * long period, TimeUnit unit) {
80 dl 1.21 * super(executor, maxBufferCapacity);
81 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
82     * periodicTask = scheduler.scheduleAtFixedRate(
83     * () -> submit(supplier.get()), 0, period, unit);
84     * }
85     * public void close() {
86     * periodicTask.cancel(false);
87     * scheduler.shutdown();
88     * super.close();
89     * }
90     * }}</pre>
91     *
92 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
93     * It uses single-step requests to its publisher for simplicity of
94     * illustration. A more adaptive version could monitor flow using the
95 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
96 dl 1.21 * methods.
97 dl 1.16 *
98     * <pre> {@code
99     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
100     * implements Flow.Processor<S,T> {
101     * final Function<? super S, ? extends T> function;
102     * Flow.Subscription subscription;
103 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
104 dl 1.16 * Function<? super S, ? extends T> function) {
105 dl 1.21 * super(executor, maxBufferCapacity);
106 dl 1.16 * this.function = function;
107     * }
108     * public void onSubscribe(Flow.Subscription subscription) {
109     * (this.subscription = subscription).request(1);
110     * }
111     * public void onNext(S item) {
112     * subscription.request(1);
113     * submit(function.apply(item));
114     * }
115     * public void onError(Throwable ex) { closeExceptionally(ex); }
116     * public void onComplete() { close(); }
117     * }}</pre>
118     *
119 dl 1.1 * @param <T> the published item type
120     * @author Doug Lea
121     * @since 1.9
122     */
123     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
124     AutoCloseable {
125     /*
126     * Most mechanics are handled by BufferedSubscription. This class
127 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
128     * built-in synchronization locks across public methods. (Using
129     * built-in locks works well in the most typical case in which
130     * only one thread submits items).
131 dl 1.1 */
132    
133 dl 1.25 /** The largest possible power of two array size. */
134     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
135 dl 1.1
136 dl 1.25 /** Round capacity to power of 2, at least 2, at most limit. */
137 dl 1.1 static final int roundCapacity(int cap) { // to nearest power of 2
138     int n = cap - 1;
139     n |= n >>> 1;
140     n |= n >>> 2;
141     n |= n >>> 4;
142     n |= n >>> 8;
143     n |= n >>> 16;
144 dl 1.21 return (n <= 0) ? 2 : // at least 2
145 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
146 dl 1.1 }
147    
148     /**
149     * Clients (BufferedSubscriptions) are maintained in a linked list
150     * (via their "next" fields). This works well for publish loops.
151     * It requires O(n) traversal to check for duplicate subscribers,
152     * but we expect that subscribing is much less common than
153 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
154 dl 1.25 * when BufferedSubscription methods return negative values
155 dl 1.29 * signifying that they have been disabled. To reduce
156     * head-of-line blocking, submit and offer methods first call
157     * BufferedSubscription.offer on each subscriber, and place
158     * saturated ones in retries list (using nextRetry field), and
159     * retry, possibly blocking or dropping.
160 dl 1.1 */
161     BufferedSubscription<T> clients;
162    
163 dl 1.20 /** Run status, updated only within locks */
164     volatile boolean closed;
165 dl 1.36 /** If non-null, the exception in closeExceptionally */
166 dl 1.37 volatile Throwable closedException;
167 dl 1.20
168 dl 1.1 // Parameters for constructing BufferedSubscriptions
169     final Executor executor;
170 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
171 dl 1.1 final int maxBufferCapacity;
172    
173 dl 1.20 /**
174 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
175 dl 1.36 * async delivery to subscribers, with the given maximum
176     * buffer size for each subscriber, and, if non-null, the given handler
177     * invoked when any Subscriber throws an exception in method {@code onNext}.
178 dl 1.1 *
179     * @param executor the executor to use for async delivery,
180     * supporting creation of at least one independent thread
181     * @param maxBufferCapacity the maximum capacity for each
182 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
183     * the nearest power of two and/or bounded by the largest value
184     * supported by this implementation; method {@link #getMaxBufferCapacity}
185     * returns the actual value)
186 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
187     * thrown in method {@code onNext}.
188 dl 1.1 * @throws NullPointerException if executor is null
189 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
190     * positive
191 dl 1.1 */
192 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
193     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
194 dl 1.21 if (executor == null)
195     throw new NullPointerException();
196     if (maxBufferCapacity <= 0)
197     throw new IllegalArgumentException("capacity must be positive");
198 dl 1.18 this.executor = executor;
199 dl 1.36 this.onNextHandler = handler;
200 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
201 dl 1.18 }
202    
203 dl 1.1 /**
204 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
205     * async delivery to subscribers, with the given maximum
206     * buffer size for each subscriber, and no handler for Subscriber
207     * exceptions in method {@code onNext}.
208     *
209     * @param executor the executor to use for async delivery,
210     * supporting creation of at least one independent thread
211     * @param maxBufferCapacity the maximum capacity for each
212     * subscriber's buffer (the enforced capacity may be rounded up to
213     * the nearest power of two and/or bounded by the largest value
214     * supported by this implementation; method {@link #getMaxBufferCapacity}
215     * returns the actual value)
216     * @throws NullPointerException if executor is null
217     * @throws IllegalArgumentException if maxBufferCapacity not
218     * positive
219     */
220     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
221     this(executor, maxBufferCapacity, null);
222     }
223    
224     /**
225 dl 1.25 * Creates a new SubmissionPublisher using the {@link
226     * ForkJoinPool#commonPool()} for async delivery to subscribers,
227 dl 1.36 * with maximum buffer capacity of {@link Flow#defaultBufferSize},
228     * and no handler for Subscriber exceptions in method {@code
229     * onNext}.
230 dl 1.25 */
231     public SubmissionPublisher() {
232 dl 1.36 this(ForkJoinPool.commonPool(), Flow.defaultBufferSize(), null);
233 dl 1.25 }
234    
235     /**
236 dl 1.18 * Adds the given Subscriber unless already subscribed. If
237 dl 1.30 * already subscribed, the Subscriber's {@code onError} method is
238     * invoked on the existing subscription with an {@link
239     * IllegalStateException}. Otherwise, upon success, the
240     * Subscriber's {@code onSubscribe} method is invoked
241     * asynchronously with a new {@link Flow.Subscription}. If {@code
242     * onSubscribe} throws an exception, the subscription is
243 dl 1.36 * cancelled. Otherwise, if this SubmissionPublisher was closed
244     * exceptionally, then the subscriber's {@code onError} method is
245     * invoked with the corresponding exception, or if closed without
246     * exception, the subscriber's {@code onComplete} method is
247     * invoked. Subscribers may enable receiving items by invoking
248     * the {@code request} method of the new Subscription, and may
249     * unsubscribe by invoking its {@code cancel} method.
250 dl 1.18 *
251     * @param subscriber the subscriber
252     * @throws NullPointerException if subscriber is null
253     */
254     public void subscribe(Flow.Subscriber<? super T> subscriber) {
255     if (subscriber == null) throw new NullPointerException();
256 dl 1.20 BufferedSubscription<T> subscription =
257 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
258     onNextHandler, maxBufferCapacity);
259 dl 1.19 synchronized (this) {
260 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
261     if (b == null) {
262 dl 1.36 Throwable ex;
263 dl 1.29 subscription.onSubscribe();
264 dl 1.37 if ((ex = closedException) != null)
265 dl 1.36 subscription.onError(ex);
266     else if (closed)
267 dl 1.29 subscription.onComplete();
268     else if (pred == null)
269     clients = subscription;
270 dl 1.27 else
271 dl 1.29 pred.next = subscription;
272     break;
273 dl 1.27 }
274 dl 1.29 BufferedSubscription<T> next = b.next;
275     if (b.isDisabled()) { // remove
276     b.next = null; // detach
277 dl 1.19 if (pred == null)
278 dl 1.29 clients = next;
279 dl 1.19 else
280 dl 1.29 pred.next = next;
281     }
282     else if (subscriber.equals(b.subscriber)) {
283     b.onError(new IllegalStateException("Duplicate subscribe"));
284     break;
285 dl 1.19 }
286 dl 1.29 else
287     pred = b;
288     b = next;
289 dl 1.19 }
290     }
291 dl 1.18 }
292    
293     /**
294 dl 1.16 * Publishes the given item to each current subscriber by
295 dl 1.30 * asynchronously invoking its {@link Flow.Subscriber#onNext}
296     * method, blocking uninterruptibly while resources for any
297     * subscriber are unavailable. This method returns an estimate of
298     * the maximum lag (number of items submitted but not yet
299     * consumed) among all current subscribers. This value is at least
300     * one (accounting for this submitted item) if there are any
301     * subscribers, else zero.
302 dl 1.16 *
303     * <p>If the Executor for this publisher throws a
304     * RejectedExecutionException (or any other RuntimeException or
305     * Error) when attempting to asynchronously notify subscribers,
306 dl 1.31 * then this exception is rethrown, in which case not all
307     * subscribers will have been issued this item.
308 dl 1.16 *
309     * @param item the (non-null) item to publish
310 dl 1.21 * @return the estimated maximum lag among subscribers
311 dl 1.16 * @throws IllegalStateException if closed
312     * @throws NullPointerException if item is null
313     * @throws RejectedExecutionException if thrown by Executor
314     */
315 dl 1.21 public int submit(T item) {
316 dl 1.16 if (item == null) throw new NullPointerException();
317 dl 1.21 int lag = 0;
318 dl 1.27 boolean complete;
319 dl 1.16 synchronized (this) {
320 dl 1.27 complete = closed;
321 dl 1.29 BufferedSubscription<T> b = clients;
322 dl 1.27 if (!complete) {
323 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
324 dl 1.27 while (b != null) {
325     BufferedSubscription<T> next = b.next;
326 dl 1.29 int stat = b.offer(item);
327     if (stat < 0) { // disabled
328     b.next = null;
329 dl 1.27 if (pred == null)
330     clients = next;
331 dl 1.24 else
332 dl 1.27 pred.next = next;
333 dl 1.24 }
334 dl 1.27 else {
335 dl 1.29 if (stat > lag)
336     lag = stat;
337     else if (stat == 0) { // place on retry list
338     b.nextRetry = null;
339 dl 1.27 if (rtail == null)
340 dl 1.29 r = b;
341 dl 1.27 else
342     rtail.nextRetry = b;
343     rtail = b;
344     }
345     pred = b;
346     }
347     b = next;
348 dl 1.21 }
349 dl 1.29 while (r != null) {
350     BufferedSubscription<T> nextRetry = r.nextRetry;
351     r.nextRetry = null;
352     int stat = r.submit(item);
353     if (stat > lag)
354     lag = stat;
355     else if (stat < 0 && clients == r)
356     clients = r.next; // postpone internal unsubscribes
357     r = nextRetry;
358     }
359 dl 1.16 }
360     }
361 dl 1.27 if (complete)
362     throw new IllegalStateException("Closed");
363     else
364     return lag;
365 dl 1.16 }
366    
367     /**
368 dl 1.1 * Publishes the given item, if possible, to each current
369 dl 1.30 * subscriber by asynchronously invoking its {@link
370     * Flow.Subscriber#onNext} method. The item may be dropped by one
371     * or more subscribers if resource limits are exceeded, in which
372     * case the given handler (if non-null) is invoked, and if it
373     * returns true, retried once. Other calls to methods in this
374     * class by other threads are blocked while the handler is
375     * invoked. Unless recovery is assured, options are usually
376     * limited to logging the error and/or issuing an {@code onError}
377     * signal to the subscriber.
378 dl 1.1 *
379 dl 1.21 * <p>This method returns a status indicator: If negative, it
380     * represents the (negative) number of drops (failed attempts to
381     * issue the item to a subscriber). Otherwise it is an estimate of
382     * the maximum lag (number of items submitted but not yet
383     * consumed) among all current subscribers. This value is at least
384     * one (accounting for this submitted item) if there are any
385     * subscribers, else zero.
386 jsr166 1.23 *
387 dl 1.1 * <p>If the Executor for this publisher throws a
388     * RejectedExecutionException (or any other RuntimeException or
389     * Error) when attempting to asynchronously notify subscribers, or
390     * the drop handler throws an exception when processing a dropped
391     * item, then this exception is rethrown.
392     *
393     * @param item the (non-null) item to publish
394     * @param onDrop if non-null, the handler invoked upon a drop to a
395     * subscriber, with arguments of the subscriber and item; if it
396     * returns true, an offer is re-attempted (once)
397 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
398     * an estimate of maximum lag
399 dl 1.1 * @throws IllegalStateException if closed
400     * @throws NullPointerException if item is null
401     * @throws RejectedExecutionException if thrown by Executor
402     */
403     public int offer(T item,
404     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
405 dl 1.29 return doOffer(0L, item, onDrop);
406 dl 1.1 }
407    
408     /**
409     * Publishes the given item, if possible, to each current
410 dl 1.30 * subscriber by asynchronously invoking its {@link
411     * Flow.Subscriber#onNext} method, blocking while resources for
412     * any subscription are unavailable, up to the specified timeout
413     * or the caller thread is interrupted, at which point the given
414     * handler (if non-null) is invoked, and if it returns true,
415     * retried once. (The drop handler may distinguish timeouts from
416     * interrupts by checking whether the current thread is
417     * interrupted.) Other calls to methods in this class by other
418     * threads are blocked while the handler is invoked. Unless
419     * recovery is assured, options are usually limited to logging the
420     * error and/or issuing an {@code onError} signal to the
421     * subscriber.
422 dl 1.1 *
423 dl 1.21 * <p>This method returns a status indicator: If negative, it
424     * represents the (negative) number of drops (failed attempts to
425     * issue the item to a subscriber). Otherwise it is an estimate of
426     * the maximum lag (number of items submitted but not yet
427     * consumed) among all current subscribers. This value is at least
428     * one (accounting for this submitted item) if there are any
429     * subscribers, else zero.
430     *
431 dl 1.1 * <p>If the Executor for this publisher throws a
432     * RejectedExecutionException (or any other RuntimeException or
433     * Error) when attempting to asynchronously notify subscribers, or
434     * the drop handler throws an exception when processing a dropped
435     * item, then this exception is rethrown.
436     *
437     * @param item the (non-null) item to publish
438     * @param timeout how long to wait for resources for any subscriber
439     * before giving up, in units of {@code unit}
440     * @param unit a {@code TimeUnit} determining how to interpret the
441     * {@code timeout} parameter
442     * @param onDrop if non-null, the handler invoked upon a drop to a
443     * subscriber, with arguments of the subscriber and item; if it
444     * returns true, an offer is re-attempted (once)
445 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
446     * an estimate of maximum lag
447 dl 1.1 * @throws IllegalStateException if closed
448     * @throws NullPointerException if item is null
449     * @throws RejectedExecutionException if thrown by Executor
450     */
451     public int offer(T item, long timeout, TimeUnit unit,
452     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
453 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
454     }
455    
456     /** Common implementation for both forms of offer */
457     final int doOffer(long nanos, T item,
458     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
459 dl 1.1 if (item == null) throw new NullPointerException();
460 dl 1.27 int lag = 0, drops = 0;
461     boolean complete;
462 jsr166 1.2 synchronized (this) {
463 dl 1.27 complete = closed;
464 dl 1.29 BufferedSubscription<T> b = clients;
465 dl 1.27 if (!complete) {
466 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
467 dl 1.27 while (b != null) {
468     BufferedSubscription<T> next = b.next;
469 dl 1.29 int stat = b.offer(item);
470     if (stat < 0) {
471     b.next = null;
472 dl 1.27 if (pred == null)
473     clients = next;
474     else
475     pred.next = next;
476     }
477     else {
478 dl 1.29 if (stat > lag)
479     lag = stat;
480     else if (stat == 0) {
481     b.nextRetry = null;
482 dl 1.27 if (rtail == null)
483 dl 1.29 r = b;
484 dl 1.27 else
485     rtail.nextRetry = b;
486     rtail = b;
487     }
488 dl 1.29 else if (stat > lag)
489 dl 1.27 lag = stat;
490     pred = b;
491     }
492     b = next;
493 dl 1.1 }
494 dl 1.29 while (r != null) {
495     BufferedSubscription<T> nextRetry = r.nextRetry;
496     r.nextRetry = null;
497     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
498     r.offer(item);
499     if (stat == 0 && onDrop != null &&
500     onDrop.test(r.subscriber, item))
501     stat = r.offer(item);
502     if (stat == 0)
503     ++drops;
504     else if (stat > lag)
505     lag = stat;
506     else if (stat < 0 && clients == r)
507     clients = r.next;
508     r = nextRetry;
509     }
510 dl 1.1 }
511     }
512 dl 1.27 if (complete)
513     throw new IllegalStateException("Closed");
514     else
515     return (drops > 0) ? -drops : lag;
516 dl 1.1 }
517    
518     /**
519 dl 1.30 * Unless already closed, issues {@link
520     * Flow.Subscriber#onComplete} signals to current subscribers, and
521     * disallows subsequent attempts to publish. Upon return, this
522     * method does <em>NOT</em> guarantee that all subscribers have
523     * yet completed.
524 dl 1.1 */
525     public void close() {
526     if (!closed) {
527 dl 1.32 BufferedSubscription<T> b;
528 jsr166 1.2 synchronized (this) {
529 dl 1.1 b = clients;
530     clients = null;
531     closed = true;
532     }
533     while (b != null) {
534 dl 1.32 BufferedSubscription<T> next = b.next;
535     b.next = null;
536 dl 1.19 b.onComplete();
537 dl 1.1 b = next;
538     }
539     }
540     }
541    
542     /**
543 dl 1.30 * Unless already closed, issues {@link Flow.Subscriber#onError}
544     * signals to current subscribers with the given error, and
545 dl 1.36 * disallows subsequent attempts to publish. Future subscribers
546     * also receive the given error. Upon return, this method does
547     * <em>NOT</em> guarantee that all subscribers have yet completed.
548 dl 1.1 *
549 dl 1.30 * @param error the {@code onError} argument sent to subscribers
550 dl 1.1 * @throws NullPointerException if error is null
551     */
552     public void closeExceptionally(Throwable error) {
553     if (error == null)
554     throw new NullPointerException();
555     if (!closed) {
556 dl 1.32 BufferedSubscription<T> b;
557 jsr166 1.2 synchronized (this) {
558 dl 1.1 b = clients;
559     clients = null;
560     closed = true;
561 dl 1.37 closedException = error;
562 dl 1.1 }
563     while (b != null) {
564 dl 1.32 BufferedSubscription<T> next = b.next;
565     b.next = null;
566 dl 1.19 b.onError(error);
567 dl 1.1 b = next;
568     }
569     }
570     }
571    
572     /**
573     * Returns true if this publisher is not accepting submissions.
574     *
575     * @return true if closed
576     */
577     public boolean isClosed() {
578     return closed;
579     }
580    
581 dl 1.37
582     /**
583     * Returns the exception associated with {@link #closeExceptionally},
584     * or null if not closed or if closed normally.
585     *
586     * @return the exception, or null if none
587     */
588     public Throwable getClosedException() {
589     return closedException;
590     }
591    
592 dl 1.1 /**
593 jsr166 1.6 * Returns true if this publisher has any subscribers.
594 dl 1.1 *
595     * @return true if this publisher has any subscribers
596     */
597     public boolean hasSubscribers() {
598     boolean nonEmpty = false;
599     if (!closed) {
600 jsr166 1.2 synchronized (this) {
601 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
602     BufferedSubscription<T> next = b.next;
603 dl 1.19 if (b.isDisabled()) {
604 dl 1.29 b.next = null;
605 dl 1.31 b = clients = next;
606 jsr166 1.2 }
607 dl 1.1 else {
608     nonEmpty = true;
609     break;
610     }
611     }
612     }
613     }
614     return nonEmpty;
615     }
616    
617     /**
618 dl 1.21 * Returns the number of current subscribers.
619 dl 1.1 *
620 dl 1.21 * @return the number of current subscribers
621 dl 1.1 */
622 dl 1.21 public int getNumberOfSubscribers() {
623     int count = 0;
624     if (!closed) {
625     synchronized (this) {
626     BufferedSubscription<T> pred = null, next;
627     for (BufferedSubscription<T> b = clients; b != null; b = next) {
628     next = b.next;
629     if (b.isDisabled()) {
630 dl 1.29 b.next = null;
631 dl 1.21 if (pred == null)
632     clients = next;
633     else
634     pred.next = next;
635     }
636     else {
637     pred = b;
638     ++count;
639     }
640     }
641     }
642     }
643     return count;
644 dl 1.1 }
645    
646 jsr166 1.7 /**
647 dl 1.21 * Returns the Executor used for asynchronous delivery.
648 dl 1.1 *
649 dl 1.21 * @return the Executor used for asynchronous delivery
650 dl 1.1 */
651 dl 1.21 public Executor getExecutor() {
652     return executor;
653 dl 1.1 }
654    
655 jsr166 1.7 /**
656 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
657 dl 1.1 *
658 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
659 dl 1.1 */
660     public int getMaxBufferCapacity() {
661     return maxBufferCapacity;
662     }
663    
664     /**
665 dl 1.29 * Returns a list of current subscribers for monitoring and
666     * tracking purposes, not for invoking {@link Flow.Subscriber}
667     * methods on the subscribers.
668 dl 1.1 *
669     * @return list of current subscribers
670     */
671     public List<Flow.Subscriber<? super T>> getSubscribers() {
672     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
673 jsr166 1.2 synchronized (this) {
674 dl 1.1 BufferedSubscription<T> pred = null, next;
675     for (BufferedSubscription<T> b = clients; b != null; b = next) {
676     next = b.next;
677 dl 1.19 if (b.isDisabled()) {
678 dl 1.29 b.next = null;
679 dl 1.1 if (pred == null)
680     clients = next;
681     else
682     pred.next = next;
683 jsr166 1.2 }
684 dl 1.1 else
685     subs.add(b.subscriber);
686     }
687     }
688     return subs;
689     }
690 jsr166 1.2
691 dl 1.1 /**
692     * Returns true if the given Subscriber is currently subscribed.
693     *
694     * @param subscriber the subscriber
695     * @return true if currently subscribed
696 dl 1.21 * @throws NullPointerException if subscriber is null
697 dl 1.1 */
698     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
699 dl 1.21 if (subscriber == null) throw new NullPointerException();
700 jsr166 1.2 if (!closed) {
701     synchronized (this) {
702 dl 1.1 BufferedSubscription<T> pred = null, next;
703     for (BufferedSubscription<T> b = clients; b != null; b = next) {
704     next = b.next;
705 dl 1.19 if (b.isDisabled()) {
706 dl 1.29 b.next = null;
707 dl 1.1 if (pred == null)
708     clients = next;
709     else
710     pred.next = next;
711 jsr166 1.2 }
712 dl 1.21 else if (subscriber.equals(b.subscriber))
713 dl 1.1 return true;
714 dl 1.21 else
715     pred = b;
716 dl 1.1 }
717     }
718     }
719     return false;
720     }
721    
722     /**
723 dl 1.21 * Returns an estimate of the minimum number of items requested
724     * (via {@link Flow.Subscription#request}) but not yet produced,
725     * among all current subscribers.
726     *
727     * @return the estimate, or zero if no subscribers
728     */
729     public long estimateMinimumDemand() {
730     long min = Long.MAX_VALUE;
731     boolean nonEmpty = false;
732     synchronized (this) {
733     BufferedSubscription<T> pred = null, next;
734     for (BufferedSubscription<T> b = clients; b != null; b = next) {
735     int n; long d;
736     next = b.next;
737     if ((n = b.estimateLag()) < 0) {
738 dl 1.29 b.next = null;
739 dl 1.21 if (pred == null)
740     clients = next;
741     else
742     pred.next = next;
743     }
744     else {
745     if ((d = b.demand - n) < min)
746     min = d;
747     nonEmpty = true;
748 dl 1.32 pred = b;
749 dl 1.21 }
750     }
751     }
752 jsr166 1.22 return nonEmpty ? min : 0;
753 dl 1.21 }
754    
755     /**
756     * Returns an estimate of the maximum number of items produced but
757     * not yet consumed among all current subscribers.
758 dl 1.17 *
759     * @return the estimate
760 dl 1.1 */
761 dl 1.21 public int estimateMaximumLag() {
762     int max = 0;
763 dl 1.17 synchronized (this) {
764     BufferedSubscription<T> pred = null, next;
765     for (BufferedSubscription<T> b = clients; b != null; b = next) {
766 dl 1.21 int n;
767 dl 1.17 next = b.next;
768 dl 1.21 if ((n = b.estimateLag()) < 0) {
769 dl 1.29 b.next = null;
770 dl 1.17 if (pred == null)
771     clients = next;
772     else
773     pred.next = next;
774     }
775 dl 1.32 else {
776     if (n > max)
777     max = n;
778     pred = b;
779     }
780 dl 1.17 }
781     }
782 dl 1.21 return max;
783 dl 1.1 }
784    
785 dl 1.25 /**
786     * A task for consuming buffer items and signals, created and
787     * executed whenever they become available. A task consumes as
788     * many items/signals as possible before terminating, at which
789     * point another task is created when needed. The dual Runnable
790     * and ForkJoinTask declaration saves overhead when executed by
791     * ForkJoinPools, without impacting other kinds of Executors.
792     */
793     @SuppressWarnings("serial")
794     static final class ConsumerTask<T> extends ForkJoinTask<Void>
795     implements Runnable {
796     final BufferedSubscription<T> consumer;
797     ConsumerTask(BufferedSubscription<T> consumer) {
798     this.consumer = consumer;
799     }
800     public final Void getRawResult() { return null; }
801     public final void setRawResult(Void v) {}
802     public final boolean exec() { consumer.consume(); return false; }
803     public final void run() { consumer.consume(); }
804     }
805    
806 dl 1.1 /**
807 dl 1.15 * A bounded (ring) buffer with integrated control to start a
808     * consumer task whenever items are available. The buffer
809 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
810     * internal documentation for details) specialized for the case of
811     * at most one concurrent producer and consumer, and power of two
812     * buffer sizes. This allows methods to operate without locks even
813     * while supporting resizing, blocking, task-triggering, and
814     * garbage-free buffers (nulling out elements when consumed),
815     * although supporting these does impose a bit of overhead
816     * compared to plain fixed-size ring buffers.
817 dl 1.1 *
818     * The publisher guarantees a single producer via its lock. We
819     * ensure in this class that there is at most one consumer. The
820     * request and cancel methods must be fully thread-safe but are
821     * coded to exploit the most common case in which they are only
822     * called by consumers (usually within onNext).
823     *
824 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
825     * ensure that a task is active when consumable items (and
826 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
827 dl 1.24 * there is demand (unfilled requests). This is complicated on
828 dl 1.19 * the creation side by the possibility of exceptions when trying
829     * to execute tasks. These eventually force DISABLED state, but
830 dl 1.1 * sometimes not directly. On the task side, termination (clearing
831 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
832 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
833 dl 1.1 *
834     * The ctl field also manages run state. When DISABLED, no further
835 dl 1.20 * updates are possible. Disabling may be preceded by setting
836 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
837     * case the associated Subscriber methods are invoked, possibly
838     * synchronously if there is no active consumer task (including
839 dl 1.20 * cases where execute() failed). The cancel() method is supported
840     * by treating as ERROR but suppressing onError signal.
841 dl 1.1 *
842     * Support for blocking also exploits the fact that there is only
843     * one possible waiter. ManagedBlocker-compatible control fields
844     * are placed in this class itself rather than in wait-nodes.
845     * Blocking control relies on the "waiter" field. Producers set
846     * the field before trying to block, but must then recheck (via
847     * offer) before parking. Signalling then just unparks and clears
848 dl 1.20 * waiter field. If the producer and consumer are both in the same
849 dl 1.29 * ForkJoinPool, or consumers are running in commonPool, the
850     * producer attempts to help run consumer tasks that it forked
851     * before blocking. To avoid potential cycles, only one level of
852     * helping is currently supported.
853 dl 1.1 *
854     * This class uses @Contended and heuristic field declaration
855 dl 1.31 * ordering to reduce false-sharing-based memory contention among
856     * instances of BufferedSubscription, but it does not currently
857     * attempt to avoid memory contention among buffers. This field
858     * and element packing can hurt performance especially when each
859     * publisher has only one client operating at a high rate.
860 dl 1.1 * Addressing this may require allocating substantially more space
861     * than users expect.
862     */
863 dl 1.15 @SuppressWarnings("serial")
864 dl 1.1 @sun.misc.Contended
865 dl 1.25 static final class BufferedSubscription<T>
866     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
867 dl 1.1 // Order-sensitive field declarations
868 dl 1.27 long timeout; // > 0 if timed wait
869     volatile long demand; // # unfilled requests
870     int maxCapacity; // reduced on OOME
871     int putStat; // offer result for ManagedBlocker
872 dl 1.29 int helpDepth; // nested helping depth (at most 1)
873 dl 1.27 volatile int ctl; // atomic run state flags
874     volatile int head; // next position to take
875 dl 1.34 int tail; // next position to put
876 dl 1.27 Object[] array; // buffer: null if disabled
877 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
878 dl 1.27 Executor executor; // null if disabled
879 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
880 dl 1.27 volatile Throwable pendingError; // holds until onError issued
881     volatile Thread waiter; // blocked producer thread
882     T putItem; // for offer within ManagedBlocker
883     BufferedSubscription<T> next; // used only by publisher
884     BufferedSubscription<T> nextRetry; // used only by publisher
885 dl 1.1
886     // ctl values
887 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
888 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
889 dl 1.27 static final int DISABLED = 0x04; // final state
890     static final int ERROR = 0x08; // signal onError then disable
891     static final int SUBSCRIBE = 0x10; // signal onSubscribe
892     static final int COMPLETE = 0x20; // signal onComplete when done
893 dl 1.1
894 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
895 dl 1.16
896 jsr166 1.22 /**
897 dl 1.21 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
898     */
899 dl 1.34 static final int MINCAP = 16;
900 dl 1.21
901 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
902 dl 1.36 Executor executor,
903     BiConsumer<? super Flow.Subscriber<? super T>,
904     ? super Throwable> onNextHandler,
905     int maxBufferCapacity) {
906 dl 1.1 this.subscriber = subscriber;
907     this.executor = executor;
908 dl 1.36 this.onNextHandler = onNextHandler;
909 dl 1.1 this.maxCapacity = maxBufferCapacity;
910     }
911    
912 dl 1.19 final boolean isDisabled() {
913 dl 1.20 return ctl == DISABLED;
914     }
915    
916 dl 1.21 /**
917     * Returns estimated number of buffered items, or -1 if
918     * disabled
919     */
920     final int estimateLag() {
921 dl 1.20 int n;
922 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
923 dl 1.19 }
924    
925 dl 1.1 /**
926 dl 1.16 * Tries to add item and start consumer task if necessary.
927 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
928 dl 1.1 */
929     final int offer(T item) {
930 dl 1.34 int h = head, t = tail, cap, size, stat;
931 dl 1.1 Object[] a = array;
932 dl 1.34 if (a != null && (cap = a.length) > 0 && cap > (size = t + 1 - h)) {
933     a[(cap - 1) & t] = item; // relaxed writes OK
934     tail = t + 1;
935     U.storeFence(); // ensure fields written
936 dl 1.27 stat = size;
937 dl 1.1 }
938 dl 1.34 else
939     stat = growAndAdd(a, item);
940 dl 1.29 return (stat > 0 &&
941     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
942 dl 1.27 startOnOffer(stat) : stat;
943     }
944    
945     /**
946 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
947 dl 1.27 */
948     private int growAndAdd(Object[] a, T item) {
949 dl 1.34 boolean alloc;
950 dl 1.27 int cap, stat;
951     if ((ctl & (ERROR | DISABLED)) != 0) {
952     cap = 0;
953     stat = -1;
954 dl 1.34 alloc = false;
955 dl 1.27 }
956 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
957 dl 1.27 cap = 0;
958     stat = 1;
959 dl 1.34 alloc = true;
960     }
961     else {
962     U.fullFence(); // recheck
963     int h = head, t = tail, size = t + 1 - h;
964     if (cap > size) {
965     a[(cap - 1) & t] = item;
966     tail = t + 1;
967     U.storeFence();
968     stat = size;
969     alloc = false;
970     }
971     else if (cap >= maxCapacity) {
972     stat = 0; // cannot grow
973     alloc = false;
974     }
975     else {
976     stat = cap + 1;
977     alloc = true;
978     }
979 dl 1.27 }
980 dl 1.34 if (alloc) {
981 dl 1.27 int newCap = (cap > 0 ? cap << 1 :
982     maxCapacity >= MINCAP ? MINCAP :
983     maxCapacity >= 2 ? maxCapacity : 2);
984     if (newCap <= cap)
985     stat = 0;
986     else {
987     Object[] newArray = null;
988 dl 1.16 try {
989 dl 1.27 newArray = new Object[newCap];
990     } catch (Throwable ex) { // try to cope with OOME
991     }
992     if (newArray == null) {
993     if (cap > 0)
994     maxCapacity = cap; // avoid continuous failure
995     stat = 0;
996     }
997     else {
998     array = newArray;
999     int t = tail;
1000     int newMask = newCap - 1;
1001     if (a != null && cap > 0) {
1002     int mask = cap - 1;
1003     for (int j = head; j != t; ++j) {
1004 dl 1.34 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1005     Object x = U.getObjectVolatile(a, k);
1006     if (x != null && // races with consumer
1007     U.compareAndSwapObject(a, k, x, null))
1008 dl 1.27 newArray[j & newMask] = x;
1009     }
1010     }
1011     newArray[t & newMask] = item;
1012     tail = t + 1;
1013 dl 1.34 U.storeFence();
1014 dl 1.16 }
1015     }
1016     }
1017 dl 1.21 return stat;
1018 dl 1.1 }
1019    
1020 dl 1.34
1021 dl 1.1 /**
1022 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1023 dl 1.29 * initial offer return 0.
1024 dl 1.20 */
1025     final int submit(T item) {
1026 dl 1.34 int stat; Executor e; ForkJoinWorkerThread w;
1027     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1028     ((e = executor) instanceof ForkJoinPool)) {
1029 dl 1.29 Thread thread = Thread.currentThread();
1030 dl 1.20 if ((thread instanceof ForkJoinWorkerThread) &&
1031 dl 1.34 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1032     stat = internalHelpConsume(w.workQueue, item);
1033     else if (e == ForkJoinPool.commonPool())
1034     stat = externalHelpConsume
1035     (ForkJoinPool.commonSubmitterQueue(), item);
1036 dl 1.20 }
1037 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
1038     putItem = item;
1039     timeout = 0L;
1040     try {
1041     ForkJoinPool.managedBlock(this);
1042     } catch (InterruptedException ie) {
1043     timeout = INTERRUPTED;
1044     }
1045     stat = putStat;
1046     if (timeout < 0L)
1047     Thread.currentThread().interrupt();
1048     }
1049 dl 1.20 return stat;
1050     }
1051    
1052     /**
1053 dl 1.34 * Tries helping for FJ submitter
1054     */
1055     private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1056     ForkJoinTask<?> t;
1057     int stat = 0;
1058     if (w != null) {
1059     while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1060     if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1061     break;
1062     ((ConsumerTask<?>)t).consumer.helpConsume();
1063     }
1064     }
1065     return stat;
1066     }
1067    
1068     /**
1069     * Tries helping for non-FJ submitter
1070     */
1071     private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1072     ForkJoinTask<?> t;
1073     int stat = 0;
1074     if (w != null) {
1075     while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1076     if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1077     break;
1078     ((ConsumerTask<?>)t).consumer.helpConsume();
1079     }
1080     }
1081     return stat;
1082     }
1083    
1084     /**
1085 dl 1.27 * Timeout version; similar to submit
1086 dl 1.20 */
1087 dl 1.27 final int timedOffer(T item, long nanos) {
1088 dl 1.34 int stat; Executor e;
1089     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1090     ((e = executor) instanceof ForkJoinPool)) {
1091 dl 1.29 Thread thread = Thread.currentThread();
1092     if (((thread instanceof ForkJoinWorkerThread) &&
1093     ((ForkJoinWorkerThread)thread).getPool() == e) ||
1094     e == ForkJoinPool.commonPool()) {
1095     ForkJoinTask<?> t;
1096     long deadline = System.nanoTime() + nanos;
1097     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1098     (t instanceof ConsumerTask)) {
1099     if ((stat = offer(item)) != 0 ||
1100     (nanos = deadline - System.nanoTime()) <= 0L ||
1101     !t.tryUnfork())
1102     break;
1103     ((ConsumerTask<?>)t).consumer.helpConsume();
1104     }
1105 dl 1.20 }
1106 dl 1.25 }
1107     if (stat == 0 && (stat = offer(item)) == 0 &&
1108     (timeout = nanos) > 0L) {
1109     putItem = item;
1110     try {
1111     ForkJoinPool.managedBlock(this);
1112     } catch (InterruptedException ie) {
1113     timeout = INTERRUPTED;
1114     }
1115     stat = putStat;
1116     if (timeout < 0L)
1117     Thread.currentThread().interrupt();
1118 dl 1.20 }
1119     return stat;
1120     }
1121    
1122 dl 1.29 /** Version of consume called when helping in submit or timedOffer */
1123     private void helpConsume() {
1124     helpDepth = 1; // only one level allowed
1125     consume();
1126     helpDepth = 0;
1127     }
1128    
1129 dl 1.20 /**
1130 dl 1.27 * Tries to start consumer task after offer.
1131     * @return -1 if now disabled, else argument
1132     */
1133     private int startOnOffer(int stat) {
1134     for (;;) {
1135     Executor e; int c;
1136     if ((c = ctl) == DISABLED || (e = executor) == null) {
1137     stat = -1;
1138     break;
1139     }
1140     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1141 dl 1.29 if ((c & CONSUME) != 0 ||
1142 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1143 dl 1.29 c | CONSUME))
1144 dl 1.27 break;
1145     }
1146     else if (demand == 0L || tail == head)
1147     break;
1148     else if (U.compareAndSwapInt(this, CTL, c,
1149 dl 1.29 c | (ACTIVE | CONSUME))) {
1150 dl 1.27 try {
1151     e.execute(new ConsumerTask<T>(this));
1152     break;
1153     } catch (RuntimeException | Error ex) { // back out
1154 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1155 dl 1.27 (c & ACTIVE) != 0 &&
1156     !U.compareAndSwapInt(this, CTL, c,
1157     c & ~ACTIVE));
1158     throw ex;
1159     }
1160     }
1161     }
1162     return stat;
1163     }
1164    
1165 dl 1.34 private void signalWaiter(Thread w) {
1166     waiter = null;
1167     LockSupport.unpark(w); // release producer
1168     }
1169    
1170 dl 1.27 /**
1171 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1172 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1173     * upon error by nulling required components.
1174 dl 1.1 */
1175 dl 1.27 private void detach() {
1176     Thread w = waiter;
1177     executor = null;
1178     subscriber = null;
1179 dl 1.16 pendingError = null;
1180 dl 1.34 signalWaiter(w);
1181 dl 1.1 }
1182    
1183     /**
1184 dl 1.19 * Issues error signal, asynchronously if a task is running,
1185 jsr166 1.26 * else synchronously.
1186 dl 1.19 */
1187     final void onError(Throwable ex) {
1188     for (int c;;) {
1189 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1190 dl 1.19 break;
1191     else if ((c & ACTIVE) != 0) {
1192     pendingError = ex;
1193     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1194     break; // cause consumer task to exit
1195     }
1196     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1197     Flow.Subscriber<? super T> s = subscriber;
1198     if (s != null && ex != null) {
1199     try {
1200     s.onError(ex);
1201     } catch (Throwable ignore) {
1202     }
1203     }
1204     detach();
1205     break;
1206     }
1207     }
1208     }
1209    
1210     /**
1211 dl 1.1 * Tries to start consumer task upon a signal or request;
1212 jsr166 1.12 * disables on failure.
1213 dl 1.1 */
1214 dl 1.27 private void startOrDisable() {
1215     Executor e;
1216     if ((e = executor) != null) { // skip if already disabled
1217 dl 1.1 try {
1218 dl 1.25 e.execute(new ConsumerTask<T>(this));
1219 dl 1.27 } catch (Throwable ex) { // back out and force signal
1220 dl 1.1 for (int c;;) {
1221 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1222 dl 1.1 break;
1223     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1224 dl 1.19 onError(ex);
1225 dl 1.1 break;
1226     }
1227     }
1228     }
1229     }
1230     }
1231    
1232 dl 1.19 final void onComplete() {
1233 dl 1.1 for (int c;;) {
1234 dl 1.20 if ((c = ctl) == DISABLED)
1235 dl 1.1 break;
1236 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1237 dl 1.29 c | (ACTIVE | CONSUME | COMPLETE))) {
1238 dl 1.16 if ((c & ACTIVE) == 0)
1239     startOrDisable();
1240 dl 1.1 break;
1241     }
1242     }
1243     }
1244    
1245 dl 1.19 final void onSubscribe() {
1246 dl 1.1 for (int c;;) {
1247 dl 1.20 if ((c = ctl) == DISABLED)
1248 dl 1.1 break;
1249 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1250 dl 1.29 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1251 dl 1.19 if ((c & ACTIVE) == 0)
1252     startOrDisable();
1253 dl 1.1 break;
1254     }
1255     }
1256     }
1257    
1258 dl 1.16 /**
1259     * Causes consumer task to exit if active (without reporting
1260     * onError unless there is already a pending error), and
1261     * disables.
1262     */
1263     public void cancel() {
1264 dl 1.1 for (int c;;) {
1265 dl 1.20 if ((c = ctl) == DISABLED)
1266 dl 1.1 break;
1267 dl 1.16 else if ((c & ACTIVE) != 0) {
1268 dl 1.29 if (U.compareAndSwapInt(this, CTL, c,
1269     c | (CONSUME | ERROR)))
1270 dl 1.16 break;
1271     }
1272     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1273     detach();
1274 dl 1.1 break;
1275     }
1276     }
1277     }
1278    
1279 dl 1.16 /**
1280     * Adds to demand and possibly starts task.
1281     */
1282     public void request(long n) {
1283     if (n > 0L) {
1284     for (;;) {
1285     long prev = demand, d;
1286     if ((d = prev + n) < prev) // saturate
1287     d = Long.MAX_VALUE;
1288     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1289 dl 1.31 for (int c, h;;) {
1290 dl 1.27 if ((c = ctl) == DISABLED)
1291 dl 1.16 break;
1292 dl 1.27 else if ((c & ACTIVE) != 0) {
1293 dl 1.29 if ((c & CONSUME) != 0 ||
1294 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1295 dl 1.29 c | CONSUME))
1296 dl 1.27 break;
1297     }
1298     else if ((h = head) != tail) {
1299 dl 1.16 if (U.compareAndSwapInt(this, CTL, c,
1300 dl 1.29 c | (ACTIVE|CONSUME))) {
1301 dl 1.16 startOrDisable();
1302     break;
1303     }
1304     }
1305     else if (head == h && tail == h)
1306 dl 1.27 break; // else stale
1307 dl 1.31 if (demand == 0L)
1308     break;
1309 dl 1.16 }
1310     break;
1311     }
1312     }
1313     }
1314     else if (n < 0L)
1315 dl 1.19 onError(new IllegalArgumentException(
1316     "negative subscription request"));
1317 dl 1.16 }
1318    
1319 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1320 dl 1.1 T item = putItem;
1321 dl 1.34 if (item != null) {
1322     if ((putStat = offer(item)) == 0)
1323     return false;
1324     putItem = null;
1325 dl 1.1 }
1326     return true;
1327     }
1328    
1329 dl 1.20 public final boolean block() { // for ManagedBlocker
1330 dl 1.1 T item = putItem;
1331     if (item != null) {
1332     putItem = null;
1333     long nanos = timeout;
1334     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1335     while ((putStat = offer(item)) == 0) {
1336     if (Thread.interrupted()) {
1337 dl 1.16 timeout = INTERRUPTED;
1338 dl 1.1 if (nanos > 0L)
1339     break;
1340     }
1341     else if (nanos > 0L &&
1342     (nanos = deadline - System.nanoTime()) <= 0L)
1343     break;
1344     else if (waiter == null)
1345     waiter = Thread.currentThread();
1346     else {
1347     if (nanos > 0L)
1348     LockSupport.parkNanos(this, nanos);
1349     else
1350     LockSupport.park(this);
1351     waiter = null;
1352     }
1353     }
1354     }
1355     waiter = null;
1356     return true;
1357     }
1358    
1359 dl 1.34
1360 dl 1.29 /**
1361     * Consumer loop, called from ConsumerTask, or indirectly
1362     * via helpConsume when helping during submit.
1363     */
1364 dl 1.25 final void consume() {
1365 dl 1.1 Flow.Subscriber<? super T> s;
1366 dl 1.34 int h = head;
1367 dl 1.27 if ((s = subscriber) != null) { // else disabled
1368 dl 1.34 for (;;) {
1369 dl 1.27 long d = demand;
1370 dl 1.34 int c; Object[] a; int n; long i; Object x; Thread w;
1371 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1372 dl 1.34 if (!checkControl(s, c))
1373 dl 1.20 break;
1374 dl 1.19 }
1375 dl 1.34 else if ((a = array) == null || h == tail ||
1376     (n = a.length) == 0 ||
1377     (x = U.getObjectVolatile
1378     (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1379     == null) {
1380     if (!checkEmpty(s, c))
1381 dl 1.27 break;
1382     }
1383 dl 1.34 else if (d == 0L) {
1384     if (!checkDemand(c))
1385     break;
1386 dl 1.1 }
1387 dl 1.34 else if (((c & CONSUME) != 0 ||
1388     U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1389     U.compareAndSwapObject(a, i, x, null)) {
1390 dl 1.27 U.putOrderedInt(this, HEAD, ++h);
1391 dl 1.34 U.getAndAddLong(this, DEMAND, -1L);
1392     if ((w = waiter) != null)
1393     signalWaiter(w);
1394 dl 1.36 try {
1395     @SuppressWarnings("unchecked") T y = (T) x;
1396     s.onNext(y);
1397     } catch (Throwable ex) {
1398     handleOnNext(s, ex);
1399     }
1400 dl 1.34 }
1401     }
1402     }
1403     }
1404    
1405     /**
1406     * Respond to control events in consume()
1407     */
1408     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1409     boolean stat = true;
1410     if ((c & ERROR) != 0) {
1411     Throwable ex = pendingError;
1412     ctl = DISABLED; // no need for CAS
1413     if (ex != null) { // null if errorless cancel
1414     try {
1415     if (s != null)
1416     s.onError(ex);
1417     } catch (Throwable ignore) {
1418     }
1419     }
1420     }
1421     else if ((c & SUBSCRIBE) != 0) {
1422     if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1423     try {
1424     if (s != null)
1425     s.onSubscribe(this);
1426     } catch (Throwable ex) {
1427     onError(ex);
1428     }
1429     }
1430     }
1431     else {
1432     detach();
1433     stat = false;
1434     }
1435     return stat;
1436     }
1437    
1438     /**
1439     * Respond to apparent emptiness in consume()
1440     */
1441     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1442     boolean stat = true;
1443     if (head == tail) {
1444     if ((c & CONSUME) != 0)
1445     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1446     else if ((c & COMPLETE) != 0) {
1447     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1448 dl 1.1 try {
1449 dl 1.34 if (s != null)
1450     s.onComplete();
1451     } catch (Throwable ignore) {
1452 dl 1.1 }
1453     }
1454     }
1455 dl 1.34 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1456     stat = false;
1457     }
1458     return stat;
1459     }
1460    
1461     /**
1462     * Respond to apparent zero demand in consume()
1463     */
1464     private boolean checkDemand(int c) {
1465     boolean stat = true;
1466     if (demand == 0L) {
1467     if ((c & CONSUME) != 0)
1468     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1469     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1470     stat = false;
1471     }
1472     return stat;
1473     }
1474    
1475     /**
1476 dl 1.36 * Process exception in Subscriber.onNext
1477 dl 1.34 */
1478 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1479     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1480     if ((h = onNextHandler) != null) {
1481     try {
1482     h.accept(s, ex);
1483     } catch (Throwable ignore) {
1484     }
1485 dl 1.1 }
1486 dl 1.36 onError(ex);
1487 dl 1.1 }
1488    
1489 jsr166 1.9 // Unsafe mechanics
1490     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1491 dl 1.1 private static final long CTL;
1492     private static final long TAIL;
1493     private static final long HEAD;
1494     private static final long DEMAND;
1495     private static final int ABASE;
1496     private static final int ASHIFT;
1497    
1498     static {
1499     try {
1500     CTL = U.objectFieldOffset
1501 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1502 dl 1.1 TAIL = U.objectFieldOffset
1503 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1504 dl 1.1 HEAD = U.objectFieldOffset
1505 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1506 dl 1.1 DEMAND = U.objectFieldOffset
1507 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1508    
1509     ABASE = U.arrayBaseOffset(Object[].class);
1510     int scale = U.arrayIndexScale(Object[].class);
1511 dl 1.1 if ((scale & (scale - 1)) != 0)
1512     throw new Error("data type scale not a power of two");
1513     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1514 jsr166 1.9 } catch (ReflectiveOperationException e) {
1515 dl 1.1 throw new Error(e);
1516     }
1517 jsr166 1.33
1518     // Reduce the risk of rare disastrous classloading in first call to
1519     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1520     Class<?> ensureLoaded = LockSupport.class;
1521 dl 1.1 }
1522     }
1523     }