ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.25
Committed: Fri Jan 23 13:48:41 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.24: +125 -86 lines
Log Message:
Expand spin/help/block cases; add defaults

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14 dl 1.15 import java.util.function.Supplier;
15 dl 1.1
16     /**
17 dl 1.21 * A {@link Flow.Publisher} that asynchronously issues submitted
18     * (non-null) items to current subscribers until it is closed. Each
19     * current subscriber receives newly submitted items in the same order
20     * unless drops or exceptions are encountered. Using a
21     * SubmissionPublisher allows item generators to act as Publishers
22     * relying on drop handling and/or blocking for flow control.
23 dl 1.1 *
24 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 dl 1.1 * constructor for delivery to subscribers. The best choice of
26     * Executor depends on expected usage. If the generator(s) of
27     * submitted items run in separate threads, and the number of
28     * subscribers can be estimated, consider using a {@link
29 dl 1.25 * Executors#newFixedThreadPool}. Otherwise consider using
30     * the default {@link ForkJoinPool#commonPool}.
31 dl 1.1 *
32     * <p>Buffering allows producers and consumers to transiently operate
33     * at different rates. Each subscriber uses an independent buffer.
34 dl 1.21 * Buffers are created upon first use and expanded as needed up to the
35     * given maximum. (the enforced capacity may be rounded up to the
36     * nearest power of two and/or bounded by the largest value supported
37     * by this implementation.) Invocations of {@link
38 dl 1.16 * Flow.Subscription#request} do not directly result in buffer
39 dl 1.24 * expansion, but risk saturation if unfilled requests exceed the
40 dl 1.25 * maximum capacity. The default value of {@link
41     * Flow#defaultBufferSize} may provide a useful starting point for
42     * choosing a capacity based on expected rates, resources, and usages.
43 dl 1.18 *
44 dl 1.1 * <p>Publication methods support different policies about what to do
45     * when buffers are saturated. Method {@link #submit} blocks until
46 dl 1.16 * resources are available. This is simplest, but least
47 dl 1.17 * responsive. The {@code offer} methods may drop items (either
48     * immediately or with bounded timeout), but provide an opportunity to
49 dl 1.16 * interpose a handler and then retry.
50 dl 1.1 *
51     * <p>If any Subscriber method throws an exception, its subscription
52     * is cancelled. If the supplied Executor throws
53     * RejectedExecutionException (or any other RuntimeException or Error)
54     * when attempting to execute a task, or a drop handler throws an
55     * exception when processing a dropped item, then the exception is
56     * rethrown. In these cases, some but not all subscribers may have
57     * received items. It is usually good practice to {@link
58 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
59 dl 1.1 *
60 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
61     * that generate items, and use the methods in this class to publish
62     * them. For example here is a class that periodically publishes the
63     * items generated from a supplier. (In practice you might add methods
64     * to independently start and stop generation, to share schedulers
65 dl 1.16 * among publishers, and so on, or instead use a SubmissionPublisher
66     * as a component rather than a superclass.)
67 dl 1.15 *
68     * <pre> {@code
69     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
70     * final ScheduledFuture<?> periodicTask;
71     * final ScheduledExecutorService scheduler;
72 dl 1.21 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
73     * Supplier<? extends T> supplier,
74 dl 1.15 * long period, TimeUnit unit) {
75 dl 1.21 * super(executor, maxBufferCapacity);
76 dl 1.15 * scheduler = new ScheduledThreadPoolExecutor(1);
77     * periodicTask = scheduler.scheduleAtFixedRate(
78     * () -> submit(supplier.get()), 0, period, unit);
79     * }
80     * public void close() {
81     * periodicTask.cancel(false);
82     * scheduler.shutdown();
83     * super.close();
84     * }
85     * }}</pre>
86     *
87 dl 1.21 * <p>Here is an example of a {@link Flow.Processor} implementation.
88     * It uses single-step requests to its publisher for simplicity of
89     * illustration. A more adaptive version could monitor flow using the
90     * lag estimate returned from {@code submit} and/or other utility
91     * methods.
92 dl 1.16 *
93     * <pre> {@code
94     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
95     * implements Flow.Processor<S,T> {
96     * final Function<? super S, ? extends T> function;
97     * Flow.Subscription subscription;
98 dl 1.21 * TransformProcessor(Executor executor, int maxBufferCapacity,
99 dl 1.16 * Function<? super S, ? extends T> function) {
100 dl 1.21 * super(executor, maxBufferCapacity);
101 dl 1.16 * this.function = function;
102     * }
103     * public void onSubscribe(Flow.Subscription subscription) {
104     * (this.subscription = subscription).request(1);
105     * }
106     * public void onNext(S item) {
107     * subscription.request(1);
108     * submit(function.apply(item));
109     * }
110     * public void onError(Throwable ex) { closeExceptionally(ex); }
111     * public void onComplete() { close(); }
112     * }}</pre>
113     *
114 dl 1.1 * @param <T> the published item type
115     * @author Doug Lea
116     * @since 1.9
117     */
118     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
119     AutoCloseable {
120     /*
121     * Most mechanics are handled by BufferedSubscription. This class
122 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
123     * built-in synchronization locks across public methods. (Using
124     * built-in locks works well in the most typical case in which
125     * only one thread submits items).
126 dl 1.1 */
127    
128 dl 1.25 /** The largest possible power of two array size. */
129     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
130 dl 1.1
131 dl 1.25 /** Round capacity to power of 2, at least 2, at most limit. */
132 dl 1.1 static final int roundCapacity(int cap) { // to nearest power of 2
133     int n = cap - 1;
134     n |= n >>> 1;
135     n |= n >>> 2;
136     n |= n >>> 4;
137     n |= n >>> 8;
138     n |= n >>> 16;
139 dl 1.21 return (n <= 0) ? 2 : // at least 2
140 dl 1.25 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
141 dl 1.1 }
142    
143     /**
144     * Clients (BufferedSubscriptions) are maintained in a linked list
145     * (via their "next" fields). This works well for publish loops.
146     * It requires O(n) traversal to check for duplicate subscribers,
147     * but we expect that subscribing is much less common than
148 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
149 dl 1.25 * when BufferedSubscription methods return negative values
150     * signifying that they have been disabled.
151 dl 1.1 */
152     BufferedSubscription<T> clients;
153    
154 dl 1.20 /** Run status, updated only within locks */
155     volatile boolean closed;
156    
157 dl 1.1 // Parameters for constructing BufferedSubscriptions
158     final Executor executor;
159     final int maxBufferCapacity;
160    
161 dl 1.20 /**
162 dl 1.1 * Creates a new SubmissionPublisher using the given Executor for
163 dl 1.21 * async delivery to subscribers, and with the given maximum
164 dl 1.25 * buffer size for each subscriber.
165 dl 1.1 *
166     * @param executor the executor to use for async delivery,
167     * supporting creation of at least one independent thread
168     * @param maxBufferCapacity the maximum capacity for each
169 dl 1.21 * subscriber's buffer (the enforced capacity may be rounded up to
170     * the nearest power of two and/or bounded by the largest value
171     * supported by this implementation; method {@link #getMaxBufferCapacity}
172     * returns the actual value)
173 dl 1.1 * @throws NullPointerException if executor is null
174 dl 1.21 * @throws IllegalArgumentException if maxBufferCapacity not
175     * positive
176 dl 1.1 */
177 dl 1.21 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
178     if (executor == null)
179     throw new NullPointerException();
180     if (maxBufferCapacity <= 0)
181     throw new IllegalArgumentException("capacity must be positive");
182 dl 1.18 this.executor = executor;
183 dl 1.21 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
184 dl 1.18 }
185    
186 dl 1.1 /**
187 dl 1.25 * Creates a new SubmissionPublisher using the {@link
188     * ForkJoinPool#commonPool()} for async delivery to subscribers,
189     * and with maximum buffer capacity of
190     * {@link Flow#defaultBufferSize}.
191     */
192     public SubmissionPublisher() {
193     this(ForkJoinPool.commonPool(), Flow.defaultBufferSize());
194     }
195    
196     /**
197 dl 1.18 * Adds the given Subscriber unless already subscribed. If
198     * already subscribed, the Subscriber's onError method is invoked
199     * with an IllegalStateException. Otherwise, upon success, the
200     * Subscriber's onSubscribe method is invoked with a new
201 dl 1.20 * Subscription. If onSubscribe throws an exception, the
202     * subscription is cancelled. Otherwise, if this
203     * SubmissionPublisher is closed, the subscriber's onComplete
204     * method is then invoked. Subscribers may enable receiving items
205     * by invoking the {@code request} method of the new Subscription,
206     * and may unsubscribe by invoking its cancel method.
207 dl 1.18 *
208     * @param subscriber the subscriber
209     * @throws NullPointerException if subscriber is null
210     */
211     public void subscribe(Flow.Subscriber<? super T> subscriber) {
212     if (subscriber == null) throw new NullPointerException();
213 dl 1.20 BufferedSubscription<T> subscription =
214 dl 1.21 new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
215 dl 1.20 boolean present = false, complete;
216 dl 1.19 synchronized (this) {
217 dl 1.20 complete = closed;
218 dl 1.19 BufferedSubscription<T> pred = null, next;
219     for (BufferedSubscription<T> b = clients; ; b = next) {
220     if (b == null) {
221     if (pred == null)
222 dl 1.20 clients = subscription;
223 dl 1.19 else
224 dl 1.20 pred.next = subscription;
225     subscription.onSubscribe();
226 dl 1.19 break;
227     }
228     next = b.next;
229     if (b.isDisabled()) { // remove
230     if (pred == null)
231     clients = next;
232     else
233     pred.next = next;
234     }
235 dl 1.20 else if (subscriber.equals(b.subscriber)) {
236 dl 1.19 present = true;
237     break;
238     }
239     pred = b;
240     }
241     }
242     if (present)
243     subscriber.onError(new IllegalStateException("Already subscribed"));
244 dl 1.20 else if (complete)
245     subscription.onComplete();
246 dl 1.18 }
247    
248     /**
249 dl 1.16 * Publishes the given item to each current subscriber by
250     * asynchronously invoking its onNext method, blocking
251     * uninterruptibly while resources for any subscriber are
252 dl 1.21 * unavailable. This method returns an estimate of the maximum lag
253     * (number of items submitted but not yet consumed) among all
254     * current subscribers. This value is at least one (accounting for
255     * this submitted item) if there are any subscribers, else zero.
256 dl 1.16 *
257     * <p>If the Executor for this publisher throws a
258     * RejectedExecutionException (or any other RuntimeException or
259     * Error) when attempting to asynchronously notify subscribers,
260     * then this exception is rethrown.
261     *
262     * @param item the (non-null) item to publish
263 dl 1.21 * @return the estimated maximum lag among subscribers
264 dl 1.16 * @throws IllegalStateException if closed
265     * @throws NullPointerException if item is null
266     * @throws RejectedExecutionException if thrown by Executor
267     */
268 dl 1.21 public int submit(T item) {
269 dl 1.16 if (item == null) throw new NullPointerException();
270 dl 1.21 int lag = 0;
271 dl 1.16 synchronized (this) {
272     if (closed)
273     throw new IllegalStateException("Closed");
274 dl 1.24 /*
275     * To reduce head-of-line blocking, try offer() on each,
276     * place saturated ones in retries list, and later wait
277     * them out.
278     */
279     BufferedSubscription<T> b = clients, retries = null,
280     rtail = null, pred = null, next;
281     for ( ; b != null; b = next) {
282 dl 1.16 int stat;
283     next = b.next;
284 dl 1.24 if ((stat = b.offer(item)) < 0) {
285 dl 1.16 if (pred == null)
286     clients = next;
287     else
288     pred.next = next;
289     }
290 dl 1.21 else {
291 dl 1.24 if (stat == 0) {
292     if (rtail == null)
293     retries = b;
294     else
295     rtail.nextRetry = b;
296     rtail = b;
297     stat = maxBufferCapacity;
298     }
299 dl 1.21 if (stat > lag)
300     lag = stat;
301 dl 1.24 pred = b;
302 dl 1.21 }
303 dl 1.16 }
304 dl 1.24 if (retries != null)
305     retrySubmit(retries, item);
306 dl 1.16 }
307 dl 1.21 return lag;
308 dl 1.16 }
309    
310     /**
311 dl 1.24 * Calls submit on each subscription on retry list.
312     */
313     private void retrySubmit(BufferedSubscription<T> retries, T item) {
314     for (BufferedSubscription<T> r = retries; r != null;) {
315     BufferedSubscription<T> nextRetry = r.nextRetry;
316     r.nextRetry = null;
317     r.submit(item);
318     r = nextRetry;
319     }
320     }
321    
322     /**
323 dl 1.1 * Publishes the given item, if possible, to each current
324     * subscriber by asynchronously invoking its onNext method. The
325     * item may be dropped by one or more subscribers if resource
326     * limits are exceeded, in which case the given handler (if
327     * non-null) is invoked, and if it returns true, retried once.
328 dl 1.16 * Other calls to methods in this class by other threads are
329     * blocked while the handler is invoked. Unless recovery is
330     * assured, options are usually limited to logging the error
331     * and/or issuing an onError signal to the subscriber.
332 dl 1.1 *
333 dl 1.21 * <p>This method returns a status indicator: If negative, it
334     * represents the (negative) number of drops (failed attempts to
335     * issue the item to a subscriber). Otherwise it is an estimate of
336     * the maximum lag (number of items submitted but not yet
337     * consumed) among all current subscribers. This value is at least
338     * one (accounting for this submitted item) if there are any
339     * subscribers, else zero.
340 jsr166 1.23 *
341 dl 1.1 * <p>If the Executor for this publisher throws a
342     * RejectedExecutionException (or any other RuntimeException or
343     * Error) when attempting to asynchronously notify subscribers, or
344     * the drop handler throws an exception when processing a dropped
345     * item, then this exception is rethrown.
346     *
347     * @param item the (non-null) item to publish
348     * @param onDrop if non-null, the handler invoked upon a drop to a
349     * subscriber, with arguments of the subscriber and item; if it
350     * returns true, an offer is re-attempted (once)
351 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
352     * an estimate of maximum lag
353 dl 1.1 * @throws IllegalStateException if closed
354     * @throws NullPointerException if item is null
355     * @throws RejectedExecutionException if thrown by Executor
356     */
357     public int offer(T item,
358     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
359     if (item == null) throw new NullPointerException();
360 dl 1.21 int ret = 0;
361 jsr166 1.2 synchronized (this) {
362 dl 1.1 if (closed)
363     throw new IllegalStateException("Closed");
364     BufferedSubscription<T> pred = null, next;
365     for (BufferedSubscription<T> b = clients; b != null; b = next) {
366     int stat;
367     next = b.next;
368     if ((stat = b.offer(item)) == 0 &&
369     onDrop != null &&
370     onDrop.test(b.subscriber, item))
371     stat = b.offer(item);
372     if (stat < 0) {
373     if (pred == null)
374     clients = next;
375     else
376     pred.next = next;
377     }
378     else {
379     pred = b;
380     if (stat == 0)
381 dl 1.21 ret = (ret >= 0) ? -1 : ret - 1;
382     else if (ret >= 0 && stat > ret)
383     ret = stat;
384 dl 1.1 }
385     }
386 dl 1.21 return ret;
387 dl 1.1 }
388     }
389    
390     /**
391     * Publishes the given item, if possible, to each current
392     * subscriber by asynchronously invoking its onNext method,
393     * blocking while resources for any subscription are unavailable,
394     * up to the specified timeout or the caller thread is
395 dl 1.16 * interrupted, at which point the given handler (if non-null) is
396     * invoked, and if it returns true, retried once. (The drop
397     * handler may distinguish timeouts from interrupts by checking
398     * whether the current thread is interrupted.) Other calls to
399     * methods in this class by other threads are blocked while the
400     * handler is invoked. Unless recovery is assured, options are
401     * usually limited to logging the error and/or issuing an onError
402     * signal to the subscriber.
403 dl 1.1 *
404 dl 1.21 * <p>This method returns a status indicator: If negative, it
405     * represents the (negative) number of drops (failed attempts to
406     * issue the item to a subscriber). Otherwise it is an estimate of
407     * the maximum lag (number of items submitted but not yet
408     * consumed) among all current subscribers. This value is at least
409     * one (accounting for this submitted item) if there are any
410     * subscribers, else zero.
411     *
412 dl 1.1 * <p>If the Executor for this publisher throws a
413     * RejectedExecutionException (or any other RuntimeException or
414     * Error) when attempting to asynchronously notify subscribers, or
415     * the drop handler throws an exception when processing a dropped
416     * item, then this exception is rethrown.
417     *
418     * @param item the (non-null) item to publish
419     * @param timeout how long to wait for resources for any subscriber
420     * before giving up, in units of {@code unit}
421     * @param unit a {@code TimeUnit} determining how to interpret the
422     * {@code timeout} parameter
423     * @param onDrop if non-null, the handler invoked upon a drop to a
424     * subscriber, with arguments of the subscriber and item; if it
425     * returns true, an offer is re-attempted (once)
426 dl 1.21 * @return if negative, the (negative) number of drops; otherwise
427     * an estimate of maximum lag
428 dl 1.1 * @throws IllegalStateException if closed
429     * @throws NullPointerException if item is null
430     * @throws RejectedExecutionException if thrown by Executor
431     */
432     public int offer(T item, long timeout, TimeUnit unit,
433     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
434     if (item == null) throw new NullPointerException();
435     long nanos = unit.toNanos(timeout);
436 dl 1.21 int ret = 0;
437 jsr166 1.2 synchronized (this) {
438 dl 1.1 if (closed)
439     throw new IllegalStateException("Closed");
440     BufferedSubscription<T> pred = null, next;
441     for (BufferedSubscription<T> b = clients; b != null; b = next) {
442     int stat;
443     next = b.next;
444 dl 1.19 if ((stat = b.offerNanos(item, nanos)) == 0 &&
445 dl 1.16 onDrop != null && onDrop.test(b.subscriber, item))
446 dl 1.1 stat = b.offer(item);
447     if (stat < 0) {
448     if (pred == null)
449     clients = next;
450     else
451     pred.next = next;
452     }
453     else {
454     pred = b;
455     if (stat == 0)
456 dl 1.21 ret = (ret >= 0) ? -1 : ret - 1;
457     else if (ret >= 0 && stat > ret)
458     ret = stat;
459 dl 1.1 }
460     }
461     }
462 dl 1.21 return ret;
463 dl 1.1 }
464    
465     /**
466     * Unless already closed, issues onComplete signals to current
467     * subscribers, and disallows subsequent attempts to publish.
468     */
469     public void close() {
470     if (!closed) {
471     BufferedSubscription<T> b, next;
472 jsr166 1.2 synchronized (this) {
473 dl 1.1 b = clients;
474     clients = null;
475     closed = true;
476     }
477     while (b != null) {
478     next = b.next;
479 dl 1.19 b.onComplete();
480 dl 1.1 b = next;
481     }
482     }
483     }
484    
485     /**
486     * Unless already closed, issues onError signals to current
487     * subscribers with the given error, and disallows subsequent
488     * attempts to publish.
489     *
490 jsr166 1.5 * @param error the onError argument sent to subscribers
491 dl 1.1 * @throws NullPointerException if error is null
492     */
493     public void closeExceptionally(Throwable error) {
494     if (error == null)
495     throw new NullPointerException();
496     if (!closed) {
497     BufferedSubscription<T> b, next;
498 jsr166 1.2 synchronized (this) {
499 dl 1.1 b = clients;
500     clients = null;
501     closed = true;
502     }
503     while (b != null) {
504     next = b.next;
505 dl 1.19 b.onError(error);
506 dl 1.1 b = next;
507     }
508     }
509     }
510    
511     /**
512     * Returns true if this publisher is not accepting submissions.
513     *
514     * @return true if closed
515     */
516     public boolean isClosed() {
517     return closed;
518     }
519    
520     /**
521 jsr166 1.6 * Returns true if this publisher has any subscribers.
522 dl 1.1 *
523     * @return true if this publisher has any subscribers
524     */
525     public boolean hasSubscribers() {
526     boolean nonEmpty = false;
527     if (!closed) {
528 jsr166 1.2 synchronized (this) {
529 dl 1.1 BufferedSubscription<T> pred = null, next;
530     for (BufferedSubscription<T> b = clients; b != null; b = next) {
531     next = b.next;
532 dl 1.19 if (b.isDisabled()) {
533 dl 1.1 if (pred == null)
534     clients = next;
535     else
536     pred.next = next;
537 jsr166 1.2 }
538 dl 1.1 else {
539     nonEmpty = true;
540     break;
541     }
542     }
543     }
544     }
545     return nonEmpty;
546     }
547    
548     /**
549 dl 1.21 * Returns the number of current subscribers.
550 dl 1.1 *
551 dl 1.21 * @return the number of current subscribers
552 dl 1.1 */
553 dl 1.21 public int getNumberOfSubscribers() {
554     int count = 0;
555     if (!closed) {
556     synchronized (this) {
557     BufferedSubscription<T> pred = null, next;
558     for (BufferedSubscription<T> b = clients; b != null; b = next) {
559     next = b.next;
560     if (b.isDisabled()) {
561     if (pred == null)
562     clients = next;
563     else
564     pred.next = next;
565     }
566     else {
567     pred = b;
568     ++count;
569     }
570     }
571     }
572     }
573     return count;
574 dl 1.1 }
575    
576 jsr166 1.7 /**
577 dl 1.21 * Returns the Executor used for asynchronous delivery.
578 dl 1.1 *
579 dl 1.21 * @return the Executor used for asynchronous delivery
580 dl 1.1 */
581 dl 1.21 public Executor getExecutor() {
582     return executor;
583 dl 1.1 }
584    
585 jsr166 1.7 /**
586 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
587 dl 1.1 *
588 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
589 dl 1.1 */
590     public int getMaxBufferCapacity() {
591     return maxBufferCapacity;
592     }
593    
594     /**
595     * Returns a list of current subscribers.
596     *
597     * @return list of current subscribers
598     */
599     public List<Flow.Subscriber<? super T>> getSubscribers() {
600     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
601 jsr166 1.2 synchronized (this) {
602 dl 1.1 BufferedSubscription<T> pred = null, next;
603     for (BufferedSubscription<T> b = clients; b != null; b = next) {
604     next = b.next;
605 dl 1.19 if (b.isDisabled()) {
606 dl 1.1 if (pred == null)
607     clients = next;
608     else
609     pred.next = next;
610 jsr166 1.2 }
611 dl 1.1 else
612     subs.add(b.subscriber);
613     }
614     }
615     return subs;
616     }
617 jsr166 1.2
618 dl 1.1 /**
619     * Returns true if the given Subscriber is currently subscribed.
620     *
621     * @param subscriber the subscriber
622     * @return true if currently subscribed
623 dl 1.21 * @throws NullPointerException if subscriber is null
624 dl 1.1 */
625     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
626 dl 1.21 if (subscriber == null) throw new NullPointerException();
627 jsr166 1.2 if (!closed) {
628     synchronized (this) {
629 dl 1.1 BufferedSubscription<T> pred = null, next;
630     for (BufferedSubscription<T> b = clients; b != null; b = next) {
631     next = b.next;
632 dl 1.19 if (b.isDisabled()) {
633 dl 1.1 if (pred == null)
634     clients = next;
635     else
636     pred.next = next;
637 jsr166 1.2 }
638 dl 1.21 else if (subscriber.equals(b.subscriber))
639 dl 1.1 return true;
640 dl 1.21 else
641     pred = b;
642 dl 1.1 }
643     }
644     }
645     return false;
646     }
647    
648     /**
649 dl 1.21 * Returns an estimate of the minimum number of items requested
650     * (via {@link Flow.Subscription#request}) but not yet produced,
651     * among all current subscribers.
652     *
653     * @return the estimate, or zero if no subscribers
654     */
655     public long estimateMinimumDemand() {
656     long min = Long.MAX_VALUE;
657     boolean nonEmpty = false;
658     synchronized (this) {
659     BufferedSubscription<T> pred = null, next;
660     for (BufferedSubscription<T> b = clients; b != null; b = next) {
661     int n; long d;
662     next = b.next;
663     if ((n = b.estimateLag()) < 0) {
664     if (pred == null)
665     clients = next;
666     else
667     pred.next = next;
668     }
669     else {
670     if ((d = b.demand - n) < min)
671     min = d;
672     nonEmpty = true;
673     }
674     }
675     }
676 jsr166 1.22 return nonEmpty ? min : 0;
677 dl 1.21 }
678    
679     /**
680     * Returns an estimate of the maximum number of items produced but
681     * not yet consumed among all current subscribers.
682 dl 1.17 *
683     * @return the estimate
684 dl 1.1 */
685 dl 1.21 public int estimateMaximumLag() {
686     int max = 0;
687 dl 1.17 synchronized (this) {
688     BufferedSubscription<T> pred = null, next;
689     for (BufferedSubscription<T> b = clients; b != null; b = next) {
690 dl 1.21 int n;
691 dl 1.17 next = b.next;
692 dl 1.21 if ((n = b.estimateLag()) < 0) {
693 dl 1.17 if (pred == null)
694     clients = next;
695     else
696     pred.next = next;
697     }
698 dl 1.21 else if (n > max)
699     max = n;
700 dl 1.17 }
701     }
702 dl 1.21 return max;
703 dl 1.1 }
704    
705 dl 1.25
706     /**
707     * A task for consuming buffer items and signals, created and
708     * executed whenever they become available. A task consumes as
709     * many items/signals as possible before terminating, at which
710     * point another task is created when needed. The dual Runnable
711     * and ForkJoinTask declaration saves overhead when executed by
712     * ForkJoinPools, without impacting other kinds of Executors.
713     */
714     @SuppressWarnings("serial")
715     static final class ConsumerTask<T> extends ForkJoinTask<Void>
716     implements Runnable {
717     final BufferedSubscription<T> consumer;
718     ConsumerTask(BufferedSubscription<T> consumer) {
719     this.consumer = consumer;
720     }
721     public final Void getRawResult() { return null; }
722     public final void setRawResult(Void v) {}
723     public final boolean exec() { consumer.consume(); return false; }
724     public final void run() { consumer.consume(); }
725     }
726    
727 dl 1.1 /**
728 dl 1.15 * A bounded (ring) buffer with integrated control to start a
729     * consumer task whenever items are available. The buffer
730 dl 1.1 * algorithm is similar to one used inside ForkJoinPool,
731     * specialized for the case of at most one concurrent producer and
732     * consumer, and power of two buffer sizes. This allows methods to
733     * operate without locks even while supporting resizing, blocking,
734     * task-triggering, and garbage-free buffers (nulling out elements
735     * when consumed), although supporting these does impose a bit of
736     * overhead compared to plain fixed-size ring buffers.
737     *
738     * The publisher guarantees a single producer via its lock. We
739     * ensure in this class that there is at most one consumer. The
740     * request and cancel methods must be fully thread-safe but are
741     * coded to exploit the most common case in which they are only
742     * called by consumers (usually within onNext).
743     *
744 dl 1.15 * Execution control is managed using the ACTIVE ctl bit. We
745     * ensure that a task is active when consumable items (and
746 dl 1.19 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
747 dl 1.24 * there is demand (unfilled requests). This is complicated on
748 dl 1.19 * the creation side by the possibility of exceptions when trying
749     * to execute tasks. These eventually force DISABLED state, but
750 dl 1.1 * sometimes not directly. On the task side, termination (clearing
751     * ACTIVE) may race with producers or request() calls, so in some
752     * cases requires a re-check, re-activating if possible.
753     *
754     * The ctl field also manages run state. When DISABLED, no further
755 dl 1.20 * updates are possible. Disabling may be preceded by setting
756 dl 1.1 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
757     * case the associated Subscriber methods are invoked, possibly
758     * synchronously if there is no active consumer task (including
759 dl 1.20 * cases where execute() failed). The cancel() method is supported
760     * by treating as ERROR but suppressing onError signal.
761 dl 1.1 *
762     * Support for blocking also exploits the fact that there is only
763     * one possible waiter. ManagedBlocker-compatible control fields
764     * are placed in this class itself rather than in wait-nodes.
765     * Blocking control relies on the "waiter" field. Producers set
766     * the field before trying to block, but must then recheck (via
767     * offer) before parking. Signalling then just unparks and clears
768 dl 1.20 * waiter field. If the producer and consumer are both in the same
769     * ForkJoinPool, the producer attempts to help run consumer tasks
770     * that it forked before blocking.
771 dl 1.1 *
772     * This class uses @Contended and heuristic field declaration
773     * ordering to reduce memory contention on BufferedSubscription
774     * itself, but it does not currently attempt to avoid memory
775     * contention (especially including card-marks) among buffer
776     * elements, that can significantly slow down some usages.
777     * Addressing this may require allocating substantially more space
778     * than users expect.
779     */
780 dl 1.15 @SuppressWarnings("serial")
781 dl 1.1 @sun.misc.Contended
782 dl 1.25 static final class BufferedSubscription<T>
783     implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
784 dl 1.1 // Order-sensitive field declarations
785 dl 1.20 long timeout; // > 0 if timed wait
786 dl 1.17 volatile long demand; // # unfilled requests
787 dl 1.1 int maxCapacity; // reduced on OOME
788     int putStat; // offer result for ManagedBlocker
789     volatile int ctl; // atomic run state flags
790     volatile int head; // next position to take
791     volatile int tail; // next position to put
792     volatile Object[] array; // buffer: null if disabled
793     Flow.Subscriber<? super T> subscriber; // null if disabled
794     Executor executor; // null if disabled
795     volatile Throwable pendingError; // holds until onError issued
796     volatile Thread waiter; // blocked producer thread
797     T putItem; // for offer within ManagedBlocker
798     BufferedSubscription<T> next; // used only by publisher
799 dl 1.24 BufferedSubscription<T> nextRetry;// used only by publisher
800 dl 1.1
801     // ctl values
802 dl 1.20 static final int ACTIVE = 0x01; // consumer task active
803     static final int DISABLED = 0x02; // final state
804     static final int ERROR = 0x04; // signal onError then disable
805     static final int SUBSCRIBE = 0x08; // signal onSubscribe
806     static final int COMPLETE = 0x10; // signal onComplete when done
807 dl 1.1
808 dl 1.20 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
809 dl 1.16
810 jsr166 1.22 /**
811 dl 1.21 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
812     */
813     static final int MINCAP = 8;
814    
815 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
816 dl 1.21 Executor executor, int maxBufferCapacity) {
817 dl 1.1 this.subscriber = subscriber;
818     this.executor = executor;
819     this.maxCapacity = maxBufferCapacity;
820     }
821    
822 dl 1.19 final boolean isDisabled() {
823 dl 1.20 return ctl == DISABLED;
824     }
825    
826 dl 1.21 /**
827     * Returns estimated number of buffered items, or -1 if
828     * disabled
829     */
830     final int estimateLag() {
831 dl 1.20 int n;
832 jsr166 1.22 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
833 dl 1.19 }
834    
835 dl 1.1 /**
836 dl 1.16 * Tries to add item and start consumer task if necessary.
837 dl 1.21 * @return -1 if disabled, 0 if dropped, else estimated lag
838 dl 1.1 */
839     final int offer(T item) {
840     Object[] a = array;
841 dl 1.21 int t = tail, stat = t + 1 - head, n, i;
842     if (a != null && (n = a.length) >= stat && (i = t & (n - 1)) >= 0) {
843 dl 1.16 a[i] = item;
844 dl 1.1 U.putOrderedInt(this, TAIL, t + 1);
845     }
846 dl 1.16 else if ((stat = growAndAdd(a, item)) <= 0)
847     return stat;
848     for (int c;;) { // possibly start task
849     Executor e;
850     if (((c = ctl) & ACTIVE) != 0)
851     break;
852 dl 1.20 else if (c == DISABLED || (e = executor) == null)
853 dl 1.16 return -1;
854     else if (demand == 0L || tail == head)
855     break;
856     else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
857     try {
858 dl 1.25 e.execute(new ConsumerTask<T>(this));
859 dl 1.16 break;
860     } catch (RuntimeException | Error ex) { // back out
861     do {} while ((c = ctl) >= 0 &&
862     (c & ACTIVE) != 0 &&
863     !U.compareAndSwapInt(this, CTL, c,
864     c & ~ACTIVE));
865     throw ex;
866     }
867     }
868     }
869 dl 1.21 return stat;
870 dl 1.1 }
871    
872     /**
873 dl 1.16 * Tries to create or expand buffer, then adds item if possible
874 dl 1.1 */
875 dl 1.16 final int growAndAdd(Object[] oldArray, T item) {
876     int oldLen, newLen;
877 dl 1.24 if (oldArray == null) {
878 dl 1.1 oldLen = 0;
879 dl 1.24 newLen = (maxCapacity >= MINCAP ? MINCAP :
880     maxCapacity >= 2 ? maxCapacity : 2);
881 dl 1.1 }
882 dl 1.24 else if ((oldLen = oldArray.length) >= maxCapacity ||
883     (newLen = oldLen << 1) <= 0)
884 dl 1.16 return 0; // cannot grow
885 dl 1.24 if (ctl == DISABLED)
886     return -1;
887 dl 1.16 Object[] newArray;
888 dl 1.1 try {
889 dl 1.16 newArray = new Object[newLen];
890     } catch (Throwable ex) { // try to cope with OOME
891 dl 1.24 if (oldLen > 0)
892 dl 1.25 maxCapacity = oldLen; // avoid continuous failure
893 dl 1.1 return 0;
894     }
895 dl 1.16 array = newArray;
896     int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
897     if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
898     for (int j = head; j != t; ++j) { // races with consumer
899     Object x;
900 dl 1.1 int i = j & oldMask;
901 dl 1.16 if ((x = oldArray[i]) != null &&
902     U.compareAndSwapObject(oldArray,
903 dl 1.1 (((long)i) << ASHIFT) + ABASE,
904     x, null))
905 dl 1.16 newArray[j & newMask] = x;
906 dl 1.1 }
907     }
908 dl 1.16 newArray[t & newMask] = item;
909     tail = t + 1;
910 dl 1.21 return oldLen + 1;
911 dl 1.1 }
912    
913     /**
914 dl 1.25 * Spins/helps/blocks while offer returns 0. Tries helping
915     * if either the producer and consumers are in same
916     * ForkJoinPool, or consumers are in commonPool.
917 dl 1.20 */
918     final int submit(T item) {
919 dl 1.25 Executor e;
920     int stat = 0;
921     if ((e = executor) instanceof ForkJoinPool) {
922 dl 1.20 Thread thread = Thread.currentThread();
923     if ((thread instanceof ForkJoinWorkerThread) &&
924 dl 1.25 ((ForkJoinWorkerThread)thread).getPool() == e) {
925     ForkJoinTask<?> t;
926     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
927     (t instanceof ConsumerTask)) {
928     if ((stat = offer(item)) != 0 || !t.tryUnfork())
929 dl 1.20 break;
930 dl 1.25 ((ConsumerTask<?>)t).run();
931 dl 1.20 }
932     }
933 dl 1.25 else if (e == ForkJoinPool.commonPool()) {
934     ForkJoinTask<?> t;
935     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
936     (t instanceof ConsumerTask)) {
937     if ((stat = offer(item)) != 0 || !t.tryUnfork())
938     break;
939     ((ConsumerTask<?>)t).run();
940 dl 1.20 }
941     }
942     }
943 dl 1.25 if (stat == 0 && (stat = offer(item)) == 0) {
944     putItem = item;
945     timeout = 0L;
946     try {
947     ForkJoinPool.managedBlock(this);
948     } catch (InterruptedException ie) {
949     timeout = INTERRUPTED;
950     }
951     stat = putStat;
952     if (timeout < 0L)
953     Thread.currentThread().interrupt();
954     }
955 dl 1.20 return stat;
956     }
957    
958     /**
959     * Timeout version of offer
960     */
961     final int offerNanos(T item, long nanos) {
962 dl 1.25 Executor e; Thread thread;
963 dl 1.20 int stat;
964 dl 1.25 if ((stat = offer(item)) == 0 && nanos > 0L &&
965     ((e = executor) instanceof ForkJoinPool) &&
966     ((((thread = Thread.currentThread()) instanceof
967     ForkJoinWorkerThread) &&
968     ((ForkJoinWorkerThread)thread).getPool() == e) ||
969     e == ForkJoinPool.commonPool())) {
970     long deadline = System.nanoTime() + nanos;
971     ForkJoinTask<?> t;
972     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
973     (t instanceof ConsumerTask)) {
974     if ((stat = offer(item)) != 0 ||
975     (nanos = deadline - System.nanoTime()) <= 0L ||
976     !t.tryUnfork())
977     break;
978     ((ConsumerTask<?>)t).run();
979 dl 1.20 }
980 dl 1.25 }
981     if (stat == 0 && (stat = offer(item)) == 0 &&
982     (timeout = nanos) > 0L) {
983     putItem = item;
984     try {
985     ForkJoinPool.managedBlock(this);
986     } catch (InterruptedException ie) {
987     timeout = INTERRUPTED;
988     }
989     stat = putStat;
990     if (timeout < 0L)
991     Thread.currentThread().interrupt();
992 dl 1.20 }
993     return stat;
994     }
995    
996     /**
997 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
998 dl 1.20 * until publisher unsubscribes, but also to help cleanly stop
999     * upon error by nulling required components.
1000 dl 1.1 */
1001 dl 1.16 final void detach() {
1002     pendingError = null;
1003     subscriber = null;
1004     executor = null;
1005     array = null;
1006     Thread w = waiter;
1007     if (w != null) {
1008     waiter = null;
1009     LockSupport.unpark(w); // force wakeup
1010 dl 1.1 }
1011     }
1012    
1013     /**
1014 dl 1.19 * Issues error signal, asynchronously if a task is running,
1015     * else synchronously
1016     */
1017     final void onError(Throwable ex) {
1018     for (int c;;) {
1019 dl 1.20 if ((c = ctl) == DISABLED)
1020 dl 1.19 break;
1021     else if ((c & ACTIVE) != 0) {
1022     pendingError = ex;
1023     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1024     break; // cause consumer task to exit
1025     }
1026     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1027     Flow.Subscriber<? super T> s = subscriber;
1028     if (s != null && ex != null) {
1029     try {
1030     s.onError(ex);
1031     } catch (Throwable ignore) {
1032     }
1033     }
1034     detach();
1035     break;
1036     }
1037     }
1038     }
1039    
1040     /**
1041 dl 1.1 * Tries to start consumer task upon a signal or request;
1042 jsr166 1.12 * disables on failure.
1043 dl 1.1 */
1044     final void startOrDisable() {
1045     Executor e; // skip if already disabled
1046     if ((e = executor) != null) {
1047     try {
1048 dl 1.25 e.execute(new ConsumerTask<T>(this));
1049 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
1050 dl 1.1 for (int c;;) {
1051 dl 1.20 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1052 dl 1.1 break;
1053     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1054 dl 1.19 onError(ex);
1055 dl 1.1 break;
1056     }
1057     }
1058     }
1059     }
1060     }
1061    
1062 dl 1.19 final void onComplete() {
1063 dl 1.1 for (int c;;) {
1064 dl 1.20 if ((c = ctl) == DISABLED)
1065 dl 1.1 break;
1066 dl 1.16 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
1067     if ((c & ACTIVE) == 0)
1068     startOrDisable();
1069 dl 1.1 break;
1070     }
1071     }
1072     }
1073    
1074 dl 1.19 final void onSubscribe() {
1075 dl 1.1 for (int c;;) {
1076 dl 1.20 if ((c = ctl) == DISABLED)
1077 dl 1.1 break;
1078 dl 1.19 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | SUBSCRIBE))) {
1079     if ((c & ACTIVE) == 0)
1080     startOrDisable();
1081 dl 1.1 break;
1082     }
1083     }
1084     }
1085    
1086 dl 1.16 /**
1087     * Causes consumer task to exit if active (without reporting
1088     * onError unless there is already a pending error), and
1089     * disables.
1090     */
1091     public void cancel() {
1092 dl 1.1 for (int c;;) {
1093 dl 1.20 if ((c = ctl) == DISABLED)
1094 dl 1.1 break;
1095 dl 1.16 else if ((c & ACTIVE) != 0) {
1096     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1097     break;
1098     }
1099     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1100     detach();
1101 dl 1.1 break;
1102     }
1103     }
1104     }
1105    
1106 dl 1.16 /**
1107     * Adds to demand and possibly starts task.
1108     */
1109     public void request(long n) {
1110     if (n > 0L) {
1111     for (;;) {
1112     long prev = demand, d;
1113     if ((d = prev + n) < prev) // saturate
1114     d = Long.MAX_VALUE;
1115     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1116     for (int c, h;;) {
1117     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
1118     demand == 0L)
1119     break;
1120     if ((h = head) != tail) {
1121     if (U.compareAndSwapInt(this, CTL, c,
1122     c | ACTIVE)) {
1123     startOrDisable();
1124     break;
1125     }
1126     }
1127     else if (head == h && tail == h)
1128     break;
1129     }
1130     break;
1131     }
1132     }
1133     }
1134     else if (n < 0L)
1135 dl 1.19 onError(new IllegalArgumentException(
1136     "negative subscription request"));
1137 dl 1.16 }
1138    
1139 dl 1.20 public final boolean isReleasable() { // for ManagedBlocker
1140 dl 1.1 T item = putItem;
1141 dl 1.25 if (item != null) { // A few randomized spins
1142     for (int spins = MINCAP, r = 0;;) {
1143     if ((putStat = offer(item)) != 0) {
1144     putItem = null;
1145     break;
1146     }
1147     else if (r == 0)
1148     r = ThreadLocalRandom.nextSecondarySeed();
1149     else {
1150     r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1151     if (r >= 0 && --spins <= 0)
1152     return false;
1153     }
1154     }
1155 dl 1.1 }
1156     return true;
1157     }
1158    
1159 dl 1.20 public final boolean block() { // for ManagedBlocker
1160 dl 1.1 T item = putItem;
1161     if (item != null) {
1162     putItem = null;
1163     long nanos = timeout;
1164     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1165     while ((putStat = offer(item)) == 0) {
1166     if (Thread.interrupted()) {
1167 dl 1.16 timeout = INTERRUPTED;
1168 dl 1.1 if (nanos > 0L)
1169     break;
1170     }
1171     else if (nanos > 0L &&
1172     (nanos = deadline - System.nanoTime()) <= 0L)
1173     break;
1174     else if (waiter == null)
1175     waiter = Thread.currentThread();
1176     else {
1177     if (nanos > 0L)
1178     LockSupport.parkNanos(this, nanos);
1179     else
1180     LockSupport.park(this);
1181     waiter = null;
1182     }
1183     }
1184     }
1185     waiter = null;
1186     return true;
1187     }
1188    
1189 dl 1.25 /** Consumer loop called only from ConsumerTask */
1190     final void consume() {
1191 dl 1.1 Flow.Subscriber<? super T> s;
1192 dl 1.25 if ((s = subscriber) != null) {
1193 dl 1.1 for (;;) {
1194     long d = demand; // read volatile fields in acceptable order
1195     int c = ctl;
1196     int h = head;
1197     int t = tail;
1198     Object[] a = array;
1199     int i, n; Object x; Thread w;
1200 dl 1.20 if ((c & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1201     if ((c & ERROR) != 0) {
1202     Throwable ex = pendingError;
1203     ctl = DISABLED; // no need for CAS
1204     if (ex != null) {
1205     try {
1206     s.onError(ex);
1207     } catch (Throwable ignore) {
1208     }
1209 dl 1.1 }
1210     }
1211 dl 1.20 else if ((c & SUBSCRIBE) != 0) {
1212     if (U.compareAndSwapInt(this, CTL, c,
1213     c & ~SUBSCRIBE)) {
1214     try {
1215     s.onSubscribe(this);
1216     } catch (Throwable ex) {
1217 dl 1.21 ctl = DISABLED; // disable on throw
1218 dl 1.20 }
1219 dl 1.19 }
1220     }
1221 dl 1.20 else {
1222     detach();
1223     break;
1224     }
1225 dl 1.19 }
1226 dl 1.1 else if (h == t) { // empty
1227     if (h == tail &&
1228     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
1229     if (h != tail || c != (c = ctl)) { // recheck
1230     if ((c & (ACTIVE | DISABLED)) != 0 ||
1231     !U.compareAndSwapInt(this, CTL, c,
1232     c | ACTIVE))
1233     break;
1234     }
1235     else if ((c & COMPLETE) != 0) {
1236     ctl = DISABLED;
1237     try {
1238     s.onComplete();
1239 jsr166 1.2 } catch (Throwable ignore) {
1240 dl 1.1 }
1241     }
1242     else
1243     break;
1244     }
1245     }
1246 dl 1.16 else if (a == null || (n = a.length) == 0 ||
1247     (x = a[i = h & (n - 1)]) == null)
1248     ; // stale; retry
1249 dl 1.1 else if (d == 0L) { // can't take
1250     if (demand == 0L &&
1251     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1252     ((demand == 0L && c == (c = ctl)) || // recheck
1253     (c & (ACTIVE | DISABLED)) != 0 ||
1254     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1255     break;
1256     }
1257 dl 1.16 else if (U.compareAndSwapObject(
1258 dl 1.1 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1259     U.putOrderedInt(this, HEAD, h + 1);
1260     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1261     d = demand; // almost never fails
1262     if ((w = waiter) != null) {
1263     waiter = null;
1264     LockSupport.unpark(w); // release producer
1265     }
1266     try {
1267     @SuppressWarnings("unchecked") T y = (T) x;
1268     s.onNext(y);
1269 dl 1.16 } catch (Throwable ex) { // disable on throw
1270 dl 1.1 ctl = DISABLED;
1271     }
1272     }
1273     }
1274     }
1275     }
1276    
1277 jsr166 1.9 // Unsafe mechanics
1278     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1279 dl 1.1 private static final long CTL;
1280     private static final long TAIL;
1281     private static final long HEAD;
1282     private static final long DEMAND;
1283     private static final int ABASE;
1284     private static final int ASHIFT;
1285    
1286     static {
1287     try {
1288     CTL = U.objectFieldOffset
1289 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1290 dl 1.1 TAIL = U.objectFieldOffset
1291 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1292 dl 1.1 HEAD = U.objectFieldOffset
1293 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1294 dl 1.1 DEMAND = U.objectFieldOffset
1295 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1296    
1297     ABASE = U.arrayBaseOffset(Object[].class);
1298     int scale = U.arrayIndexScale(Object[].class);
1299 dl 1.1 if ((scale & (scale - 1)) != 0)
1300     throw new Error("data type scale not a power of two");
1301     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1302 jsr166 1.9 } catch (ReflectiveOperationException e) {
1303 dl 1.1 throw new Error(e);
1304     }
1305     }
1306     }
1307     }