ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/SubmissionPublisher.java
Revision: 1.6
Committed: Sat Mar 11 00:17:04 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +8 -5 lines
Log Message:
sync with src/main

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.ArrayList;
10     import java.util.List;
11     import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiConsumer;
13     import java.util.function.BiPredicate;
14     import java.util.function.Consumer;
15    
16     /**
17     * A {@link Flow.Publisher} that asynchronously issues submitted
18     * (non-null) items to current subscribers until it is closed. Each
19     * current subscriber receives newly submitted items in the same order
20     * unless drops or exceptions are encountered. Using a
21     * SubmissionPublisher allows item generators to act as compliant <a
22     * href="http://www.reactive-streams.org/"> reactive-streams</a>
23     * Publishers relying on drop handling and/or blocking for flow
24     * control.
25     *
26     * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
27     * constructor for delivery to subscribers. The best choice of
28     * Executor depends on expected usage. If the generator(s) of
29     * submitted items run in separate threads, and the number of
30     * subscribers can be estimated, consider using a {@link
31     * Executors#newFixedThreadPool}. Otherwise consider using the
32     * default, normally the {@link ForkJoinPool#commonPool}.
33     *
34     * <p>Buffering allows producers and consumers to transiently operate
35     * at different rates. Each subscriber uses an independent buffer.
36     * Buffers are created upon first use and expanded as needed up to the
37     * given maximum. (The enforced capacity may be rounded up to the
38     * nearest power of two and/or bounded by the largest value supported
39     * by this implementation.) Invocations of {@link
40     * Flow.Subscription#request(long) request} do not directly result in
41     * buffer expansion, but risk saturation if unfilled requests exceed
42     * the maximum capacity. The default value of {@link
43     * Flow#defaultBufferSize()} may provide a useful starting point for
44     * choosing a capacity based on expected rates, resources, and usages.
45     *
46     * <p>Publication methods support different policies about what to do
47     * when buffers are saturated. Method {@link #submit(Object) submit}
48     * blocks until resources are available. This is simplest, but least
49     * responsive. The {@code offer} methods may drop items (either
50     * immediately or with bounded timeout), but provide an opportunity to
51     * interpose a handler and then retry.
52     *
53     * <p>If any Subscriber method throws an exception, its subscription
54     * is cancelled. If a handler is supplied as a constructor argument,
55     * it is invoked before cancellation upon an exception in method
56     * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
57     * {@link Flow.Subscriber#onSubscribe onSubscribe},
58     * {@link Flow.Subscriber#onError(Throwable) onError} and
59     * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
60     * handled before cancellation. If the supplied Executor throws
61     * {@link RejectedExecutionException} (or any other RuntimeException
62     * or Error) when attempting to execute a task, or a drop handler
63     * throws an exception when processing a dropped item, then the
64     * exception is rethrown. In these cases, not all subscribers will
65     * have been issued the published item. It is usually good practice to
66     * {@link #closeExceptionally closeExceptionally} in these cases.
67     *
68     * <p>Method {@link #consume(Consumer)} simplifies support for a
69     * common case in which the only action of a subscriber is to request
70     * and process all items using a supplied function.
71     *
72     * <p>This class may also serve as a convenient base for subclasses
73     * that generate items, and use the methods in this class to publish
74     * them. For example here is a class that periodically publishes the
75     * items generated from a supplier. (In practice you might add methods
76     * to independently start and stop generation, to share Executors
77     * among publishers, and so on, or use a SubmissionPublisher as a
78     * component rather than a superclass.)
79     *
80     * <pre> {@code
81     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
82     * final ScheduledFuture<?> periodicTask;
83     * final ScheduledExecutorService scheduler;
84     * PeriodicPublisher(Executor executor, int maxBufferCapacity,
85     * Supplier<? extends T> supplier,
86     * long period, TimeUnit unit) {
87     * super(executor, maxBufferCapacity);
88     * scheduler = new ScheduledThreadPoolExecutor(1);
89     * periodicTask = scheduler.scheduleAtFixedRate(
90     * () -> submit(supplier.get()), 0, period, unit);
91     * }
92     * public void close() {
93     * periodicTask.cancel(false);
94     * scheduler.shutdown();
95     * super.close();
96     * }
97     * }}</pre>
98     *
99     * <p>Here is an example of a {@link Flow.Processor} implementation.
100     * It uses single-step requests to its publisher for simplicity of
101     * illustration. A more adaptive version could monitor flow using the
102     * lag estimate returned from {@code submit}, along with other utility
103     * methods.
104     *
105     * <pre> {@code
106     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
107     * implements Flow.Processor<S,T> {
108     * final Function<? super S, ? extends T> function;
109     * Flow.Subscription subscription;
110     * TransformProcessor(Executor executor, int maxBufferCapacity,
111     * Function<? super S, ? extends T> function) {
112     * super(executor, maxBufferCapacity);
113     * this.function = function;
114     * }
115     * public void onSubscribe(Flow.Subscription subscription) {
116     * (this.subscription = subscription).request(1);
117     * }
118     * public void onNext(S item) {
119     * subscription.request(1);
120     * submit(function.apply(item));
121     * }
122     * public void onError(Throwable ex) { closeExceptionally(ex); }
123     * public void onComplete() { close(); }
124     * }}</pre>
125     *
126     * @param <T> the published item type
127     * @author Doug Lea
128     * @since 9
129     */
130     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
131     AutoCloseable {
132     /*
133     * Most mechanics are handled by BufferedSubscription. This class
134     * mainly tracks subscribers and ensures sequentiality, by using
135     * built-in synchronization locks across public methods. (Using
136     * built-in locks works well in the most typical case in which
137     * only one thread submits items).
138     */
139    
140     /** The largest possible power of two array size. */
141     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
142    
143     /** Round capacity to power of 2, at most limit. */
144     static final int roundCapacity(int cap) {
145     int n = cap - 1;
146     n |= n >>> 1;
147     n |= n >>> 2;
148     n |= n >>> 4;
149     n |= n >>> 8;
150     n |= n >>> 16;
151     return (n <= 0) ? 1 : // at least 1
152     (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
153     }
154    
155     // default Executor setup; nearly the same as CompletableFuture
156    
157     /**
158     * Default executor -- ForkJoinPool.commonPool() unless it cannot
159     * support parallelism.
160     */
161     private static final Executor ASYNC_POOL =
162     (ForkJoinPool.getCommonPoolParallelism() > 1) ?
163     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
164    
165     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
166     private static final class ThreadPerTaskExecutor implements Executor {
167     public void execute(Runnable r) { new Thread(r).start(); }
168     }
169    
170     /**
171     * Clients (BufferedSubscriptions) are maintained in a linked list
172     * (via their "next" fields). This works well for publish loops.
173     * It requires O(n) traversal to check for duplicate subscribers,
174     * but we expect that subscribing is much less common than
175     * publishing. Unsubscribing occurs only during traversal loops,
176     * when BufferedSubscription methods return negative values
177     * signifying that they have been disabled. To reduce
178     * head-of-line blocking, submit and offer methods first call
179     * BufferedSubscription.offer on each subscriber, and place
180     * saturated ones in retries list (using nextRetry field), and
181     * retry, possibly blocking or dropping.
182     */
183     BufferedSubscription<T> clients;
184    
185     /** Run status, updated only within locks */
186     volatile boolean closed;
187     /** If non-null, the exception in closeExceptionally */
188     volatile Throwable closedException;
189    
190     // Parameters for constructing BufferedSubscriptions
191     final Executor executor;
192     final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
193     final int maxBufferCapacity;
194    
195     /**
196     * Creates a new SubmissionPublisher using the given Executor for
197     * async delivery to subscribers, with the given maximum buffer size
198     * for each subscriber, and, if non-null, the given handler invoked
199     * when any Subscriber throws an exception in method {@link
200     * Flow.Subscriber#onNext(Object) onNext}.
201     *
202     * @param executor the executor to use for async delivery,
203     * supporting creation of at least one independent thread
204     * @param maxBufferCapacity the maximum capacity for each
205     * subscriber's buffer (the enforced capacity may be rounded up to
206     * the nearest power of two and/or bounded by the largest value
207     * supported by this implementation; method {@link #getMaxBufferCapacity}
208     * returns the actual value)
209     * @param handler if non-null, procedure to invoke upon exception
210     * thrown in method {@code onNext}
211     * @throws NullPointerException if executor is null
212     * @throws IllegalArgumentException if maxBufferCapacity not
213     * positive
214     */
215     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
216     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
217     if (executor == null)
218     throw new NullPointerException();
219     if (maxBufferCapacity <= 0)
220     throw new IllegalArgumentException("capacity must be positive");
221     this.executor = executor;
222     this.onNextHandler = handler;
223     this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
224     }
225    
226     /**
227     * Creates a new SubmissionPublisher using the given Executor for
228     * async delivery to subscribers, with the given maximum buffer size
229     * for each subscriber, and no handler for Subscriber exceptions in
230     * method {@link Flow.Subscriber#onNext(Object) onNext}.
231     *
232     * @param executor the executor to use for async delivery,
233     * supporting creation of at least one independent thread
234     * @param maxBufferCapacity the maximum capacity for each
235     * subscriber's buffer (the enforced capacity may be rounded up to
236     * the nearest power of two and/or bounded by the largest value
237     * supported by this implementation; method {@link #getMaxBufferCapacity}
238     * returns the actual value)
239     * @throws NullPointerException if executor is null
240     * @throws IllegalArgumentException if maxBufferCapacity not
241     * positive
242     */
243     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
244     this(executor, maxBufferCapacity, null);
245     }
246    
247     /**
248     * Creates a new SubmissionPublisher using the {@link
249     * ForkJoinPool#commonPool()} for async delivery to subscribers
250     * (unless it does not support a parallelism level of at least two,
251     * in which case, a new Thread is created to run each task), with
252     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
253     * handler for Subscriber exceptions in method {@link
254     * Flow.Subscriber#onNext(Object) onNext}.
255     */
256     public SubmissionPublisher() {
257     this(ASYNC_POOL, Flow.defaultBufferSize(), null);
258     }
259    
260     /**
261     * Adds the given Subscriber unless already subscribed. If already
262     * subscribed, the Subscriber's {@link
263     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
264     * the existing subscription with an {@link IllegalStateException}.
265     * Otherwise, upon success, the Subscriber's {@link
266     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
267     * asynchronously with a new {@link Flow.Subscription}. If {@link
268     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
269     * subscription is cancelled. Otherwise, if this SubmissionPublisher
270     * was closed exceptionally, then the subscriber's {@link
271     * Flow.Subscriber#onError onError} method is invoked with the
272     * corresponding exception, or if closed without exception, the
273     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
274     * method is invoked. Subscribers may enable receiving items by
275     * invoking the {@link Flow.Subscription#request(long) request}
276     * method of the new Subscription, and may unsubscribe by invoking
277     * its {@link Flow.Subscription#cancel() cancel} method.
278     *
279     * @param subscriber the subscriber
280     * @throws NullPointerException if subscriber is null
281     */
282     public void subscribe(Flow.Subscriber<? super T> subscriber) {
283     if (subscriber == null) throw new NullPointerException();
284     BufferedSubscription<T> subscription =
285     new BufferedSubscription<T>(subscriber, executor,
286     onNextHandler, maxBufferCapacity);
287     synchronized (this) {
288     for (BufferedSubscription<T> b = clients, pred = null;;) {
289     if (b == null) {
290     Throwable ex;
291     subscription.onSubscribe();
292     if ((ex = closedException) != null)
293     subscription.onError(ex);
294     else if (closed)
295     subscription.onComplete();
296     else if (pred == null)
297     clients = subscription;
298     else
299     pred.next = subscription;
300     break;
301     }
302     BufferedSubscription<T> next = b.next;
303     if (b.isDisabled()) { // remove
304     b.next = null; // detach
305     if (pred == null)
306     clients = next;
307     else
308     pred.next = next;
309     }
310     else if (subscriber.equals(b.subscriber)) {
311     b.onError(new IllegalStateException("Duplicate subscribe"));
312     break;
313     }
314     else
315     pred = b;
316     b = next;
317     }
318     }
319     }
320    
321     /**
322     * Publishes the given item to each current subscriber by
323     * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
324     * onNext} method, blocking uninterruptibly while resources for any
325     * subscriber are unavailable. This method returns an estimate of
326     * the maximum lag (number of items submitted but not yet consumed)
327     * among all current subscribers. This value is at least one
328     * (accounting for this submitted item) if there are any
329     * subscribers, else zero.
330     *
331     * <p>If the Executor for this publisher throws a
332     * RejectedExecutionException (or any other RuntimeException or
333     * Error) when attempting to asynchronously notify subscribers,
334     * then this exception is rethrown, in which case not all
335     * subscribers will have been issued this item.
336     *
337     * @param item the (non-null) item to publish
338     * @return the estimated maximum lag among subscribers
339     * @throws IllegalStateException if closed
340     * @throws NullPointerException if item is null
341     * @throws RejectedExecutionException if thrown by Executor
342     */
343     public int submit(T item) {
344     if (item == null) throw new NullPointerException();
345     int lag = 0;
346     boolean complete;
347     synchronized (this) {
348     complete = closed;
349     BufferedSubscription<T> b = clients;
350     if (!complete) {
351     BufferedSubscription<T> pred = null, r = null, rtail = null;
352     while (b != null) {
353     BufferedSubscription<T> next = b.next;
354     int stat = b.offer(item);
355     if (stat < 0) { // disabled
356     b.next = null;
357     if (pred == null)
358     clients = next;
359     else
360     pred.next = next;
361     }
362     else {
363     if (stat > lag)
364     lag = stat;
365     else if (stat == 0) { // place on retry list
366     b.nextRetry = null;
367     if (rtail == null)
368     r = b;
369     else
370     rtail.nextRetry = b;
371     rtail = b;
372     }
373     pred = b;
374     }
375     b = next;
376     }
377     while (r != null) {
378     BufferedSubscription<T> nextRetry = r.nextRetry;
379     r.nextRetry = null;
380     int stat = r.submit(item);
381     if (stat > lag)
382     lag = stat;
383     else if (stat < 0 && clients == r)
384     clients = r.next; // postpone internal unsubscribes
385     r = nextRetry;
386     }
387     }
388     }
389     if (complete)
390     throw new IllegalStateException("Closed");
391     else
392     return lag;
393     }
394    
395     /**
396     * Publishes the given item, if possible, to each current subscriber
397     * by asynchronously invoking its {@link
398     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
399     * dropped by one or more subscribers if resource limits are
400     * exceeded, in which case the given handler (if non-null) is
401     * invoked, and if it returns true, retried once. Other calls to
402     * methods in this class by other threads are blocked while the
403     * handler is invoked. Unless recovery is assured, options are
404     * usually limited to logging the error and/or issuing an {@link
405     * Flow.Subscriber#onError(Throwable) onError} signal to the
406     * subscriber.
407     *
408     * <p>This method returns a status indicator: If negative, it
409     * represents the (negative) number of drops (failed attempts to
410     * issue the item to a subscriber). Otherwise it is an estimate of
411     * the maximum lag (number of items submitted but not yet
412     * consumed) among all current subscribers. This value is at least
413     * one (accounting for this submitted item) if there are any
414     * subscribers, else zero.
415     *
416     * <p>If the Executor for this publisher throws a
417     * RejectedExecutionException (or any other RuntimeException or
418     * Error) when attempting to asynchronously notify subscribers, or
419     * the drop handler throws an exception when processing a dropped
420     * item, then this exception is rethrown.
421     *
422     * @param item the (non-null) item to publish
423     * @param onDrop if non-null, the handler invoked upon a drop to a
424     * subscriber, with arguments of the subscriber and item; if it
425     * returns true, an offer is re-attempted (once)
426     * @return if negative, the (negative) number of drops; otherwise
427     * an estimate of maximum lag
428     * @throws IllegalStateException if closed
429     * @throws NullPointerException if item is null
430     * @throws RejectedExecutionException if thrown by Executor
431     */
432     public int offer(T item,
433     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
434     return doOffer(0L, item, onDrop);
435     }
436    
437     /**
438     * Publishes the given item, if possible, to each current subscriber
439     * by asynchronously invoking its {@link
440     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
441     * resources for any subscription are unavailable, up to the
442     * specified timeout or until the caller thread is interrupted, at
443     * which point the given handler (if non-null) is invoked, and if it
444     * returns true, retried once. (The drop handler may distinguish
445     * timeouts from interrupts by checking whether the current thread
446     * is interrupted.) Other calls to methods in this class by other
447     * threads are blocked while the handler is invoked. Unless
448     * recovery is assured, options are usually limited to logging the
449     * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
450     * onError} signal to the subscriber.
451     *
452     * <p>This method returns a status indicator: If negative, it
453     * represents the (negative) number of drops (failed attempts to
454     * issue the item to a subscriber). Otherwise it is an estimate of
455     * the maximum lag (number of items submitted but not yet
456     * consumed) among all current subscribers. This value is at least
457     * one (accounting for this submitted item) if there are any
458     * subscribers, else zero.
459     *
460     * <p>If the Executor for this publisher throws a
461     * RejectedExecutionException (or any other RuntimeException or
462     * Error) when attempting to asynchronously notify subscribers, or
463     * the drop handler throws an exception when processing a dropped
464     * item, then this exception is rethrown.
465     *
466     * @param item the (non-null) item to publish
467     * @param timeout how long to wait for resources for any subscriber
468     * before giving up, in units of {@code unit}
469     * @param unit a {@code TimeUnit} determining how to interpret the
470     * {@code timeout} parameter
471     * @param onDrop if non-null, the handler invoked upon a drop to a
472     * subscriber, with arguments of the subscriber and item; if it
473     * returns true, an offer is re-attempted (once)
474     * @return if negative, the (negative) number of drops; otherwise
475     * an estimate of maximum lag
476     * @throws IllegalStateException if closed
477     * @throws NullPointerException if item is null
478     * @throws RejectedExecutionException if thrown by Executor
479     */
480     public int offer(T item, long timeout, TimeUnit unit,
481     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
482     return doOffer(unit.toNanos(timeout), item, onDrop);
483     }
484    
485     /** Common implementation for both forms of offer */
486     final int doOffer(long nanos, T item,
487     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
488     if (item == null) throw new NullPointerException();
489     int lag = 0, drops = 0;
490     boolean complete;
491     synchronized (this) {
492     complete = closed;
493     BufferedSubscription<T> b = clients;
494     if (!complete) {
495     BufferedSubscription<T> pred = null, r = null, rtail = null;
496     while (b != null) {
497     BufferedSubscription<T> next = b.next;
498     int stat = b.offer(item);
499     if (stat < 0) {
500     b.next = null;
501     if (pred == null)
502     clients = next;
503     else
504     pred.next = next;
505     }
506     else {
507     if (stat > lag)
508     lag = stat;
509     else if (stat == 0) {
510     b.nextRetry = null;
511     if (rtail == null)
512     r = b;
513     else
514     rtail.nextRetry = b;
515     rtail = b;
516     }
517     else if (stat > lag)
518     lag = stat;
519     pred = b;
520     }
521     b = next;
522     }
523     while (r != null) {
524     BufferedSubscription<T> nextRetry = r.nextRetry;
525     r.nextRetry = null;
526 jsr166 1.5 int stat = (nanos > 0L)
527     ? r.timedOffer(item, nanos)
528     : r.offer(item);
529 jsr166 1.1 if (stat == 0 && onDrop != null &&
530     onDrop.test(r.subscriber, item))
531     stat = r.offer(item);
532     if (stat == 0)
533     ++drops;
534     else if (stat > lag)
535     lag = stat;
536     else if (stat < 0 && clients == r)
537     clients = r.next;
538     r = nextRetry;
539     }
540     }
541     }
542     if (complete)
543     throw new IllegalStateException("Closed");
544     else
545     return (drops > 0) ? -drops : lag;
546     }
547    
548     /**
549     * Unless already closed, issues {@link
550     * Flow.Subscriber#onComplete() onComplete} signals to current
551     * subscribers, and disallows subsequent attempts to publish.
552     * Upon return, this method does <em>NOT</em> guarantee that all
553     * subscribers have yet completed.
554     */
555     public void close() {
556     if (!closed) {
557     BufferedSubscription<T> b;
558     synchronized (this) {
559 jsr166 1.6 // no need to re-check closed here
560 jsr166 1.1 b = clients;
561     clients = null;
562     closed = true;
563     }
564     while (b != null) {
565     BufferedSubscription<T> next = b.next;
566     b.next = null;
567     b.onComplete();
568     b = next;
569     }
570     }
571     }
572    
573     /**
574     * Unless already closed, issues {@link
575     * Flow.Subscriber#onError(Throwable) onError} signals to current
576     * subscribers with the given error, and disallows subsequent
577     * attempts to publish. Future subscribers also receive the given
578     * error. Upon return, this method does <em>NOT</em> guarantee
579     * that all subscribers have yet completed.
580     *
581     * @param error the {@code onError} argument sent to subscribers
582     * @throws NullPointerException if error is null
583     */
584     public void closeExceptionally(Throwable error) {
585     if (error == null)
586     throw new NullPointerException();
587     if (!closed) {
588     BufferedSubscription<T> b;
589     synchronized (this) {
590     b = clients;
591 jsr166 1.6 if (!closed) { // don't clobber racing close
592     clients = null;
593     closedException = error;
594     closed = true;
595     }
596 jsr166 1.1 }
597     while (b != null) {
598     BufferedSubscription<T> next = b.next;
599     b.next = null;
600     b.onError(error);
601     b = next;
602     }
603     }
604     }
605    
606     /**
607     * Returns true if this publisher is not accepting submissions.
608     *
609     * @return true if closed
610     */
611     public boolean isClosed() {
612     return closed;
613     }
614    
615     /**
616     * Returns the exception associated with {@link
617     * #closeExceptionally(Throwable) closeExceptionally}, or null if
618     * not closed or if closed normally.
619     *
620     * @return the exception, or null if none
621     */
622     public Throwable getClosedException() {
623     return closedException;
624     }
625    
626     /**
627     * Returns true if this publisher has any subscribers.
628     *
629     * @return true if this publisher has any subscribers
630     */
631     public boolean hasSubscribers() {
632     boolean nonEmpty = false;
633     if (!closed) {
634     synchronized (this) {
635     for (BufferedSubscription<T> b = clients; b != null;) {
636     BufferedSubscription<T> next = b.next;
637     if (b.isDisabled()) {
638     b.next = null;
639     b = clients = next;
640     }
641     else {
642     nonEmpty = true;
643     break;
644     }
645     }
646     }
647     }
648     return nonEmpty;
649     }
650    
651     /**
652     * Returns the number of current subscribers.
653     *
654     * @return the number of current subscribers
655     */
656     public int getNumberOfSubscribers() {
657     int count = 0;
658     if (!closed) {
659     synchronized (this) {
660     BufferedSubscription<T> pred = null, next;
661     for (BufferedSubscription<T> b = clients; b != null; b = next) {
662     next = b.next;
663     if (b.isDisabled()) {
664     b.next = null;
665     if (pred == null)
666     clients = next;
667     else
668     pred.next = next;
669     }
670     else {
671     pred = b;
672     ++count;
673     }
674     }
675     }
676     }
677     return count;
678     }
679    
680     /**
681     * Returns the Executor used for asynchronous delivery.
682     *
683     * @return the Executor used for asynchronous delivery
684     */
685     public Executor getExecutor() {
686     return executor;
687     }
688    
689     /**
690     * Returns the maximum per-subscriber buffer capacity.
691     *
692     * @return the maximum per-subscriber buffer capacity
693     */
694     public int getMaxBufferCapacity() {
695     return maxBufferCapacity;
696     }
697    
698     /**
699     * Returns a list of current subscribers for monitoring and
700     * tracking purposes, not for invoking {@link Flow.Subscriber}
701     * methods on the subscribers.
702     *
703     * @return list of current subscribers
704     */
705     public List<Flow.Subscriber<? super T>> getSubscribers() {
706     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
707     synchronized (this) {
708     BufferedSubscription<T> pred = null, next;
709     for (BufferedSubscription<T> b = clients; b != null; b = next) {
710     next = b.next;
711     if (b.isDisabled()) {
712     b.next = null;
713     if (pred == null)
714     clients = next;
715     else
716     pred.next = next;
717     }
718     else
719     subs.add(b.subscriber);
720     }
721     }
722     return subs;
723     }
724    
725     /**
726     * Returns true if the given Subscriber is currently subscribed.
727     *
728     * @param subscriber the subscriber
729     * @return true if currently subscribed
730     * @throws NullPointerException if subscriber is null
731     */
732     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
733     if (subscriber == null) throw new NullPointerException();
734     if (!closed) {
735     synchronized (this) {
736     BufferedSubscription<T> pred = null, next;
737     for (BufferedSubscription<T> b = clients; b != null; b = next) {
738     next = b.next;
739     if (b.isDisabled()) {
740     b.next = null;
741     if (pred == null)
742     clients = next;
743     else
744     pred.next = next;
745     }
746     else if (subscriber.equals(b.subscriber))
747     return true;
748     else
749     pred = b;
750     }
751     }
752     }
753     return false;
754     }
755    
756     /**
757     * Returns an estimate of the minimum number of items requested
758     * (via {@link Flow.Subscription#request(long) request}) but not
759     * yet produced, among all current subscribers.
760     *
761     * @return the estimate, or zero if no subscribers
762     */
763     public long estimateMinimumDemand() {
764     long min = Long.MAX_VALUE;
765     boolean nonEmpty = false;
766     synchronized (this) {
767     BufferedSubscription<T> pred = null, next;
768     for (BufferedSubscription<T> b = clients; b != null; b = next) {
769     int n; long d;
770     next = b.next;
771     if ((n = b.estimateLag()) < 0) {
772     b.next = null;
773     if (pred == null)
774     clients = next;
775     else
776     pred.next = next;
777     }
778     else {
779     if ((d = b.demand - n) < min)
780     min = d;
781     nonEmpty = true;
782     pred = b;
783     }
784     }
785     }
786     return nonEmpty ? min : 0;
787     }
788    
789     /**
790     * Returns an estimate of the maximum number of items produced but
791     * not yet consumed among all current subscribers.
792     *
793     * @return the estimate
794     */
795     public int estimateMaximumLag() {
796     int max = 0;
797     synchronized (this) {
798     BufferedSubscription<T> pred = null, next;
799     for (BufferedSubscription<T> b = clients; b != null; b = next) {
800     int n;
801     next = b.next;
802     if ((n = b.estimateLag()) < 0) {
803     b.next = null;
804     if (pred == null)
805     clients = next;
806     else
807     pred.next = next;
808     }
809     else {
810     if (n > max)
811     max = n;
812     pred = b;
813     }
814     }
815     }
816     return max;
817     }
818    
819     /**
820     * Processes all published items using the given Consumer function.
821     * Returns a CompletableFuture that is completed normally when this
822     * publisher signals {@link Flow.Subscriber#onComplete()
823     * onComplete}, or completed exceptionally upon any error, or an
824     * exception is thrown by the Consumer, or the returned
825     * CompletableFuture is cancelled, in which case no further items
826     * are processed.
827     *
828     * @param consumer the function applied to each onNext item
829     * @return a CompletableFuture that is completed normally
830     * when the publisher signals onComplete, and exceptionally
831     * upon any error or cancellation
832     * @throws NullPointerException if consumer is null
833     */
834     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
835     if (consumer == null)
836     throw new NullPointerException();
837     CompletableFuture<Void> status = new CompletableFuture<>();
838     subscribe(new ConsumerSubscriber<T>(status, consumer));
839     return status;
840     }
841    
842     /** Subscriber for method consume */
843     private static final class ConsumerSubscriber<T>
844     implements Flow.Subscriber<T> {
845     final CompletableFuture<Void> status;
846     final Consumer<? super T> consumer;
847     Flow.Subscription subscription;
848     ConsumerSubscriber(CompletableFuture<Void> status,
849     Consumer<? super T> consumer) {
850     this.status = status; this.consumer = consumer;
851     }
852     public final void onSubscribe(Flow.Subscription subscription) {
853     this.subscription = subscription;
854     status.whenComplete((v, e) -> subscription.cancel());
855     if (!status.isDone())
856     subscription.request(Long.MAX_VALUE);
857     }
858     public final void onError(Throwable ex) {
859     status.completeExceptionally(ex);
860     }
861     public final void onComplete() {
862     status.complete(null);
863     }
864     public final void onNext(T item) {
865     try {
866     consumer.accept(item);
867     } catch (Throwable ex) {
868     subscription.cancel();
869     status.completeExceptionally(ex);
870     }
871     }
872     }
873    
874     /**
875     * A task for consuming buffer items and signals, created and
876     * executed whenever they become available. A task consumes as
877     * many items/signals as possible before terminating, at which
878     * point another task is created when needed. The dual Runnable
879     * and ForkJoinTask declaration saves overhead when executed by
880     * ForkJoinPools, without impacting other kinds of Executors.
881     */
882     @SuppressWarnings("serial")
883     static final class ConsumerTask<T> extends ForkJoinTask<Void>
884 dl 1.4 implements Runnable, CompletableFuture.AsynchronousCompletionTask {
885 jsr166 1.1 final BufferedSubscription<T> consumer;
886     ConsumerTask(BufferedSubscription<T> consumer) {
887     this.consumer = consumer;
888     }
889     public final Void getRawResult() { return null; }
890     public final void setRawResult(Void v) {}
891     public final boolean exec() { consumer.consume(); return false; }
892     public final void run() { consumer.consume(); }
893     }
894    
895     /**
896     * A bounded (ring) buffer with integrated control to start a
897     * consumer task whenever items are available. The buffer
898     * algorithm is similar to one used inside ForkJoinPool (see its
899     * internal documentation for details) specialized for the case of
900     * at most one concurrent producer and consumer, and power of two
901     * buffer sizes. This allows methods to operate without locks even
902     * while supporting resizing, blocking, task-triggering, and
903     * garbage-free buffers (nulling out elements when consumed),
904     * although supporting these does impose a bit of overhead
905     * compared to plain fixed-size ring buffers.
906     *
907     * The publisher guarantees a single producer via its lock. We
908     * ensure in this class that there is at most one consumer. The
909     * request and cancel methods must be fully thread-safe but are
910     * coded to exploit the most common case in which they are only
911     * called by consumers (usually within onNext).
912     *
913     * Execution control is managed using the ACTIVE ctl bit. We
914     * ensure that a task is active when consumable items (and
915     * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
916     * there is demand (unfilled requests). This is complicated on
917     * the creation side by the possibility of exceptions when trying
918     * to execute tasks. These eventually force DISABLED state, but
919     * sometimes not directly. On the task side, termination (clearing
920     * ACTIVE) that would otherwise race with producers or request()
921     * calls uses the CONSUME keep-alive bit to force a recheck.
922     *
923     * The ctl field also manages run state. When DISABLED, no further
924     * updates are possible. Disabling may be preceded by setting
925     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
926     * case the associated Subscriber methods are invoked, possibly
927     * synchronously if there is no active consumer task (including
928     * cases where execute() failed). The cancel() method is supported
929     * by treating as ERROR but suppressing onError signal.
930     *
931     * Support for blocking also exploits the fact that there is only
932     * one possible waiter. ManagedBlocker-compatible control fields
933     * are placed in this class itself rather than in wait-nodes.
934     * Blocking control relies on the "waiter" field. Producers set
935     * the field before trying to block, but must then recheck (via
936     * offer) before parking. Signalling then just unparks and clears
937 dl 1.4 * waiter field. If the producer and/or consumer are using a
938     * ForkJoinPool, the producer attempts to help run consumer tasks
939     * via ForkJoinPool.helpAsyncBlocker before blocking.
940 jsr166 1.1 *
941     * This class uses @Contended and heuristic field declaration
942     * ordering to reduce false-sharing-based memory contention among
943     * instances of BufferedSubscription, but it does not currently
944     * attempt to avoid memory contention among buffers. This field
945     * and element packing can hurt performance especially when each
946     * publisher has only one client operating at a high rate.
947     * Addressing this may require allocating substantially more space
948     * than users expect.
949     */
950     @SuppressWarnings("serial")
951 jsr166 1.3 @sun.misc.Contended
952 jsr166 1.1 private static final class BufferedSubscription<T>
953     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
954     // Order-sensitive field declarations
955     long timeout; // > 0 if timed wait
956     volatile long demand; // # unfilled requests
957     int maxCapacity; // reduced on OOME
958     int putStat; // offer result for ManagedBlocker
959     volatile int ctl; // atomic run state flags
960     volatile int head; // next position to take
961     int tail; // next position to put
962     Object[] array; // buffer: null if disabled
963     Flow.Subscriber<? super T> subscriber; // null if disabled
964     Executor executor; // null if disabled
965     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
966     volatile Throwable pendingError; // holds until onError issued
967     volatile Thread waiter; // blocked producer thread
968     T putItem; // for offer within ManagedBlocker
969     BufferedSubscription<T> next; // used only by publisher
970     BufferedSubscription<T> nextRetry; // used only by publisher
971    
972     // ctl values
973     static final int ACTIVE = 0x01; // consumer task active
974     static final int CONSUME = 0x02; // keep-alive for consumer task
975     static final int DISABLED = 0x04; // final state
976     static final int ERROR = 0x08; // signal onError then disable
977     static final int SUBSCRIBE = 0x10; // signal onSubscribe
978     static final int COMPLETE = 0x20; // signal onComplete when done
979    
980     static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
981    
982     /**
983     * Initial buffer capacity used when maxBufferCapacity is
984     * greater. Must be a power of two.
985     */
986     static final int DEFAULT_INITIAL_CAP = 32;
987    
988     BufferedSubscription(Flow.Subscriber<? super T> subscriber,
989     Executor executor,
990     BiConsumer<? super Flow.Subscriber<? super T>,
991     ? super Throwable> onNextHandler,
992     int maxBufferCapacity) {
993     this.subscriber = subscriber;
994     this.executor = executor;
995     this.onNextHandler = onNextHandler;
996     this.maxCapacity = maxBufferCapacity;
997     this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
998     (maxBufferCapacity < 2 ? // at least 2 slots
999     2 : maxBufferCapacity) :
1000     DEFAULT_INITIAL_CAP];
1001     }
1002    
1003     final boolean isDisabled() {
1004     return ctl == DISABLED;
1005     }
1006    
1007     /**
1008     * Returns estimated number of buffered items, or -1 if
1009     * disabled.
1010     */
1011     final int estimateLag() {
1012     int n;
1013     return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1014     }
1015    
1016     /**
1017     * Tries to add item and start consumer task if necessary.
1018     * @return -1 if disabled, 0 if dropped, else estimated lag
1019     */
1020     final int offer(T item) {
1021     int h = head, t = tail, cap, size, stat;
1022     Object[] a = array;
1023     if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1024     a[(cap - 1) & t] = item; // relaxed writes OK
1025     tail = t + 1;
1026     stat = size;
1027     }
1028     else
1029     stat = growAndAdd(a, item);
1030     return (stat > 0 &&
1031     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1032     startOnOffer(stat) : stat;
1033     }
1034    
1035     /**
1036     * Tries to create or expand buffer, then adds item if possible.
1037     */
1038     private int growAndAdd(Object[] a, T item) {
1039     boolean alloc;
1040     int cap, stat;
1041     if ((ctl & (ERROR | DISABLED)) != 0) {
1042     cap = 0;
1043     stat = -1;
1044     alloc = false;
1045     }
1046     else if (a == null || (cap = a.length) <= 0) {
1047     cap = 0;
1048     stat = 1;
1049     alloc = true;
1050     }
1051     else {
1052     U.fullFence(); // recheck
1053     int h = head, t = tail, size = t + 1 - h;
1054     if (cap >= size) {
1055     a[(cap - 1) & t] = item;
1056     tail = t + 1;
1057     stat = size;
1058     alloc = false;
1059     }
1060     else if (cap >= maxCapacity) {
1061     stat = 0; // cannot grow
1062     alloc = false;
1063     }
1064     else {
1065     stat = cap + 1;
1066     alloc = true;
1067     }
1068     }
1069     if (alloc) {
1070     int newCap = (cap > 0) ? cap << 1 : 1;
1071     if (newCap <= cap)
1072     stat = 0;
1073     else {
1074     Object[] newArray = null;
1075     try {
1076     newArray = new Object[newCap];
1077     } catch (Throwable ex) { // try to cope with OOME
1078     }
1079     if (newArray == null) {
1080     if (cap > 0)
1081     maxCapacity = cap; // avoid continuous failure
1082     stat = 0;
1083     }
1084     else {
1085     array = newArray;
1086     int t = tail;
1087     int newMask = newCap - 1;
1088     if (a != null && cap > 0) {
1089     int mask = cap - 1;
1090     for (int j = head; j != t; ++j) {
1091     long k = ((long)(j & mask) << ASHIFT) + ABASE;
1092     Object x = U.getObjectVolatile(a, k);
1093     if (x != null && // races with consumer
1094     U.compareAndSwapObject(a, k, x, null))
1095     newArray[j & newMask] = x;
1096     }
1097     }
1098     newArray[t & newMask] = item;
1099     tail = t + 1;
1100     }
1101     }
1102     }
1103     return stat;
1104     }
1105    
1106     /**
1107     * Spins/helps/blocks while offer returns 0. Called only if
1108     * initial offer return 0.
1109     */
1110     final int submit(T item) {
1111 dl 1.4 int stat;
1112     if ((stat = offer(item)) == 0) {
1113 jsr166 1.1 putItem = item;
1114     timeout = 0L;
1115 dl 1.4 putStat = 0;
1116     ForkJoinPool.helpAsyncBlocker(executor, this);
1117     if ((stat = putStat) == 0) {
1118     try {
1119     ForkJoinPool.managedBlock(this);
1120     } catch (InterruptedException ie) {
1121     timeout = INTERRUPTED;
1122     }
1123     stat = putStat;
1124 jsr166 1.1 }
1125     if (timeout < 0L)
1126     Thread.currentThread().interrupt();
1127     }
1128     return stat;
1129     }
1130    
1131     /**
1132     * Timeout version; similar to submit.
1133     */
1134     final int timedOffer(T item, long nanos) {
1135 dl 1.4 int stat;
1136     if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1137     putItem = item;
1138     putStat = 0;
1139     ForkJoinPool.helpAsyncBlocker(executor, this);
1140     if ((stat = putStat) == 0) {
1141     try {
1142     ForkJoinPool.managedBlock(this);
1143     } catch (InterruptedException ie) {
1144     timeout = INTERRUPTED;
1145 jsr166 1.1 }
1146 dl 1.4 stat = putStat;
1147 jsr166 1.1 }
1148     if (timeout < 0L)
1149     Thread.currentThread().interrupt();
1150     }
1151     return stat;
1152     }
1153    
1154     /**
1155     * Tries to start consumer task after offer.
1156     * @return -1 if now disabled, else argument
1157     */
1158     private int startOnOffer(int stat) {
1159     for (;;) {
1160     Executor e; int c;
1161     if ((c = ctl) == DISABLED || (e = executor) == null) {
1162     stat = -1;
1163     break;
1164     }
1165     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1166     if ((c & CONSUME) != 0 ||
1167     U.compareAndSwapInt(this, CTL, c,
1168     c | CONSUME))
1169     break;
1170     }
1171     else if (demand == 0L || tail == head)
1172     break;
1173     else if (U.compareAndSwapInt(this, CTL, c,
1174     c | (ACTIVE | CONSUME))) {
1175     try {
1176     e.execute(new ConsumerTask<T>(this));
1177     break;
1178     } catch (RuntimeException | Error ex) { // back out
1179     do {} while (((c = ctl) & DISABLED) == 0 &&
1180     (c & ACTIVE) != 0 &&
1181     !U.compareAndSwapInt(this, CTL, c,
1182     c & ~ACTIVE));
1183     throw ex;
1184     }
1185     }
1186     }
1187     return stat;
1188     }
1189    
1190     private void signalWaiter(Thread w) {
1191     waiter = null;
1192     LockSupport.unpark(w); // release producer
1193     }
1194    
1195     /**
1196     * Nulls out most fields, mainly to avoid garbage retention
1197     * until publisher unsubscribes, but also to help cleanly stop
1198     * upon error by nulling required components.
1199     */
1200     private void detach() {
1201     Thread w = waiter;
1202     executor = null;
1203     subscriber = null;
1204     pendingError = null;
1205     signalWaiter(w);
1206     }
1207    
1208     /**
1209     * Issues error signal, asynchronously if a task is running,
1210     * else synchronously.
1211     */
1212     final void onError(Throwable ex) {
1213     for (int c;;) {
1214     if (((c = ctl) & (ERROR | DISABLED)) != 0)
1215     break;
1216     else if ((c & ACTIVE) != 0) {
1217     pendingError = ex;
1218     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1219     break; // cause consumer task to exit
1220     }
1221     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1222     Flow.Subscriber<? super T> s = subscriber;
1223     if (s != null && ex != null) {
1224     try {
1225     s.onError(ex);
1226     } catch (Throwable ignore) {
1227     }
1228     }
1229     detach();
1230     break;
1231     }
1232     }
1233     }
1234    
1235     /**
1236     * Tries to start consumer task upon a signal or request;
1237     * disables on failure.
1238     */
1239     private void startOrDisable() {
1240     Executor e;
1241     if ((e = executor) != null) { // skip if already disabled
1242     try {
1243     e.execute(new ConsumerTask<T>(this));
1244     } catch (Throwable ex) { // back out and force signal
1245     for (int c;;) {
1246     if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1247     break;
1248     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1249     onError(ex);
1250     break;
1251     }
1252     }
1253     }
1254     }
1255     }
1256    
1257     final void onComplete() {
1258     for (int c;;) {
1259     if ((c = ctl) == DISABLED)
1260     break;
1261     if (U.compareAndSwapInt(this, CTL, c,
1262     c | (ACTIVE | CONSUME | COMPLETE))) {
1263     if ((c & ACTIVE) == 0)
1264     startOrDisable();
1265     break;
1266     }
1267     }
1268     }
1269    
1270     final void onSubscribe() {
1271     for (int c;;) {
1272     if ((c = ctl) == DISABLED)
1273     break;
1274     if (U.compareAndSwapInt(this, CTL, c,
1275     c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1276     if ((c & ACTIVE) == 0)
1277     startOrDisable();
1278     break;
1279     }
1280     }
1281     }
1282    
1283     /**
1284     * Causes consumer task to exit if active (without reporting
1285     * onError unless there is already a pending error), and
1286     * disables.
1287     */
1288     public void cancel() {
1289     for (int c;;) {
1290     if ((c = ctl) == DISABLED)
1291     break;
1292     else if ((c & ACTIVE) != 0) {
1293     if (U.compareAndSwapInt(this, CTL, c,
1294     c | (CONSUME | ERROR)))
1295     break;
1296     }
1297     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1298     detach();
1299     break;
1300     }
1301     }
1302     }
1303    
1304     /**
1305     * Adds to demand and possibly starts task.
1306     */
1307     public void request(long n) {
1308     if (n > 0L) {
1309     for (;;) {
1310     long prev = demand, d;
1311     if ((d = prev + n) < prev) // saturate
1312     d = Long.MAX_VALUE;
1313     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1314     for (int c, h;;) {
1315     if ((c = ctl) == DISABLED)
1316     break;
1317     else if ((c & ACTIVE) != 0) {
1318     if ((c & CONSUME) != 0 ||
1319     U.compareAndSwapInt(this, CTL, c,
1320     c | CONSUME))
1321     break;
1322     }
1323     else if ((h = head) != tail) {
1324     if (U.compareAndSwapInt(this, CTL, c,
1325     c | (ACTIVE|CONSUME))) {
1326     startOrDisable();
1327     break;
1328     }
1329     }
1330     else if (head == h && tail == h)
1331     break; // else stale
1332     if (demand == 0L)
1333     break;
1334     }
1335     break;
1336     }
1337     }
1338     }
1339 jsr166 1.6 else
1340 jsr166 1.1 onError(new IllegalArgumentException(
1341 jsr166 1.6 "non-positive subscription request"));
1342 jsr166 1.1 }
1343    
1344     public final boolean isReleasable() { // for ManagedBlocker
1345     T item = putItem;
1346     if (item != null) {
1347     if ((putStat = offer(item)) == 0)
1348     return false;
1349     putItem = null;
1350     }
1351     return true;
1352     }
1353    
1354     public final boolean block() { // for ManagedBlocker
1355     T item = putItem;
1356     if (item != null) {
1357     putItem = null;
1358     long nanos = timeout;
1359     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1360     while ((putStat = offer(item)) == 0) {
1361     if (Thread.interrupted()) {
1362     timeout = INTERRUPTED;
1363     if (nanos > 0L)
1364     break;
1365     }
1366     else if (nanos > 0L &&
1367     (nanos = deadline - System.nanoTime()) <= 0L)
1368     break;
1369     else if (waiter == null)
1370     waiter = Thread.currentThread();
1371     else {
1372     if (nanos > 0L)
1373     LockSupport.parkNanos(this, nanos);
1374     else
1375     LockSupport.park(this);
1376     waiter = null;
1377     }
1378     }
1379     }
1380     waiter = null;
1381     return true;
1382     }
1383    
1384     /**
1385     * Consumer loop, called from ConsumerTask, or indirectly
1386     * when helping during submit.
1387     */
1388     final void consume() {
1389     Flow.Subscriber<? super T> s;
1390     int h = head;
1391     if ((s = subscriber) != null) { // else disabled
1392     for (;;) {
1393     long d = demand;
1394     int c; Object[] a; int n; long i; Object x; Thread w;
1395     if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1396     if (!checkControl(s, c))
1397     break;
1398     }
1399     else if ((a = array) == null || h == tail ||
1400     (n = a.length) == 0 ||
1401     (x = U.getObjectVolatile
1402     (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1403     == null) {
1404     if (!checkEmpty(s, c))
1405     break;
1406     }
1407     else if (d == 0L) {
1408     if (!checkDemand(c))
1409     break;
1410     }
1411     else if (((c & CONSUME) != 0 ||
1412     U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1413     U.compareAndSwapObject(a, i, x, null)) {
1414     U.putOrderedInt(this, HEAD, ++h);
1415     U.getAndAddLong(this, DEMAND, -1L);
1416     if ((w = waiter) != null)
1417     signalWaiter(w);
1418     try {
1419     @SuppressWarnings("unchecked") T y = (T) x;
1420     s.onNext(y);
1421     } catch (Throwable ex) {
1422     handleOnNext(s, ex);
1423     }
1424     }
1425     }
1426     }
1427     }
1428    
1429     /**
1430     * Responds to control events in consume().
1431     */
1432     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1433     boolean stat = true;
1434 jsr166 1.5 if ((c & SUBSCRIBE) != 0) {
1435     if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1436     try {
1437     if (s != null)
1438     s.onSubscribe(this);
1439     } catch (Throwable ex) {
1440     onError(ex);
1441     }
1442     }
1443     }
1444     else if ((c & ERROR) != 0) {
1445 jsr166 1.1 Throwable ex = pendingError;
1446     ctl = DISABLED; // no need for CAS
1447     if (ex != null) { // null if errorless cancel
1448     try {
1449     if (s != null)
1450     s.onError(ex);
1451     } catch (Throwable ignore) {
1452     }
1453     }
1454     }
1455     else {
1456     detach();
1457     stat = false;
1458     }
1459     return stat;
1460     }
1461    
1462     /**
1463     * Responds to apparent emptiness in consume().
1464     */
1465     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1466     boolean stat = true;
1467     if (head == tail) {
1468     if ((c & CONSUME) != 0)
1469     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1470     else if ((c & COMPLETE) != 0) {
1471     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1472     try {
1473     if (s != null)
1474     s.onComplete();
1475     } catch (Throwable ignore) {
1476     }
1477     }
1478     }
1479     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1480     stat = false;
1481     }
1482     return stat;
1483     }
1484    
1485     /**
1486     * Responds to apparent zero demand in consume().
1487     */
1488     private boolean checkDemand(int c) {
1489     boolean stat = true;
1490     if (demand == 0L) {
1491     if ((c & CONSUME) != 0)
1492     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1493     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1494     stat = false;
1495     }
1496     return stat;
1497     }
1498    
1499     /**
1500     * Processes exception in Subscriber.onNext.
1501     */
1502     private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1503     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1504     if ((h = onNextHandler) != null) {
1505     try {
1506     h.accept(s, ex);
1507     } catch (Throwable ignore) {
1508     }
1509     }
1510     onError(ex);
1511     }
1512    
1513     // Unsafe mechanics
1514     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1515     private static final long CTL;
1516     private static final long TAIL;
1517     private static final long HEAD;
1518     private static final long DEMAND;
1519     private static final int ABASE;
1520     private static final int ASHIFT;
1521    
1522     static {
1523     try {
1524     CTL = U.objectFieldOffset
1525     (BufferedSubscription.class.getDeclaredField("ctl"));
1526     TAIL = U.objectFieldOffset
1527     (BufferedSubscription.class.getDeclaredField("tail"));
1528     HEAD = U.objectFieldOffset
1529     (BufferedSubscription.class.getDeclaredField("head"));
1530     DEMAND = U.objectFieldOffset
1531     (BufferedSubscription.class.getDeclaredField("demand"));
1532    
1533     ABASE = U.arrayBaseOffset(Object[].class);
1534     int scale = U.arrayIndexScale(Object[].class);
1535     if ((scale & (scale - 1)) != 0)
1536     throw new Error("data type scale not a power of two");
1537     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1538     } catch (ReflectiveOperationException e) {
1539     throw new Error(e);
1540     }
1541    
1542     // Reduce the risk of rare disastrous classloading in first call to
1543     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1544     Class<?> ensureLoaded = LockSupport.class;
1545     }
1546     }
1547     }