ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.52
Committed: Sat Sep 19 20:44:42 2015 UTC (8 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.51: +2 -2 lines
Log Message:
ALL_CAPS for static finals

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12 dl 1.36 import java.util.function.BiConsumer;
13 dl 1.1 import java.util.function.BiPredicate;
14 dl 1.45 import java.util.function.Consumer;
15 dl 1.1
16     /**
17 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
18     * (non-null) items to current subscribers until it is closed. Each
19     * current subscriber receives newly submitted items in the same order
20     * unless drops or exceptions are encountered. Using a
21 dl 1.51 * SubmissionPublisher allows item generators to act as compliant <a
22     * href="http://www.reactive-streams.org/"> reactive-streams</a>
23     * Publishers relying on drop handling and/or blocking for flow
24     * control.
25 dl 1.1 *
26 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
27 dl 1.1 * constructor for delivery to subscribers. The best choice of
28     * Executor depends on expected usage. If the generator(s) of
29     * submitted items run in separate threads, and the number of
30     * subscribers can be estimated, consider using a {@link
31 dl 1.46 * Executors#newFixedThreadPool}. Otherwise consider using the
32     * default, normally the {@link ForkJoinPool#commonPool}.
33 dl 1.1 *
34     * <p>Buffering allows producers and consumers to transiently operate
35     * at different rates. Each subscriber uses an independent buffer.
36 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
37 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
38 dl 1.21 * nearest power of two and/or bounded by the largest value supported
39     * by this implementation.) Invocations of {@link
40 jsr166 1.49 * Flow.Subscription#request(long) request} do not directly result in
41     * buffer expansion, but risk saturation if unfilled requests exceed
42     * the maximum capacity. The default value of {@link
43     * Flow#defaultBufferSize()} may provide a useful starting point for
44 dl 1.25 * choosing a capacity based on expected rates, resources, and usages.
45 dl 1.18 *
46 dl 1.1 * <p>Publication methods support different policies about what to do
47 jsr166 1.49 * when buffers are saturated. Method {@link #submit(Object) submit}
48     * blocks until resources are available. This is simplest, but least
49     * responsive. The {@code offer} methods may drop items (either
50     * immediately or with bounded timeout), but provide an opportunity to
51     * interpose a handler and then retry.
52 dl 1.1 *
53     * <p>If any Subscriber method throws an exception, its subscription
54 dl 1.36 * is cancelled. If a handler is supplied as a constructor argument,
55     * it is invoked before cancellation upon an exception in method
56 jsr166 1.41 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
57     * {@link Flow.Subscriber#onSubscribe onSubscribe},
58 jsr166 1.49 * {@link Flow.Subscriber#onError(Throwable) onError} and
59     * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
60 jsr166 1.41 * handled before cancellation. If the supplied Executor throws
61     * {@link RejectedExecutionException} (or any other RuntimeException
62     * or Error) when attempting to execute a task, or a drop handler
63     * throws an exception when processing a dropped item, then the
64     * exception is rethrown. In these cases, not all subscribers will
65     * have been issued the published item. It is usually good practice to
66     * {@link #closeExceptionally closeExceptionally} in these cases.
67 dl 1.1 *
68 jsr166 1.49 * <p>Method {@link #consume(Consumer)} simplifies support for a
69     * common case in which the only action of a subscriber is to request
70     * and process all items using a supplied function.
71 dl 1.45 *
72 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
73     * that generate items, and use the methods in this class to publish
74     * them. For example here is a class that periodically publishes the
75     * items generated from a supplier. (In practice you might add methods
76 dl 1.35 * to independently start and stop generation, to share Executors
77 dl 1.30 * among publishers, and so on, or use a SubmissionPublisher as a
78     * component rather than a superclass.)
79 dl 1.15 *
80     * <pre> {@code
81     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
82     * final ScheduledFuture<?> periodicTask;
83     * final ScheduledExecutorService scheduler;
84 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
85     * Supplier<? extends T> supplier,
86 dl 1.15 * long period, TimeUnit unit) {
87 dl 1.21 * super(executor, maxBufferCapacity);
88 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
89     * periodicTask = scheduler.scheduleAtFixedRate(
90     * () -> submit(supplier.get()), 0, period, unit);
91     * }
92     * public void close() {
93     * periodicTask.cancel(false);
94     * scheduler.shutdown();
95     * super.close();
96     * }
97     * }}</pre>
98     *
99 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
100     * It uses single-step requests to its publisher for simplicity of
101     * illustration. A more adaptive version could monitor flow using the
102 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
103 dl 1.21 * methods.
104 dl 1.16 *
105     * <pre> {@code
106     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
107     * implements Flow.Processor<S,T> {
108     * final Function<? super S, ? extends T> function;
109     * Flow.Subscription subscription;
110 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
111 dl 1.16 * Function<? super S, ? extends T> function) {
112 dl 1.21 * super(executor, maxBufferCapacity);
113 dl 1.16 * this.function = function;
114     * }
115     * public void onSubscribe(Flow.Subscription subscription) {
116     * (this.subscription = subscription).request(1);
117     * }
118     * public void onNext(S item) {
119     * subscription.request(1);
120     * submit(function.apply(item));
121     * }
122     * public void onError(Throwable ex) { closeExceptionally(ex); }
123     * public void onComplete() { close(); }
124     * }}</pre>
125     *
126 dl 1.1 * @param <T> the published item type
127     * @author Doug Lea
128     * @since 1.9
129     */
130     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
131     AutoCloseable {
132     /*
133     * Most mechanics are handled by BufferedSubscription. This class
134 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
135     * built-in synchronization locks across public methods. (Using
136     * built-in locks works well in the most typical case in which
137     * only one thread submits items).
138 dl 1.1 */
139    
140 dl 1.25 /** The largest possible power of two array size. */
141 jsr166 1.43 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
142 dl 1.1
143 dl 1.40 /** Round capacity to power of 2, at most limit. */
144     static final int roundCapacity(int cap) {
145 dl 1.1 int n = cap - 1;
146     n |= n >>> 1;
147     n |= n >>> 2;
148     n |= n >>> 4;
149     n |= n >>> 8;
150     n |= n >>> 16;
151 dl 1.40 return (n <= 0) ? 1 : // at least 1
152 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
153 dl 1.1 }
154    
155 dl 1.48 // default Executor setup; nearly the same as CompletableFuture
156 dl 1.46
157     /**
158     * Default executor -- ForkJoinPool.commonPool() unless it cannot
159     * support parallelism.
160     */
161 jsr166 1.52 private static final Executor ASYNC_POOL =
162 dl 1.46 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
163     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
164    
165     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
166     static final class ThreadPerTaskExecutor implements Executor {
167     public void execute(Runnable r) { new Thread(r).start(); }
168     }
169    
170 dl 1.1 /**
171     * Clients (BufferedSubscriptions) are maintained in a linked list
172     * (via their "next" fields). This works well for publish loops.
173     * It requires O(n) traversal to check for duplicate subscribers,
174     * but we expect that subscribing is much less common than
175 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
176 dl 1.25 * when BufferedSubscription methods return negative values
177 dl 1.29 * signifying that they have been disabled. To reduce
178     * head-of-line blocking, submit and offer methods first call
179     * BufferedSubscription.offer on each subscriber, and place
180     * saturated ones in retries list (using nextRetry field), and
181     * retry, possibly blocking or dropping.
182 dl 1.1 */
183     BufferedSubscription<T> clients;
184    
185 dl 1.20 /** Run status, updated only within locks */
186     volatile boolean closed;
187 dl 1.36 /** If non-null, the exception in closeExceptionally */
188 dl 1.37 volatile Throwable closedException;
189 dl 1.20
190 dl 1.1 // Parameters for constructing BufferedSubscriptions
191     final Executor executor;
192 dl 1.36 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
193 dl 1.1 final int maxBufferCapacity;
194    
195 dl 1.20 /**
196 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
197 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
198     * for each subscriber, and, if non-null, the given handler invoked
199     * when any Subscriber throws an exception in method {@link
200     * Flow.Subscriber#onNext(Object) onNext}.
201 dl 1.1 *
202     * @param executor the executor to use for async delivery,
203     * supporting creation of at least one independent thread
204     * @param maxBufferCapacity the maximum capacity for each
205 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
206     * the nearest power of two and/or bounded by the largest value
207     * supported by this implementation; method {@link #getMaxBufferCapacity}
208     * returns the actual value)
209 dl 1.36 * @param handler if non-null, procedure to invoke upon exception
210 jsr166 1.49 * thrown in method {@code onNext}
211 dl 1.1 * @throws NullPointerException if executor is null
212 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
213     * positive
214 dl 1.1 */
215 dl 1.36 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
216     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
217 dl 1.21 if (executor == null)
218     throw new NullPointerException();
219     if (maxBufferCapacity <= 0)
220     throw new IllegalArgumentException("capacity must be positive");
221 dl 1.18 this.executor = executor;
222 dl 1.36 this.onNextHandler = handler;
223 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
224 dl 1.18 }
225    
226 dl 1.1 /**
227 dl 1.36 * Creates a new SubmissionPublisher using the given Executor for
228 jsr166 1.49 * async delivery to subscribers, with the given maximum buffer size
229     * for each subscriber, and no handler for Subscriber exceptions in
230     * method {@link Flow.Subscriber#onNext(Object) onNext}.
231 dl 1.36 *
232     * @param executor the executor to use for async delivery,
233     * supporting creation of at least one independent thread
234     * @param maxBufferCapacity the maximum capacity for each
235     * subscriber's buffer (the enforced capacity may be rounded up to
236     * the nearest power of two and/or bounded by the largest value
237     * supported by this implementation; method {@link #getMaxBufferCapacity}
238     * returns the actual value)
239     * @throws NullPointerException if executor is null
240     * @throws IllegalArgumentException if maxBufferCapacity not
241     * positive
242     */
243     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
244     this(executor, maxBufferCapacity, null);
245     }
246    
247     /**
248 dl 1.25 * Creates a new SubmissionPublisher using the {@link
249 dl 1.46 * ForkJoinPool#commonPool()} for async delivery to subscribers
250 jsr166 1.49 * (unless it does not support a parallelism level of at least two,
251     * in which case, a new Thread is created to run each task), with
252     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
253     * handler for Subscriber exceptions in method {@link
254     * Flow.Subscriber#onNext(Object) onNext}.
255 dl 1.25 */
256     public SubmissionPublisher() {
257 jsr166 1.52 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
258 dl 1.25 }
259    
260     /**
261 jsr166 1.49 * Adds the given Subscriber unless already subscribed. If already
262     * subscribed, the Subscriber's {@link
263     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
264     * the existing subscription with an {@link IllegalStateException}.
265     * Otherwise, upon success, the Subscriber's {@link
266     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
267     * asynchronously with a new {@link Flow.Subscription}. If {@link
268     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
269     * subscription is cancelled. Otherwise, if this SubmissionPublisher
270     * was closed exceptionally, then the subscriber's {@link
271     * Flow.Subscriber#onError onError} method is invoked with the
272     * corresponding exception, or if closed without exception, the
273     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
274     * method is invoked. Subscribers may enable receiving items by
275     * invoking the {@link Flow.Subscription#request(long) request}
276     * method of the new Subscription, and may unsubscribe by invoking
277     * its {@link Flow.Subscription#cancel() cancel} method.
278 dl 1.18 *
279     * @param subscriber the subscriber
280     * @throws NullPointerException if subscriber is null
281     */
282     public void subscribe(Flow.Subscriber<? super T> subscriber) {
283     if (subscriber == null) throw new NullPointerException();
284 dl 1.20 BufferedSubscription<T> subscription =
285 dl 1.36 new BufferedSubscription<T>(subscriber, executor,
286     onNextHandler, maxBufferCapacity);
287 dl 1.19 synchronized (this) {
288 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
289     if (b == null) {
290 dl 1.36 Throwable ex;
291 dl 1.29 subscription.onSubscribe();
292 dl 1.37 if ((ex = closedException) != null)
293 dl 1.36 subscription.onError(ex);
294     else if (closed)
295 dl 1.29 subscription.onComplete();
296     else if (pred == null)
297     clients = subscription;
298 dl 1.27 else
299 dl 1.29 pred.next = subscription;
300     break;
301 dl 1.27 }
302 dl 1.29 BufferedSubscription<T> next = b.next;
303     if (b.isDisabled()) { // remove
304     b.next = null; // detach
305 dl 1.19 if (pred == null)
306 dl 1.29 clients = next;
307 dl 1.19 else
308 dl 1.29 pred.next = next;
309     }
310     else if (subscriber.equals(b.subscriber)) {
311     b.onError(new IllegalStateException("Duplicate subscribe"));
312     break;
313 dl 1.19 }
314 dl 1.29 else
315     pred = b;
316     b = next;
317 dl 1.19 }
318     }
319 dl 1.18 }
320    
321     /**
322 dl 1.16 * Publishes the given item to each current subscriber by
323 jsr166 1.49 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
324     * onNext} method, blocking uninterruptibly while resources for any
325 dl 1.30 * subscriber are unavailable. This method returns an estimate of
326 jsr166 1.49 * the maximum lag (number of items submitted but not yet consumed)
327     * among all current subscribers. This value is at least one
328     * (accounting for this submitted item) if there are any
329 dl 1.30 * subscribers, else zero.
330 dl 1.16 *
331     * <p>If the Executor for this publisher throws a
332     * RejectedExecutionException (or any other RuntimeException or
333     * Error) when attempting to asynchronously notify subscribers,
334 dl 1.31 * then this exception is rethrown, in which case not all
335     * subscribers will have been issued this item.
336 dl 1.16 *
337     * @param item the (non-null) item to publish
338 dl 1.21 * @return the estimated maximum lag among subscribers
339 dl 1.16 * @throws IllegalStateException if closed
340     * @throws NullPointerException if item is null
341     * @throws RejectedExecutionException if thrown by Executor
342     */
343 dl 1.21 public int submit(T item) {
344 dl 1.16 if (item == null) throw new NullPointerException();
345 dl 1.21 int lag = 0;
346 dl 1.27 boolean complete;
347 dl 1.16 synchronized (this) {
348 dl 1.27 complete = closed;
349 dl 1.29 BufferedSubscription<T> b = clients;
350 dl 1.27 if (!complete) {
351 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
352 dl 1.27 while (b != null) {
353     BufferedSubscription<T> next = b.next;
354 dl 1.29 int stat = b.offer(item);
355     if (stat < 0) { // disabled
356     b.next = null;
357 dl 1.27 if (pred == null)
358     clients = next;
359 dl 1.24 else
360 dl 1.27 pred.next = next;
361 dl 1.24 }
362 dl 1.27 else {
363 dl 1.29 if (stat > lag)
364     lag = stat;
365     else if (stat == 0) { // place on retry list
366     b.nextRetry = null;
367 dl 1.27 if (rtail == null)
368 dl 1.29 r = b;
369 dl 1.27 else
370     rtail.nextRetry = b;
371     rtail = b;
372     }
373     pred = b;
374     }
375     b = next;
376 dl 1.21 }
377 dl 1.29 while (r != null) {
378     BufferedSubscription<T> nextRetry = r.nextRetry;
379     r.nextRetry = null;
380     int stat = r.submit(item);
381     if (stat > lag)
382     lag = stat;
383     else if (stat < 0 && clients == r)
384     clients = r.next; // postpone internal unsubscribes
385     r = nextRetry;
386     }
387 dl 1.16 }
388     }
389 dl 1.27 if (complete)
390     throw new IllegalStateException("Closed");
391     else
392     return lag;
393 dl 1.16 }
394    
395     /**
396 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
397     * by asynchronously invoking its {@link
398     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
399     * dropped by one or more subscribers if resource limits are
400     * exceeded, in which case the given handler (if non-null) is
401     * invoked, and if it returns true, retried once. Other calls to
402     * methods in this class by other threads are blocked while the
403     * handler is invoked. Unless recovery is assured, options are
404     * usually limited to logging the error and/or issuing an {@link
405     * Flow.Subscriber#onError(Throwable) onError} signal to the
406     * subscriber.
407 dl 1.1 *
408 dl 1.21 * <p>This method returns a status indicator: If negative, it
409     * represents the (negative) number of drops (failed attempts to
410     * issue the item to a subscriber). Otherwise it is an estimate of
411     * the maximum lag (number of items submitted but not yet
412     * consumed) among all current subscribers. This value is at least
413     * one (accounting for this submitted item) if there are any
414     * subscribers, else zero.
415 jsr166 1.23 *
416 dl 1.1 * <p>If the Executor for this publisher throws a
417     * RejectedExecutionException (or any other RuntimeException or
418     * Error) when attempting to asynchronously notify subscribers, or
419     * the drop handler throws an exception when processing a dropped
420     * item, then this exception is rethrown.
421     *
422     * @param item the (non-null) item to publish
423     * @param onDrop if non-null, the handler invoked upon a drop to a
424     * subscriber, with arguments of the subscriber and item; if it
425     * returns true, an offer is re-attempted (once)
426 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
427     * an estimate of maximum lag
428 dl 1.1 * @throws IllegalStateException if closed
429     * @throws NullPointerException if item is null
430     * @throws RejectedExecutionException if thrown by Executor
431     */
432     public int offer(T item,
433     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
434 dl 1.29 return doOffer(0L, item, onDrop);
435 dl 1.1 }
436    
437     /**
438 jsr166 1.49 * Publishes the given item, if possible, to each current subscriber
439     * by asynchronously invoking its {@link
440     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
441     * resources for any subscription are unavailable, up to the
442     * specified timeout or until the caller thread is interrupted, at
443     * which point the given handler (if non-null) is invoked, and if it
444     * returns true, retried once. (The drop handler may distinguish
445     * timeouts from interrupts by checking whether the current thread
446     * is interrupted.) Other calls to methods in this class by other
447 dl 1.30 * threads are blocked while the handler is invoked. Unless
448     * recovery is assured, options are usually limited to logging the
449 jsr166 1.49 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
450     * onError} signal to the subscriber.
451 dl 1.1 *
452 dl 1.21 * <p>This method returns a status indicator: If negative, it
453     * represents the (negative) number of drops (failed attempts to
454     * issue the item to a subscriber). Otherwise it is an estimate of
455     * the maximum lag (number of items submitted but not yet
456     * consumed) among all current subscribers. This value is at least
457     * one (accounting for this submitted item) if there are any
458     * subscribers, else zero.
459     *
460 dl 1.1 * <p>If the Executor for this publisher throws a
461     * RejectedExecutionException (or any other RuntimeException or
462     * Error) when attempting to asynchronously notify subscribers, or
463     * the drop handler throws an exception when processing a dropped
464     * item, then this exception is rethrown.
465     *
466     * @param item the (non-null) item to publish
467     * @param timeout how long to wait for resources for any subscriber
468     * before giving up, in units of {@code unit}
469     * @param unit a {@code TimeUnit} determining how to interpret the
470     * {@code timeout} parameter
471     * @param onDrop if non-null, the handler invoked upon a drop to a
472     * subscriber, with arguments of the subscriber and item; if it
473     * returns true, an offer is re-attempted (once)
474 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
475     * an estimate of maximum lag
476 dl 1.1 * @throws IllegalStateException if closed
477     * @throws NullPointerException if item is null
478     * @throws RejectedExecutionException if thrown by Executor
479     */
480     public int offer(T item, long timeout, TimeUnit unit,
481     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
482 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
483     }
484    
485     /** Common implementation for both forms of offer */
486     final int doOffer(long nanos, T item,
487     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
488 dl 1.1 if (item == null) throw new NullPointerException();
489 dl 1.27 int lag = 0, drops = 0;
490     boolean complete;
491 jsr166 1.2 synchronized (this) {
492 dl 1.27 complete = closed;
493 dl 1.29 BufferedSubscription<T> b = clients;
494 dl 1.27 if (!complete) {
495 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
496 dl 1.27 while (b != null) {
497     BufferedSubscription<T> next = b.next;
498 dl 1.29 int stat = b.offer(item);
499     if (stat < 0) {
500     b.next = null;
501 dl 1.27 if (pred == null)
502     clients = next;
503     else
504     pred.next = next;
505     }
506     else {
507 dl 1.29 if (stat > lag)
508     lag = stat;
509     else if (stat == 0) {
510     b.nextRetry = null;
511 dl 1.27 if (rtail == null)
512 dl 1.29 r = b;
513 dl 1.27 else
514     rtail.nextRetry = b;
515     rtail = b;
516     }
517 dl 1.29 else if (stat > lag)
518 dl 1.27 lag = stat;
519     pred = b;
520     }
521     b = next;
522 dl 1.1 }
523 dl 1.29 while (r != null) {
524     BufferedSubscription<T> nextRetry = r.nextRetry;
525     r.nextRetry = null;
526     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
527     r.offer(item);
528     if (stat == 0 && onDrop != null &&
529     onDrop.test(r.subscriber, item))
530     stat = r.offer(item);
531     if (stat == 0)
532     ++drops;
533     else if (stat > lag)
534     lag = stat;
535     else if (stat < 0 && clients == r)
536     clients = r.next;
537     r = nextRetry;
538     }
539 dl 1.1 }
540     }
541 dl 1.27 if (complete)
542     throw new IllegalStateException("Closed");
543     else
544     return (drops > 0) ? -drops : lag;
545 dl 1.1 }
546    
547     /**
548 dl 1.30 * Unless already closed, issues {@link
549 jsr166 1.49 * Flow.Subscriber#onComplete() onComplete} signals to current
550     * subscribers, and disallows subsequent attempts to publish.
551     * Upon return, this method does <em>NOT</em> guarantee that all
552     * subscribers have yet completed.
553 dl 1.1 */
554     public void close() {
555     if (!closed) {
556 dl 1.32 BufferedSubscription<T> b;
557 jsr166 1.2 synchronized (this) {
558 dl 1.1 b = clients;
559     clients = null;
560     closed = true;
561     }
562     while (b != null) {
563 dl 1.32 BufferedSubscription<T> next = b.next;
564     b.next = null;
565 dl 1.19 b.onComplete();
566 dl 1.1 b = next;
567     }
568     }
569     }
570    
571     /**
572 jsr166 1.49 * Unless already closed, issues {@link
573     * Flow.Subscriber#onError(Throwable) onError} signals to current
574     * subscribers with the given error, and disallows subsequent
575     * attempts to publish. Future subscribers also receive the given
576     * error. Upon return, this method does <em>NOT</em> guarantee
577     * that all subscribers have yet completed.
578 dl 1.1 *
579 dl 1.30 * @param error the {@code onError} argument sent to subscribers
580 dl 1.1 * @throws NullPointerException if error is null
581     */
582     public void closeExceptionally(Throwable error) {
583     if (error == null)
584     throw new NullPointerException();
585     if (!closed) {
586 dl 1.32 BufferedSubscription<T> b;
587 jsr166 1.2 synchronized (this) {
588 dl 1.1 b = clients;
589     clients = null;
590     closed = true;
591 dl 1.37 closedException = error;
592 dl 1.1 }
593     while (b != null) {
594 dl 1.32 BufferedSubscription<T> next = b.next;
595     b.next = null;
596 dl 1.19 b.onError(error);
597 dl 1.1 b = next;
598     }
599     }
600     }
601    
602     /**
603     * Returns true if this publisher is not accepting submissions.
604     *
605     * @return true if closed
606     */
607     public boolean isClosed() {
608     return closed;
609     }
610    
611 dl 1.37 /**
612 jsr166 1.49 * Returns the exception associated with {@link
613     * #closeExceptionally(Throwable) closeExceptionally}, or null if
614     * not closed or if closed normally.
615 dl 1.37 *
616     * @return the exception, or null if none
617     */
618     public Throwable getClosedException() {
619     return closedException;
620     }
621    
622 dl 1.1 /**
623 jsr166 1.6 * Returns true if this publisher has any subscribers.
624 dl 1.1 *
625     * @return true if this publisher has any subscribers
626     */
627     public boolean hasSubscribers() {
628     boolean nonEmpty = false;
629     if (!closed) {
630 jsr166 1.2 synchronized (this) {
631 dl 1.31 for (BufferedSubscription<T> b = clients; b != null;) {
632     BufferedSubscription<T> next = b.next;
633 dl 1.19 if (b.isDisabled()) {
634 dl 1.29 b.next = null;
635 dl 1.31 b = clients = next;
636 jsr166 1.2 }
637 dl 1.1 else {
638     nonEmpty = true;
639     break;
640     }
641     }
642     }
643     }
644     return nonEmpty;
645     }
646    
647     /**
648 dl 1.21 * Returns the number of current subscribers.
649 dl 1.1 *
650 dl 1.21 * @return the number of current subscribers
651 dl 1.1 */
652 dl 1.21 public int getNumberOfSubscribers() {
653     int count = 0;
654     if (!closed) {
655     synchronized (this) {
656     BufferedSubscription<T> pred = null, next;
657     for (BufferedSubscription<T> b = clients; b != null; b = next) {
658     next = b.next;
659     if (b.isDisabled()) {
660 dl 1.29 b.next = null;
661 dl 1.21 if (pred == null)
662     clients = next;
663     else
664     pred.next = next;
665     }
666     else {
667     pred = b;
668     ++count;
669     }
670     }
671     }
672     }
673     return count;
674 dl 1.1 }
675    
676 jsr166 1.7 /**
677 dl 1.21 * Returns the Executor used for asynchronous delivery.
678 dl 1.1 *
679 dl 1.21 * @return the Executor used for asynchronous delivery
680 dl 1.1 */
681 dl 1.21 public Executor getExecutor() {
682     return executor;
683 dl 1.1 }
684    
685 jsr166 1.7 /**
686 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
687 dl 1.1 *
688 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
689 dl 1.1 */
690     public int getMaxBufferCapacity() {
691     return maxBufferCapacity;
692     }
693    
694     /**
695 dl 1.29 * Returns a list of current subscribers for monitoring and
696     * tracking purposes, not for invoking {@link Flow.Subscriber}
697     * methods on the subscribers.
698 dl 1.1 *
699     * @return list of current subscribers
700     */
701     public List<Flow.Subscriber<? super T>> getSubscribers() {
702     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
703 jsr166 1.2 synchronized (this) {
704 dl 1.1 BufferedSubscription<T> pred = null, next;
705     for (BufferedSubscription<T> b = clients; b != null; b = next) {
706     next = b.next;
707 dl 1.19 if (b.isDisabled()) {
708 dl 1.29 b.next = null;
709 dl 1.1 if (pred == null)
710     clients = next;
711     else
712     pred.next = next;
713 jsr166 1.2 }
714 dl 1.1 else
715     subs.add(b.subscriber);
716     }
717     }
718     return subs;
719     }
720 jsr166 1.2
721 dl 1.1 /**
722     * Returns true if the given Subscriber is currently subscribed.
723     *
724     * @param subscriber the subscriber
725     * @return true if currently subscribed
726 dl 1.21 * @throws NullPointerException if subscriber is null
727 dl 1.1 */
728     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
729 dl 1.21 if (subscriber == null) throw new NullPointerException();
730 jsr166 1.2 if (!closed) {
731     synchronized (this) {
732 dl 1.1 BufferedSubscription<T> pred = null, next;
733     for (BufferedSubscription<T> b = clients; b != null; b = next) {
734     next = b.next;
735 dl 1.19 if (b.isDisabled()) {
736 dl 1.29 b.next = null;
737 dl 1.1 if (pred == null)
738     clients = next;
739     else
740     pred.next = next;
741 jsr166 1.2 }
742 dl 1.21 else if (subscriber.equals(b.subscriber))
743 dl 1.1 return true;
744 dl 1.21 else
745     pred = b;
746 dl 1.1 }
747     }
748     }
749     return false;
750     }
751    
752     /**
753 dl 1.21 * Returns an estimate of the minimum number of items requested
754 jsr166 1.49 * (via {@link Flow.Subscription#request(long) request}) but not
755     * yet produced, among all current subscribers.
756 dl 1.21 *
757     * @return the estimate, or zero if no subscribers
758     */
759     public long estimateMinimumDemand() {
760     long min = Long.MAX_VALUE;
761     boolean nonEmpty = false;
762     synchronized (this) {
763     BufferedSubscription<T> pred = null, next;
764     for (BufferedSubscription<T> b = clients; b != null; b = next) {
765     int n; long d;
766     next = b.next;
767     if ((n = b.estimateLag()) < 0) {
768 dl 1.29 b.next = null;
769 dl 1.21 if (pred == null)
770     clients = next;
771     else
772     pred.next = next;
773     }
774     else {
775     if ((d = b.demand - n) < min)
776     min = d;
777     nonEmpty = true;
778 dl 1.32 pred = b;
779 dl 1.21 }
780     }
781     }
782 jsr166 1.22 return nonEmpty ? min : 0;
783 dl 1.21 }
784    
785     /**
786     * Returns an estimate of the maximum number of items produced but
787     * not yet consumed among all current subscribers.
788 dl 1.17 *
789     * @return the estimate
790 dl 1.1 */
791 dl 1.21 public int estimateMaximumLag() {
792     int max = 0;
793 dl 1.17 synchronized (this) {
794     BufferedSubscription<T> pred = null, next;
795     for (BufferedSubscription<T> b = clients; b != null; b = next) {
796 dl 1.21 int n;
797 dl 1.17 next = b.next;
798 dl 1.21 if ((n = b.estimateLag()) < 0) {
799 dl 1.29 b.next = null;
800 dl 1.17 if (pred == null)
801     clients = next;
802     else
803     pred.next = next;
804     }
805 dl 1.32 else {
806     if (n > max)
807     max = n;
808     pred = b;
809     }
810 dl 1.17 }
811     }
812 dl 1.21 return max;
813 dl 1.1 }
814    
815 dl 1.25 /**
816 jsr166 1.49 * Processes all published items using the given Consumer function.
817     * Returns a CompletableFuture that is completed normally when this
818     * publisher signals {@link Flow.Subscriber#onComplete()
819     * onComplete}, or completed exceptionally upon any error, or an
820     * exception is thrown by the Consumer, or the returned
821     * CompletableFuture is cancelled, in which case no further items
822     * are processed.
823 dl 1.45 *
824     * @param consumer the function applied to each onNext item
825     * @return a CompletableFuture that is completed normally
826     * when the publisher signals onComplete, and exceptionally
827 jsr166 1.47 * upon any error or cancellation
828 dl 1.45 * @throws NullPointerException if consumer is null
829     */
830     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
831     if (consumer == null)
832     throw new NullPointerException();
833     CompletableFuture<Void> status = new CompletableFuture<>();
834     subscribe(new ConsumerSubscriber<T>(status, consumer));
835     return status;
836     }
837    
838     /** Subscriber for method consume */
839     static final class ConsumerSubscriber<T> implements Flow.Subscriber<T> {
840     final CompletableFuture<Void> status;
841     final Consumer<? super T> consumer;
842     Flow.Subscription subscription;
843     ConsumerSubscriber(CompletableFuture<Void> status,
844     Consumer<? super T> consumer) {
845     this.status = status; this.consumer = consumer;
846     }
847     public final void onSubscribe(Flow.Subscription subscription) {
848     this.subscription = subscription;
849     if (status.isDone())
850     subscription.cancel();
851     else
852     subscription.request(Long.MAX_VALUE);
853     }
854     public final void onError(Throwable ex) {
855     status.completeExceptionally(ex);
856     }
857     public final void onComplete() {
858     status.complete(null);
859     }
860     public final void onNext(T item) {
861     if (status.isDone())
862     subscription.cancel();
863     else {
864     try {
865     consumer.accept(item);
866     } catch (Throwable ex) {
867     subscription.cancel();
868     status.completeExceptionally(ex);
869     }
870     }
871     }
872     }
873    
874     /**
875 dl 1.25 * A task for consuming buffer items and signals, created and
876     * executed whenever they become available. A task consumes as
877     * many items/signals as possible before terminating, at which
878     * point another task is created when needed. The dual Runnable
879     * and ForkJoinTask declaration saves overhead when executed by
880     * ForkJoinPools, without impacting other kinds of Executors.
881     */
882     @SuppressWarnings("serial")
883     static final class ConsumerTask<T> extends ForkJoinTask<Void>
884     implements Runnable {
885     final BufferedSubscription<T> consumer;
886     ConsumerTask(BufferedSubscription<T> consumer) {
887     this.consumer = consumer;
888     }
889     public final Void getRawResult() { return null; }
890     public final void setRawResult(Void v) {}
891     public final boolean exec() { consumer.consume(); return false; }
892     public final void run() { consumer.consume(); }
893     }
894    
895 dl 1.1 /**
896 dl 1.15 * A bounded (ring) buffer with integrated control to start a
897     * consumer task whenever items are available. The buffer
898 dl 1.34 * algorithm is similar to one used inside ForkJoinPool (see its
899     * internal documentation for details) specialized for the case of
900     * at most one concurrent producer and consumer, and power of two
901     * buffer sizes. This allows methods to operate without locks even
902     * while supporting resizing, blocking, task-triggering, and
903     * garbage-free buffers (nulling out elements when consumed),
904     * although supporting these does impose a bit of overhead
905     * compared to plain fixed-size ring buffers.
906 dl 1.1 *
907     * The publisher guarantees a single producer via its lock. We
908     * ensure in this class that there is at most one consumer. The
909     * request and cancel methods must be fully thread-safe but are
910     * coded to exploit the most common case in which they are only
911     * called by consumers (usually within onNext).
912     *
913 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
914     * ensure that a task is active when consumable items (and
915 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
916 dl 1.24 * there is demand (unfilled requests). This is complicated on
917 dl 1.19 * the creation side by the possibility of exceptions when trying
918     * to execute tasks. These eventually force DISABLED state, but
919 dl 1.1 * sometimes not directly. On the task side, termination (clearing
920 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
921 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
922 dl 1.1 *
923     * The ctl field also manages run state. When DISABLED, no further
924 dl 1.20 * updates are possible. Disabling may be preceded by setting
925 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
926     * case the associated Subscriber methods are invoked, possibly
927     * synchronously if there is no active consumer task (including
928 dl 1.20 * cases where execute() failed). The cancel() method is supported
929     * by treating as ERROR but suppressing onError signal.
930 dl 1.1 *
931     * Support for blocking also exploits the fact that there is only
932     * one possible waiter. ManagedBlocker-compatible control fields
933     * are placed in this class itself rather than in wait-nodes.
934     * Blocking control relies on the "waiter" field. Producers set
935     * the field before trying to block, but must then recheck (via
936     * offer) before parking. Signalling then just unparks and clears
937 dl 1.20 * waiter field. If the producer and consumer are both in the same
938 dl 1.29 * ForkJoinPool, or consumers are running in commonPool, the
939     * producer attempts to help run consumer tasks that it forked
940     * before blocking. To avoid potential cycles, only one level of
941     * helping is currently supported.
942 dl 1.1 *
943     * This class uses @Contended and heuristic field declaration
944 dl 1.31 * ordering to reduce false-sharing-based memory contention among
945     * instances of BufferedSubscription, but it does not currently
946     * attempt to avoid memory contention among buffers. This field
947     * and element packing can hurt performance especially when each
948     * publisher has only one client operating at a high rate.
949 dl 1.1 * Addressing this may require allocating substantially more space
950     * than users expect.
951     */
952 dl 1.15 @SuppressWarnings("serial")
953 dl 1.1 @sun.misc.Contended
954 dl 1.25 static final class BufferedSubscription<T>
955     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
956 dl 1.1 // Order-sensitive field declarations
957 dl 1.27 long timeout; // > 0 if timed wait
958     volatile long demand; // # unfilled requests
959     int maxCapacity; // reduced on OOME
960     int putStat; // offer result for ManagedBlocker
961 dl 1.29 int helpDepth; // nested helping depth (at most 1)
962 dl 1.27 volatile int ctl; // atomic run state flags
963     volatile int head; // next position to take
964 dl 1.34 int tail; // next position to put
965 dl 1.27 Object[] array; // buffer: null if disabled
966 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
967 dl 1.27 Executor executor; // null if disabled
968 dl 1.36 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
969 dl 1.27 volatile Throwable pendingError; // holds until onError issued
970     volatile Thread waiter; // blocked producer thread
971     T putItem; // for offer within ManagedBlocker
972     BufferedSubscription<T> next; // used only by publisher
973     BufferedSubscription<T> nextRetry; // used only by publisher
974 dl 1.1
975     // ctl values
976 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
977 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
978 dl 1.27 static final int DISABLED = 0x04; // final state
979     static final int ERROR = 0x08; // signal onError then disable
980     static final int SUBSCRIBE = 0x10; // signal onSubscribe
981     static final int COMPLETE = 0x20; // signal onComplete when done
982 dl 1.1
983 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
984 dl 1.16
985 jsr166 1.22 /**
986 dl 1.40 * Initial buffer capacity used when maxBufferCapacity is
987     * greater. Must be a power of two.
988 dl 1.21 */
989 dl 1.40 static final int DEFAULT_INITIAL_CAP = 32;
990 dl 1.21
991 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
992 dl 1.36 Executor executor,
993     BiConsumer<? super Flow.Subscriber<? super T>,
994     ? super Throwable> onNextHandler,
995     int maxBufferCapacity) {
996 dl 1.1 this.subscriber = subscriber;
997     this.executor = executor;
998 dl 1.36 this.onNextHandler = onNextHandler;
999 dl 1.1 this.maxCapacity = maxBufferCapacity;
1000 dl 1.40 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
1001     maxBufferCapacity : DEFAULT_INITIAL_CAP];
1002 dl 1.1 }
1003    
1004 dl 1.19 final boolean isDisabled() {
1005 dl 1.20 return ctl == DISABLED;
1006     }
1007    
1008 dl 1.21 /**
1009     * Returns estimated number of buffered items, or -1 if
1010 jsr166 1.42 * disabled.
1011 dl 1.21 */
1012     final int estimateLag() {
1013 dl 1.20 int n;
1014 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1015 dl 1.19 }
1016    
1017 dl 1.1 /**
1018 dl 1.16 * Tries to add item and start consumer task if necessary.
1019 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
1020 dl 1.1 */
1021     final int offer(T item) {
1022 dl 1.34 int h = head, t = tail, cap, size, stat;
1023 dl 1.1 Object[] a = array;
1024 dl 1.40 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1025 dl 1.34 a[(cap - 1) & t] = item; // relaxed writes OK
1026     tail = t + 1;
1027 dl 1.27 stat = size;
1028 dl 1.1 }
1029 dl 1.34 else
1030     stat = growAndAdd(a, item);
1031 dl 1.29 return (stat > 0 &&
1032     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1033 dl 1.27 startOnOffer(stat) : stat;
1034     }
1035    
1036     /**
1037 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
1038 dl 1.27 */
1039     private int growAndAdd(Object[] a, T item) {
1040 dl 1.34 boolean alloc;
1041 dl 1.27 int cap, stat;
1042     if ((ctl & (ERROR | DISABLED)) != 0) {
1043     cap = 0;
1044     stat = -1;
1045 dl 1.34 alloc = false;
1046 dl 1.27 }
1047 dl 1.34 else if (a == null || (cap = a.length) <= 0) {
1048 dl 1.27 cap = 0;
1049     stat = 1;
1050 dl 1.34 alloc = true;
1051     }
1052     else {
1053     U.fullFence(); // recheck
1054     int h = head, t = tail, size = t + 1 - h;
1055 dl 1.40 if (cap >= size) {
1056 dl 1.34 a[(cap - 1) & t] = item;
1057     tail = t + 1;
1058     stat = size;
1059     alloc = false;
1060     }
1061     else if (cap >= maxCapacity) {
1062     stat = 0; // cannot grow
1063     alloc = false;
1064     }
1065     else {
1066     stat = cap + 1;
1067     alloc = true;
1068     }
1069 dl 1.27 }
1070 dl 1.34 if (alloc) {
1071 dl 1.40 int newCap = (cap > 0) ? cap << 1 : 1;
1072 dl 1.27 if (newCap <= cap)
1073     stat = 0;
1074     else {
1075     Object[] newArray = null;
1076 dl 1.16 try {
1077 dl 1.27 newArray = new Object[newCap];
1078     } catch (Throwable ex) { // try to cope with OOME
1079     }
1080     if (newArray == null) {
1081     if (cap > 0)
1082     maxCapacity = cap; // avoid continuous failure
1083     stat = 0;
1084     }
1085     else {
1086     array = newArray;
1087     int t = tail;
1088     int newMask = newCap - 1;
1089     if (a != null && cap > 0) {
1090     int mask = cap - 1;
1091     for (int j = head; j != t; ++j) {
1092 dl 1.34 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1093     Object x = U.getObjectVolatile(a, k);
1094     if (x != null && // races with consumer
1095     U.compareAndSwapObject(a, k, x, null))
1096 dl 1.27 newArray[j & newMask] = x;
1097     }
1098     }
1099     newArray[t & newMask] = item;
1100     tail = t + 1;
1101 dl 1.16 }
1102     }
1103     }
1104 dl 1.21 return stat;
1105 dl 1.1 }
1106    
1107     /**
1108 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
1109 dl 1.29 * initial offer return 0.
1110 dl 1.20 */
1111     final int submit(T item) {
1112 dl 1.34 int stat; Executor e; ForkJoinWorkerThread w;
1113     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1114     ((e = executor) instanceof ForkJoinPool)) {
1115 dl 1.39 helpDepth = 1;
1116 dl 1.29 Thread thread = Thread.currentThread();
1117 dl 1.20 if ((thread instanceof ForkJoinWorkerThread) &&
1118 dl 1.34 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1119     stat = internalHelpConsume(w.workQueue, item);
1120     else if (e == ForkJoinPool.commonPool())
1121     stat = externalHelpConsume
1122     (ForkJoinPool.commonSubmitterQueue(), item);
1123 dl 1.39 helpDepth = 0;
1124 dl 1.20 }
1125 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
1126     putItem = item;
1127     timeout = 0L;
1128     try {
1129     ForkJoinPool.managedBlock(this);
1130     } catch (InterruptedException ie) {
1131     timeout = INTERRUPTED;
1132     }
1133     stat = putStat;
1134     if (timeout < 0L)
1135     Thread.currentThread().interrupt();
1136     }
1137 dl 1.20 return stat;
1138     }
1139    
1140     /**
1141 dl 1.34 * Tries helping for FJ submitter
1142     */
1143     private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1144     int stat = 0;
1145     if (w != null) {
1146 dl 1.39 ForkJoinTask<?> t;
1147 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1148     if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1149     break;
1150 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1151 dl 1.34 }
1152     }
1153     return stat;
1154     }
1155    
1156     /**
1157     * Tries helping for non-FJ submitter
1158     */
1159     private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1160     int stat = 0;
1161     if (w != null) {
1162 dl 1.39 ForkJoinTask<?> t;
1163 dl 1.34 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1164     if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1165     break;
1166 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1167 dl 1.34 }
1168     }
1169     return stat;
1170     }
1171    
1172     /**
1173 dl 1.27 * Timeout version; similar to submit
1174 dl 1.20 */
1175 dl 1.27 final int timedOffer(T item, long nanos) {
1176 dl 1.34 int stat; Executor e;
1177     if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1178     ((e = executor) instanceof ForkJoinPool)) {
1179 dl 1.29 Thread thread = Thread.currentThread();
1180     if (((thread instanceof ForkJoinWorkerThread) &&
1181     ((ForkJoinWorkerThread)thread).getPool() == e) ||
1182     e == ForkJoinPool.commonPool()) {
1183 dl 1.39 helpDepth = 1;
1184 dl 1.29 ForkJoinTask<?> t;
1185     long deadline = System.nanoTime() + nanos;
1186     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1187     (t instanceof ConsumerTask)) {
1188     if ((stat = offer(item)) != 0 ||
1189     (nanos = deadline - System.nanoTime()) <= 0L ||
1190     !t.tryUnfork())
1191     break;
1192 dl 1.39 ((ConsumerTask<?>)t).consumer.consume();
1193 dl 1.29 }
1194 dl 1.39 helpDepth = 0;
1195 dl 1.20 }
1196 dl 1.25 }
1197     if (stat == 0 && (stat = offer(item)) == 0 &&
1198     (timeout = nanos) > 0L) {
1199     putItem = item;
1200     try {
1201     ForkJoinPool.managedBlock(this);
1202     } catch (InterruptedException ie) {
1203     timeout = INTERRUPTED;
1204     }
1205     stat = putStat;
1206     if (timeout < 0L)
1207     Thread.currentThread().interrupt();
1208 dl 1.20 }
1209     return stat;
1210     }
1211    
1212     /**
1213 dl 1.27 * Tries to start consumer task after offer.
1214     * @return -1 if now disabled, else argument
1215     */
1216     private int startOnOffer(int stat) {
1217     for (;;) {
1218     Executor e; int c;
1219     if ((c = ctl) == DISABLED || (e = executor) == null) {
1220     stat = -1;
1221     break;
1222     }
1223     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1224 dl 1.29 if ((c & CONSUME) != 0 ||
1225 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1226 dl 1.29 c | CONSUME))
1227 dl 1.27 break;
1228     }
1229     else if (demand == 0L || tail == head)
1230     break;
1231     else if (U.compareAndSwapInt(this, CTL, c,
1232 dl 1.29 c | (ACTIVE | CONSUME))) {
1233 dl 1.27 try {
1234     e.execute(new ConsumerTask<T>(this));
1235     break;
1236     } catch (RuntimeException | Error ex) { // back out
1237 dl 1.30 do {} while (((c = ctl) & DISABLED) == 0 &&
1238 dl 1.27 (c & ACTIVE) != 0 &&
1239     !U.compareAndSwapInt(this, CTL, c,
1240     c & ~ACTIVE));
1241     throw ex;
1242     }
1243     }
1244     }
1245     return stat;
1246     }
1247    
1248 dl 1.34 private void signalWaiter(Thread w) {
1249     waiter = null;
1250     LockSupport.unpark(w); // release producer
1251     }
1252    
1253 dl 1.27 /**
1254 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1255 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1256     * upon error by nulling required components.
1257 dl 1.1 */
1258 dl 1.27 private void detach() {
1259     Thread w = waiter;
1260     executor = null;
1261     subscriber = null;
1262 dl 1.16 pendingError = null;
1263 dl 1.34 signalWaiter(w);
1264 dl 1.1 }
1265    
1266     /**
1267 dl 1.19 * Issues error signal, asynchronously if a task is running,
1268 jsr166 1.26 * else synchronously.
1269 dl 1.19 */
1270     final void onError(Throwable ex) {
1271     for (int c;;) {
1272 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1273 dl 1.19 break;
1274     else if ((c & ACTIVE) != 0) {
1275     pendingError = ex;
1276     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1277     break; // cause consumer task to exit
1278     }
1279     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1280     Flow.Subscriber<? super T> s = subscriber;
1281     if (s != null && ex != null) {
1282     try {
1283     s.onError(ex);
1284     } catch (Throwable ignore) {
1285     }
1286     }
1287     detach();
1288     break;
1289     }
1290     }
1291     }
1292    
1293     /**
1294 dl 1.1 * Tries to start consumer task upon a signal or request;
1295 jsr166 1.12 * disables on failure.
1296 dl 1.1 */
1297 dl 1.27 private void startOrDisable() {
1298     Executor e;
1299     if ((e = executor) != null) { // skip if already disabled
1300 dl 1.1 try {
1301 dl 1.25 e.execute(new ConsumerTask<T>(this));
1302 dl 1.27 } catch (Throwable ex) { // back out and force signal
1303 dl 1.1 for (int c;;) {
1304 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1305 dl 1.1 break;
1306     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1307 dl 1.19 onError(ex);
1308 dl 1.1 break;
1309     }
1310     }
1311     }
1312     }
1313     }
1314    
1315 dl 1.19 final void onComplete() {
1316 dl 1.1 for (int c;;) {
1317 dl 1.20 if ((c = ctl) == DISABLED)
1318 dl 1.1 break;
1319 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1320 dl 1.29 c | (ACTIVE | CONSUME | COMPLETE))) {
1321 dl 1.16 if ((c & ACTIVE) == 0)
1322     startOrDisable();
1323 dl 1.1 break;
1324     }
1325     }
1326     }
1327    
1328 dl 1.19 final void onSubscribe() {
1329 dl 1.1 for (int c;;) {
1330 dl 1.20 if ((c = ctl) == DISABLED)
1331 dl 1.1 break;
1332 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1333 dl 1.29 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1334 dl 1.19 if ((c & ACTIVE) == 0)
1335     startOrDisable();
1336 dl 1.1 break;
1337     }
1338     }
1339     }
1340    
1341 dl 1.16 /**
1342     * Causes consumer task to exit if active (without reporting
1343     * onError unless there is already a pending error), and
1344     * disables.
1345     */
1346     public void cancel() {
1347 dl 1.1 for (int c;;) {
1348 dl 1.20 if ((c = ctl) == DISABLED)
1349 dl 1.1 break;
1350 dl 1.16 else if ((c & ACTIVE) != 0) {
1351 dl 1.29 if (U.compareAndSwapInt(this, CTL, c,
1352     c | (CONSUME | ERROR)))
1353 dl 1.16 break;
1354     }
1355     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1356     detach();
1357 dl 1.1 break;
1358     }
1359     }
1360     }
1361    
1362 dl 1.16 /**
1363     * Adds to demand and possibly starts task.
1364     */
1365     public void request(long n) {
1366     if (n > 0L) {
1367     for (;;) {
1368     long prev = demand, d;
1369     if ((d = prev + n) < prev) // saturate
1370     d = Long.MAX_VALUE;
1371     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1372 dl 1.31 for (int c, h;;) {
1373 dl 1.27 if ((c = ctl) == DISABLED)
1374 dl 1.16 break;
1375 dl 1.27 else if ((c & ACTIVE) != 0) {
1376 dl 1.29 if ((c & CONSUME) != 0 ||
1377 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1378 dl 1.29 c | CONSUME))
1379 dl 1.27 break;
1380     }
1381     else if ((h = head) != tail) {
1382 dl 1.16 if (U.compareAndSwapInt(this, CTL, c,
1383 dl 1.29 c | (ACTIVE|CONSUME))) {
1384 dl 1.16 startOrDisable();
1385     break;
1386     }
1387     }
1388     else if (head == h && tail == h)
1389 dl 1.27 break; // else stale
1390 dl 1.31 if (demand == 0L)
1391     break;
1392 dl 1.16 }
1393     break;
1394     }
1395     }
1396     }
1397     else if (n < 0L)
1398 dl 1.19 onError(new IllegalArgumentException(
1399     "negative subscription request"));
1400 dl 1.16 }
1401    
1402 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1403 dl 1.1 T item = putItem;
1404 dl 1.34 if (item != null) {
1405     if ((putStat = offer(item)) == 0)
1406     return false;
1407     putItem = null;
1408 dl 1.1 }
1409     return true;
1410     }
1411    
1412 dl 1.20 public final boolean block() { // for ManagedBlocker
1413 dl 1.1 T item = putItem;
1414     if (item != null) {
1415     putItem = null;
1416     long nanos = timeout;
1417     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1418     while ((putStat = offer(item)) == 0) {
1419     if (Thread.interrupted()) {
1420 dl 1.16 timeout = INTERRUPTED;
1421 dl 1.1 if (nanos > 0L)
1422     break;
1423     }
1424     else if (nanos > 0L &&
1425     (nanos = deadline - System.nanoTime()) <= 0L)
1426     break;
1427     else if (waiter == null)
1428     waiter = Thread.currentThread();
1429     else {
1430     if (nanos > 0L)
1431     LockSupport.parkNanos(this, nanos);
1432     else
1433     LockSupport.park(this);
1434     waiter = null;
1435     }
1436     }
1437     }
1438     waiter = null;
1439     return true;
1440     }
1441    
1442 dl 1.29 /**
1443     * Consumer loop, called from ConsumerTask, or indirectly
1444 dl 1.39 * when helping during submit.
1445 dl 1.29 */
1446 dl 1.25 final void consume() {
1447 dl 1.1 Flow.Subscriber<? super T> s;
1448 dl 1.34 int h = head;
1449 dl 1.27 if ((s = subscriber) != null) { // else disabled
1450 dl 1.34 for (;;) {
1451 dl 1.27 long d = demand;
1452 dl 1.34 int c; Object[] a; int n; long i; Object x; Thread w;
1453 dl 1.27 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1454 dl 1.34 if (!checkControl(s, c))
1455 dl 1.20 break;
1456 dl 1.19 }
1457 dl 1.34 else if ((a = array) == null || h == tail ||
1458     (n = a.length) == 0 ||
1459     (x = U.getObjectVolatile
1460     (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1461     == null) {
1462     if (!checkEmpty(s, c))
1463 dl 1.27 break;
1464     }
1465 dl 1.34 else if (d == 0L) {
1466     if (!checkDemand(c))
1467     break;
1468 dl 1.1 }
1469 dl 1.34 else if (((c & CONSUME) != 0 ||
1470     U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1471     U.compareAndSwapObject(a, i, x, null)) {
1472 dl 1.27 U.putOrderedInt(this, HEAD, ++h);
1473 dl 1.34 U.getAndAddLong(this, DEMAND, -1L);
1474     if ((w = waiter) != null)
1475     signalWaiter(w);
1476 dl 1.36 try {
1477     @SuppressWarnings("unchecked") T y = (T) x;
1478     s.onNext(y);
1479     } catch (Throwable ex) {
1480     handleOnNext(s, ex);
1481     }
1482 dl 1.34 }
1483     }
1484     }
1485     }
1486    
1487     /**
1488 jsr166 1.42 * Responds to control events in consume().
1489 dl 1.34 */
1490     private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1491     boolean stat = true;
1492     if ((c & ERROR) != 0) {
1493     Throwable ex = pendingError;
1494     ctl = DISABLED; // no need for CAS
1495     if (ex != null) { // null if errorless cancel
1496     try {
1497     if (s != null)
1498     s.onError(ex);
1499     } catch (Throwable ignore) {
1500     }
1501     }
1502     }
1503     else if ((c & SUBSCRIBE) != 0) {
1504     if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1505     try {
1506     if (s != null)
1507     s.onSubscribe(this);
1508     } catch (Throwable ex) {
1509     onError(ex);
1510     }
1511     }
1512     }
1513     else {
1514     detach();
1515     stat = false;
1516     }
1517     return stat;
1518     }
1519    
1520     /**
1521 jsr166 1.42 * Responds to apparent emptiness in consume().
1522 dl 1.34 */
1523     private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1524     boolean stat = true;
1525     if (head == tail) {
1526     if ((c & CONSUME) != 0)
1527     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1528     else if ((c & COMPLETE) != 0) {
1529     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1530 dl 1.1 try {
1531 dl 1.34 if (s != null)
1532     s.onComplete();
1533     } catch (Throwable ignore) {
1534 dl 1.1 }
1535     }
1536     }
1537 dl 1.34 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1538     stat = false;
1539     }
1540     return stat;
1541     }
1542    
1543     /**
1544 jsr166 1.42 * Responds to apparent zero demand in consume().
1545 dl 1.34 */
1546     private boolean checkDemand(int c) {
1547     boolean stat = true;
1548     if (demand == 0L) {
1549     if ((c & CONSUME) != 0)
1550     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1551     else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1552     stat = false;
1553     }
1554     return stat;
1555     }
1556    
1557     /**
1558 jsr166 1.42 * Processes exception in Subscriber.onNext.
1559 dl 1.34 */
1560 dl 1.36 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1561     BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1562     if ((h = onNextHandler) != null) {
1563     try {
1564     h.accept(s, ex);
1565     } catch (Throwable ignore) {
1566     }
1567 dl 1.1 }
1568 dl 1.36 onError(ex);
1569 dl 1.1 }
1570    
1571 jsr166 1.9 // Unsafe mechanics
1572     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1573 dl 1.1 private static final long CTL;
1574     private static final long TAIL;
1575     private static final long HEAD;
1576     private static final long DEMAND;
1577     private static final int ABASE;
1578     private static final int ASHIFT;
1579    
1580     static {
1581     try {
1582     CTL = U.objectFieldOffset
1583 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1584 dl 1.1 TAIL = U.objectFieldOffset
1585 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1586 dl 1.1 HEAD = U.objectFieldOffset
1587 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1588 dl 1.1 DEMAND = U.objectFieldOffset
1589 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1590    
1591     ABASE = U.arrayBaseOffset(Object[].class);
1592     int scale = U.arrayIndexScale(Object[].class);
1593 dl 1.1 if ((scale & (scale - 1)) != 0)
1594     throw new Error("data type scale not a power of two");
1595     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1596 jsr166 1.9 } catch (ReflectiveOperationException e) {
1597 dl 1.1 throw new Error(e);
1598     }
1599 jsr166 1.33
1600     // Reduce the risk of rare disastrous classloading in first call to
1601     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1602     Class<?> ensureLoaded = LockSupport.class;
1603 dl 1.1 }
1604     }
1605     }