ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.48
Committed: Sun Sep 13 13:39:41 2015 UTC (8 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.47: +3 -3 lines
Log Message:
incorporate suggested doc improvements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12 dl 1.36 import java.util.function.BiConsumer;
13 dl 1.1 import java.util.function.BiPredicate;
14 dl 1.45 import java.util.function.Consumer;
15 dl 1.1 import java.util.function.Function;
16 dl 1.15 import java.util.function.Supplier;
17 dl 1.1
18     /**
19 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
20     * (non-null) items to current subscribers until it is closed. Each
21     * current subscriber receives newly submitted items in the same order
22     * unless drops or exceptions are encountered. Using a
23     * SubmissionPublisher allows item generators to act as Publishers
24     * relying on drop handling and/or blocking for flow control.
25 dl 1.1 *
26 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
27 dl 1.1 * constructor for delivery to subscribers. The best choice of
28     * Executor depends on expected usage. If the generator(s) of
29     * submitted items run in separate threads, and the number of
30     * subscribers can be estimated, consider using a {@link
31 dl 1.46 * Executors#newFixedThreadPool}. Otherwise consider using the
32     * default, normally the {@link ForkJoinPool#commonPool}.
33 dl 1.1 *
34     * <p>Buffering allows producers and consumers to transiently operate
35     * at different rates. Each subscriber uses an independent buffer.
36 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
37 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
38 dl 1.21 * nearest power of two and/or bounded by the largest value supported
39     * by this implementation.) Invocations of {@link
40 dl 1.16 * Flow.Subscription#request} do not directly result in buffer
41 dl 1.24 * expansion, but risk saturation if unfilled requests exceed the
42 dl 1.25 * maximum capacity. The default value of {@link
43     * Flow#defaultBufferSize} may provide a useful starting point for
44     * choosing a capacity based on expected rates, resources, and usages.
45 dl 1.18 *
46 dl 1.1 * <p>Publication methods support different policies about what to do
47     * when buffers are saturated. Method {@link #submit} blocks until
48 jsr166 1.26 * resources are available. This is simplest, but least responsive.
49     * The {@code offer} methods may drop items (either immediately or
50     * with bounded timeout), but provide an opportunity to interpose a
51     * handler and then retry.
52 dl 1.1 *
53     * <p>If any Subscriber method throws an exception, its subscription
54 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
55     * it is invoked before cancellation upon an exception in method
56 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
57     * {@link Flow.Subscriber#onSubscribe onSubscribe},
58 jsr166 1.44 * {@link Flow.Subscriber#onError onError} and
59     * {@link Flow.Subscriber#onComplete onComplete} are not recorded or
60 jsr166 1.41 * handled before cancellation. If the supplied Executor throws
61     * {@link RejectedExecutionException} (or any other RuntimeException
62     * or Error) when attempting to execute a task, or a drop handler
63     * throws an exception when processing a dropped item, then the
64     * exception is rethrown. In these cases, not all subscribers will
65     * have been issued the published item. It is usually good practice to
66     * {@link #closeExceptionally closeExceptionally} in these cases.
67 dl 1.1 *
68 dl 1.45 * <p>Method {@link #consume} simplifies support for a common case in
69     * which the only action of a subscriber is to request and process all
70     * items using a supplied function.
71     *
72 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
73     * that generate items, and use the methods in this class to publish
74     * them. For example here is a class that periodically publishes the
75     * items generated from a supplier. (In practice you might add methods
76 dl 1.35 * to independently start and stop generation, to share Executors
77 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
78     * component rather than a superclass.)
79 dl 1.15 *
80     * <pre> {@code
81     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
82     * final ScheduledFuture<?> periodicTask;
83     * final ScheduledExecutorService scheduler;
84 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
85     * Supplier<? extends T> supplier,
86 dl 1.15 * long period, TimeUnit unit) {
87 dl 1.21 * super(executor, maxBufferCapacity);
88 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
89     * periodicTask = scheduler.scheduleAtFixedRate(
90     * () -> submit(supplier.get()), 0, period, unit);
91     * }
92     * public void close() {
93     * periodicTask.cancel(false);
94     * scheduler.shutdown();
95     * super.close();
96     * }
97     * }}</pre>
98     *
99 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
100     * It uses single-step requests to its publisher for simplicity of
101     * illustration. A more adaptive version could monitor flow using the
102 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
103 dl 1.21 * methods.
104 dl 1.16 *
105     * <pre> {@code
106     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
107     * implements Flow.Processor<S,T> {
108     * final Function<? super S, ? extends T> function;
109     * Flow.Subscription subscription;
110 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
111 dl 1.16 * Function<? super S, ? extends T> function) {
112 dl 1.21 * super(executor, maxBufferCapacity);
113 dl 1.16 * this.function = function;
114     * }
115     * public void onSubscribe(Flow.Subscription subscription) {
116     * (this.subscription = subscription).request(1);
117     * }
118     * public void onNext(S item) {
119     * subscription.request(1);
120     * submit(function.apply(item));
121     * }
122     * public void onError(Throwable ex) { closeExceptionally(ex); }
123     * public void onComplete() { close(); }
124     * }}</pre>
125     *
126 dl 1.1 * @param <T> the published item type
127     * @author Doug Lea
128     * @since 1.9
129     */
130     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
131     AutoCloseable {
132     /*
133     * Most mechanics are handled by BufferedSubscription. This class
134 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
135     * built-in synchronization locks across public methods. (Using
136     * built-in locks works well in the most typical case in which
137     * only one thread submits items).
138 dl 1.1 */
139    
140 dl 1.25 /** The largest possible power of two array size. */
141 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
142 dl 1.1
143 dl 1.40 /** Round capacity to power of 2, at most limit. */
144     static final int roundCapacity(int cap) {
145 dl 1.1 int n = cap - 1;
146     n |= n >>> 1;
147     n |= n >>> 2;
148     n |= n >>> 4;
149     n |= n >>> 8;
150     n |= n >>> 16;
151 dl 1.40 return (n <= 0) ? 1 : // at least 1
152 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
153 dl 1.1 }
154    
155 dl 1.48 // default Executor setup; nearly the same as CompletableFuture
156 dl 1.46
157     /**
158     * Default executor -- ForkJoinPool.commonPool() unless it cannot
159     * support parallelism.
160     */
161     private static final Executor asyncPool =
162     (ForkJoinPool.getCommonPoolParallelism() > 1) ?
163     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
164    
165     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
166     static final class ThreadPerTaskExecutor implements Executor {
167     public void execute(Runnable r) { new Thread(r).start(); }
168     }
169    
170 dl 1.1 /**
171     * Clients (BufferedSubscriptions) are maintained in a linked list
172     * (via their "next" fields). This works well for publish loops.
173     * It requires O(n) traversal to check for duplicate subscribers,
174     * but we expect that subscribing is much less common than
175 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
176 dl 1.25 * when BufferedSubscription methods return negative values
177 dl 1.29 * signifying that they have been disabled. To reduce
178     * head-of-line blocking, submit and offer methods first call
179     * BufferedSubscription.offer on each subscriber, and place
180     * saturated ones in retries list (using nextRetry field), and
181     * retry, possibly blocking or dropping.
182 dl 1.1 */
183     BufferedSubscription<T> clients;
184    
185 dl 1.20 /** Run status, updated only within locks */
186     volatile boolean closed;
187 dl 1.36 /** If non-null, the exception in closeExceptionally */
188 dl 1.37 volatile Throwable closedException;
189 dl 1.20
190 dl 1.1 // Parameters for constructing BufferedSubscriptions
191     final Executor executor;
192 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
193 dl 1.1 final int maxBufferCapacity;
194    
195 dl 1.20 /**
196 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
197 dl 1.36 * async delivery to subscribers, with the given maximum
198     * buffer size for each subscriber, and, if non-null, the given handler
199     * invoked when any Subscriber throws an exception in method {@code onNext}.
200 dl 1.1 *
201     * @param executor the executor to use for async delivery,
202     * supporting creation of at least one independent thread
203     * @param maxBufferCapacity the maximum capacity for each
204 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
205     * the nearest power of two and/or bounded by the largest value
206     * supported by this implementation; method {@link #getMaxBufferCapacity}
207     * returns the actual value)
208 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
209     * thrown in method {@code onNext}.
210 dl 1.1 * @throws NullPointerException if executor is null
211 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
212     * positive
213 dl 1.1 */
214 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
215     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
216 dl 1.21 if (executor == null)
217     throw new NullPointerException();
218     if (maxBufferCapacity <= 0)
219     throw new IllegalArgumentException("capacity must be positive");
220 dl 1.18 this.executor = executor;
221 dl 1.36 this.onNextHandler = handler;
222 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
223 dl 1.18 }
224    
225 dl 1.1 /**
226 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
227     * async delivery to subscribers, with the given maximum
228     * buffer size for each subscriber, and no handler for Subscriber
229     * exceptions in method {@code onNext}.
230     *
231     * @param executor the executor to use for async delivery,
232     * supporting creation of at least one independent thread
233     * @param maxBufferCapacity the maximum capacity for each
234     * subscriber's buffer (the enforced capacity may be rounded up to
235     * the nearest power of two and/or bounded by the largest value
236     * supported by this implementation; method {@link #getMaxBufferCapacity}
237     * returns the actual value)
238     * @throws NullPointerException if executor is null
239     * @throws IllegalArgumentException if maxBufferCapacity not
240     * positive
241     */
242     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
243     this(executor, maxBufferCapacity, null);
244     }
245    
246     /**
247 dl 1.25 * Creates a new SubmissionPublisher using the {@link
248 dl 1.46 * ForkJoinPool#commonPool()} for async delivery to subscribers
249     * (unless it does not support a parallelism level of at least
250     * two, in which case, a new Thread is created to run each task),
251 dl 1.36 * with maximum buffer capacity of {@link Flow#defaultBufferSize},
252     * and no handler for Subscriber exceptions in method {@code
253     * onNext}.
254 dl 1.25 */
255     public SubmissionPublisher() {
256 dl 1.46 this(asyncPool, Flow.defaultBufferSize(), null);
257 dl 1.25 }
258    
259     /**
260 dl 1.18 * Adds the given Subscriber unless already subscribed. If
261 dl 1.30 * already subscribed, the Subscriber's {@code onError} method is
262     * invoked on the existing subscription with an {@link
263     * IllegalStateException}. Otherwise, upon success, the
264     * Subscriber's {@code onSubscribe} method is invoked
265     * asynchronously with a new {@link Flow.Subscription}. If {@code
266     * onSubscribe} throws an exception, the subscription is
267 dl 1.36 * cancelled. Otherwise, if this SubmissionPublisher was closed
268     * exceptionally, then the subscriber's {@code onError} method is
269     * invoked with the corresponding exception, or if closed without
270     * exception, the subscriber's {@code onComplete} method is
271     * invoked. Subscribers may enable receiving items by invoking
272     * the {@code request} method of the new Subscription, and may
273     * unsubscribe by invoking its {@code cancel} method.
274 dl 1.18 *
275     * @param subscriber the subscriber
276     * @throws NullPointerException if subscriber is null
277     */
278     public void subscribe(Flow.Subscriber<? super T> subscriber) {
279     if (subscriber == null) throw new NullPointerException();
280 dl 1.20 BufferedSubscription<T> subscription =
281 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
282     onNextHandler, maxBufferCapacity);
283 dl 1.19 synchronized (this) {
284 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
285     if (b == null) {
286 dl 1.36 Throwable ex;
287 dl 1.29 subscription.onSubscribe();
288 dl 1.37 if ((ex = closedException) != null)
289 dl 1.36 subscription.onError(ex);
290     else if (closed)
291 dl 1.29 subscription.onComplete();
292     else if (pred == null)
293     clients = subscription;
294 dl 1.27 else
295 dl 1.29 pred.next = subscription;
296     break;
297 dl 1.27 }
298 dl 1.29 BufferedSubscription<T> next = b.next;
299     if (b.isDisabled()) { // remove
300     b.next = null; // detach
301 dl 1.19 if (pred == null)
302 dl 1.29 clients = next;
303 dl 1.19 else
304 dl 1.29 pred.next = next;
305     }
306     else if (subscriber.equals(b.subscriber)) {
307     b.onError(new IllegalStateException("Duplicate subscribe"));
308     break;
309 dl 1.19 }
310 dl 1.29 else
311     pred = b;
312     b = next;
313 dl 1.19 }
314     }
315 dl 1.18 }
316    
317     /**
318 dl 1.16 * Publishes the given item to each current subscriber by
319 dl 1.30 * asynchronously invoking its {@link Flow.Subscriber#onNext}
320     * method, blocking uninterruptibly while resources for any
321     * subscriber are unavailable. This method returns an estimate of
322     * the maximum lag (number of items submitted but not yet
323     * consumed) among all current subscribers. This value is at least
324     * one (accounting for this submitted item) if there are any
325     * subscribers, else zero.
326 dl 1.16 *
327     * <p>If the Executor for this publisher throws a
328     * RejectedExecutionException (or any other RuntimeException or
329     * Error) when attempting to asynchronously notify subscribers,
330 dl 1.31 * then this exception is rethrown, in which case not all
331     * subscribers will have been issued this item.
332 dl 1.16 *
333     * @param item the (non-null) item to publish
334 dl 1.21 * @return the estimated maximum lag among subscribers
335 dl 1.16 * @throws IllegalStateException if closed
336     * @throws NullPointerException if item is null
337     * @throws RejectedExecutionException if thrown by Executor
338     */
339 dl 1.21 public int submit(T item) {
340 dl 1.16 if (item == null) throw new NullPointerException();
341 dl 1.21 int lag = 0;
342 dl 1.27 boolean complete;
343 dl 1.16 synchronized (this) {
344 dl 1.27 complete = closed;
345 dl 1.29 BufferedSubscription<T> b = clients;
346 dl 1.27 if (!complete) {
347 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
348 dl 1.27 while (b != null) {
349     BufferedSubscription<T> next = b.next;
350 dl 1.29 int stat = b.offer(item);
351     if (stat < 0) { // disabled
352     b.next = null;
353 dl 1.27 if (pred == null)
354     clients = next;
355 dl 1.24 else
356 dl 1.27 pred.next = next;
357 dl 1.24 }
358 dl 1.27 else {
359 dl 1.29 if (stat > lag)
360     lag = stat;
361     else if (stat == 0) { // place on retry list
362     b.nextRetry = null;
363 dl 1.27 if (rtail == null)
364 dl 1.29 r = b;
365 dl 1.27 else
366     rtail.nextRetry = b;
367     rtail = b;
368     }
369     pred = b;
370     }
371     b = next;
372 dl 1.21 }
373 dl 1.29 while (r != null) {
374     BufferedSubscription<T> nextRetry = r.nextRetry;
375     r.nextRetry = null;
376     int stat = r.submit(item);
377     if (stat > lag)
378     lag = stat;
379     else if (stat < 0 && clients == r)
380     clients = r.next; // postpone internal unsubscribes
381     r = nextRetry;
382     }
383 dl 1.16 }
384     }
385 dl 1.27 if (complete)
386     throw new IllegalStateException("Closed");
387     else
388     return lag;
389 dl 1.16 }
390    
391     /**
392 dl 1.1 * Publishes the given item, if possible, to each current
393 dl 1.30 * subscriber by asynchronously invoking its {@link
394     * Flow.Subscriber#onNext} method. The item may be dropped by one
395     * or more subscribers if resource limits are exceeded, in which
396     * case the given handler (if non-null) is invoked, and if it
397     * returns true, retried once. Other calls to methods in this
398     * class by other threads are blocked while the handler is
399     * invoked. Unless recovery is assured, options are usually
400     * limited to logging the error and/or issuing an {@code onError}
401     * signal to the subscriber.
402 dl 1.1 *
403 dl 1.21 * <p>This method returns a status indicator: If negative, it
404     * represents the (negative) number of drops (failed attempts to
405     * issue the item to a subscriber). Otherwise it is an estimate of
406     * the maximum lag (number of items submitted but not yet
407     * consumed) among all current subscribers. This value is at least
408     * one (accounting for this submitted item) if there are any
409     * subscribers, else zero.
410 jsr166 1.23 *
411 dl 1.1 * <p>If the Executor for this publisher throws a
412     * RejectedExecutionException (or any other RuntimeException or
413     * Error) when attempting to asynchronously notify subscribers, or
414     * the drop handler throws an exception when processing a dropped
415     * item, then this exception is rethrown.
416     *
417     * @param item the (non-null) item to publish
418     * @param onDrop if non-null, the handler invoked upon a drop to a
419     * subscriber, with arguments of the subscriber and item; if it
420     * returns true, an offer is re-attempted (once)
421 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
422     * an estimate of maximum lag
423 dl 1.1 * @throws IllegalStateException if closed
424     * @throws NullPointerException if item is null
425     * @throws RejectedExecutionException if thrown by Executor
426     */
427     public int offer(T item,
428     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
429 dl 1.29 return doOffer(0L, item, onDrop);
430 dl 1.1 }
431    
432     /**
433     * Publishes the given item, if possible, to each current
434 dl 1.30 * subscriber by asynchronously invoking its {@link
435     * Flow.Subscriber#onNext} method, blocking while resources for
436     * any subscription are unavailable, up to the specified timeout
437 dl 1.48 * or until the caller thread is interrupted, at which point the
438     * given handler (if non-null) is invoked, and if it returns true,
439 dl 1.30 * retried once. (The drop handler may distinguish timeouts from
440     * interrupts by checking whether the current thread is
441     * interrupted.) Other calls to methods in this class by other
442     * threads are blocked while the handler is invoked. Unless
443     * recovery is assured, options are usually limited to logging the
444     * error and/or issuing an {@code onError} signal to the
445     * subscriber.
446 dl 1.1 *
447 dl 1.21 * <p>This method returns a status indicator: If negative, it
448     * represents the (negative) number of drops (failed attempts to
449     * issue the item to a subscriber). Otherwise it is an estimate of
450     * the maximum lag (number of items submitted but not yet
451     * consumed) among all current subscribers. This value is at least
452     * one (accounting for this submitted item) if there are any
453     * subscribers, else zero.
454     *
455 dl 1.1 * <p>If the Executor for this publisher throws a
456     * RejectedExecutionException (or any other RuntimeException or
457     * Error) when attempting to asynchronously notify subscribers, or
458     * the drop handler throws an exception when processing a dropped
459     * item, then this exception is rethrown.
460     *
461     * @param item the (non-null) item to publish
462     * @param timeout how long to wait for resources for any subscriber
463     * before giving up, in units of {@code unit}
464     * @param unit a {@code TimeUnit} determining how to interpret the
465     * {@code timeout} parameter
466     * @param onDrop if non-null, the handler invoked upon a drop to a
467     * subscriber, with arguments of the subscriber and item; if it
468     * returns true, an offer is re-attempted (once)
469 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
470     * an estimate of maximum lag
471 dl 1.1 * @throws IllegalStateException if closed
472     * @throws NullPointerException if item is null
473     * @throws RejectedExecutionException if thrown by Executor
474     */
475     public int offer(T item, long timeout, TimeUnit unit,
476     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
477 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
478     }
479    
480     /** Common implementation for both forms of offer */
481     final int doOffer(long nanos, T item,
482     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
483 dl 1.1 if (item == null) throw new NullPointerException();
484 dl 1.27 int lag = 0, drops = 0;
485     boolean complete;
486 jsr166 1.2 synchronized (this) {
487 dl 1.27 complete = closed;
488 dl 1.29 BufferedSubscription<T> b = clients;
489 dl 1.27 if (!complete) {
490 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
491 dl 1.27 while (b != null) {
492     BufferedSubscription<T> next = b.next;
493 dl 1.29 int stat = b.offer(item);
494     if (stat < 0) {
495     b.next = null;
496 dl 1.27 if (pred == null)
497     clients = next;
498     else
499     pred.next = next;
500     }
501     else {
502 dl 1.29 if (stat > lag)
503     lag = stat;
504     else if (stat == 0) {
505     b.nextRetry = null;
506 dl 1.27 if (rtail == null)
507 dl 1.29 r = b;
508 dl 1.27 else
509     rtail.nextRetry = b;
510     rtail = b;
511     }
512 dl 1.29 else if (stat > lag)
513 dl 1.27 lag = stat;
514     pred = b;
515     }
516     b = next;
517 dl 1.1 }
518 dl 1.29 while (r != null) {
519     BufferedSubscription<T> nextRetry = r.nextRetry;
520     r.nextRetry = null;
521     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
522     r.offer(item);
523     if (stat == 0 && onDrop != null &&
524     onDrop.test(r.subscriber, item))
525     stat = r.offer(item);
526     if (stat == 0)
527     ++drops;
528     else if (stat > lag)
529     lag = stat;
530     else if (stat < 0 && clients == r)
531     clients = r.next;
532     r = nextRetry;
533     }
534 dl 1.1 }
535     }
536 dl 1.27 if (complete)
537     throw new IllegalStateException("Closed");
538     else
539     return (drops > 0) ? -drops : lag;
540 dl 1.1 }
541    
542     /**
543 dl 1.30 * Unless already closed, issues {@link
544     * Flow.Subscriber#onComplete} signals to current subscribers, and
545     * disallows subsequent attempts to publish. Upon return, this
546     * method does <em>NOT</em> guarantee that all subscribers have
547     * yet completed.
548 dl 1.1 */
549     public void close() {
550     if (!closed) {
551 dl 1.32 BufferedSubscription<T> b;
552 jsr166 1.2 synchronized (this) {
553 dl 1.1 b = clients;
554     clients = null;
555     closed = true;
556     }
557     while (b != null) {
558 dl 1.32 BufferedSubscription<T> next = b.next;
559     b.next = null;
560 dl 1.19 b.onComplete();
561 dl 1.1 b = next;
562     }
563     }
564     }
565    
566     /**
567 dl 1.30 * Unless already closed, issues {@link Flow.Subscriber#onError}
568     * signals to current subscribers with the given error, and
569 dl 1.36 * disallows subsequent attempts to publish. Future subscribers
570     * also receive the given error. Upon return, this method does
571     * <em>NOT</em> guarantee that all subscribers have yet completed.
572 dl 1.1 *
573 dl 1.30 * @param error the {@code onError} argument sent to subscribers
574 dl 1.1 * @throws NullPointerException if error is null
575     */
576     public void closeExceptionally(Throwable error) {
577     if (error == null)
578     throw new NullPointerException();
579     if (!closed) {
580 dl 1.32 BufferedSubscription<T> b;
581 jsr166 1.2 synchronized (this) {
582 dl 1.1 b = clients;
583     clients = null;
584     closed = true;
585 dl 1.37 closedException = error;
586 dl 1.1 }
587     while (b != null) {
588 dl 1.32 BufferedSubscription<T> next = b.next;
589     b.next = null;
590 dl 1.19 b.onError(error);
591 dl 1.1 b = next;
592     }
593     }
594     }
595    
596     /**
597     * Returns true if this publisher is not accepting submissions.
598     *
599     * @return true if closed
600     */
601     public boolean isClosed() {
602     return closed;
603     }
604    
605 dl 1.37 /**
606     * Returns the exception associated with {@link #closeExceptionally},
607     * or null if not closed or if closed normally.
608     *
609     * @return the exception, or null if none
610     */
611     public Throwable getClosedException() {
612     return closedException;
613     }
614    
615 dl 1.1 /**
616 jsr166 1.6 * Returns true if this publisher has any subscribers.
617 dl 1.1 *
618     * @return true if this publisher has any subscribers
619     */
620     public boolean hasSubscribers() {
621     boolean nonEmpty = false;
622     if (!closed) {
623 jsr166 1.2 synchronized (this) {
624 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
625     BufferedSubscription<T> next = b.next;
626 dl 1.19 if (b.isDisabled()) {
627 dl 1.29 b.next = null;
628 dl 1.31 b = clients = next;
629 jsr166 1.2 }
630 dl 1.1 else {
631     nonEmpty = true;
632     break;
633     }
634     }
635     }
636     }
637     return nonEmpty;
638     }
639    
640     /**
641 dl 1.21 * Returns the number of current subscribers.
642 dl 1.1 *
643 dl 1.21 * @return the number of current subscribers
644 dl 1.1 */
645 dl 1.21 public int getNumberOfSubscribers() {
646     int count = 0;
647     if (!closed) {
648     synchronized (this) {
649     BufferedSubscription<T> pred = null, next;
650     for (BufferedSubscription<T> b = clients; b != null; b = next) {
651     next = b.next;
652     if (b.isDisabled()) {
653 dl 1.29 b.next = null;
654 dl 1.21 if (pred == null)
655     clients = next;
656     else
657     pred.next = next;
658     }
659     else {
660     pred = b;
661     ++count;
662     }
663     }
664     }
665     }
666     return count;
667 dl 1.1 }
668    
669 jsr166 1.7 /**
670 dl 1.21 * Returns the Executor used for asynchronous delivery.
671 dl 1.1 *
672 dl 1.21 * @return the Executor used for asynchronous delivery
673 dl 1.1 */
674 dl 1.21 public Executor getExecutor() {
675     return executor;
676 dl 1.1 }
677    
678 jsr166 1.7 /**
679 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
680 dl 1.1 *
681 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
682 dl 1.1 */
683     public int getMaxBufferCapacity() {
684     return maxBufferCapacity;
685     }
686    
687     /**
688 dl 1.29 * Returns a list of current subscribers for monitoring and
689     * tracking purposes, not for invoking {@link Flow.Subscriber}
690     * methods on the subscribers.
691 dl 1.1 *
692     * @return list of current subscribers
693     */
694     public List<Flow.Subscriber<? super T>> getSubscribers() {
695     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
696 jsr166 1.2 synchronized (this) {
697 dl 1.1 BufferedSubscription<T> pred = null, next;
698     for (BufferedSubscription<T> b = clients; b != null; b = next) {
699     next = b.next;
700 dl 1.19 if (b.isDisabled()) {
701 dl 1.29 b.next = null;
702 dl 1.1 if (pred == null)
703     clients = next;
704     else
705     pred.next = next;
706 jsr166 1.2 }
707 dl 1.1 else
708     subs.add(b.subscriber);
709     }
710     }
711     return subs;
712     }
713 jsr166 1.2
714 dl 1.1 /**
715     * Returns true if the given Subscriber is currently subscribed.
716     *
717     * @param subscriber the subscriber
718     * @return true if currently subscribed
719 dl 1.21 * @throws NullPointerException if subscriber is null
720 dl 1.1 */
721     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
722 dl 1.21 if (subscriber == null) throw new NullPointerException();
723 jsr166 1.2 if (!closed) {
724     synchronized (this) {
725 dl 1.1 BufferedSubscription<T> pred = null, next;
726     for (BufferedSubscription<T> b = clients; b != null; b = next) {
727     next = b.next;
728 dl 1.19 if (b.isDisabled()) {
729 dl 1.29 b.next = null;
730 dl 1.1 if (pred == null)
731     clients = next;
732     else
733     pred.next = next;
734 jsr166 1.2 }
735 dl 1.21 else if (subscriber.equals(b.subscriber))
736 dl 1.1 return true;
737 dl 1.21 else
738     pred = b;
739 dl 1.1 }
740     }
741     }
742     return false;
743     }
744    
745     /**
746 dl 1.21 * Returns an estimate of the minimum number of items requested
747     * (via {@link Flow.Subscription#request}) but not yet produced,
748     * among all current subscribers.
749     *
750     * @return the estimate, or zero if no subscribers
751     */
752     public long estimateMinimumDemand() {
753     long min = Long.MAX_VALUE;
754     boolean nonEmpty = false;
755     synchronized (this) {
756     BufferedSubscription<T> pred = null, next;
757     for (BufferedSubscription<T> b = clients; b != null; b = next) {
758     int n; long d;
759     next = b.next;
760     if ((n = b.estimateLag()) < 0) {
761 dl 1.29 b.next = null;
762 dl 1.21 if (pred == null)
763     clients = next;
764     else
765     pred.next = next;
766     }
767     else {
768     if ((d = b.demand - n) < min)
769     min = d;
770     nonEmpty = true;
771 dl 1.32 pred = b;
772 dl 1.21 }
773     }
774     }
775 jsr166 1.22 return nonEmpty ? min : 0;
776 dl 1.21 }
777    
778     /**
779     * Returns an estimate of the maximum number of items produced but
780     * not yet consumed among all current subscribers.
781 dl 1.17 *
782     * @return the estimate
783 dl 1.1 */
784 dl 1.21 public int estimateMaximumLag() {
785     int max = 0;
786 dl 1.17 synchronized (this) {
787     BufferedSubscription<T> pred = null, next;
788     for (BufferedSubscription<T> b = clients; b != null; b = next) {
789 dl 1.21 int n;
790 dl 1.17 next = b.next;
791 dl 1.21 if ((n = b.estimateLag()) < 0) {
792 dl 1.29 b.next = null;
793 dl 1.17 if (pred == null)
794     clients = next;
795     else
796     pred.next = next;
797     }
798 dl 1.32 else {
799     if (n > max)
800     max = n;
801     pred = b;
802     }
803 dl 1.17 }
804     }
805 dl 1.21 return max;
806 dl 1.1 }
807    
808 dl 1.25 /**
809 dl 1.45 * Processes all published items using the given Consumer
810     * function. Returns a CompletableFuture that is completed
811     * normally when this publisher signals {@code onComplete}, or
812     * completed exceptionally upon any error, or an exception is
813     * thrown by the Consumer, or the returned CompletableFuture is
814     * cancelled, in which case no further items are processed.
815     *
816     * @param consumer the function applied to each onNext item
817     * @return a CompletableFuture that is completed normally
818     * when the publisher signals onComplete, and exceptionally
819 jsr166 1.47 * upon any error or cancellation
820 dl 1.45 * @throws NullPointerException if consumer is null
821     */
822     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
823     if (consumer == null)
824     throw new NullPointerException();
825     CompletableFuture<Void> status = new CompletableFuture<>();
826     subscribe(new ConsumerSubscriber<T>(status, consumer));
827     return status;
828     }
829    
830     /** Subscriber for method consume */
831     static final class ConsumerSubscriber<T> implements Flow.Subscriber<T> {
832     final CompletableFuture<Void> status;
833     final Consumer<? super T> consumer;
834     Flow.Subscription subscription;
835     ConsumerSubscriber(CompletableFuture<Void> status,
836     Consumer<? super T> consumer) {
837     this.status = status; this.consumer = consumer;
838     }
839     public final void onSubscribe(Flow.Subscription subscription) {
840     this.subscription = subscription;
841     if (status.isDone())
842     subscription.cancel();
843     else
844     subscription.request(Long.MAX_VALUE);
845     }
846     public final void onError(Throwable ex) {
847     status.completeExceptionally(ex);
848     }
849     public final void onComplete() {
850     status.complete(null);
851     }
852     public final void onNext(T item) {
853     if (status.isDone())
854     subscription.cancel();
855     else {
856     try {
857     consumer.accept(item);
858     } catch (Throwable ex) {
859     subscription.cancel();
860     status.completeExceptionally(ex);
861     }
862     }
863     }
864     }
865    
866     /**
867 dl 1.25 * A task for consuming buffer items and signals, created and
868     * executed whenever they become available. A task consumes as
869     * many items/signals as possible before terminating, at which
870     * point another task is created when needed. The dual Runnable
871     * and ForkJoinTask declaration saves overhead when executed by
872     * ForkJoinPools, without impacting other kinds of Executors.
873     */
874     @SuppressWarnings("serial")
875     static final class ConsumerTask<T> extends ForkJoinTask<Void>
876     implements Runnable {
877     final BufferedSubscription<T> consumer;
878     ConsumerTask(BufferedSubscription<T> consumer) {
879     this.consumer = consumer;
880     }
881     public final Void getRawResult() { return null; }
882     public final void setRawResult(Void v) {}
883     public final boolean exec() { consumer.consume(); return false; }
884     public final void run() { consumer.consume(); }
885     }
886    
887 dl 1.1 /**
888 dl 1.15 * A bounded (ring) buffer with integrated control to start a
889     * consumer task whenever items are available. The buffer
890 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
891     * internal documentation for details) specialized for the case of
892     * at most one concurrent producer and consumer, and power of two
893     * buffer sizes. This allows methods to operate without locks even
894     * while supporting resizing, blocking, task-triggering, and
895     * garbage-free buffers (nulling out elements when consumed),
896     * although supporting these does impose a bit of overhead
897     * compared to plain fixed-size ring buffers.
898 dl 1.1 *
899     * The publisher guarantees a single producer via its lock. We
900     * ensure in this class that there is at most one consumer. The
901     * request and cancel methods must be fully thread-safe but are
902     * coded to exploit the most common case in which they are only
903     * called by consumers (usually within onNext).
904     *
905 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
906     * ensure that a task is active when consumable items (and
907 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
908 dl 1.24 * there is demand (unfilled requests). This is complicated on
909 dl 1.19 * the creation side by the possibility of exceptions when trying
910     * to execute tasks. These eventually force DISABLED state, but
911 dl 1.1 * sometimes not directly. On the task side, termination (clearing
912 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
913 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
914 dl 1.1 *
915     * The ctl field also manages run state. When DISABLED, no further
916 dl 1.20 * updates are possible. Disabling may be preceded by setting
917 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
918     * case the associated Subscriber methods are invoked, possibly
919     * synchronously if there is no active consumer task (including
920 dl 1.20 * cases where execute() failed). The cancel() method is supported
921     * by treating as ERROR but suppressing onError signal.
922 dl 1.1 *
923     * Support for blocking also exploits the fact that there is only
924     * one possible waiter. ManagedBlocker-compatible control fields
925     * are placed in this class itself rather than in wait-nodes.
926     * Blocking control relies on the "waiter" field. Producers set
927     * the field before trying to block, but must then recheck (via
928     * offer) before parking. Signalling then just unparks and clears
929 dl 1.20 * waiter field. If the producer and consumer are both in the same
930 dl 1.29 * ForkJoinPool, or consumers are running in commonPool, the
931     * producer attempts to help run consumer tasks that it forked
932     * before blocking. To avoid potential cycles, only one level of
933     * helping is currently supported.
934 dl 1.1 *
935     * This class uses @Contended and heuristic field declaration
936 dl 1.31 * ordering to reduce false-sharing-based memory contention among
937     * instances of BufferedSubscription, but it does not currently
938     * attempt to avoid memory contention among buffers. This field
939     * and element packing can hurt performance especially when each
940     * publisher has only one client operating at a high rate.
941 dl 1.1 * Addressing this may require allocating substantially more space
942     * than users expect.
943     */
944 dl 1.15 @SuppressWarnings("serial")
945 dl 1.1 @sun.misc.Contended
946 dl 1.25 static final class BufferedSubscription<T>
947     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
948 dl 1.1 // Order-sensitive field declarations
949 dl 1.27 long timeout; // > 0 if timed wait
950     volatile long demand; // # unfilled requests
951     int maxCapacity; // reduced on OOME
952     int putStat; // offer result for ManagedBlocker
953 dl 1.29 int helpDepth; // nested helping depth (at most 1)
954 dl 1.27 volatile int ctl; // atomic run state flags
955     volatile int head; // next position to take
956 dl 1.34 int tail; // next position to put
957 dl 1.27 Object[] array; // buffer: null if disabled
958 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
959 dl 1.27 Executor executor; // null if disabled
960 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
961 dl 1.27 volatile Throwable pendingError; // holds until onError issued
962     volatile Thread waiter; // blocked producer thread
963     T putItem; // for offer within ManagedBlocker
964     BufferedSubscription<T> next; // used only by publisher
965     BufferedSubscription<T> nextRetry; // used only by publisher
966 dl 1.1
967     // ctl values
968 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
969 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
970 dl 1.27 static final int DISABLED = 0x04; // final state
971     static final int ERROR = 0x08; // signal onError then disable
972     static final int SUBSCRIBE = 0x10; // signal onSubscribe
973     static final int COMPLETE = 0x20; // signal onComplete when done
974 dl 1.1
975 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
976 dl 1.16
977 jsr166 1.22 /**
978 dl 1.40 * Initial buffer capacity used when maxBufferCapacity is
979     * greater. Must be a power of two.
980 dl 1.21 */
981 dl 1.40 static final int DEFAULT_INITIAL_CAP = 32;
982 dl 1.21
983 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
984 dl 1.36 Executor executor,
985     BiConsumer<? super Flow.Subscriber<? super T>,
986     ? super Throwable> onNextHandler,
987     int maxBufferCapacity) {
988 dl 1.1 this.subscriber = subscriber;
989     this.executor = executor;
990 dl 1.36 this.onNextHandler = onNextHandler;
991 dl 1.1 this.maxCapacity = maxBufferCapacity;
992 dl 1.40 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
993     maxBufferCapacity : DEFAULT_INITIAL_CAP];
994 dl 1.1 }
995    
996 dl 1.19 final boolean isDisabled() {
997 dl 1.20 return ctl == DISABLED;
998     }
999    
1000 dl 1.21 /**
1001     * Returns estimated number of buffered items, or -1 if
1002 jsr166 1.42 * disabled.
1003 dl 1.21 */
1004     final int estimateLag() {
1005 dl 1.20 int n;
1006 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1007 dl 1.19 }
1008    
1009 dl 1.1 /**
1010 dl 1.16 * Tries to add item and start consumer task if necessary.
1011 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
1012 dl 1.1 */
1013     final int offer(T item) {
1014 dl 1.34 int h = head, t = tail, cap, size, stat;
1015 dl 1.1 Object[] a = array;
1016 dl 1.40 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1017 dl 1.34 a[(cap - 1) & t] = item; // relaxed writes OK
1018     tail = t + 1;
1019 dl 1.27 stat = size;
1020 dl 1.1 }
1021 dl 1.34 else
1022     stat = growAndAdd(a, item);
1023 dl 1.29 return (stat > 0 &&
1024     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1025 dl 1.27 startOnOffer(stat) : stat;
1026     }
1027    
1028     /**
1029 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
1030 dl 1.27 */
1031     private int growAndAdd(Object[] a, T item) {
1032 dl 1.34 boolean alloc;
1033 dl 1.27 int cap, stat;
1034     if ((ctl & (ERROR | DISABLED)) != 0) {
1035     cap = 0;
1036     stat = -1;
1037 dl 1.34 alloc = false;
1038 dl 1.27 }
1039 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
1040 dl 1.27 cap = 0;
1041     stat = 1;
1042 dl 1.34 alloc = true;
1043     }
1044     else {
1045     U.fullFence(); // recheck
1046     int h = head, t = tail, size = t + 1 - h;
1047 dl 1.40 if (cap >= size) {
1048 dl 1.34 a[(cap - 1) & t] = item;
1049     tail = t + 1;
1050     stat = size;
1051     alloc = false;
1052     }
1053     else if (cap >= maxCapacity) {
1054     stat = 0; // cannot grow
1055     alloc = false;
1056     }
1057     else {
1058     stat = cap + 1;
1059     alloc = true;
1060     }
1061 dl 1.27 }
1062 dl 1.34 if (alloc) {
1063 dl 1.40 int newCap = (cap > 0) ? cap << 1 : 1;
1064 dl 1.27 if (newCap <= cap)
1065     stat = 0;
1066     else {
1067     Object[] newArray = null;
1068 dl 1.16 try {
1069 dl 1.27 newArray = new Object[newCap];
1070     } catch (Throwable ex) { // try to cope with OOME
1071     }
1072     if (newArray == null) {
1073     if (cap > 0)
1074     maxCapacity = cap; // avoid continuous failure
1075     stat = 0;
1076     }
1077     else {
1078     array = newArray;
1079     int t = tail;
1080     int newMask = newCap - 1;
1081     if (a != null && cap > 0) {
1082     int mask = cap - 1;
1083     for (int j = head; j != t; ++j) {
1084 dl 1.34 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1085     Object x = U.getObjectVolatile(a, k);
1086     if (x != null && // races with consumer
1087     U.compareAndSwapObject(a, k, x, null))
1088 dl 1.27 newArray[j & newMask] = x;
1089     }
1090     }
1091     newArray[t & newMask] = item;
1092     tail = t + 1;
1093 dl 1.16 }
1094     }
1095     }
1096 dl 1.21 return stat;
1097 dl 1.1 }
1098    
1099     /**
1100 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1101 dl 1.29 * initial offer return 0.
1102 dl 1.20 */
1103     final int submit(T item) {
1104 dl 1.34 int stat; Executor e; ForkJoinWorkerThread w;
1105     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1106     ((e = executor) instanceof ForkJoinPool)) {
1107 dl 1.39 helpDepth = 1;
1108 dl 1.29 Thread thread = Thread.currentThread();
1109 dl 1.20 if ((thread instanceof ForkJoinWorkerThread) &&
1110 dl 1.34 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1111     stat = internalHelpConsume(w.workQueue, item);
1112     else if (e == ForkJoinPool.commonPool())
1113     stat = externalHelpConsume
1114     (ForkJoinPool.commonSubmitterQueue(), item);
1115 dl 1.39 helpDepth = 0;
1116 dl 1.20 }
1117 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
1118     putItem = item;
1119     timeout = 0L;
1120     try {
1121     ForkJoinPool.managedBlock(this);
1122     } catch (InterruptedException ie) {
1123     timeout = INTERRUPTED;
1124     }
1125     stat = putStat;
1126     if (timeout < 0L)
1127     Thread.currentThread().interrupt();
1128     }
1129 dl 1.20 return stat;
1130     }
1131    
1132     /**
1133 dl 1.34 * Tries helping for FJ submitter
1134     */
1135     private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1136     int stat = 0;
1137     if (w != null) {
1138 dl 1.39 ForkJoinTask<?> t;
1139 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1140     if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1141     break;
1142 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1143 dl 1.34 }
1144     }
1145     return stat;
1146     }
1147    
1148     /**
1149     * Tries helping for non-FJ submitter
1150     */
1151     private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1152     int stat = 0;
1153     if (w != null) {
1154 dl 1.39 ForkJoinTask<?> t;
1155 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1156     if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1157     break;
1158 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1159 dl 1.34 }
1160     }
1161     return stat;
1162     }
1163    
1164     /**
1165 dl 1.27 * Timeout version; similar to submit
1166 dl 1.20 */
1167 dl 1.27 final int timedOffer(T item, long nanos) {
1168 dl 1.34 int stat; Executor e;
1169     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1170     ((e = executor) instanceof ForkJoinPool)) {
1171 dl 1.29 Thread thread = Thread.currentThread();
1172     if (((thread instanceof ForkJoinWorkerThread) &&
1173     ((ForkJoinWorkerThread)thread).getPool() == e) ||
1174     e == ForkJoinPool.commonPool()) {
1175 dl 1.39 helpDepth = 1;
1176 dl 1.29 ForkJoinTask<?> t;
1177     long deadline = System.nanoTime() + nanos;
1178     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1179     (t instanceof ConsumerTask)) {
1180     if ((stat = offer(item)) != 0 ||
1181     (nanos = deadline - System.nanoTime()) <= 0L ||
1182     !t.tryUnfork())
1183     break;
1184 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1185 dl 1.29 }
1186 dl 1.39 helpDepth = 0;
1187 dl 1.20 }
1188 dl 1.25 }
1189     if (stat == 0 && (stat = offer(item)) == 0 &&
1190     (timeout = nanos) > 0L) {
1191     putItem = item;
1192     try {
1193     ForkJoinPool.managedBlock(this);
1194     } catch (InterruptedException ie) {
1195     timeout = INTERRUPTED;
1196     }
1197     stat = putStat;
1198     if (timeout < 0L)
1199     Thread.currentThread().interrupt();
1200 dl 1.20 }
1201     return stat;
1202     }
1203    
1204     /**
1205 dl 1.27 * Tries to start consumer task after offer.
1206     * @return -1 if now disabled, else argument
1207     */
1208     private int startOnOffer(int stat) {
1209     for (;;) {
1210     Executor e; int c;
1211     if ((c = ctl) == DISABLED || (e = executor) == null) {
1212     stat = -1;
1213     break;
1214     }
1215     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1216 dl 1.29 if ((c & CONSUME) != 0 ||
1217 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1218 dl 1.29 c | CONSUME))
1219 dl 1.27 break;
1220     }
1221     else if (demand == 0L || tail == head)
1222     break;
1223     else if (U.compareAndSwapInt(this, CTL, c,
1224 dl 1.29 c | (ACTIVE | CONSUME))) {
1225 dl 1.27 try {
1226     e.execute(new ConsumerTask<T>(this));
1227     break;
1228     } catch (RuntimeException | Error ex) { // back out
1229 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1230 dl 1.27 (c & ACTIVE) != 0 &&
1231     !U.compareAndSwapInt(this, CTL, c,
1232     c & ~ACTIVE));
1233     throw ex;
1234     }
1235     }
1236     }
1237     return stat;
1238     }
1239    
1240 dl 1.34 private void signalWaiter(Thread w) {
1241     waiter = null;
1242     LockSupport.unpark(w); // release producer
1243     }
1244    
1245 dl 1.27 /**
1246 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1247 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1248     * upon error by nulling required components.
1249 dl 1.1 */
1250 dl 1.27 private void detach() {
1251     Thread w = waiter;
1252     executor = null;
1253     subscriber = null;
1254 dl 1.16 pendingError = null;
1255 dl 1.34 signalWaiter(w);
1256 dl 1.1 }
1257    
1258     /**
1259 dl 1.19 * Issues error signal, asynchronously if a task is running,
1260 jsr166 1.26 * else synchronously.
1261 dl 1.19 */
1262     final void onError(Throwable ex) {
1263     for (int c;;) {
1264 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1265 dl 1.19 break;
1266     else if ((c & ACTIVE) != 0) {
1267     pendingError = ex;
1268     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1269     break; // cause consumer task to exit
1270     }
1271     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1272     Flow.Subscriber<? super T> s = subscriber;
1273     if (s != null && ex != null) {
1274     try {
1275     s.onError(ex);
1276     } catch (Throwable ignore) {
1277     }
1278     }
1279     detach();
1280     break;
1281     }
1282     }
1283     }
1284    
1285     /**
1286 dl 1.1 * Tries to start consumer task upon a signal or request;
1287 jsr166 1.12 * disables on failure.
1288 dl 1.1 */
1289 dl 1.27 private void startOrDisable() {
1290     Executor e;
1291     if ((e = executor) != null) { // skip if already disabled
1292 dl 1.1 try {
1293 dl 1.25 e.execute(new ConsumerTask<T>(this));
1294 dl 1.27 } catch (Throwable ex) { // back out and force signal
1295 dl 1.1 for (int c;;) {
1296 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1297 dl 1.1 break;
1298     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1299 dl 1.19 onError(ex);
1300 dl 1.1 break;
1301     }
1302     }
1303     }
1304     }
1305     }
1306    
1307 dl 1.19 final void onComplete() {
1308 dl 1.1 for (int c;;) {
1309 dl 1.20 if ((c = ctl) == DISABLED)
1310 dl 1.1 break;
1311 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1312 dl 1.29 c | (ACTIVE | CONSUME | COMPLETE))) {
1313 dl 1.16 if ((c & ACTIVE) == 0)
1314     startOrDisable();
1315 dl 1.1 break;
1316     }
1317     }
1318     }
1319    
1320 dl 1.19 final void onSubscribe() {
1321 dl 1.1 for (int c;;) {
1322 dl 1.20 if ((c = ctl) == DISABLED)
1323 dl 1.1 break;
1324 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1325 dl 1.29 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1326 dl 1.19 if ((c & ACTIVE) == 0)
1327     startOrDisable();
1328 dl 1.1 break;
1329     }
1330     }
1331     }
1332    
1333 dl 1.16 /**
1334     * Causes consumer task to exit if active (without reporting
1335     * onError unless there is already a pending error), and
1336     * disables.
1337     */
1338     public void cancel() {
1339 dl 1.1 for (int c;;) {
1340 dl 1.20 if ((c = ctl) == DISABLED)
1341 dl 1.1 break;
1342 dl 1.16 else if ((c & ACTIVE) != 0) {
1343 dl 1.29 if (U.compareAndSwapInt(this, CTL, c,
1344     c | (CONSUME | ERROR)))
1345 dl 1.16 break;
1346     }
1347     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1348     detach();
1349 dl 1.1 break;
1350     }
1351     }
1352     }
1353    
1354 dl 1.16 /**
1355     * Adds to demand and possibly starts task.
1356     */
1357     public void request(long n) {
1358     if (n > 0L) {
1359     for (;;) {
1360     long prev = demand, d;
1361     if ((d = prev + n) < prev) // saturate
1362     d = Long.MAX_VALUE;
1363     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1364 dl 1.31 for (int c, h;;) {
1365 dl 1.27 if ((c = ctl) == DISABLED)
1366 dl 1.16 break;
1367 dl 1.27 else if ((c & ACTIVE) != 0) {
1368 dl 1.29 if ((c & CONSUME) != 0 ||
1369 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1370 dl 1.29 c | CONSUME))
1371 dl 1.27 break;
1372     }
1373     else if ((h = head) != tail) {
1374 dl 1.16 if (U.compareAndSwapInt(this, CTL, c,
1375 dl 1.29 c | (ACTIVE|CONSUME))) {
1376 dl 1.16 startOrDisable();
1377     break;
1378     }
1379     }
1380     else if (head == h && tail == h)
1381 dl 1.27 break; // else stale
1382 dl 1.31 if (demand == 0L)
1383     break;
1384 dl 1.16 }
1385     break;
1386     }
1387     }
1388     }
1389     else if (n < 0L)
1390 dl 1.19 onError(new IllegalArgumentException(
1391     "negative subscription request"));
1392 dl 1.16 }
1393    
1394 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1395 dl 1.1 T item = putItem;
1396 dl 1.34 if (item != null) {
1397     if ((putStat = offer(item)) == 0)
1398     return false;
1399     putItem = null;
1400 dl 1.1 }
1401     return true;
1402     }
1403    
1404 dl 1.20 public final boolean block() { // for ManagedBlocker
1405 dl 1.1 T item = putItem;
1406     if (item != null) {
1407     putItem = null;
1408     long nanos = timeout;
1409     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1410     while ((putStat = offer(item)) == 0) {
1411     if (Thread.interrupted()) {
1412 dl 1.16 timeout = INTERRUPTED;
1413 dl 1.1 if (nanos > 0L)
1414     break;
1415     }
1416     else if (nanos > 0L &&
1417     (nanos = deadline - System.nanoTime()) <= 0L)
1418     break;
1419     else if (waiter == null)
1420     waiter = Thread.currentThread();
1421     else {
1422     if (nanos > 0L)
1423     LockSupport.parkNanos(this, nanos);
1424     else
1425     LockSupport.park(this);
1426     waiter = null;
1427     }
1428     }
1429     }
1430     waiter = null;
1431     return true;
1432     }
1433    
1434 dl 1.29 /**
1435     * Consumer loop, called from ConsumerTask, or indirectly
1436 dl 1.39 * when helping during submit.
1437 dl 1.29 */
1438 dl 1.25 final void consume() {
1439 dl 1.1 Flow.Subscriber<? super T> s;
1440 dl 1.34 int h = head;
1441 dl 1.27 if ((s = subscriber) != null) { // else disabled
1442 dl 1.34 for (;;) {
1443 dl 1.27 long d = demand;
1444 dl 1.34 int c; Object[] a; int n; long i; Object x; Thread w;
1445 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1446 dl 1.34 if (!checkControl(s, c))
1447 dl 1.20 break;
1448 dl 1.19 }
1449 dl 1.34 else if ((a = array) == null || h == tail ||
1450     (n = a.length) == 0 ||
1451     (x = U.getObjectVolatile
1452     (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1453     == null) {
1454     if (!checkEmpty(s, c))
1455 dl 1.27 break;
1456     }
1457 dl 1.34 else if (d == 0L) {
1458     if (!checkDemand(c))
1459     break;
1460 dl 1.1 }
1461 dl 1.34 else if (((c & CONSUME) != 0 ||
1462     U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1463     U.compareAndSwapObject(a, i, x, null)) {
1464 dl 1.27 U.putOrderedInt(this, HEAD, ++h);
1465 dl 1.34 U.getAndAddLong(this, DEMAND, -1L);
1466     if ((w = waiter) != null)
1467     signalWaiter(w);
1468 dl 1.36 try {
1469     @SuppressWarnings("unchecked") T y = (T) x;
1470     s.onNext(y);
1471     } catch (Throwable ex) {
1472     handleOnNext(s, ex);
1473     }
1474 dl 1.34 }
1475     }
1476     }
1477     }
1478    
1479     /**
1480 jsr166 1.42 * Responds to control events in consume().
1481 dl 1.34 */
1482     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1483     boolean stat = true;
1484     if ((c & ERROR) != 0) {
1485     Throwable ex = pendingError;
1486     ctl = DISABLED; // no need for CAS
1487     if (ex != null) { // null if errorless cancel
1488     try {
1489     if (s != null)
1490     s.onError(ex);
1491     } catch (Throwable ignore) {
1492     }
1493     }
1494     }
1495     else if ((c & SUBSCRIBE) != 0) {
1496     if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1497     try {
1498     if (s != null)
1499     s.onSubscribe(this);
1500     } catch (Throwable ex) {
1501     onError(ex);
1502     }
1503     }
1504     }
1505     else {
1506     detach();
1507     stat = false;
1508     }
1509     return stat;
1510     }
1511    
1512     /**
1513 jsr166 1.42 * Responds to apparent emptiness in consume().
1514 dl 1.34 */
1515     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1516     boolean stat = true;
1517     if (head == tail) {
1518     if ((c & CONSUME) != 0)
1519     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1520     else if ((c & COMPLETE) != 0) {
1521     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1522 dl 1.1 try {
1523 dl 1.34 if (s != null)
1524     s.onComplete();
1525     } catch (Throwable ignore) {
1526 dl 1.1 }
1527     }
1528     }
1529 dl 1.34 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1530     stat = false;
1531     }
1532     return stat;
1533     }
1534    
1535     /**
1536 jsr166 1.42 * Responds to apparent zero demand in consume().
1537 dl 1.34 */
1538     private boolean checkDemand(int c) {
1539     boolean stat = true;
1540     if (demand == 0L) {
1541     if ((c & CONSUME) != 0)
1542     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1543     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1544     stat = false;
1545     }
1546     return stat;
1547     }
1548    
1549     /**
1550 jsr166 1.42 * Processes exception in Subscriber.onNext.
1551 dl 1.34 */
1552 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1553     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1554     if ((h = onNextHandler) != null) {
1555     try {
1556     h.accept(s, ex);
1557     } catch (Throwable ignore) {
1558     }
1559 dl 1.1 }
1560 dl 1.36 onError(ex);
1561 dl 1.1 }
1562    
1563 jsr166 1.9 // Unsafe mechanics
1564     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1565 dl 1.1 private static final long CTL;
1566     private static final long TAIL;
1567     private static final long HEAD;
1568     private static final long DEMAND;
1569     private static final int ABASE;
1570     private static final int ASHIFT;
1571    
1572     static {
1573     try {
1574     CTL = U.objectFieldOffset
1575 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1576 dl 1.1 TAIL = U.objectFieldOffset
1577 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1578 dl 1.1 HEAD = U.objectFieldOffset
1579 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1580 dl 1.1 DEMAND = U.objectFieldOffset
1581 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1582    
1583     ABASE = U.arrayBaseOffset(Object[].class);
1584     int scale = U.arrayIndexScale(Object[].class);
1585 dl 1.1 if ((scale & (scale - 1)) != 0)
1586     throw new Error("data type scale not a power of two");
1587     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1588 jsr166 1.9 } catch (ReflectiveOperationException e) {
1589 dl 1.1 throw new Error(e);
1590     }
1591 jsr166 1.33
1592     // Reduce the risk of rare disastrous classloading in first call to
1593     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1594     Class<?> ensureLoaded = LockSupport.class;
1595 dl 1.1 }
1596     }
1597     }