ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.29
Committed: Sun Jan 25 15:19:40 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.28: +166 -186 lines
Log Message:
Minor improvements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14 dl 1.15 import java.util.function.Supplier;
15 dl 1.1
16     /**
17 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
18     * (non-null) items to current subscribers until it is closed. Each
19     * current subscriber receives newly submitted items in the same order
20     * unless drops or exceptions are encountered. Using a
21     * SubmissionPublisher allows item generators to act as Publishers
22     * relying on drop handling and/or blocking for flow control.
23 dl 1.1 *
24 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 dl 1.1 * constructor for delivery to subscribers. The best choice of
26     * Executor depends on expected usage. If the generator(s) of
27     * submitted items run in separate threads, and the number of
28     * subscribers can be estimated, consider using a {@link
29 dl 1.25 * Executors#newFixedThreadPool}. Otherwise consider using
30     * the default {@link ForkJoinPool#commonPool}.
31 dl 1.1 *
32     * <p>Buffering allows producers and consumers to transiently operate
33     * at different rates. Each subscriber uses an independent buffer.
34 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
35 dl 1.27 * given maximum. (The enforced capacity may be rounded up to the
36 dl 1.21 * nearest power of two and/or bounded by the largest value supported
37     * by this implementation.) Invocations of {@link
38 dl 1.16 * Flow.Subscription#request} do not directly result in buffer
39 dl 1.24 * expansion, but risk saturation if unfilled requests exceed the
40 dl 1.25 * maximum capacity. The default value of {@link
41     * Flow#defaultBufferSize} may provide a useful starting point for
42     * choosing a capacity based on expected rates, resources, and usages.
43 dl 1.18 *
44 dl 1.1 * <p>Publication methods support different policies about what to do
45     * when buffers are saturated. Method {@link #submit} blocks until
46 jsr166 1.26 * resources are available. This is simplest, but least responsive.
47     * The {@code offer} methods may drop items (either immediately or
48     * with bounded timeout), but provide an opportunity to interpose a
49     * handler and then retry.
50 dl 1.1 *
51     * <p>If any Subscriber method throws an exception, its subscription
52 dl 1.27 * is cancelled. If the supplied Executor throws {@link
53     * RejectedExecutionException} (or any other RuntimeException or
54     * Error) when attempting to execute a task, or a drop handler throws
55     * an exception when processing a dropped item, then the exception is
56 dl 1.1 * rethrown. In these cases, some but not all subscribers may have
57 dl 1.27 * received the published item. It is usually good practice to {@link
58 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
59 dl 1.1 *
60 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
61     * that generate items, and use the methods in this class to publish
62     * them. For example here is a class that periodically publishes the
63     * items generated from a supplier. (In practice you might add methods
64     * to independently start and stop generation, to share schedulers
65 dl 1.16 * among publishers, and so on, or instead use a SubmissionPublisher
66     * as a component rather than a superclass.)
67 dl 1.15 *
68     * <pre> {@code
69     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
70     * final ScheduledFuture<?> periodicTask;
71     * final ScheduledExecutorService scheduler;
72 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
73     * Supplier<? extends T> supplier,
74 dl 1.15 * long period, TimeUnit unit) {
75 dl 1.21 * super(executor, maxBufferCapacity);
76 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
77     * periodicTask = scheduler.scheduleAtFixedRate(
78     * () -> submit(supplier.get()), 0, period, unit);
79     * }
80     * public void close() {
81     * periodicTask.cancel(false);
82     * scheduler.shutdown();
83     * super.close();
84     * }
85     * }}</pre>
86     *
87 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
88     * It uses single-step requests to its publisher for simplicity of
89     * illustration. A more adaptive version could monitor flow using the
90 dl 1.27 * lag estimate returned from {@code submit}, along with other utility
91 dl 1.21 * methods.
92 dl 1.16 *
93     * <pre> {@code
94     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
95     * implements Flow.Processor<S,T> {
96     * final Function<? super S, ? extends T> function;
97     * Flow.Subscription subscription;
98 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
99 dl 1.16 * Function<? super S, ? extends T> function) {
100 dl 1.21 * super(executor, maxBufferCapacity);
101 dl 1.16 * this.function = function;
102     * }
103     * public void onSubscribe(Flow.Subscription subscription) {
104     * (this.subscription = subscription).request(1);
105     * }
106     * public void onNext(S item) {
107     * subscription.request(1);
108     * submit(function.apply(item));
109     * }
110     * public void onError(Throwable ex) { closeExceptionally(ex); }
111     * public void onComplete() { close(); }
112     * }}</pre>
113     *
114 dl 1.1 * @param <T> the published item type
115     * @author Doug Lea
116     * @since 1.9
117     */
118     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
119     AutoCloseable {
120     /*
121     * Most mechanics are handled by BufferedSubscription. This class
122 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
123     * built-in synchronization locks across public methods. (Using
124     * built-in locks works well in the most typical case in which
125     * only one thread submits items).
126 dl 1.1 */
127    
128 dl 1.25 /** The largest possible power of two array size. */
129     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
130 dl 1.1
131 dl 1.25 /** Round capacity to power of 2, at least 2, at most limit. */
132 dl 1.1 static final int roundCapacity(int cap) { // to nearest power of 2
133     int n = cap - 1;
134     n |= n >>> 1;
135     n |= n >>> 2;
136     n |= n >>> 4;
137     n |= n >>> 8;
138     n |= n >>> 16;
139 dl 1.21 return (n <= 0) ? 2 : // at least 2
140 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
141 dl 1.1 }
142    
143     /**
144     * Clients (BufferedSubscriptions) are maintained in a linked list
145     * (via their "next" fields). This works well for publish loops.
146     * It requires O(n) traversal to check for duplicate subscribers,
147     * but we expect that subscribing is much less common than
148 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
149 dl 1.25 * when BufferedSubscription methods return negative values
150 dl 1.29 * signifying that they have been disabled. To reduce
151     * head-of-line blocking, submit and offer methods first call
152     * BufferedSubscription.offer on each subscriber, and place
153     * saturated ones in retries list (using nextRetry field), and
154     * retry, possibly blocking or dropping.
155 dl 1.1 */
156     BufferedSubscription<T> clients;
157    
158 dl 1.20 /** Run status, updated only within locks */
159     volatile boolean closed;
160    
161 dl 1.1 // Parameters for constructing BufferedSubscriptions
162     final Executor executor;
163     final int maxBufferCapacity;
164    
165 dl 1.20 /**
166 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
167 dl 1.21 * async delivery to subscribers, and with the given maximum
168 dl 1.25 * buffer size for each subscriber.
169 dl 1.1 *
170     * @param executor the executor to use for async delivery,
171     * supporting creation of at least one independent thread
172     * @param maxBufferCapacity the maximum capacity for each
173 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
174     * the nearest power of two and/or bounded by the largest value
175     * supported by this implementation; method {@link #getMaxBufferCapacity}
176     * returns the actual value)
177 dl 1.1 * @throws NullPointerException if executor is null
178 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
179     * positive
180 dl 1.1 */
181 dl 1.21 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
182     if (executor == null)
183     throw new NullPointerException();
184     if (maxBufferCapacity <= 0)
185     throw new IllegalArgumentException("capacity must be positive");
186 dl 1.18 this.executor = executor;
187 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
188 dl 1.18 }
189    
190 dl 1.1 /**
191 dl 1.25 * Creates a new SubmissionPublisher using the {@link
192     * ForkJoinPool#commonPool()} for async delivery to subscribers,
193 dl 1.27 * and with maximum buffer capacity of {@link
194     * Flow#defaultBufferSize}.
195 dl 1.25 */
196     public SubmissionPublisher() {
197     this(ForkJoinPool.commonPool(), Flow.defaultBufferSize());
198     }
199    
200     /**
201 dl 1.18 * Adds the given Subscriber unless already subscribed. If
202     * already subscribed, the Subscriber's onError method is invoked
203 dl 1.29 * on the existing subscription with an IllegalStateException.
204     * Otherwise, upon success, the Subscriber's onSubscribe method is
205     * invoked asynchronously with a new Subscription. If onSubscribe
206     * throws an exception, the subscription is cancelled. Otherwise,
207     * if this SubmissionPublisher is closed, the subscriber's
208     * onComplete method is then invoked. Subscribers may enable
209     * receiving items by invoking the {@code request} method of the
210     * new Subscription, and may unsubscribe by invoking its cancel
211     * method.
212 dl 1.18 *
213     * @param subscriber the subscriber
214     * @throws NullPointerException if subscriber is null
215     */
216     public void subscribe(Flow.Subscriber<? super T> subscriber) {
217     if (subscriber == null) throw new NullPointerException();
218 dl 1.20 BufferedSubscription<T> subscription =
219 dl 1.21 new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
220 dl 1.19 synchronized (this) {
221 dl 1.29 for (BufferedSubscription<T> b = clients, pred = null;;) {
222     if (b == null) {
223     subscription.onSubscribe();
224     if (closed)
225     subscription.onComplete();
226     else if (pred == null)
227     clients = subscription;
228 dl 1.27 else
229 dl 1.29 pred.next = subscription;
230     break;
231 dl 1.27 }
232 dl 1.29 BufferedSubscription<T> next = b.next;
233     if (b.isDisabled()) { // remove
234     b.next = null; // detach
235 dl 1.19 if (pred == null)
236 dl 1.29 clients = next;
237 dl 1.19 else
238 dl 1.29 pred.next = next;
239     }
240     else if (subscriber.equals(b.subscriber)) {
241     b.onError(new IllegalStateException("Duplicate subscribe"));
242     break;
243 dl 1.19 }
244 dl 1.29 else
245     pred = b;
246     b = next;
247 dl 1.19 }
248     }
249 dl 1.18 }
250    
251     /**
252 dl 1.16 * Publishes the given item to each current subscriber by
253     * asynchronously invoking its onNext method, blocking
254     * uninterruptibly while resources for any subscriber are
255 dl 1.21 * unavailable. This method returns an estimate of the maximum lag
256     * (number of items submitted but not yet consumed) among all
257     * current subscribers. This value is at least one (accounting for
258     * this submitted item) if there are any subscribers, else zero.
259 dl 1.16 *
260     * <p>If the Executor for this publisher throws a
261     * RejectedExecutionException (or any other RuntimeException or
262     * Error) when attempting to asynchronously notify subscribers,
263     * then this exception is rethrown.
264     *
265     * @param item the (non-null) item to publish
266 dl 1.21 * @return the estimated maximum lag among subscribers
267 dl 1.16 * @throws IllegalStateException if closed
268     * @throws NullPointerException if item is null
269     * @throws RejectedExecutionException if thrown by Executor
270     */
271 dl 1.21 public int submit(T item) {
272 dl 1.16 if (item == null) throw new NullPointerException();
273 dl 1.21 int lag = 0;
274 dl 1.27 boolean complete;
275 dl 1.16 synchronized (this) {
276 dl 1.27 complete = closed;
277 dl 1.29 BufferedSubscription<T> b = clients;
278 dl 1.27 if (!complete) {
279 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
280 dl 1.27 while (b != null) {
281     BufferedSubscription<T> next = b.next;
282 dl 1.29 int stat = b.offer(item);
283     if (stat < 0) { // disabled
284     b.next = null;
285 dl 1.27 if (pred == null)
286     clients = next;
287 dl 1.24 else
288 dl 1.27 pred.next = next;
289 dl 1.24 }
290 dl 1.27 else {
291 dl 1.29 if (stat > lag)
292     lag = stat;
293     else if (stat == 0) { // place on retry list
294     b.nextRetry = null;
295 dl 1.27 if (rtail == null)
296 dl 1.29 r = b;
297 dl 1.27 else
298     rtail.nextRetry = b;
299     rtail = b;
300     }
301     pred = b;
302     }
303     b = next;
304 dl 1.21 }
305 dl 1.29 while (r != null) {
306     BufferedSubscription<T> nextRetry = r.nextRetry;
307     r.nextRetry = null;
308     int stat = r.submit(item);
309     if (stat > lag)
310     lag = stat;
311     else if (stat < 0 && clients == r)
312     clients = r.next; // postpone internal unsubscribes
313     r = nextRetry;
314     }
315 dl 1.16 }
316     }
317 dl 1.27 if (complete)
318     throw new IllegalStateException("Closed");
319     else
320     return lag;
321 dl 1.16 }
322    
323     /**
324 dl 1.1 * Publishes the given item, if possible, to each current
325     * subscriber by asynchronously invoking its onNext method. The
326     * item may be dropped by one or more subscribers if resource
327     * limits are exceeded, in which case the given handler (if
328     * non-null) is invoked, and if it returns true, retried once.
329 dl 1.16 * Other calls to methods in this class by other threads are
330     * blocked while the handler is invoked. Unless recovery is
331     * assured, options are usually limited to logging the error
332     * and/or issuing an onError signal to the subscriber.
333 dl 1.1 *
334 dl 1.21 * <p>This method returns a status indicator: If negative, it
335     * represents the (negative) number of drops (failed attempts to
336     * issue the item to a subscriber). Otherwise it is an estimate of
337     * the maximum lag (number of items submitted but not yet
338     * consumed) among all current subscribers. This value is at least
339     * one (accounting for this submitted item) if there are any
340     * subscribers, else zero.
341 jsr166 1.23 *
342 dl 1.1 * <p>If the Executor for this publisher throws a
343     * RejectedExecutionException (or any other RuntimeException or
344     * Error) when attempting to asynchronously notify subscribers, or
345     * the drop handler throws an exception when processing a dropped
346     * item, then this exception is rethrown.
347     *
348     * @param item the (non-null) item to publish
349     * @param onDrop if non-null, the handler invoked upon a drop to a
350     * subscriber, with arguments of the subscriber and item; if it
351     * returns true, an offer is re-attempted (once)
352 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
353     * an estimate of maximum lag
354 dl 1.1 * @throws IllegalStateException if closed
355     * @throws NullPointerException if item is null
356     * @throws RejectedExecutionException if thrown by Executor
357     */
358     public int offer(T item,
359     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
360 dl 1.29 return doOffer(0L, item, onDrop);
361 dl 1.1 }
362    
363     /**
364     * Publishes the given item, if possible, to each current
365     * subscriber by asynchronously invoking its onNext method,
366     * blocking while resources for any subscription are unavailable,
367     * up to the specified timeout or the caller thread is
368 dl 1.16 * interrupted, at which point the given handler (if non-null) is
369     * invoked, and if it returns true, retried once. (The drop
370     * handler may distinguish timeouts from interrupts by checking
371     * whether the current thread is interrupted.) Other calls to
372     * methods in this class by other threads are blocked while the
373     * handler is invoked. Unless recovery is assured, options are
374     * usually limited to logging the error and/or issuing an onError
375     * signal to the subscriber.
376 dl 1.1 *
377 dl 1.21 * <p>This method returns a status indicator: If negative, it
378     * represents the (negative) number of drops (failed attempts to
379     * issue the item to a subscriber). Otherwise it is an estimate of
380     * the maximum lag (number of items submitted but not yet
381     * consumed) among all current subscribers. This value is at least
382     * one (accounting for this submitted item) if there are any
383     * subscribers, else zero.
384     *
385 dl 1.1 * <p>If the Executor for this publisher throws a
386     * RejectedExecutionException (or any other RuntimeException or
387     * Error) when attempting to asynchronously notify subscribers, or
388     * the drop handler throws an exception when processing a dropped
389     * item, then this exception is rethrown.
390     *
391     * @param item the (non-null) item to publish
392     * @param timeout how long to wait for resources for any subscriber
393     * before giving up, in units of {@code unit}
394     * @param unit a {@code TimeUnit} determining how to interpret the
395     * {@code timeout} parameter
396     * @param onDrop if non-null, the handler invoked upon a drop to a
397     * subscriber, with arguments of the subscriber and item; if it
398     * returns true, an offer is re-attempted (once)
399 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
400     * an estimate of maximum lag
401 dl 1.1 * @throws IllegalStateException if closed
402     * @throws NullPointerException if item is null
403     * @throws RejectedExecutionException if thrown by Executor
404     */
405     public int offer(T item, long timeout, TimeUnit unit,
406     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
407 dl 1.29 return doOffer(unit.toNanos(timeout), item, onDrop);
408     }
409    
410     /** Common implementation for both forms of offer */
411     final int doOffer(long nanos, T item,
412     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
413 dl 1.1 if (item == null) throw new NullPointerException();
414 dl 1.27 int lag = 0, drops = 0;
415     boolean complete;
416 jsr166 1.2 synchronized (this) {
417 dl 1.27 complete = closed;
418 dl 1.29 BufferedSubscription<T> b = clients;
419 dl 1.27 if (!complete) {
420 dl 1.29 BufferedSubscription<T> pred = null, r = null, rtail = null;
421 dl 1.27 while (b != null) {
422     BufferedSubscription<T> next = b.next;
423 dl 1.29 int stat = b.offer(item);
424     if (stat < 0) {
425     b.next = null;
426 dl 1.27 if (pred == null)
427     clients = next;
428     else
429     pred.next = next;
430     }
431     else {
432 dl 1.29 if (stat > lag)
433     lag = stat;
434     else if (stat == 0) {
435     b.nextRetry = null;
436 dl 1.27 if (rtail == null)
437 dl 1.29 r = b;
438 dl 1.27 else
439     rtail.nextRetry = b;
440     rtail = b;
441     }
442 dl 1.29 else if (stat > lag)
443 dl 1.27 lag = stat;
444     pred = b;
445     }
446     b = next;
447 dl 1.1 }
448 dl 1.29 while (r != null) {
449     BufferedSubscription<T> nextRetry = r.nextRetry;
450     r.nextRetry = null;
451     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
452     r.offer(item);
453     if (stat == 0 && onDrop != null &&
454     onDrop.test(r.subscriber, item))
455     stat = r.offer(item);
456     if (stat == 0)
457     ++drops;
458     else if (stat > lag)
459     lag = stat;
460     else if (stat < 0 && clients == r)
461     clients = r.next;
462     r = nextRetry;
463     }
464 dl 1.1 }
465     }
466 dl 1.27 if (complete)
467     throw new IllegalStateException("Closed");
468     else
469     return (drops > 0) ? -drops : lag;
470 dl 1.1 }
471    
472     /**
473     * Unless already closed, issues onComplete signals to current
474     * subscribers, and disallows subsequent attempts to publish.
475     */
476     public void close() {
477     if (!closed) {
478     BufferedSubscription<T> b, next;
479 jsr166 1.2 synchronized (this) {
480 dl 1.1 b = clients;
481     clients = null;
482     closed = true;
483     }
484     while (b != null) {
485     next = b.next;
486 dl 1.19 b.onComplete();
487 dl 1.1 b = next;
488     }
489     }
490     }
491    
492     /**
493     * Unless already closed, issues onError signals to current
494     * subscribers with the given error, and disallows subsequent
495     * attempts to publish.
496     *
497 jsr166 1.5 * @param error the onError argument sent to subscribers
498 dl 1.1 * @throws NullPointerException if error is null
499     */
500     public void closeExceptionally(Throwable error) {
501     if (error == null)
502     throw new NullPointerException();
503     if (!closed) {
504     BufferedSubscription<T> b, next;
505 jsr166 1.2 synchronized (this) {
506 dl 1.1 b = clients;
507     clients = null;
508     closed = true;
509     }
510     while (b != null) {
511     next = b.next;
512 dl 1.19 b.onError(error);
513 dl 1.1 b = next;
514     }
515     }
516     }
517    
518     /**
519     * Returns true if this publisher is not accepting submissions.
520     *
521     * @return true if closed
522     */
523     public boolean isClosed() {
524     return closed;
525     }
526    
527     /**
528 jsr166 1.6 * Returns true if this publisher has any subscribers.
529 dl 1.1 *
530     * @return true if this publisher has any subscribers
531     */
532     public boolean hasSubscribers() {
533     boolean nonEmpty = false;
534     if (!closed) {
535 jsr166 1.2 synchronized (this) {
536 dl 1.1 BufferedSubscription<T> pred = null, next;
537     for (BufferedSubscription<T> b = clients; b != null; b = next) {
538     next = b.next;
539 dl 1.19 if (b.isDisabled()) {
540 dl 1.29 b.next = null;
541 dl 1.1 if (pred == null)
542     clients = next;
543     else
544     pred.next = next;
545 jsr166 1.2 }
546 dl 1.1 else {
547     nonEmpty = true;
548     break;
549     }
550     }
551     }
552     }
553     return nonEmpty;
554     }
555    
556     /**
557 dl 1.21 * Returns the number of current subscribers.
558 dl 1.1 *
559 dl 1.21 * @return the number of current subscribers
560 dl 1.1 */
561 dl 1.21 public int getNumberOfSubscribers() {
562     int count = 0;
563     if (!closed) {
564     synchronized (this) {
565     BufferedSubscription<T> pred = null, next;
566     for (BufferedSubscription<T> b = clients; b != null; b = next) {
567     next = b.next;
568     if (b.isDisabled()) {
569 dl 1.29 b.next = null;
570 dl 1.21 if (pred == null)
571     clients = next;
572     else
573     pred.next = next;
574     }
575     else {
576     pred = b;
577     ++count;
578     }
579     }
580     }
581     }
582     return count;
583 dl 1.1 }
584    
585 jsr166 1.7 /**
586 dl 1.21 * Returns the Executor used for asynchronous delivery.
587 dl 1.1 *
588 dl 1.21 * @return the Executor used for asynchronous delivery
589 dl 1.1 */
590 dl 1.21 public Executor getExecutor() {
591     return executor;
592 dl 1.1 }
593    
594 jsr166 1.7 /**
595 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
596 dl 1.1 *
597 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
598 dl 1.1 */
599     public int getMaxBufferCapacity() {
600     return maxBufferCapacity;
601     }
602    
603     /**
604 dl 1.29 * Returns a list of current subscribers for monitoring and
605     * tracking purposes, not for invoking {@link Flow.Subscriber}
606     * methods on the subscribers.
607 dl 1.1 *
608     * @return list of current subscribers
609     */
610     public List<Flow.Subscriber<? super T>> getSubscribers() {
611     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
612 jsr166 1.2 synchronized (this) {
613 dl 1.1 BufferedSubscription<T> pred = null, next;
614     for (BufferedSubscription<T> b = clients; b != null; b = next) {
615     next = b.next;
616 dl 1.19 if (b.isDisabled()) {
617 dl 1.29 b.next = null;
618 dl 1.1 if (pred == null)
619     clients = next;
620     else
621     pred.next = next;
622 jsr166 1.2 }
623 dl 1.1 else
624     subs.add(b.subscriber);
625     }
626     }
627     return subs;
628     }
629 jsr166 1.2
630 dl 1.1 /**
631     * Returns true if the given Subscriber is currently subscribed.
632     *
633     * @param subscriber the subscriber
634     * @return true if currently subscribed
635 dl 1.21 * @throws NullPointerException if subscriber is null
636 dl 1.1 */
637     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
638 dl 1.21 if (subscriber == null) throw new NullPointerException();
639 jsr166 1.2 if (!closed) {
640     synchronized (this) {
641 dl 1.1 BufferedSubscription<T> pred = null, next;
642     for (BufferedSubscription<T> b = clients; b != null; b = next) {
643     next = b.next;
644 dl 1.19 if (b.isDisabled()) {
645 dl 1.29 b.next = null;
646 dl 1.1 if (pred == null)
647     clients = next;
648     else
649     pred.next = next;
650 jsr166 1.2 }
651 dl 1.21 else if (subscriber.equals(b.subscriber))
652 dl 1.1 return true;
653 dl 1.21 else
654     pred = b;
655 dl 1.1 }
656     }
657     }
658     return false;
659     }
660    
661     /**
662 dl 1.21 * Returns an estimate of the minimum number of items requested
663     * (via {@link Flow.Subscription#request}) but not yet produced,
664     * among all current subscribers.
665     *
666     * @return the estimate, or zero if no subscribers
667     */
668     public long estimateMinimumDemand() {
669     long min = Long.MAX_VALUE;
670     boolean nonEmpty = false;
671     synchronized (this) {
672     BufferedSubscription<T> pred = null, next;
673     for (BufferedSubscription<T> b = clients; b != null; b = next) {
674     int n; long d;
675     next = b.next;
676     if ((n = b.estimateLag()) < 0) {
677 dl 1.29 b.next = null;
678 dl 1.21 if (pred == null)
679     clients = next;
680     else
681     pred.next = next;
682     }
683     else {
684     if ((d = b.demand - n) < min)
685     min = d;
686     nonEmpty = true;
687     }
688     }
689     }
690 jsr166 1.22 return nonEmpty ? min : 0;
691 dl 1.21 }
692    
693     /**
694     * Returns an estimate of the maximum number of items produced but
695     * not yet consumed among all current subscribers.
696 dl 1.17 *
697     * @return the estimate
698 dl 1.1 */
699 dl 1.21 public int estimateMaximumLag() {
700     int max = 0;
701 dl 1.17 synchronized (this) {
702     BufferedSubscription<T> pred = null, next;
703     for (BufferedSubscription<T> b = clients; b != null; b = next) {
704 dl 1.21 int n;
705 dl 1.17 next = b.next;
706 dl 1.21 if ((n = b.estimateLag()) < 0) {
707 dl 1.29 b.next = null;
708 dl 1.17 if (pred == null)
709     clients = next;
710     else
711     pred.next = next;
712     }
713 dl 1.21 else if (n > max)
714     max = n;
715 dl 1.17 }
716     }
717 dl 1.21 return max;
718 dl 1.1 }
719    
720 dl 1.25 /**
721     * A task for consuming buffer items and signals, created and
722     * executed whenever they become available. A task consumes as
723     * many items/signals as possible before terminating, at which
724     * point another task is created when needed. The dual Runnable
725     * and ForkJoinTask declaration saves overhead when executed by
726     * ForkJoinPools, without impacting other kinds of Executors.
727     */
728     @SuppressWarnings("serial")
729     static final class ConsumerTask<T> extends ForkJoinTask<Void>
730     implements Runnable {
731     final BufferedSubscription<T> consumer;
732     ConsumerTask(BufferedSubscription<T> consumer) {
733     this.consumer = consumer;
734     }
735     public final Void getRawResult() { return null; }
736     public final void setRawResult(Void v) {}
737     public final boolean exec() { consumer.consume(); return false; }
738     public final void run() { consumer.consume(); }
739     }
740    
741 dl 1.1 /**
742 dl 1.15 * A bounded (ring) buffer with integrated control to start a
743     * consumer task whenever items are available. The buffer
744 dl 1.1 * algorithm is similar to one used inside ForkJoinPool,
745     * specialized for the case of at most one concurrent producer and
746     * consumer, and power of two buffer sizes. This allows methods to
747     * operate without locks even while supporting resizing, blocking,
748     * task-triggering, and garbage-free buffers (nulling out elements
749     * when consumed), although supporting these does impose a bit of
750     * overhead compared to plain fixed-size ring buffers.
751     *
752     * The publisher guarantees a single producer via its lock. We
753     * ensure in this class that there is at most one consumer. The
754     * request and cancel methods must be fully thread-safe but are
755     * coded to exploit the most common case in which they are only
756     * called by consumers (usually within onNext).
757     *
758 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
759     * ensure that a task is active when consumable items (and
760 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
761 dl 1.24 * there is demand (unfilled requests). This is complicated on
762 dl 1.19 * the creation side by the possibility of exceptions when trying
763     * to execute tasks. These eventually force DISABLED state, but
764 dl 1.1 * sometimes not directly. On the task side, termination (clearing
765 dl 1.27 * ACTIVE) that would otherwise race with producers or request()
766 dl 1.29 * calls uses the CONSUME keep-alive bit to force a recheck.
767 dl 1.1 *
768     * The ctl field also manages run state. When DISABLED, no further
769 dl 1.20 * updates are possible. Disabling may be preceded by setting
770 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
771     * case the associated Subscriber methods are invoked, possibly
772     * synchronously if there is no active consumer task (including
773 dl 1.20 * cases where execute() failed). The cancel() method is supported
774     * by treating as ERROR but suppressing onError signal.
775 dl 1.1 *
776     * Support for blocking also exploits the fact that there is only
777     * one possible waiter. ManagedBlocker-compatible control fields
778     * are placed in this class itself rather than in wait-nodes.
779     * Blocking control relies on the "waiter" field. Producers set
780     * the field before trying to block, but must then recheck (via
781     * offer) before parking. Signalling then just unparks and clears
782 dl 1.20 * waiter field. If the producer and consumer are both in the same
783 dl 1.29 * ForkJoinPool, or consumers are running in commonPool, the
784     * producer attempts to help run consumer tasks that it forked
785     * before blocking. To avoid potential cycles, only one level of
786     * helping is currently supported.
787 dl 1.1 *
788     * This class uses @Contended and heuristic field declaration
789     * ordering to reduce memory contention on BufferedSubscription
790     * itself, but it does not currently attempt to avoid memory
791     * contention (especially including card-marks) among buffer
792     * elements, that can significantly slow down some usages.
793     * Addressing this may require allocating substantially more space
794     * than users expect.
795     */
796 dl 1.15 @SuppressWarnings("serial")
797 dl 1.1 @sun.misc.Contended
798 dl 1.25 static final class BufferedSubscription<T>
799     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
800 dl 1.1 // Order-sensitive field declarations
801 dl 1.27 long timeout; // > 0 if timed wait
802     volatile long demand; // # unfilled requests
803     int maxCapacity; // reduced on OOME
804     int putStat; // offer result for ManagedBlocker
805 dl 1.29 int helpDepth; // nested helping depth (at most 1)
806 dl 1.27 volatile int ctl; // atomic run state flags
807     volatile int head; // next position to take
808     volatile int tail; // next position to put
809     Object[] array; // buffer: null if disabled
810 dl 1.1 Flow.Subscriber<? super T> subscriber; // null if disabled
811 dl 1.27 Executor executor; // null if disabled
812     volatile Throwable pendingError; // holds until onError issued
813     volatile Thread waiter; // blocked producer thread
814     T putItem; // for offer within ManagedBlocker
815     BufferedSubscription<T> next; // used only by publisher
816     BufferedSubscription<T> nextRetry; // used only by publisher
817 dl 1.1
818     // ctl values
819 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
820 dl 1.29 static final int CONSUME = 0x02; // keep-alive for consumer task
821 dl 1.27 static final int DISABLED = 0x04; // final state
822     static final int ERROR = 0x08; // signal onError then disable
823     static final int SUBSCRIBE = 0x10; // signal onSubscribe
824     static final int COMPLETE = 0x20; // signal onComplete when done
825 dl 1.1
826 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
827 dl 1.16
828 jsr166 1.22 /**
829 dl 1.21 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
830     */
831     static final int MINCAP = 8;
832    
833 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
834 dl 1.21 Executor executor, int maxBufferCapacity) {
835 dl 1.1 this.subscriber = subscriber;
836     this.executor = executor;
837     this.maxCapacity = maxBufferCapacity;
838     }
839    
840 dl 1.19 final boolean isDisabled() {
841 dl 1.20 return ctl == DISABLED;
842     }
843    
844 dl 1.21 /**
845     * Returns estimated number of buffered items, or -1 if
846     * disabled
847     */
848     final int estimateLag() {
849 dl 1.20 int n;
850 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
851 dl 1.19 }
852    
853 dl 1.1 /**
854 dl 1.16 * Tries to add item and start consumer task if necessary.
855 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
856 dl 1.1 */
857     final int offer(T item) {
858     Object[] a = array;
859 dl 1.27 int t = tail, size = t + 1 - head, stat, cap, i;
860     if (a == null || (cap = a.length) < size || (i = t & (cap - 1)) < 0)
861     stat = growAndAdd(a, item);
862     else {
863 dl 1.16 a[i] = item;
864 dl 1.1 U.putOrderedInt(this, TAIL, t + 1);
865 dl 1.27 stat = size;
866 dl 1.1 }
867 dl 1.29 return (stat > 0 &&
868     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
869 dl 1.27 startOnOffer(stat) : stat;
870     }
871    
872     /**
873 jsr166 1.28 * Tries to create or expand buffer, then adds item if possible.
874 dl 1.27 */
875     private int growAndAdd(Object[] a, T item) {
876     int cap, stat;
877     if ((ctl & (ERROR | DISABLED)) != 0) {
878     cap = 0;
879     stat = -1;
880     }
881     else if (a == null) {
882     cap = 0;
883     stat = 1;
884     }
885     else if ((cap = a.length) >= maxCapacity)
886     stat = 0; // cannot grow
887     else
888     stat = cap + 1;
889     if (stat > 0) {
890     int newCap = (cap > 0 ? cap << 1 :
891     maxCapacity >= MINCAP ? MINCAP :
892     maxCapacity >= 2 ? maxCapacity : 2);
893     if (newCap <= cap)
894     stat = 0;
895     else {
896     Object[] newArray = null;
897 dl 1.16 try {
898 dl 1.27 newArray = new Object[newCap];
899     } catch (Throwable ex) { // try to cope with OOME
900     }
901     if (newArray == null) {
902     if (cap > 0)
903     maxCapacity = cap; // avoid continuous failure
904     stat = 0;
905     }
906     else {
907     array = newArray;
908     int t = tail;
909     int newMask = newCap - 1;
910     if (a != null && cap > 0) {
911     int mask = cap - 1;
912     for (int j = head; j != t; ++j) {
913     Object x; // races with consumer
914     int k = j & mask;
915     if ((x = a[k]) != null &&
916     U.compareAndSwapObject(
917     a, (((long)k) << ASHIFT) + ABASE,
918     x, null))
919     newArray[j & newMask] = x;
920     }
921     }
922     newArray[t & newMask] = item;
923     tail = t + 1;
924 dl 1.16 }
925     }
926     }
927 dl 1.21 return stat;
928 dl 1.1 }
929    
930     /**
931 dl 1.27 * Spins/helps/blocks while offer returns 0. Called only if
932 dl 1.29 * initial offer return 0.
933 dl 1.20 */
934     final int submit(T item) {
935 dl 1.29 int stat = 0;
936 dl 1.27 Executor e = executor;
937 dl 1.29 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
938     Thread thread = Thread.currentThread();
939     // split cases to help compiler specialization
940 dl 1.20 if ((thread instanceof ForkJoinWorkerThread) &&
941 dl 1.25 ((ForkJoinWorkerThread)thread).getPool() == e) {
942     ForkJoinTask<?> t;
943     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
944     (t instanceof ConsumerTask)) {
945     if ((stat = offer(item)) != 0 || !t.tryUnfork())
946 dl 1.20 break;
947 dl 1.29 ((ConsumerTask<?>)t).consumer.helpConsume();
948 dl 1.20 }
949     }
950 dl 1.25 else if (e == ForkJoinPool.commonPool()) {
951     ForkJoinTask<?> t;
952     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
953     (t instanceof ConsumerTask)) {
954     if ((stat = offer(item)) != 0 || !t.tryUnfork())
955     break;
956 dl 1.29 ((ConsumerTask<?>)t).consumer.helpConsume();
957 dl 1.20 }
958     }
959     }
960 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
961     putItem = item;
962     timeout = 0L;
963     try {
964     ForkJoinPool.managedBlock(this);
965     } catch (InterruptedException ie) {
966     timeout = INTERRUPTED;
967     }
968     stat = putStat;
969     if (timeout < 0L)
970     Thread.currentThread().interrupt();
971     }
972 dl 1.20 return stat;
973     }
974    
975     /**
976 dl 1.27 * Timeout version; similar to submit
977 dl 1.20 */
978 dl 1.27 final int timedOffer(T item, long nanos) {
979 dl 1.29 int stat = 0;
980 dl 1.27 Executor e = executor;
981 dl 1.29 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
982     Thread thread = Thread.currentThread();
983     if (((thread instanceof ForkJoinWorkerThread) &&
984     ((ForkJoinWorkerThread)thread).getPool() == e) ||
985     e == ForkJoinPool.commonPool()) {
986     ForkJoinTask<?> t;
987     long deadline = System.nanoTime() + nanos;
988     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
989     (t instanceof ConsumerTask)) {
990     if ((stat = offer(item)) != 0 ||
991     (nanos = deadline - System.nanoTime()) <= 0L ||
992     !t.tryUnfork())
993     break;
994     ((ConsumerTask<?>)t).consumer.helpConsume();
995     }
996 dl 1.20 }
997 dl 1.25 }
998     if (stat == 0 && (stat = offer(item)) == 0 &&
999     (timeout = nanos) > 0L) {
1000     putItem = item;
1001     try {
1002     ForkJoinPool.managedBlock(this);
1003     } catch (InterruptedException ie) {
1004     timeout = INTERRUPTED;
1005     }
1006     stat = putStat;
1007     if (timeout < 0L)
1008     Thread.currentThread().interrupt();
1009 dl 1.20 }
1010     return stat;
1011     }
1012    
1013 dl 1.29 /** Version of consume called when helping in submit or timedOffer */
1014     private void helpConsume() {
1015     helpDepth = 1; // only one level allowed
1016     consume();
1017     helpDepth = 0;
1018     }
1019    
1020 dl 1.20 /**
1021 dl 1.27 * Tries to start consumer task after offer.
1022     * @return -1 if now disabled, else argument
1023     */
1024     private int startOnOffer(int stat) {
1025     for (;;) {
1026     Executor e; int c;
1027     if ((c = ctl) == DISABLED || (e = executor) == null) {
1028     stat = -1;
1029     break;
1030     }
1031     else if ((c & ACTIVE) != 0) { // ensure keep-alive
1032 dl 1.29 if ((c & CONSUME) != 0 ||
1033 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1034 dl 1.29 c | CONSUME))
1035 dl 1.27 break;
1036     }
1037     else if (demand == 0L || tail == head)
1038     break;
1039     else if (U.compareAndSwapInt(this, CTL, c,
1040 dl 1.29 c | (ACTIVE | CONSUME))) {
1041 dl 1.27 try {
1042     e.execute(new ConsumerTask<T>(this));
1043     break;
1044     } catch (RuntimeException | Error ex) { // back out
1045     do {} while ((c = ctl) >= 0 &&
1046     (c & ACTIVE) != 0 &&
1047     !U.compareAndSwapInt(this, CTL, c,
1048     c & ~ACTIVE));
1049     throw ex;
1050     }
1051     }
1052     }
1053     return stat;
1054     }
1055    
1056     /**
1057 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
1058 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
1059     * upon error by nulling required components.
1060 dl 1.1 */
1061 dl 1.27 private void detach() {
1062     Thread w = waiter;
1063     array = null;
1064     executor = null;
1065     subscriber = null;
1066 dl 1.16 pendingError = null;
1067 dl 1.27 waiter = null;
1068     if (w != null)
1069 dl 1.16 LockSupport.unpark(w); // force wakeup
1070 dl 1.1 }
1071    
1072     /**
1073 dl 1.19 * Issues error signal, asynchronously if a task is running,
1074 jsr166 1.26 * else synchronously.
1075 dl 1.19 */
1076     final void onError(Throwable ex) {
1077     for (int c;;) {
1078 dl 1.29 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1079 dl 1.19 break;
1080     else if ((c & ACTIVE) != 0) {
1081     pendingError = ex;
1082     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1083     break; // cause consumer task to exit
1084     }
1085     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1086     Flow.Subscriber<? super T> s = subscriber;
1087     if (s != null && ex != null) {
1088     try {
1089     s.onError(ex);
1090     } catch (Throwable ignore) {
1091     }
1092     }
1093     detach();
1094     break;
1095     }
1096     }
1097     }
1098    
1099     /**
1100 dl 1.1 * Tries to start consumer task upon a signal or request;
1101 jsr166 1.12 * disables on failure.
1102 dl 1.1 */
1103 dl 1.27 private void startOrDisable() {
1104     Executor e;
1105     if ((e = executor) != null) { // skip if already disabled
1106 dl 1.1 try {
1107 dl 1.25 e.execute(new ConsumerTask<T>(this));
1108 dl 1.27 } catch (Throwable ex) { // back out and force signal
1109 dl 1.1 for (int c;;) {
1110 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1111 dl 1.1 break;
1112     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1113 dl 1.19 onError(ex);
1114 dl 1.1 break;
1115     }
1116     }
1117     }
1118     }
1119     }
1120    
1121 dl 1.19 final void onComplete() {
1122 dl 1.1 for (int c;;) {
1123 dl 1.20 if ((c = ctl) == DISABLED)
1124 dl 1.1 break;
1125 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1126 dl 1.29 c | (ACTIVE | CONSUME | COMPLETE))) {
1127 dl 1.16 if ((c & ACTIVE) == 0)
1128     startOrDisable();
1129 dl 1.1 break;
1130     }
1131     }
1132     }
1133    
1134 dl 1.19 final void onSubscribe() {
1135 dl 1.1 for (int c;;) {
1136 dl 1.20 if ((c = ctl) == DISABLED)
1137 dl 1.1 break;
1138 dl 1.27 if (U.compareAndSwapInt(this, CTL, c,
1139 dl 1.29 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1140 dl 1.19 if ((c & ACTIVE) == 0)
1141     startOrDisable();
1142 dl 1.1 break;
1143     }
1144     }
1145     }
1146    
1147 dl 1.16 /**
1148     * Causes consumer task to exit if active (without reporting
1149     * onError unless there is already a pending error), and
1150     * disables.
1151     */
1152     public void cancel() {
1153 dl 1.1 for (int c;;) {
1154 dl 1.20 if ((c = ctl) == DISABLED)
1155 dl 1.1 break;
1156 dl 1.16 else if ((c & ACTIVE) != 0) {
1157 dl 1.29 if (U.compareAndSwapInt(this, CTL, c,
1158     c | (CONSUME | ERROR)))
1159 dl 1.16 break;
1160     }
1161     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1162     detach();
1163 dl 1.1 break;
1164     }
1165     }
1166     }
1167    
1168 dl 1.16 /**
1169     * Adds to demand and possibly starts task.
1170     */
1171     public void request(long n) {
1172     if (n > 0L) {
1173     for (;;) {
1174     long prev = demand, d;
1175     if ((d = prev + n) < prev) // saturate
1176     d = Long.MAX_VALUE;
1177     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1178 dl 1.27 while (d != 0L) {
1179     int c, h;
1180     if ((c = ctl) == DISABLED)
1181 dl 1.16 break;
1182 dl 1.27 else if ((c & ACTIVE) != 0) {
1183 dl 1.29 if ((c & CONSUME) != 0 ||
1184 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1185 dl 1.29 c | CONSUME))
1186 dl 1.27 break;
1187     }
1188     else if ((h = head) != tail) {
1189 dl 1.16 if (U.compareAndSwapInt(this, CTL, c,
1190 dl 1.29 c | (ACTIVE|CONSUME))) {
1191 dl 1.16 startOrDisable();
1192     break;
1193     }
1194     }
1195     else if (head == h && tail == h)
1196 dl 1.27 break; // else stale
1197     d = demand;
1198 dl 1.16 }
1199     break;
1200     }
1201     }
1202     }
1203     else if (n < 0L)
1204 dl 1.19 onError(new IllegalArgumentException(
1205     "negative subscription request"));
1206 dl 1.16 }
1207    
1208 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1209 dl 1.1 T item = putItem;
1210 dl 1.25 if (item != null) { // A few randomized spins
1211     for (int spins = MINCAP, r = 0;;) {
1212     if ((putStat = offer(item)) != 0) {
1213     putItem = null;
1214     break;
1215     }
1216     else if (r == 0)
1217     r = ThreadLocalRandom.nextSecondarySeed();
1218     else {
1219     r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1220     if (r >= 0 && --spins <= 0)
1221     return false;
1222     }
1223     }
1224 dl 1.1 }
1225     return true;
1226     }
1227    
1228 dl 1.20 public final boolean block() { // for ManagedBlocker
1229 dl 1.1 T item = putItem;
1230     if (item != null) {
1231     putItem = null;
1232     long nanos = timeout;
1233     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1234     while ((putStat = offer(item)) == 0) {
1235     if (Thread.interrupted()) {
1236 dl 1.16 timeout = INTERRUPTED;
1237 dl 1.1 if (nanos > 0L)
1238     break;
1239     }
1240     else if (nanos > 0L &&
1241     (nanos = deadline - System.nanoTime()) <= 0L)
1242     break;
1243     else if (waiter == null)
1244     waiter = Thread.currentThread();
1245     else {
1246     if (nanos > 0L)
1247     LockSupport.parkNanos(this, nanos);
1248     else
1249     LockSupport.park(this);
1250     waiter = null;
1251     }
1252     }
1253     }
1254     waiter = null;
1255     return true;
1256     }
1257    
1258 dl 1.29 /**
1259     * Consumer loop, called from ConsumerTask, or indirectly
1260     * via helpConsume when helping during submit.
1261     */
1262 dl 1.25 final void consume() {
1263 dl 1.1 Flow.Subscriber<? super T> s;
1264 dl 1.27 if ((s = subscriber) != null) { // else disabled
1265     for (int h = head;;) {
1266     long d = demand;
1267     int c, n, i; Object[] a; Object x; Thread w;
1268     if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1269 dl 1.20 if ((c & ERROR) != 0) {
1270     Throwable ex = pendingError;
1271 dl 1.27 ctl = DISABLED; // no need for CAS
1272     if (ex != null) { // null if errorless cancel
1273 dl 1.20 try {
1274     s.onError(ex);
1275     } catch (Throwable ignore) {
1276     }
1277 dl 1.1 }
1278     }
1279 dl 1.20 else if ((c & SUBSCRIBE) != 0) {
1280     if (U.compareAndSwapInt(this, CTL, c,
1281     c & ~SUBSCRIBE)) {
1282     try {
1283     s.onSubscribe(this);
1284     } catch (Throwable ex) {
1285 dl 1.29 onError(ex);
1286 dl 1.20 }
1287 dl 1.19 }
1288     }
1289 dl 1.20 else {
1290     detach();
1291     break;
1292     }
1293 dl 1.19 }
1294 dl 1.27 else if (h == tail) { // apparently empty
1295 dl 1.29 if ((c & CONSUME) != 0) // recheck
1296     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1297 dl 1.27 else if ((c & COMPLETE) != 0) {
1298 dl 1.29 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1299     try {
1300     s.onComplete();
1301     } catch (Throwable ignore) {
1302     }
1303 dl 1.1 }
1304 dl 1.27 }
1305     else if (U.compareAndSwapInt(this, CTL, c,
1306     c & ~ACTIVE))
1307     break;
1308     }
1309     else if (d == 0L) { // can't consume
1310     if (demand == 0L) { // recheck
1311 dl 1.29 if ((c & CONSUME) != 0)
1312 dl 1.27 U.compareAndSwapInt(this, CTL, c,
1313 dl 1.29 c & ~CONSUME);
1314 dl 1.27 else if (U.compareAndSwapInt(this, CTL, c,
1315     c & ~ACTIVE))
1316 dl 1.1 break;
1317     }
1318     }
1319 dl 1.27 else if ((a = array) == null || (n = a.length) == 0 ||
1320 dl 1.16 (x = a[i = h & (n - 1)]) == null)
1321 dl 1.27 ; // stale; retry
1322 dl 1.29 else if ((c & CONSUME) == 0)
1323     U.compareAndSwapInt(this, CTL, c, c | CONSUME);
1324 dl 1.16 else if (U.compareAndSwapObject(
1325 dl 1.1 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1326 dl 1.27 U.putOrderedInt(this, HEAD, ++h);
1327 dl 1.1 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1328 dl 1.27 d = demand; // almost never fails
1329 dl 1.1 if ((w = waiter) != null) {
1330     waiter = null;
1331 dl 1.27 LockSupport.unpark(w); // release producer
1332 dl 1.1 }
1333     try {
1334     @SuppressWarnings("unchecked") T y = (T) x;
1335     s.onNext(y);
1336 dl 1.29 } catch (Throwable ex) {
1337     onError(ex);
1338 dl 1.1 }
1339     }
1340     }
1341     }
1342     }
1343    
1344 jsr166 1.9 // Unsafe mechanics
1345     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1346 dl 1.1 private static final long CTL;
1347     private static final long TAIL;
1348     private static final long HEAD;
1349     private static final long DEMAND;
1350     private static final int ABASE;
1351     private static final int ASHIFT;
1352    
1353     static {
1354     try {
1355     CTL = U.objectFieldOffset
1356 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1357 dl 1.1 TAIL = U.objectFieldOffset
1358 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1359 dl 1.1 HEAD = U.objectFieldOffset
1360 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1361 dl 1.1 DEMAND = U.objectFieldOffset
1362 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1363    
1364     ABASE = U.arrayBaseOffset(Object[].class);
1365     int scale = U.arrayIndexScale(Object[].class);
1366 dl 1.1 if ((scale & (scale - 1)) != 0)
1367     throw new Error("data type scale not a power of two");
1368     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1369 jsr166 1.9 } catch (ReflectiveOperationException e) {
1370 dl 1.1 throw new Error(e);
1371     }
1372     }
1373     }
1374     }