ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.43
Committed: Thu Sep 10 15:59:21 2015 UTC (8 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.42: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12 dl 1.36 import java.util.function.BiConsumer;
13 dl 1.1 import java.util.function.BiPredicate;
14     import java.util.function.Function;
15 dl 1.15 import java.util.function.Supplier;
16 dl 1.1
17     /**
18 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
19     * (non-null) items to current subscribers until it is closed. Each
20     * current subscriber receives newly submitted items in the same order
21     * unless drops or exceptions are encountered. Using a
22     * SubmissionPublisher allows item generators to act as Publishers
23     * relying on drop handling and/or blocking for flow control.
24 dl 1.1 *
25 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
26 dl 1.1 * constructor for delivery to subscribers. The best choice of
27     * Executor depends on expected usage. If the generator(s) of
28     * submitted items run in separate threads, and the number of
29     * subscribers can be estimated, consider using a {@link
30 dl 1.25 * Executors#newFixedThreadPool}. Otherwise consider using
31     * the default {@link ForkJoinPool#commonPool}.
32 dl 1.1 *
33     * <p>Buffering allows producers and consumers to transiently operate
34     * at different rates. Each subscriber uses an independent buffer.
35 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
36 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
37 dl 1.21 * nearest power of two and/or bounded by the largest value supported
38     * by this implementation.) Invocations of {@link
39 dl 1.16 * Flow.Subscription#request} do not directly result in buffer
40 dl 1.24 * expansion, but risk saturation if unfilled requests exceed the
41 dl 1.25 * maximum capacity. The default value of {@link
42     * Flow#defaultBufferSize} may provide a useful starting point for
43     * choosing a capacity based on expected rates, resources, and usages.
44 dl 1.18 *
45 dl 1.1 * <p>Publication methods support different policies about what to do
46     * when buffers are saturated. Method {@link #submit} blocks until
47 jsr166 1.26 * resources are available. This is simplest, but least responsive.
48     * The {@code offer} methods may drop items (either immediately or
49     * with bounded timeout), but provide an opportunity to interpose a
50     * handler and then retry.
51 dl 1.1 *
52     * <p>If any Subscriber method throws an exception, its subscription
53 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
54     * it is invoked before cancellation upon an exception in method
55 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
56     * {@link Flow.Subscriber#onSubscribe onSubscribe},
57     * {@link Flow.Subscriber#onError #onError} and
58     * {@link Flow.Subscriber#onComplete #onComplete} are not recorded or
59     * handled before cancellation. If the supplied Executor throws
60     * {@link RejectedExecutionException} (or any other RuntimeException
61     * or Error) when attempting to execute a task, or a drop handler
62     * throws an exception when processing a dropped item, then the
63     * exception is rethrown. In these cases, not all subscribers will
64     * have been issued the published item. It is usually good practice to
65     * {@link #closeExceptionally closeExceptionally} in these cases.
66 dl 1.1 *
67 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
68     * that generate items, and use the methods in this class to publish
69     * them. For example here is a class that periodically publishes the
70     * items generated from a supplier. (In practice you might add methods
71 dl 1.35 * to independently start and stop generation, to share Executors
72 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
73     * component rather than a superclass.)
74 dl 1.15 *
75     * <pre> {@code
76     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
77     * final ScheduledFuture<?> periodicTask;
78     * final ScheduledExecutorService scheduler;
79 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
80     * Supplier<? extends T> supplier,
81 dl 1.15 * long period, TimeUnit unit) {
82 dl 1.21 * super(executor, maxBufferCapacity);
83 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
84     * periodicTask = scheduler.scheduleAtFixedRate(
85     * () -> submit(supplier.get()), 0, period, unit);
86     * }
87     * public void close() {
88     * periodicTask.cancel(false);
89     * scheduler.shutdown();
90     * super.close();
91     * }
92     * }}</pre>
93     *
94 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
95     * It uses single-step requests to its publisher for simplicity of
96     * illustration. A more adaptive version could monitor flow using the
97 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
98 dl 1.21 * methods.
99 dl 1.16 *
100     * <pre> {@code
101     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
102     * implements Flow.Processor<S,T> {
103     * final Function<? super S, ? extends T> function;
104     * Flow.Subscription subscription;
105 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
106 dl 1.16 * Function<? super S, ? extends T> function) {
107 dl 1.21 * super(executor, maxBufferCapacity);
108 dl 1.16 * this.function = function;
109     * }
110     * public void onSubscribe(Flow.Subscription subscription) {
111     * (this.subscription = subscription).request(1);
112     * }
113     * public void onNext(S item) {
114     * subscription.request(1);
115     * submit(function.apply(item));
116     * }
117     * public void onError(Throwable ex) { closeExceptionally(ex); }
118     * public void onComplete() { close(); }
119     * }}</pre>
120     *
121 dl 1.1 * @param <T> the published item type
122     * @author Doug Lea
123     * @since 1.9
124     */
125     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
126     AutoCloseable {
127     /*
128     * Most mechanics are handled by BufferedSubscription. This class
129 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
130     * built-in synchronization locks across public methods. (Using
131     * built-in locks works well in the most typical case in which
132     * only one thread submits items).
133 dl 1.1 */
134    
135 dl 1.25 /** The largest possible power of two array size. */
136 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
137 dl 1.1
138 dl 1.40 /** Round capacity to power of 2, at most limit. */
139     static final int roundCapacity(int cap) {
140 dl 1.1 int n = cap - 1;
141     n |= n >>> 1;
142     n |= n >>> 2;
143     n |= n >>> 4;
144     n |= n >>> 8;
145     n |= n >>> 16;
146 dl 1.40 return (n <= 0) ? 1 : // at least 1
147 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
148 dl 1.1 }
149    
150     /**
151     * Clients (BufferedSubscriptions) are maintained in a linked list
152     * (via their "next" fields). This works well for publish loops.
153     * It requires O(n) traversal to check for duplicate subscribers,
154     * but we expect that subscribing is much less common than
155 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
156 dl 1.25 * when BufferedSubscription methods return negative values
157 dl 1.29 * signifying that they have been disabled. To reduce
158     * head-of-line blocking, submit and offer methods first call
159     * BufferedSubscription.offer on each subscriber, and place
160     * saturated ones in retries list (using nextRetry field), and
161     * retry, possibly blocking or dropping.
162 dl 1.1 */
163     BufferedSubscription<T> clients;
164    
165 dl 1.20 /** Run status, updated only within locks */
166     volatile boolean closed;
167 dl 1.36 /** If non-null, the exception in closeExceptionally */
168 dl 1.37 volatile Throwable closedException;
169 dl 1.20
170 dl 1.1 // Parameters for constructing BufferedSubscriptions
171     final Executor executor;
172 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
173 dl 1.1 final int maxBufferCapacity;
174    
175 dl 1.20 /**
176 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
177 dl 1.36 * async delivery to subscribers, with the given maximum
178     * buffer size for each subscriber, and, if non-null, the given handler
179     * invoked when any Subscriber throws an exception in method {@code onNext}.
180 dl 1.1 *
181     * @param executor the executor to use for async delivery,
182     * supporting creation of at least one independent thread
183     * @param maxBufferCapacity the maximum capacity for each
184 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
185     * the nearest power of two and/or bounded by the largest value
186     * supported by this implementation; method {@link #getMaxBufferCapacity}
187     * returns the actual value)
188 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
189     * thrown in method {@code onNext}.
190 dl 1.1 * @throws NullPointerException if executor is null
191 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
192     * positive
193 dl 1.1 */
194 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
195     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
196 dl 1.21 if (executor == null)
197     throw new NullPointerException();
198     if (maxBufferCapacity <= 0)
199     throw new IllegalArgumentException("capacity must be positive");
200 dl 1.18 this.executor = executor;
201 dl 1.36 this.onNextHandler = handler;
202 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
203 dl 1.18 }
204    
205 dl 1.1 /**
206 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
207     * async delivery to subscribers, with the given maximum
208     * buffer size for each subscriber, and no handler for Subscriber
209     * exceptions in method {@code onNext}.
210     *
211     * @param executor the executor to use for async delivery,
212     * supporting creation of at least one independent thread
213     * @param maxBufferCapacity the maximum capacity for each
214     * subscriber's buffer (the enforced capacity may be rounded up to
215     * the nearest power of two and/or bounded by the largest value
216     * supported by this implementation; method {@link #getMaxBufferCapacity}
217     * returns the actual value)
218     * @throws NullPointerException if executor is null
219     * @throws IllegalArgumentException if maxBufferCapacity not
220     * positive
221     */
222     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
223     this(executor, maxBufferCapacity, null);
224     }
225    
226     /**
227 dl 1.25 * Creates a new SubmissionPublisher using the {@link
228     * ForkJoinPool#commonPool()} for async delivery to subscribers,
229 dl 1.36 * with maximum buffer capacity of {@link Flow#defaultBufferSize},
230     * and no handler for Subscriber exceptions in method {@code
231     * onNext}.
232 dl 1.25 */
233     public SubmissionPublisher() {
234 dl 1.36 this(ForkJoinPool.commonPool(), Flow.defaultBufferSize(), null);
235 dl 1.25 }
236    
237     /**
238 dl 1.18 * Adds the given Subscriber unless already subscribed. If
239 dl 1.30 * already subscribed, the Subscriber's {@code onError} method is
240     * invoked on the existing subscription with an {@link
241     * IllegalStateException}. Otherwise, upon success, the
242     * Subscriber's {@code onSubscribe} method is invoked
243     * asynchronously with a new {@link Flow.Subscription}. If {@code
244     * onSubscribe} throws an exception, the subscription is
245 dl 1.36 * cancelled. Otherwise, if this SubmissionPublisher was closed
246     * exceptionally, then the subscriber's {@code onError} method is
247     * invoked with the corresponding exception, or if closed without
248     * exception, the subscriber's {@code onComplete} method is
249     * invoked. Subscribers may enable receiving items by invoking
250     * the {@code request} method of the new Subscription, and may
251     * unsubscribe by invoking its {@code cancel} method.
252 dl 1.18 *
253     * @param subscriber the subscriber
254     * @throws NullPointerException if subscriber is null
255     */
256     public void subscribe(Flow.Subscriber<? super T> subscriber) {
257     if (subscriber == null) throw new NullPointerException();
258 dl 1.20 BufferedSubscription<T> subscription =
259 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
260     onNextHandler, maxBufferCapacity);
261 dl 1.19 synchronized (this) {
262 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
263     if (b == null) {
264 dl 1.36 Throwable ex;
265 dl 1.29 subscription.onSubscribe();
266 dl 1.37 if ((ex = closedException) != null)
267 dl 1.36 subscription.onError(ex);
268     else if (closed)
269 dl 1.29 subscription.onComplete();
270     else if (pred == null)
271     clients = subscription;
272 dl 1.27 else
273 dl 1.29 pred.next = subscription;
274     break;
275 dl 1.27 }
276 dl 1.29 BufferedSubscription<T> next = b.next;
277     if (b.isDisabled()) { // remove
278     b.next = null; // detach
279 dl 1.19 if (pred == null)
280 dl 1.29 clients = next;
281 dl 1.19 else
282 dl 1.29 pred.next = next;
283     }
284     else if (subscriber.equals(b.subscriber)) {
285     b.onError(new IllegalStateException("Duplicate subscribe"));
286     break;
287 dl 1.19 }
288 dl 1.29 else
289     pred = b;
290     b = next;
291 dl 1.19 }
292     }
293 dl 1.18 }
294    
295     /**
296 dl 1.16 * Publishes the given item to each current subscriber by
297 dl 1.30 * asynchronously invoking its {@link Flow.Subscriber#onNext}
298     * method, blocking uninterruptibly while resources for any
299     * subscriber are unavailable. This method returns an estimate of
300     * the maximum lag (number of items submitted but not yet
301     * consumed) among all current subscribers. This value is at least
302     * one (accounting for this submitted item) if there are any
303     * subscribers, else zero.
304 dl 1.16 *
305     * <p>If the Executor for this publisher throws a
306     * RejectedExecutionException (or any other RuntimeException or
307     * Error) when attempting to asynchronously notify subscribers,
308 dl 1.31 * then this exception is rethrown, in which case not all
309     * subscribers will have been issued this item.
310 dl 1.16 *
311     * @param item the (non-null) item to publish
312 dl 1.21 * @return the estimated maximum lag among subscribers
313 dl 1.16 * @throws IllegalStateException if closed
314     * @throws NullPointerException if item is null
315     * @throws RejectedExecutionException if thrown by Executor
316     */
317 dl 1.21 public int submit(T item) {
318 dl 1.16 if (item == null) throw new NullPointerException();
319 dl 1.21 int lag = 0;
320 dl 1.27 boolean complete;
321 dl 1.16 synchronized (this) {
322 dl 1.27 complete = closed;
323 dl 1.29 BufferedSubscription<T> b = clients;
324 dl 1.27 if (!complete) {
325 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
326 dl 1.27 while (b != null) {
327     BufferedSubscription<T> next = b.next;
328 dl 1.29 int stat = b.offer(item);
329     if (stat < 0) { // disabled
330     b.next = null;
331 dl 1.27 if (pred == null)
332     clients = next;
333 dl 1.24 else
334 dl 1.27 pred.next = next;
335 dl 1.24 }
336 dl 1.27 else {
337 dl 1.29 if (stat > lag)
338     lag = stat;
339     else if (stat == 0) { // place on retry list
340     b.nextRetry = null;
341 dl 1.27 if (rtail == null)
342 dl 1.29 r = b;
343 dl 1.27 else
344     rtail.nextRetry = b;
345     rtail = b;
346     }
347     pred = b;
348     }
349     b = next;
350 dl 1.21 }
351 dl 1.29 while (r != null) {
352     BufferedSubscription<T> nextRetry = r.nextRetry;
353     r.nextRetry = null;
354     int stat = r.submit(item);
355     if (stat > lag)
356     lag = stat;
357     else if (stat < 0 && clients == r)
358     clients = r.next; // postpone internal unsubscribes
359     r = nextRetry;
360     }
361 dl 1.16 }
362     }
363 dl 1.27 if (complete)
364     throw new IllegalStateException("Closed");
365     else
366     return lag;
367 dl 1.16 }
368    
369     /**
370 dl 1.1 * Publishes the given item, if possible, to each current
371 dl 1.30 * subscriber by asynchronously invoking its {@link
372     * Flow.Subscriber#onNext} method. The item may be dropped by one
373     * or more subscribers if resource limits are exceeded, in which
374     * case the given handler (if non-null) is invoked, and if it
375     * returns true, retried once. Other calls to methods in this
376     * class by other threads are blocked while the handler is
377     * invoked. Unless recovery is assured, options are usually
378     * limited to logging the error and/or issuing an {@code onError}
379     * signal to the subscriber.
380 dl 1.1 *
381 dl 1.21 * <p>This method returns a status indicator: If negative, it
382     * represents the (negative) number of drops (failed attempts to
383     * issue the item to a subscriber). Otherwise it is an estimate of
384     * the maximum lag (number of items submitted but not yet
385     * consumed) among all current subscribers. This value is at least
386     * one (accounting for this submitted item) if there are any
387     * subscribers, else zero.
388 jsr166 1.23 *
389 dl 1.1 * <p>If the Executor for this publisher throws a
390     * RejectedExecutionException (or any other RuntimeException or
391     * Error) when attempting to asynchronously notify subscribers, or
392     * the drop handler throws an exception when processing a dropped
393     * item, then this exception is rethrown.
394     *
395     * @param item the (non-null) item to publish
396     * @param onDrop if non-null, the handler invoked upon a drop to a
397     * subscriber, with arguments of the subscriber and item; if it
398     * returns true, an offer is re-attempted (once)
399 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
400     * an estimate of maximum lag
401 dl 1.1 * @throws IllegalStateException if closed
402     * @throws NullPointerException if item is null
403     * @throws RejectedExecutionException if thrown by Executor
404     */
405     public int offer(T item,
406     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
407 dl 1.29 return doOffer(0L, item, onDrop);
408 dl 1.1 }
409    
410     /**
411     * Publishes the given item, if possible, to each current
412 dl 1.30 * subscriber by asynchronously invoking its {@link
413     * Flow.Subscriber#onNext} method, blocking while resources for
414     * any subscription are unavailable, up to the specified timeout
415     * or the caller thread is interrupted, at which point the given
416     * handler (if non-null) is invoked, and if it returns true,
417     * retried once. (The drop handler may distinguish timeouts from
418     * interrupts by checking whether the current thread is
419     * interrupted.) Other calls to methods in this class by other
420     * threads are blocked while the handler is invoked. Unless
421     * recovery is assured, options are usually limited to logging the
422     * error and/or issuing an {@code onError} signal to the
423     * subscriber.
424 dl 1.1 *
425 dl 1.21 * <p>This method returns a status indicator: If negative, it
426     * represents the (negative) number of drops (failed attempts to
427     * issue the item to a subscriber). Otherwise it is an estimate of
428     * the maximum lag (number of items submitted but not yet
429     * consumed) among all current subscribers. This value is at least
430     * one (accounting for this submitted item) if there are any
431     * subscribers, else zero.
432     *
433 dl 1.1 * <p>If the Executor for this publisher throws a
434     * RejectedExecutionException (or any other RuntimeException or
435     * Error) when attempting to asynchronously notify subscribers, or
436     * the drop handler throws an exception when processing a dropped
437     * item, then this exception is rethrown.
438     *
439     * @param item the (non-null) item to publish
440     * @param timeout how long to wait for resources for any subscriber
441     * before giving up, in units of {@code unit}
442     * @param unit a {@code TimeUnit} determining how to interpret the
443     * {@code timeout} parameter
444     * @param onDrop if non-null, the handler invoked upon a drop to a
445     * subscriber, with arguments of the subscriber and item; if it
446     * returns true, an offer is re-attempted (once)
447 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
448     * an estimate of maximum lag
449 dl 1.1 * @throws IllegalStateException if closed
450     * @throws NullPointerException if item is null
451     * @throws RejectedExecutionException if thrown by Executor
452     */
453     public int offer(T item, long timeout, TimeUnit unit,
454     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
455 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
456     }
457    
458     /** Common implementation for both forms of offer */
459     final int doOffer(long nanos, T item,
460     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
461 dl 1.1 if (item == null) throw new NullPointerException();
462 dl 1.27 int lag = 0, drops = 0;
463     boolean complete;
464 jsr166 1.2 synchronized (this) {
465 dl 1.27 complete = closed;
466 dl 1.29 BufferedSubscription<T> b = clients;
467 dl 1.27 if (!complete) {
468 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
469 dl 1.27 while (b != null) {
470     BufferedSubscription<T> next = b.next;
471 dl 1.29 int stat = b.offer(item);
472     if (stat < 0) {
473     b.next = null;
474 dl 1.27 if (pred == null)
475     clients = next;
476     else
477     pred.next = next;
478     }
479     else {
480 dl 1.29 if (stat > lag)
481     lag = stat;
482     else if (stat == 0) {
483     b.nextRetry = null;
484 dl 1.27 if (rtail == null)
485 dl 1.29 r = b;
486 dl 1.27 else
487     rtail.nextRetry = b;
488     rtail = b;
489     }
490 dl 1.29 else if (stat > lag)
491 dl 1.27 lag = stat;
492     pred = b;
493     }
494     b = next;
495 dl 1.1 }
496 dl 1.29 while (r != null) {
497     BufferedSubscription<T> nextRetry = r.nextRetry;
498     r.nextRetry = null;
499     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
500     r.offer(item);
501     if (stat == 0 && onDrop != null &&
502     onDrop.test(r.subscriber, item))
503     stat = r.offer(item);
504     if (stat == 0)
505     ++drops;
506     else if (stat > lag)
507     lag = stat;
508     else if (stat < 0 && clients == r)
509     clients = r.next;
510     r = nextRetry;
511     }
512 dl 1.1 }
513     }
514 dl 1.27 if (complete)
515     throw new IllegalStateException("Closed");
516     else
517     return (drops > 0) ? -drops : lag;
518 dl 1.1 }
519    
520     /**
521 dl 1.30 * Unless already closed, issues {@link
522     * Flow.Subscriber#onComplete} signals to current subscribers, and
523     * disallows subsequent attempts to publish. Upon return, this
524     * method does <em>NOT</em> guarantee that all subscribers have
525     * yet completed.
526 dl 1.1 */
527     public void close() {
528     if (!closed) {
529 dl 1.32 BufferedSubscription<T> b;
530 jsr166 1.2 synchronized (this) {
531 dl 1.1 b = clients;
532     clients = null;
533     closed = true;
534     }
535     while (b != null) {
536 dl 1.32 BufferedSubscription<T> next = b.next;
537     b.next = null;
538 dl 1.19 b.onComplete();
539 dl 1.1 b = next;
540     }
541     }
542     }
543    
544     /**
545 dl 1.30 * Unless already closed, issues {@link Flow.Subscriber#onError}
546     * signals to current subscribers with the given error, and
547 dl 1.36 * disallows subsequent attempts to publish. Future subscribers
548     * also receive the given error. Upon return, this method does
549     * <em>NOT</em> guarantee that all subscribers have yet completed.
550 dl 1.1 *
551 dl 1.30 * @param error the {@code onError} argument sent to subscribers
552 dl 1.1 * @throws NullPointerException if error is null
553     */
554     public void closeExceptionally(Throwable error) {
555     if (error == null)
556     throw new NullPointerException();
557     if (!closed) {
558 dl 1.32 BufferedSubscription<T> b;
559 jsr166 1.2 synchronized (this) {
560 dl 1.1 b = clients;
561     clients = null;
562     closed = true;
563 dl 1.37 closedException = error;
564 dl 1.1 }
565     while (b != null) {
566 dl 1.32 BufferedSubscription<T> next = b.next;
567     b.next = null;
568 dl 1.19 b.onError(error);
569 dl 1.1 b = next;
570     }
571     }
572     }
573    
574     /**
575     * Returns true if this publisher is not accepting submissions.
576     *
577     * @return true if closed
578     */
579     public boolean isClosed() {
580     return closed;
581     }
582    
583 dl 1.37 /**
584     * Returns the exception associated with {@link #closeExceptionally},
585     * or null if not closed or if closed normally.
586     *
587     * @return the exception, or null if none
588     */
589     public Throwable getClosedException() {
590     return closedException;
591     }
592    
593 dl 1.1 /**
594 jsr166 1.6 * Returns true if this publisher has any subscribers.
595 dl 1.1 *
596     * @return true if this publisher has any subscribers
597     */
598     public boolean hasSubscribers() {
599     boolean nonEmpty = false;
600     if (!closed) {
601 jsr166 1.2 synchronized (this) {
602 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
603     BufferedSubscription<T> next = b.next;
604 dl 1.19 if (b.isDisabled()) {
605 dl 1.29 b.next = null;
606 dl 1.31 b = clients = next;
607 jsr166 1.2 }
608 dl 1.1 else {
609     nonEmpty = true;
610     break;
611     }
612     }
613     }
614     }
615     return nonEmpty;
616     }
617    
618     /**
619 dl 1.21 * Returns the number of current subscribers.
620 dl 1.1 *
621 dl 1.21 * @return the number of current subscribers
622 dl 1.1 */
623 dl 1.21 public int getNumberOfSubscribers() {
624     int count = 0;
625     if (!closed) {
626     synchronized (this) {
627     BufferedSubscription<T> pred = null, next;
628     for (BufferedSubscription<T> b = clients; b != null; b = next) {
629     next = b.next;
630     if (b.isDisabled()) {
631 dl 1.29 b.next = null;
632 dl 1.21 if (pred == null)
633     clients = next;
634     else
635     pred.next = next;
636     }
637     else {
638     pred = b;
639     ++count;
640     }
641     }
642     }
643     }
644     return count;
645 dl 1.1 }
646    
647 jsr166 1.7 /**
648 dl 1.21 * Returns the Executor used for asynchronous delivery.
649 dl 1.1 *
650 dl 1.21 * @return the Executor used for asynchronous delivery
651 dl 1.1 */
652 dl 1.21 public Executor getExecutor() {
653     return executor;
654 dl 1.1 }
655    
656 jsr166 1.7 /**
657 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
658 dl 1.1 *
659 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
660 dl 1.1 */
661     public int getMaxBufferCapacity() {
662     return maxBufferCapacity;
663     }
664    
665     /**
666 dl 1.29 * Returns a list of current subscribers for monitoring and
667     * tracking purposes, not for invoking {@link Flow.Subscriber}
668     * methods on the subscribers.
669 dl 1.1 *
670     * @return list of current subscribers
671     */
672     public List<Flow.Subscriber<? super T>> getSubscribers() {
673     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
674 jsr166 1.2 synchronized (this) {
675 dl 1.1 BufferedSubscription<T> pred = null, next;
676     for (BufferedSubscription<T> b = clients; b != null; b = next) {
677     next = b.next;
678 dl 1.19 if (b.isDisabled()) {
679 dl 1.29 b.next = null;
680 dl 1.1 if (pred == null)
681     clients = next;
682     else
683     pred.next = next;
684 jsr166 1.2 }
685 dl 1.1 else
686     subs.add(b.subscriber);
687     }
688     }
689     return subs;
690     }
691 jsr166 1.2
692 dl 1.1 /**
693     * Returns true if the given Subscriber is currently subscribed.
694     *
695     * @param subscriber the subscriber
696     * @return true if currently subscribed
697 dl 1.21 * @throws NullPointerException if subscriber is null
698 dl 1.1 */
699     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
700 dl 1.21 if (subscriber == null) throw new NullPointerException();
701 jsr166 1.2 if (!closed) {
702     synchronized (this) {
703 dl 1.1 BufferedSubscription<T> pred = null, next;
704     for (BufferedSubscription<T> b = clients; b != null; b = next) {
705     next = b.next;
706 dl 1.19 if (b.isDisabled()) {
707 dl 1.29 b.next = null;
708 dl 1.1 if (pred == null)
709     clients = next;
710     else
711     pred.next = next;
712 jsr166 1.2 }
713 dl 1.21 else if (subscriber.equals(b.subscriber))
714 dl 1.1 return true;
715 dl 1.21 else
716     pred = b;
717 dl 1.1 }
718     }
719     }
720     return false;
721     }
722    
723     /**
724 dl 1.21 * Returns an estimate of the minimum number of items requested
725     * (via {@link Flow.Subscription#request}) but not yet produced,
726     * among all current subscribers.
727     *
728     * @return the estimate, or zero if no subscribers
729     */
730     public long estimateMinimumDemand() {
731     long min = Long.MAX_VALUE;
732     boolean nonEmpty = false;
733     synchronized (this) {
734     BufferedSubscription<T> pred = null, next;
735     for (BufferedSubscription<T> b = clients; b != null; b = next) {
736     int n; long d;
737     next = b.next;
738     if ((n = b.estimateLag()) < 0) {
739 dl 1.29 b.next = null;
740 dl 1.21 if (pred == null)
741     clients = next;
742     else
743     pred.next = next;
744     }
745     else {
746     if ((d = b.demand - n) < min)
747     min = d;
748     nonEmpty = true;
749 dl 1.32 pred = b;
750 dl 1.21 }
751     }
752     }
753 jsr166 1.22 return nonEmpty ? min : 0;
754 dl 1.21 }
755    
756     /**
757     * Returns an estimate of the maximum number of items produced but
758     * not yet consumed among all current subscribers.
759 dl 1.17 *
760     * @return the estimate
761 dl 1.1 */
762 dl 1.21 public int estimateMaximumLag() {
763     int max = 0;
764 dl 1.17 synchronized (this) {
765     BufferedSubscription<T> pred = null, next;
766     for (BufferedSubscription<T> b = clients; b != null; b = next) {
767 dl 1.21 int n;
768 dl 1.17 next = b.next;
769 dl 1.21 if ((n = b.estimateLag()) < 0) {
770 dl 1.29 b.next = null;
771 dl 1.17 if (pred == null)
772     clients = next;
773     else
774     pred.next = next;
775     }
776 dl 1.32 else {
777     if (n > max)
778     max = n;
779     pred = b;
780     }
781 dl 1.17 }
782     }
783 dl 1.21 return max;
784 dl 1.1 }
785    
786 dl 1.25 /**
787     * A task for consuming buffer items and signals, created and
788     * executed whenever they become available. A task consumes as
789     * many items/signals as possible before terminating, at which
790     * point another task is created when needed. The dual Runnable
791     * and ForkJoinTask declaration saves overhead when executed by
792     * ForkJoinPools, without impacting other kinds of Executors.
793     */
794     @SuppressWarnings("serial")
795     static final class ConsumerTask<T> extends ForkJoinTask<Void>
796     implements Runnable {
797     final BufferedSubscription<T> consumer;
798     ConsumerTask(BufferedSubscription<T> consumer) {
799     this.consumer = consumer;
800     }
801     public final Void getRawResult() { return null; }
802     public final void setRawResult(Void v) {}
803     public final boolean exec() { consumer.consume(); return false; }
804     public final void run() { consumer.consume(); }
805     }
806    
807 dl 1.1 /**
808 dl 1.15 * A bounded (ring) buffer with integrated control to start a
809     * consumer task whenever items are available. The buffer
810 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
811     * internal documentation for details) specialized for the case of
812     * at most one concurrent producer and consumer, and power of two
813     * buffer sizes. This allows methods to operate without locks even
814     * while supporting resizing, blocking, task-triggering, and
815     * garbage-free buffers (nulling out elements when consumed),
816     * although supporting these does impose a bit of overhead
817     * compared to plain fixed-size ring buffers.
818 dl 1.1 *
819     * The publisher guarantees a single producer via its lock. We
820     * ensure in this class that there is at most one consumer. The
821     * request and cancel methods must be fully thread-safe but are
822     * coded to exploit the most common case in which they are only
823     * called by consumers (usually within onNext).
824     *
825 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
826     * ensure that a task is active when consumable items (and
827 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
828 dl 1.24 * there is demand (unfilled requests). This is complicated on
829 dl 1.19 * the creation side by the possibility of exceptions when trying
830     * to execute tasks. These eventually force DISABLED state, but
831 dl 1.1 * sometimes not directly. On the task side, termination (clearing
832 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
833 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
834 dl 1.1 *
835     * The ctl field also manages run state. When DISABLED, no further
836 dl 1.20 * updates are possible. Disabling may be preceded by setting
837 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
838     * case the associated Subscriber methods are invoked, possibly
839     * synchronously if there is no active consumer task (including
840 dl 1.20 * cases where execute() failed). The cancel() method is supported
841     * by treating as ERROR but suppressing onError signal.
842 dl 1.1 *
843     * Support for blocking also exploits the fact that there is only
844     * one possible waiter. ManagedBlocker-compatible control fields
845     * are placed in this class itself rather than in wait-nodes.
846     * Blocking control relies on the "waiter" field. Producers set
847     * the field before trying to block, but must then recheck (via
848     * offer) before parking. Signalling then just unparks and clears
849 dl 1.20 * waiter field. If the producer and consumer are both in the same
850 dl 1.29 * ForkJoinPool, or consumers are running in commonPool, the
851     * producer attempts to help run consumer tasks that it forked
852     * before blocking. To avoid potential cycles, only one level of
853     * helping is currently supported.
854 dl 1.1 *
855     * This class uses @Contended and heuristic field declaration
856 dl 1.31 * ordering to reduce false-sharing-based memory contention among
857     * instances of BufferedSubscription, but it does not currently
858     * attempt to avoid memory contention among buffers. This field
859     * and element packing can hurt performance especially when each
860     * publisher has only one client operating at a high rate.
861 dl 1.1 * Addressing this may require allocating substantially more space
862     * than users expect.
863     */
864 dl 1.15 @SuppressWarnings("serial")
865 dl 1.1 @sun.misc.Contended
866 dl 1.25 static final class BufferedSubscription<T>
867     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
868 dl 1.1 // Order-sensitive field declarations
869 dl 1.27 long timeout; // > 0 if timed wait
870     volatile long demand; // # unfilled requests
871     int maxCapacity; // reduced on OOME
872     int putStat; // offer result for ManagedBlocker
873 dl 1.29 int helpDepth; // nested helping depth (at most 1)
874 dl 1.27 volatile int ctl; // atomic run state flags
875     volatile int head; // next position to take
876 dl 1.34 int tail; // next position to put
877 dl 1.27 Object[] array; // buffer: null if disabled
878 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
879 dl 1.27 Executor executor; // null if disabled
880 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
881 dl 1.27 volatile Throwable pendingError; // holds until onError issued
882     volatile Thread waiter; // blocked producer thread
883     T putItem; // for offer within ManagedBlocker
884     BufferedSubscription<T> next; // used only by publisher
885     BufferedSubscription<T> nextRetry; // used only by publisher
886 dl 1.1
887     // ctl values
888 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
889 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
890 dl 1.27 static final int DISABLED = 0x04; // final state
891     static final int ERROR = 0x08; // signal onError then disable
892     static final int SUBSCRIBE = 0x10; // signal onSubscribe
893     static final int COMPLETE = 0x20; // signal onComplete when done
894 dl 1.1
895 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
896 dl 1.16
897 jsr166 1.22 /**
898 dl 1.40 * Initial buffer capacity used when maxBufferCapacity is
899     * greater. Must be a power of two.
900 dl 1.21 */
901 dl 1.40 static final int DEFAULT_INITIAL_CAP = 32;
902 dl 1.21
903 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
904 dl 1.36 Executor executor,
905     BiConsumer<? super Flow.Subscriber<? super T>,
906     ? super Throwable> onNextHandler,
907     int maxBufferCapacity) {
908 dl 1.1 this.subscriber = subscriber;
909     this.executor = executor;
910 dl 1.36 this.onNextHandler = onNextHandler;
911 dl 1.1 this.maxCapacity = maxBufferCapacity;
912 dl 1.40 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
913     maxBufferCapacity : DEFAULT_INITIAL_CAP];
914 dl 1.1 }
915    
916 dl 1.19 final boolean isDisabled() {
917 dl 1.20 return ctl == DISABLED;
918     }
919    
920 dl 1.21 /**
921     * Returns estimated number of buffered items, or -1 if
922 jsr166 1.42 * disabled.
923 dl 1.21 */
924     final int estimateLag() {
925 dl 1.20 int n;
926 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
927 dl 1.19 }
928    
929 dl 1.1 /**
930 dl 1.16 * Tries to add item and start consumer task if necessary.
931 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
932 dl 1.1 */
933     final int offer(T item) {
934 dl 1.34 int h = head, t = tail, cap, size, stat;
935 dl 1.1 Object[] a = array;
936 dl 1.40 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
937 dl 1.34 a[(cap - 1) & t] = item; // relaxed writes OK
938     tail = t + 1;
939 dl 1.27 stat = size;
940 dl 1.1 }
941 dl 1.34 else
942     stat = growAndAdd(a, item);
943 dl 1.29 return (stat > 0 &&
944     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
945 dl 1.27 startOnOffer(stat) : stat;
946     }
947    
948     /**
949 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
950 dl 1.27 */
951     private int growAndAdd(Object[] a, T item) {
952 dl 1.34 boolean alloc;
953 dl 1.27 int cap, stat;
954     if ((ctl & (ERROR | DISABLED)) != 0) {
955     cap = 0;
956     stat = -1;
957 dl 1.34 alloc = false;
958 dl 1.27 }
959 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
960 dl 1.27 cap = 0;
961     stat = 1;
962 dl 1.34 alloc = true;
963     }
964     else {
965     U.fullFence(); // recheck
966     int h = head, t = tail, size = t + 1 - h;
967 dl 1.40 if (cap >= size) {
968 dl 1.34 a[(cap - 1) & t] = item;
969     tail = t + 1;
970     stat = size;
971     alloc = false;
972     }
973     else if (cap >= maxCapacity) {
974     stat = 0; // cannot grow
975     alloc = false;
976     }
977     else {
978     stat = cap + 1;
979     alloc = true;
980     }
981 dl 1.27 }
982 dl 1.34 if (alloc) {
983 dl 1.40 int newCap = (cap > 0) ? cap << 1 : 1;
984 dl 1.27 if (newCap <= cap)
985     stat = 0;
986     else {
987     Object[] newArray = null;
988 dl 1.16 try {
989 dl 1.27 newArray = new Object[newCap];
990     } catch (Throwable ex) { // try to cope with OOME
991     }
992     if (newArray == null) {
993     if (cap > 0)
994     maxCapacity = cap; // avoid continuous failure
995     stat = 0;
996     }
997     else {
998     array = newArray;
999     int t = tail;
1000     int newMask = newCap - 1;
1001     if (a != null && cap > 0) {
1002     int mask = cap - 1;
1003     for (int j = head; j != t; ++j) {
1004 dl 1.34 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1005     Object x = U.getObjectVolatile(a, k);
1006     if (x != null && // races with consumer
1007     U.compareAndSwapObject(a, k, x, null))
1008 dl 1.27 newArray[j & newMask] = x;
1009     }
1010     }
1011     newArray[t & newMask] = item;
1012     tail = t + 1;
1013 dl 1.16 }
1014     }
1015     }
1016 dl 1.21 return stat;
1017 dl 1.1 }
1018    
1019     /**
1020 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1021 dl 1.29 * initial offer return 0.
1022 dl 1.20 */
1023     final int submit(T item) {
1024 dl 1.34 int stat; Executor e; ForkJoinWorkerThread w;
1025     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1026     ((e = executor) instanceof ForkJoinPool)) {
1027 dl 1.39 helpDepth = 1;
1028 dl 1.29 Thread thread = Thread.currentThread();
1029 dl 1.20 if ((thread instanceof ForkJoinWorkerThread) &&
1030 dl 1.34 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1031     stat = internalHelpConsume(w.workQueue, item);
1032     else if (e == ForkJoinPool.commonPool())
1033     stat = externalHelpConsume
1034     (ForkJoinPool.commonSubmitterQueue(), item);
1035 dl 1.39 helpDepth = 0;
1036 dl 1.20 }
1037 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
1038     putItem = item;
1039     timeout = 0L;
1040     try {
1041     ForkJoinPool.managedBlock(this);
1042     } catch (InterruptedException ie) {
1043     timeout = INTERRUPTED;
1044     }
1045     stat = putStat;
1046     if (timeout < 0L)
1047     Thread.currentThread().interrupt();
1048     }
1049 dl 1.20 return stat;
1050     }
1051    
1052     /**
1053 dl 1.34 * Tries helping for FJ submitter
1054     */
1055     private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1056     int stat = 0;
1057     if (w != null) {
1058 dl 1.39 ForkJoinTask<?> t;
1059 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1060     if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1061     break;
1062 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1063 dl 1.34 }
1064     }
1065     return stat;
1066     }
1067    
1068     /**
1069     * Tries helping for non-FJ submitter
1070     */
1071     private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1072     int stat = 0;
1073     if (w != null) {
1074 dl 1.39 ForkJoinTask<?> t;
1075 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1076     if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1077     break;
1078 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1079 dl 1.34 }
1080     }
1081     return stat;
1082     }
1083    
1084     /**
1085 dl 1.27 * Timeout version; similar to submit
1086 dl 1.20 */
1087 dl 1.27 final int timedOffer(T item, long nanos) {
1088 dl 1.34 int stat; Executor e;
1089     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1090     ((e = executor) instanceof ForkJoinPool)) {
1091 dl 1.29 Thread thread = Thread.currentThread();
1092     if (((thread instanceof ForkJoinWorkerThread) &&
1093     ((ForkJoinWorkerThread)thread).getPool() == e) ||
1094     e == ForkJoinPool.commonPool()) {
1095 dl 1.39 helpDepth = 1;
1096 dl 1.29 ForkJoinTask<?> t;
1097     long deadline = System.nanoTime() + nanos;
1098     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1099     (t instanceof ConsumerTask)) {
1100     if ((stat = offer(item)) != 0 ||
1101     (nanos = deadline - System.nanoTime()) <= 0L ||
1102     !t.tryUnfork())
1103     break;
1104 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1105 dl 1.29 }
1106 dl 1.39 helpDepth = 0;
1107 dl 1.20 }
1108 dl 1.25 }
1109     if (stat == 0 && (stat = offer(item)) == 0 &&
1110     (timeout = nanos) > 0L) {
1111     putItem = item;
1112     try {
1113     ForkJoinPool.managedBlock(this);
1114     } catch (InterruptedException ie) {
1115     timeout = INTERRUPTED;
1116     }
1117     stat = putStat;
1118     if (timeout < 0L)
1119     Thread.currentThread().interrupt();
1120 dl 1.20 }
1121     return stat;
1122     }
1123    
1124     /**
1125 dl 1.27 * Tries to start consumer task after offer.
1126     * @return -1 if now disabled, else argument
1127     */
1128     private int startOnOffer(int stat) {
1129     for (;;) {
1130     Executor e; int c;
1131     if ((c = ctl) == DISABLED || (e = executor) == null) {
1132     stat = -1;
1133     break;
1134     }
1135     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1136 dl 1.29 if ((c & CONSUME) != 0 ||
1137 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1138 dl 1.29 c | CONSUME))
1139 dl 1.27 break;
1140     }
1141     else if (demand == 0L || tail == head)
1142     break;
1143     else if (U.compareAndSwapInt(this, CTL, c,
1144 dl 1.29 c | (ACTIVE | CONSUME))) {
1145 dl 1.27 try {
1146     e.execute(new ConsumerTask<T>(this));
1147     break;
1148     } catch (RuntimeException | Error ex) { // back out
1149 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1150 dl 1.27 (c & ACTIVE) != 0 &&
1151     !U.compareAndSwapInt(this, CTL, c,
1152     c & ~ACTIVE));
1153     throw ex;
1154     }
1155     }
1156     }
1157     return stat;
1158     }
1159    
1160 dl 1.34 private void signalWaiter(Thread w) {
1161     waiter = null;
1162     LockSupport.unpark(w); // release producer
1163     }
1164    
1165 dl 1.27 /**
1166 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1167 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1168     * upon error by nulling required components.
1169 dl 1.1 */
1170 dl 1.27 private void detach() {
1171     Thread w = waiter;
1172     executor = null;
1173     subscriber = null;
1174 dl 1.16 pendingError = null;
1175 dl 1.34 signalWaiter(w);
1176 dl 1.1 }
1177    
1178     /**
1179 dl 1.19 * Issues error signal, asynchronously if a task is running,
1180 jsr166 1.26 * else synchronously.
1181 dl 1.19 */
1182     final void onError(Throwable ex) {
1183     for (int c;;) {
1184 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1185 dl 1.19 break;
1186     else if ((c & ACTIVE) != 0) {
1187     pendingError = ex;
1188     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1189     break; // cause consumer task to exit
1190     }
1191     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1192     Flow.Subscriber<? super T> s = subscriber;
1193     if (s != null && ex != null) {
1194     try {
1195     s.onError(ex);
1196     } catch (Throwable ignore) {
1197     }
1198     }
1199     detach();
1200     break;
1201     }
1202     }
1203     }
1204    
1205     /**
1206 dl 1.1 * Tries to start consumer task upon a signal or request;
1207 jsr166 1.12 * disables on failure.
1208 dl 1.1 */
1209 dl 1.27 private void startOrDisable() {
1210     Executor e;
1211     if ((e = executor) != null) { // skip if already disabled
1212 dl 1.1 try {
1213 dl 1.25 e.execute(new ConsumerTask<T>(this));
1214 dl 1.27 } catch (Throwable ex) { // back out and force signal
1215 dl 1.1 for (int c;;) {
1216 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1217 dl 1.1 break;
1218     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1219 dl 1.19 onError(ex);
1220 dl 1.1 break;
1221     }
1222     }
1223     }
1224     }
1225     }
1226    
1227 dl 1.19 final void onComplete() {
1228 dl 1.1 for (int c;;) {
1229 dl 1.20 if ((c = ctl) == DISABLED)
1230 dl 1.1 break;
1231 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1232 dl 1.29 c | (ACTIVE | CONSUME | COMPLETE))) {
1233 dl 1.16 if ((c & ACTIVE) == 0)
1234     startOrDisable();
1235 dl 1.1 break;
1236     }
1237     }
1238     }
1239    
1240 dl 1.19 final void onSubscribe() {
1241 dl 1.1 for (int c;;) {
1242 dl 1.20 if ((c = ctl) == DISABLED)
1243 dl 1.1 break;
1244 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1245 dl 1.29 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1246 dl 1.19 if ((c & ACTIVE) == 0)
1247     startOrDisable();
1248 dl 1.1 break;
1249     }
1250     }
1251     }
1252    
1253 dl 1.16 /**
1254     * Causes consumer task to exit if active (without reporting
1255     * onError unless there is already a pending error), and
1256     * disables.
1257     */
1258     public void cancel() {
1259 dl 1.1 for (int c;;) {
1260 dl 1.20 if ((c = ctl) == DISABLED)
1261 dl 1.1 break;
1262 dl 1.16 else if ((c & ACTIVE) != 0) {
1263 dl 1.29 if (U.compareAndSwapInt(this, CTL, c,
1264     c | (CONSUME | ERROR)))
1265 dl 1.16 break;
1266     }
1267     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1268     detach();
1269 dl 1.1 break;
1270     }
1271     }
1272     }
1273    
1274 dl 1.16 /**
1275     * Adds to demand and possibly starts task.
1276     */
1277     public void request(long n) {
1278     if (n > 0L) {
1279     for (;;) {
1280     long prev = demand, d;
1281     if ((d = prev + n) < prev) // saturate
1282     d = Long.MAX_VALUE;
1283     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1284 dl 1.31 for (int c, h;;) {
1285 dl 1.27 if ((c = ctl) == DISABLED)
1286 dl 1.16 break;
1287 dl 1.27 else if ((c & ACTIVE) != 0) {
1288 dl 1.29 if ((c & CONSUME) != 0 ||
1289 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1290 dl 1.29 c | CONSUME))
1291 dl 1.27 break;
1292     }
1293     else if ((h = head) != tail) {
1294 dl 1.16 if (U.compareAndSwapInt(this, CTL, c,
1295 dl 1.29 c | (ACTIVE|CONSUME))) {
1296 dl 1.16 startOrDisable();
1297     break;
1298     }
1299     }
1300     else if (head == h && tail == h)
1301 dl 1.27 break; // else stale
1302 dl 1.31 if (demand == 0L)
1303     break;
1304 dl 1.16 }
1305     break;
1306     }
1307     }
1308     }
1309     else if (n < 0L)
1310 dl 1.19 onError(new IllegalArgumentException(
1311     "negative subscription request"));
1312 dl 1.16 }
1313    
1314 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1315 dl 1.1 T item = putItem;
1316 dl 1.34 if (item != null) {
1317     if ((putStat = offer(item)) == 0)
1318     return false;
1319     putItem = null;
1320 dl 1.1 }
1321     return true;
1322     }
1323    
1324 dl 1.20 public final boolean block() { // for ManagedBlocker
1325 dl 1.1 T item = putItem;
1326     if (item != null) {
1327     putItem = null;
1328     long nanos = timeout;
1329     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1330     while ((putStat = offer(item)) == 0) {
1331     if (Thread.interrupted()) {
1332 dl 1.16 timeout = INTERRUPTED;
1333 dl 1.1 if (nanos > 0L)
1334     break;
1335     }
1336     else if (nanos > 0L &&
1337     (nanos = deadline - System.nanoTime()) <= 0L)
1338     break;
1339     else if (waiter == null)
1340     waiter = Thread.currentThread();
1341     else {
1342     if (nanos > 0L)
1343     LockSupport.parkNanos(this, nanos);
1344     else
1345     LockSupport.park(this);
1346     waiter = null;
1347     }
1348     }
1349     }
1350     waiter = null;
1351     return true;
1352     }
1353    
1354 dl 1.29 /**
1355     * Consumer loop, called from ConsumerTask, or indirectly
1356 dl 1.39 * when helping during submit.
1357 dl 1.29 */
1358 dl 1.25 final void consume() {
1359 dl 1.1 Flow.Subscriber<? super T> s;
1360 dl 1.34 int h = head;
1361 dl 1.27 if ((s = subscriber) != null) { // else disabled
1362 dl 1.34 for (;;) {
1363 dl 1.27 long d = demand;
1364 dl 1.34 int c; Object[] a; int n; long i; Object x; Thread w;
1365 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1366 dl 1.34 if (!checkControl(s, c))
1367 dl 1.20 break;
1368 dl 1.19 }
1369 dl 1.34 else if ((a = array) == null || h == tail ||
1370     (n = a.length) == 0 ||
1371     (x = U.getObjectVolatile
1372     (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1373     == null) {
1374     if (!checkEmpty(s, c))
1375 dl 1.27 break;
1376     }
1377 dl 1.34 else if (d == 0L) {
1378     if (!checkDemand(c))
1379     break;
1380 dl 1.1 }
1381 dl 1.34 else if (((c & CONSUME) != 0 ||
1382     U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1383     U.compareAndSwapObject(a, i, x, null)) {
1384 dl 1.27 U.putOrderedInt(this, HEAD, ++h);
1385 dl 1.34 U.getAndAddLong(this, DEMAND, -1L);
1386     if ((w = waiter) != null)
1387     signalWaiter(w);
1388 dl 1.36 try {
1389     @SuppressWarnings("unchecked") T y = (T) x;
1390     s.onNext(y);
1391     } catch (Throwable ex) {
1392     handleOnNext(s, ex);
1393     }
1394 dl 1.34 }
1395     }
1396     }
1397     }
1398    
1399     /**
1400 jsr166 1.42 * Responds to control events in consume().
1401 dl 1.34 */
1402     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1403     boolean stat = true;
1404     if ((c & ERROR) != 0) {
1405     Throwable ex = pendingError;
1406     ctl = DISABLED; // no need for CAS
1407     if (ex != null) { // null if errorless cancel
1408     try {
1409     if (s != null)
1410     s.onError(ex);
1411     } catch (Throwable ignore) {
1412     }
1413     }
1414     }
1415     else if ((c & SUBSCRIBE) != 0) {
1416     if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1417     try {
1418     if (s != null)
1419     s.onSubscribe(this);
1420     } catch (Throwable ex) {
1421     onError(ex);
1422     }
1423     }
1424     }
1425     else {
1426     detach();
1427     stat = false;
1428     }
1429     return stat;
1430     }
1431    
1432     /**
1433 jsr166 1.42 * Responds to apparent emptiness in consume().
1434 dl 1.34 */
1435     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1436     boolean stat = true;
1437     if (head == tail) {
1438     if ((c & CONSUME) != 0)
1439     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1440     else if ((c & COMPLETE) != 0) {
1441     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1442 dl 1.1 try {
1443 dl 1.34 if (s != null)
1444     s.onComplete();
1445     } catch (Throwable ignore) {
1446 dl 1.1 }
1447     }
1448     }
1449 dl 1.34 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1450     stat = false;
1451     }
1452     return stat;
1453     }
1454    
1455     /**
1456 jsr166 1.42 * Responds to apparent zero demand in consume().
1457 dl 1.34 */
1458     private boolean checkDemand(int c) {
1459     boolean stat = true;
1460     if (demand == 0L) {
1461     if ((c & CONSUME) != 0)
1462     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1463     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1464     stat = false;
1465     }
1466     return stat;
1467     }
1468    
1469     /**
1470 jsr166 1.42 * Processes exception in Subscriber.onNext.
1471 dl 1.34 */
1472 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1473     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1474     if ((h = onNextHandler) != null) {
1475     try {
1476     h.accept(s, ex);
1477     } catch (Throwable ignore) {
1478     }
1479 dl 1.1 }
1480 dl 1.36 onError(ex);
1481 dl 1.1 }
1482    
1483 jsr166 1.9 // Unsafe mechanics
1484     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1485 dl 1.1 private static final long CTL;
1486     private static final long TAIL;
1487     private static final long HEAD;
1488     private static final long DEMAND;
1489     private static final int ABASE;
1490     private static final int ASHIFT;
1491    
1492     static {
1493     try {
1494     CTL = U.objectFieldOffset
1495 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1496 dl 1.1 TAIL = U.objectFieldOffset
1497 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1498 dl 1.1 HEAD = U.objectFieldOffset
1499 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1500 dl 1.1 DEMAND = U.objectFieldOffset
1501 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1502    
1503     ABASE = U.arrayBaseOffset(Object[].class);
1504     int scale = U.arrayIndexScale(Object[].class);
1505 dl 1.1 if ((scale & (scale - 1)) != 0)
1506     throw new Error("data type scale not a power of two");
1507     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1508 jsr166 1.9 } catch (ReflectiveOperationException e) {
1509 dl 1.1 throw new Error(e);
1510     }
1511 jsr166 1.33
1512     // Reduce the risk of rare disastrous classloading in first call to
1513     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1514     Class<?> ensureLoaded = LockSupport.class;
1515 dl 1.1 }
1516     }
1517     }