ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.14
Committed: Fri Jan 16 17:36:37 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +4 -4 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14    
15     /**
16     * A {@link Flow.Publisher} that asynchronously issues submitted items
17     * to current subscribers until it is closed. Each current subscriber
18     * receives newly submitted items in the same order unless drops or
19     * exceptions are encountered. Using a SubmissionPublisher allows
20     * item generators to act as Publishers, although without integrated
21     * flow control. Instead they rely on drop handling and/or blocking.
22     * This class may also serve as a base for subclasses that generate
23     * items, and use the methods in this class to publish them.
24     *
25     * <p>A SubmissionPublisher uses the Executor supplied in its
26     * constructor for delivery to subscribers. The best choice of
27     * Executor depends on expected usage. If the generator(s) of
28     * submitted items run in separate threads, and the number of
29     * subscribers can be estimated, consider using a {@link
30     * Executors#newFixedThreadPool}. Otherwise consider using a
31     * work-stealing pool (including {@link ForkJoinPool#commonPool}).
32     *
33     * <p>Buffering allows producers and consumers to transiently operate
34     * at different rates. Each subscriber uses an independent buffer.
35     * Buffers are created upon first use with a given initial capacity,
36     * and are resized as needed up to the maximum. (Capacity arguments
37     * may be rounded up to powers of two.) Invocations of {@code
38     * subscription.request} do not directly result in buffer expansion,
39     * but risk saturation if unfulfilled requests exceed the maximum
40     * capacity. Choices of buffer parameters rely on expected rates,
41     * resources, and usages, that usually benefit from empirical testing.
42     * As first guesses, consider initial 8 and maximum 1024.
43     *
44     * <p>Publication methods support different policies about what to do
45     * when buffers are saturated. Method {@link #submit} blocks until
46     * resources are available. This is simplest (and often appropriate
47 jsr166 1.11 * for relay stages; see {@link #newTransformProcessor newTransformProcessor}),
48     * but least responsive. The {@code offer} methods may either
49     * immediately, or with bounded timeout, drop items, but provide an
50     * opportunity to interpose a handler and then retry. While the
51     * handler is invoked, other calls to methods in this class by other
52     * threads are blocked. Unless recovery is assured, options are
53     * usually limited to logging the error and/or issuing an onError
54     * signal to the subscriber.
55 dl 1.1 *
56     * <p>If any Subscriber method throws an exception, its subscription
57     * is cancelled. If the supplied Executor throws
58     * RejectedExecutionException (or any other RuntimeException or Error)
59     * when attempting to execute a task, or a drop handler throws an
60     * exception when processing a dropped item, then the exception is
61     * rethrown. In these cases, some but not all subscribers may have
62     * received items. It is usually good practice to {@link
63 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
64 dl 1.1 *
65     * @param <T> the published item type
66     * @author Doug Lea
67     * @since 1.9
68     */
69     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
70     AutoCloseable {
71     /*
72     * Most mechanics are handled by BufferedSubscription. This class
73     * mainly ensures sequentiality by using built-in synchronization
74     * locks across public methods. (Using built-in locks works well
75     * in the most typical case in which only one thread submits
76     * items).
77     */
78    
79     // Ensuring that all arrays have power of two length
80    
81     static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
82     static final int roundCapacity(int cap) { // to nearest power of 2
83     int n = cap - 1;
84     n |= n >>> 1;
85     n |= n >>> 2;
86     n |= n >>> 4;
87     n |= n >>> 8;
88     n |= n >>> 16;
89     return (n < 0) ? 1 :
90     (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
91     }
92    
93     /**
94     * Clients (BufferedSubscriptions) are maintained in a linked list
95     * (via their "next" fields). This works well for publish loops.
96     * It requires O(n) traversal to check for duplicate subscribers,
97     * but we expect that subscribing is much less common than
98 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
99     * when BufferedSubscription methods or status checks return
100     * negative values signifying that they have been disabled.
101 dl 1.1 */
102     BufferedSubscription<T> clients;
103    
104     // Parameters for constructing BufferedSubscriptions
105     final Executor executor;
106     final int minBufferCapacity;
107     final int maxBufferCapacity;
108    
109     /** Run status, updated only within locks */
110     volatile boolean closed;
111    
112     /**
113     * Creates a new SubmissionPublisher using the given Executor for
114     * async delivery to subscribers, and with the given initial and
115     * maximum buffer sizes for each subscriber. In the absence of
116     * other constraints, consider using {@code
117 jsr166 1.5 * ForkJoinPool.commonPool(), 8, 1024}.
118 dl 1.1 *
119     * @param executor the executor to use for async delivery,
120     * supporting creation of at least one independent thread
121     * @param initialBufferCapacity the initial capacity for each
122     * subscriber's buffer (the actual capacity may be rounded up to
123     * the nearest power of two)
124     * @param maxBufferCapacity the maximum capacity for each
125     * subscriber's buffer (the actual capacity may be rounded up to
126     * the nearest power of two)
127     * @throws NullPointerException if executor is null
128     * @throws IllegalArgumentException if initialBufferCapacity is
129     * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
130     * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
131     * a power of two array size
132     */
133     public SubmissionPublisher(Executor executor,
134     int initialBufferCapacity,
135     int maxBufferCapacity) {
136     if (executor == null)
137     throw new NullPointerException();
138     if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
139     throw new IllegalArgumentException("capacity must be positive");
140     if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
141     throw new IllegalArgumentException("capacity exceeds limit");
142     if (initialBufferCapacity > maxBufferCapacity)
143     throw new IllegalArgumentException("initial cannot exceed max capacity");
144     int minc = roundCapacity(initialBufferCapacity);
145     int maxc = roundCapacity(maxBufferCapacity);
146     this.executor = executor;
147     this.minBufferCapacity = minc;
148     this.maxBufferCapacity = maxc;
149     }
150    
151     /**
152     * Adds the given Subscriber unless already subscribed. If
153     * already subscribed, the Subscriber's onError method is invoked
154     * with an IllegalStateException. Otherwise, upon success, the
155     * Subscriber's onSubscribe method is invoked with a new
156     * Subscription (upon exception, the exception is rethrown and the
157     * Subscriber remains unsubscribed). If this SubmissionPublisher
158     * is closed, the subscriber's onComplete method is then invoked.
159 dl 1.13 * Subscribers may enable receiving items by invoking the {@code
160     * request} method of the new Subscription, and may unsubscribe by
161 dl 1.1 * invoking its cancel method.
162     *
163     * @param subscriber the subscriber
164     * @throws NullPointerException if subscriber is null
165     */
166     public void subscribe(Flow.Subscriber<? super T> subscriber) {
167     if (subscriber == null) throw new NullPointerException();
168     BufferedSubscription<T> sub = new BufferedSubscription<T>(
169     subscriber, executor, minBufferCapacity, maxBufferCapacity);
170     boolean present = false, clsd;
171 jsr166 1.2 synchronized (this) {
172 dl 1.1 clsd = closed;
173     BufferedSubscription<T> pred = null, next;
174     for (BufferedSubscription<T> b = clients; b != null; b = next) {
175     next = b.next;
176     if (b.ctl < 0) { // disabled; remove
177     if (pred == null)
178     clients = next;
179     else
180     pred.next = next;
181 jsr166 1.2 }
182 dl 1.1 else if (subscriber == b.subscriber) {
183     present = true;
184     break;
185     }
186     pred = b;
187     }
188     if (!present) {
189     subscriber.onSubscribe(sub); // don't link on exception
190     if (!clsd) {
191     if (pred == null)
192     clients = sub;
193     else
194     pred.next = sub;
195     }
196     }
197     }
198     if (clsd)
199     subscriber.onComplete();
200     else if (present)
201     subscriber.onError(new IllegalStateException("Already subscribed"));
202     }
203    
204     /**
205     * Publishes the given item, if possible, to each current
206     * subscriber by asynchronously invoking its onNext method. The
207     * item may be dropped by one or more subscribers if resource
208     * limits are exceeded, in which case the given handler (if
209     * non-null) is invoked, and if it returns true, retried once.
210     *
211     * <p>If the Executor for this publisher throws a
212     * RejectedExecutionException (or any other RuntimeException or
213     * Error) when attempting to asynchronously notify subscribers, or
214     * the drop handler throws an exception when processing a dropped
215     * item, then this exception is rethrown.
216     *
217     * @param item the (non-null) item to publish
218     * @param onDrop if non-null, the handler invoked upon a drop to a
219     * subscriber, with arguments of the subscriber and item; if it
220     * returns true, an offer is re-attempted (once)
221 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
222     * to a subscriber)
223 dl 1.1 * @throws IllegalStateException if closed
224     * @throws NullPointerException if item is null
225     * @throws RejectedExecutionException if thrown by Executor
226     */
227     public int offer(T item,
228     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
229     if (item == null) throw new NullPointerException();
230     int drops = 0;
231 jsr166 1.2 synchronized (this) {
232 dl 1.1 if (closed)
233     throw new IllegalStateException("Closed");
234     BufferedSubscription<T> pred = null, next;
235     for (BufferedSubscription<T> b = clients; b != null; b = next) {
236     int stat;
237     next = b.next;
238     if ((stat = b.offer(item)) == 0 &&
239     onDrop != null &&
240     onDrop.test(b.subscriber, item))
241     stat = b.offer(item);
242     if (stat < 0) {
243     if (pred == null)
244     clients = next;
245     else
246     pred.next = next;
247     }
248     else {
249     pred = b;
250     if (stat == 0)
251     ++drops;
252     }
253     }
254     return drops;
255     }
256     }
257    
258     /**
259     * Publishes the given item, if possible, to each current
260     * subscriber by asynchronously invoking its onNext method,
261     * blocking while resources for any subscription are unavailable,
262     * up to the specified timeout or the caller thread is
263 jsr166 1.4 * interrupted, at which point the given handler (if non-null)
264 dl 1.1 * is invoked, and if it returns true, retried once.
265     *
266     * <p>If the Executor for this publisher throws a
267     * RejectedExecutionException (or any other RuntimeException or
268     * Error) when attempting to asynchronously notify subscribers, or
269     * the drop handler throws an exception when processing a dropped
270     * item, then this exception is rethrown.
271     *
272     * @param item the (non-null) item to publish
273     * @param timeout how long to wait for resources for any subscriber
274     * before giving up, in units of {@code unit}
275     * @param unit a {@code TimeUnit} determining how to interpret the
276     * {@code timeout} parameter
277     * @param onDrop if non-null, the handler invoked upon a drop to a
278     * subscriber, with arguments of the subscriber and item; if it
279     * returns true, an offer is re-attempted (once)
280 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
281     * to a subscriber)
282 dl 1.1 * @throws IllegalStateException if closed
283     * @throws NullPointerException if item is null
284     * @throws RejectedExecutionException if thrown by Executor
285     */
286     public int offer(T item, long timeout, TimeUnit unit,
287     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
288     if (item == null) throw new NullPointerException();
289     long nanos = unit.toNanos(timeout);
290     int drops = 0;
291 jsr166 1.2 synchronized (this) {
292 dl 1.1 if (closed)
293     throw new IllegalStateException("Closed");
294     BufferedSubscription<T> pred = null, next;
295     for (BufferedSubscription<T> b = clients; b != null; b = next) {
296     int stat;
297     next = b.next;
298     if ((stat = b.offer(item)) == 0 &&
299     (stat = b.timedOffer(item, nanos)) == 0 &&
300     onDrop != null &&
301     onDrop.test(b.subscriber, item))
302     stat = b.offer(item);
303     if (stat < 0) {
304     if (pred == null)
305     clients = next;
306     else
307     pred.next = next;
308     }
309     else {
310     pred = b;
311     if (stat == 0)
312     ++drops;
313     }
314     }
315     }
316     return drops;
317     }
318    
319     /**
320     * Publishes the given item to each current subscriber by
321     * asynchronously invoking its onNext method, blocking
322     * uninterruptibly while resources for any subscriber are
323     * unavailable.
324     *
325     * <p>If the Executor for this publisher throws a
326     * RejectedExecutionException (or any other RuntimeException or
327     * Error) when attempting to asynchronously notify subscribers,
328     * then this exception is rethrown.
329     *
330     * @param item the (non-null) item to publish
331     * @throws IllegalStateException if closed
332     * @throws NullPointerException if item is null
333     * @throws RejectedExecutionException if thrown by Executor
334     */
335     public void submit(T item) {
336     if (item == null) throw new NullPointerException();
337 jsr166 1.2 synchronized (this) {
338 dl 1.1 if (closed)
339     throw new IllegalStateException("Closed");
340     BufferedSubscription<T> pred = null, next;
341     for (BufferedSubscription<T> b = clients; b != null; b = next) {
342     int stat;
343     next = b.next;
344     if ((stat = b.offer(item)) == 0)
345     stat = b.submit(item);
346     if (stat < 0) {
347     if (pred == null)
348     clients = next;
349     else
350     pred.next = next;
351     }
352     else
353     pred = b;
354     }
355     }
356     }
357    
358     /**
359     * Unless already closed, issues onComplete signals to current
360     * subscribers, and disallows subsequent attempts to publish.
361     */
362     public void close() {
363     if (!closed) {
364     BufferedSubscription<T> b, next;
365 jsr166 1.2 synchronized (this) {
366 dl 1.1 b = clients;
367     clients = null;
368     closed = true;
369     }
370     while (b != null) {
371     next = b.next;
372     b.close();
373     b = next;
374     }
375     }
376     }
377    
378     /**
379     * Unless already closed, issues onError signals to current
380     * subscribers with the given error, and disallows subsequent
381     * attempts to publish.
382     *
383 jsr166 1.5 * @param error the onError argument sent to subscribers
384 dl 1.1 * @throws NullPointerException if error is null
385     */
386     public void closeExceptionally(Throwable error) {
387     if (error == null)
388     throw new NullPointerException();
389     if (!closed) {
390     BufferedSubscription<T> b, next;
391 jsr166 1.2 synchronized (this) {
392 dl 1.1 b = clients;
393     clients = null;
394     closed = true;
395     }
396     while (b != null) {
397     next = b.next;
398     b.closeExceptionally(error);
399     b = next;
400     }
401     }
402     }
403    
404     /**
405     * Returns true if this publisher is not accepting submissions.
406     *
407     * @return true if closed
408     */
409     public boolean isClosed() {
410     return closed;
411     }
412    
413     /**
414 jsr166 1.6 * Returns true if this publisher has any subscribers.
415 dl 1.1 *
416     * @return true if this publisher has any subscribers
417     */
418     public boolean hasSubscribers() {
419     boolean nonEmpty = false;
420     if (!closed) {
421 jsr166 1.2 synchronized (this) {
422 dl 1.1 BufferedSubscription<T> pred = null, next;
423     for (BufferedSubscription<T> b = clients; b != null; b = next) {
424     next = b.next;
425     if (b.ctl < 0) {
426     if (pred == null)
427     clients = next;
428     else
429     pred.next = next;
430 jsr166 1.2 }
431 dl 1.1 else {
432     nonEmpty = true;
433     break;
434     }
435     }
436     }
437     }
438     return nonEmpty;
439     }
440    
441     /**
442     * Returns the Executor used for asynchronous delivery.
443     *
444     * @return the Executor used for asynchronous delivery
445     */
446     public Executor getExecutor() {
447     return executor;
448     }
449    
450 jsr166 1.7 /**
451 jsr166 1.5 * Returns the initial per-subscriber buffer capacity.
452 dl 1.1 *
453 jsr166 1.5 * @return the initial per-subscriber buffer capacity
454 dl 1.1 */
455     public int getInitialBufferCapacity() {
456     return minBufferCapacity;
457     }
458    
459 jsr166 1.7 /**
460 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
461 dl 1.1 *
462 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
463 dl 1.1 */
464     public int getMaxBufferCapacity() {
465     return maxBufferCapacity;
466     }
467    
468     /**
469     * Returns a list of current subscribers.
470     *
471     * @return list of current subscribers
472     */
473     public List<Flow.Subscriber<? super T>> getSubscribers() {
474     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
475 jsr166 1.2 synchronized (this) {
476 dl 1.1 BufferedSubscription<T> pred = null, next;
477     for (BufferedSubscription<T> b = clients; b != null; b = next) {
478     next = b.next;
479     if (b.ctl < 0) {
480     if (pred == null)
481     clients = next;
482     else
483     pred.next = next;
484 jsr166 1.2 }
485 dl 1.1 else
486     subs.add(b.subscriber);
487     }
488     }
489     return subs;
490     }
491 jsr166 1.2
492 dl 1.1 /**
493     * Returns true if the given Subscriber is currently subscribed.
494     *
495     * @param subscriber the subscriber
496     * @return true if currently subscribed
497     */
498     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
499 jsr166 1.2 if (!closed) {
500     synchronized (this) {
501 dl 1.1 BufferedSubscription<T> pred = null, next;
502     for (BufferedSubscription<T> b = clients; b != null; b = next) {
503     next = b.next;
504     if (b.ctl < 0) {
505     if (pred == null)
506     clients = next;
507     else
508     pred.next = next;
509 jsr166 1.2 }
510 dl 1.1 else if (subscriber == b.subscriber)
511     return true;
512     }
513     }
514     }
515     return false;
516     }
517    
518     /**
519     * If the given subscription is managed by a SubmissionPublisher,
520     * returns an estimate of the number of items produced but not yet
521     * consumed; otherwise returns zero. This method is designed only
522     * for monitoring purposes, not for control.
523     *
524     * @param subscription the subscription
525     * @return the estimate, or zero if the subscription is of an
526 jsr166 1.8 * unknown type
527 dl 1.1 */
528     public static int estimateAvailable(Flow.Subscription subscription) {
529     if (subscription instanceof BufferedSubscription)
530     return ((BufferedSubscription)subscription).estimateAvailable();
531     else
532     return 0;
533     }
534    
535     /**
536     * Returns a new {@link Flow.Processor} that uses this
537     * SubmissionPublisher to relay transformed items from its source
538     * using method {@link submit}, and using the given request size
539     * for requests to its source subscription. (Typically,
540     * requestSizes should range from the initial and maximum buffer
541     * capacity of this SubmissionPublisher.)
542     *
543 jsr166 1.3 * @param <S> the subscribed item type
544 dl 1.1 * @param requestSize the request size for subscriptions
545     * with the source
546     * @param transform the transform function
547     * @return the new Processor
548     * @throws NullPointerException if transform is null
549     * @throws IllegalArgumentException if requestSize not positive
550     */
551     public <S> Flow.Processor<S,T> newTransformProcessor(
552     long requestSize,
553     Function<? super S, ? extends T> transform) {
554     if (requestSize <= 0L)
555     throw new IllegalArgumentException("requestSize must be positive");
556     if (transform == null)
557     throw new NullPointerException();
558     return new TransformProcessor<S,T>(requestSize, transform, this);
559     }
560    
561     /**
562     * A task for consuming buffer items and signals, created and
563     * executed whenever they become available. A task consumes as
564     * many items/signals as possible before terminating, at which
565     * point another task is created when needed. The dual Runnable
566     * and ForkJoinTask declaration saves overhead when executed by
567     * ForkJoinPools, without impacting other kinds of Executors.
568     */
569     @SuppressWarnings("serial")
570     static final class ConsumerTask<T> extends ForkJoinTask<Void>
571     implements Runnable {
572     final BufferedSubscription<T> s;
573     ConsumerTask(BufferedSubscription<T> s) { this.s = s; }
574     public final Void getRawResult() { return null; }
575     public final void setRawResult(Void v) {}
576 dl 1.13 public final boolean exec() { s.consume(); return false; }
577 dl 1.1 public final void run() { s.consume(); }
578     }
579    
580     /**
581     * A bounded (ring) buffer with integrated control to start
582     * consumer tasks whenever items are available. The buffer
583     * algorithm is similar to one used inside ForkJoinPool,
584     * specialized for the case of at most one concurrent producer and
585     * consumer, and power of two buffer sizes. This allows methods to
586     * operate without locks even while supporting resizing, blocking,
587     * task-triggering, and garbage-free buffers (nulling out elements
588     * when consumed), although supporting these does impose a bit of
589     * overhead compared to plain fixed-size ring buffers.
590     *
591     * The publisher guarantees a single producer via its lock. We
592     * ensure in this class that there is at most one consumer. The
593     * request and cancel methods must be fully thread-safe but are
594     * coded to exploit the most common case in which they are only
595     * called by consumers (usually within onNext).
596     *
597     * Control over creating ConsumerTasks is managed using the ACTIVE
598     * ctl bit. We ensure that a task is active when consumable items
599     * (and usually, ERROR or COMPLETE signals) are present and there
600     * is demand (unfulfilled requests). This is complicated on the
601     * creation side by the possibility of exceptions when trying to
602     * execute tasks. These eventually force DISABLED state, but
603     * sometimes not directly. On the task side, termination (clearing
604     * ACTIVE) may race with producers or request() calls, so in some
605     * cases requires a re-check, re-activating if possible.
606     *
607     * The ctl field also manages run state. When DISABLED, no further
608     * updates are possible (to simplify checks, DISABLED is defined
609     * as a negative value). Disabling may be preceded by setting
610     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
611     * case the associated Subscriber methods are invoked, possibly
612     * synchronously if there is no active consumer task (including
613     * cases where execute() failed).
614     *
615     * Support for blocking also exploits the fact that there is only
616     * one possible waiter. ManagedBlocker-compatible control fields
617     * are placed in this class itself rather than in wait-nodes.
618     * Blocking control relies on the "waiter" field. Producers set
619     * the field before trying to block, but must then recheck (via
620     * offer) before parking. Signalling then just unparks and clears
621     * waiter field.
622     *
623     * This class uses @Contended and heuristic field declaration
624     * ordering to reduce memory contention on BufferedSubscription
625     * itself, but it does not currently attempt to avoid memory
626     * contention (especially including card-marks) among buffer
627     * elements, that can significantly slow down some usages.
628     * Addressing this may require allocating substantially more space
629     * than users expect.
630     */
631     @sun.misc.Contended
632     static final class BufferedSubscription<T> implements Flow.Subscription,
633     ForkJoinPool.ManagedBlocker {
634     // Order-sensitive field declarations
635     long timeout; // > 0 if timed wait
636     volatile long demand; // # unfilled requests
637     final int minCapacity; // initial buffer size
638     int maxCapacity; // reduced on OOME
639     int putStat; // offer result for ManagedBlocker
640     volatile int ctl; // atomic run state flags
641     volatile int head; // next position to take
642     volatile int tail; // next position to put
643     boolean wasInterrupted; // true if interrupted while waiting
644     volatile Object[] array; // buffer: null if disabled
645     Flow.Subscriber<? super T> subscriber; // null if disabled
646     Executor executor; // null if disabled
647     volatile Throwable pendingError; // holds until onError issued
648     volatile Thread waiter; // blocked producer thread
649     T putItem; // for offer within ManagedBlocker
650     BufferedSubscription<T> next; // used only by publisher
651    
652     // ctl values
653     static final int ACTIVE = 0x01; // consumer task active
654     static final int ERROR = 0x02; // signal pending error
655     static final int COMPLETE = 0x04; // signal completion when done
656     static final int DISABLED = 0x80000000; // must be negative
657    
658     BufferedSubscription(Flow.Subscriber<? super T> subscriber,
659     Executor executor, int minBufferCapacity,
660     int maxBufferCapacity) {
661     this.subscriber = subscriber;
662     this.executor = executor;
663     this.minCapacity = minBufferCapacity;
664     this.maxCapacity = maxBufferCapacity;
665     }
666    
667     /**
668     * @return -1 if disabled, 0 if dropped, else 1
669     */
670     final int offer(T item) {
671     Object[] a = array;
672     int t = tail, h = head, mask;
673     if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
674     return growAndOffer(item);
675     else {
676     a[t & mask] = item;
677     U.putOrderedInt(this, TAIL, t + 1);
678     return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
679     }
680     }
681    
682     /**
683 jsr166 1.9 * Creates or expands buffer if possible, then offers.
684 dl 1.1 */
685     final int growAndOffer(T item) {
686     int oldLen, len;
687     Object[] oldA = array, a = null;
688     if (oldA != null)
689     len = (oldLen = oldA.length) << 1;
690     else if (ctl < 0)
691     return -1; // disabled
692     else {
693     oldLen = 0;
694     len = minCapacity;
695     }
696     if (oldLen >= maxCapacity || len <= 0)
697     return 0;
698     try {
699     a = new Object[len];
700     } catch (Throwable ex) { // try to cope with OOME
701     if (oldLen != 0) // avoid continuous failure
702     maxCapacity = oldLen;
703     return 0;
704     }
705     array = a;
706     int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
707     if (oldA != null && oldMask >= 0 && mask > oldMask) {
708     for (Object x; j != oldTail; ++j) { // races with consumer
709     int i = j & oldMask;
710     if ((x = oldA[i]) != null &&
711     U.compareAndSwapObject(oldA,
712     (((long)i) << ASHIFT) + ABASE,
713     x, null))
714     a[j & mask] = x;
715     }
716     }
717     a[j & mask] = item;
718     U.putOrderedInt(this, TAIL, j + 1);
719 dl 1.13 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
720 dl 1.1 }
721    
722     /**
723 jsr166 1.5 * Tries to start consumer task if items are available.
724     * Backs out and relays exception if executor fails.
725 dl 1.1 */
726     final int startOnOffer() {
727 dl 1.13 if (demand != 0L) {
728     ConsumerTask<T> task = new ConsumerTask<T>(this);
729     for (int c;;) {
730     if (((c = ctl) & ACTIVE) != 0)
731 dl 1.1 break;
732 dl 1.13 else if (c < 0)
733     return -1;
734     else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
735     try {
736     executor.execute(task);
737     break;
738     } catch (RuntimeException | Error ex) { // back out
739 jsr166 1.14 do {} while ((c = ctl) >= 0 &&
740     (c & ACTIVE) != 0 &&
741     !U.compareAndSwapInt(this, CTL, c,
742     c & ~ACTIVE));
743 dl 1.13 throw ex;
744 dl 1.1 }
745     }
746 dl 1.13 else if (demand == 0L || tail == head)
747     break;
748 dl 1.1 }
749     }
750     return 1;
751     }
752    
753     /**
754     * Tries to start consumer task upon a signal or request;
755 jsr166 1.12 * disables on failure.
756 dl 1.1 */
757     final void startOrDisable() {
758     Executor e; // skip if already disabled
759     if ((e = executor) != null) {
760     try {
761     e.execute(new ConsumerTask<T>(this));
762 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
763 dl 1.1 for (int c;;) {
764     if ((c = ctl) < 0 || (c & ACTIVE) == 0)
765     break;
766     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
767     closeExceptionally(ex);
768     break;
769     }
770     }
771     }
772     }
773     }
774    
775     /**
776     * Nulls out most fields, mainly to avoid garbage retention
777     * until publisher unsubscribes.
778     */
779     final void detach() {
780     pendingError = null;
781     subscriber = null;
782     executor = null;
783     array = null;
784     Thread w = waiter;
785     if (w != null) {
786     waiter = null;
787     LockSupport.unpark(w); // force wakeup
788     }
789     }
790    
791     public void request(long n) {
792     if (n > 0L) {
793     for (;;) {
794     long prev = demand, d;
795     if ((d = prev + n) < prev) // saturate
796     d = Long.MAX_VALUE;
797     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
798     for (int c, h;;) {
799     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
800     demand == 0L)
801     break;
802     if ((h = head) != tail) {
803     if (U.compareAndSwapInt(this, CTL, c,
804     c | ACTIVE)) {
805     startOrDisable();
806     break;
807     }
808     }
809     else if (head == h && tail == h)
810     break;
811     }
812     break;
813     }
814     }
815     }
816     else if (n < 0L)
817     closeExceptionally(new IllegalArgumentException(
818     "negative subscription request"));
819     }
820    
821     public void cancel() {
822     for (int c;;) {
823     if ((c = ctl) < 0)
824     break;
825     else if ((c & ACTIVE) != 0) {
826     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
827     break; // cause consumer task to exit
828     }
829     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
830     detach();
831     break;
832     }
833     }
834     }
835    
836     final void closeExceptionally(Throwable ex) {
837     pendingError = ex;
838     for (int c;;) {
839     if ((c = ctl) < 0)
840     break;
841     else if ((c & ACTIVE) != 0) {
842     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
843     break; // cause consumer task to exit
844     }
845     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
846     Flow.Subscriber<? super T> s = subscriber;
847     if (s != null) {
848     try {
849     s.onError(ex);
850 jsr166 1.2 } catch (Throwable ignore) {
851 dl 1.1 }
852     }
853     detach();
854     break;
855     }
856     }
857     }
858    
859     final void close() {
860     for (int c;;) {
861     if ((c = ctl) < 0)
862     break;
863     if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
864     if ((c & ACTIVE) == 0)
865     startOrDisable();
866     break;
867     }
868     }
869     }
870    
871     final int estimateAvailable() {
872     int n;
873     return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
874     }
875    
876     // ManagedBlocker support
877    
878     public final boolean isReleasable() {
879     T item = putItem;
880     if (item != null) {
881     if ((putStat = offer(item)) == 0)
882     return false;
883     putItem = null;
884     }
885     return true;
886     }
887    
888     public final boolean block() {
889     T item = putItem;
890     if (item != null) {
891     putItem = null;
892     long nanos = timeout;
893     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
894     while ((putStat = offer(item)) == 0) {
895     if (Thread.interrupted()) {
896     wasInterrupted = true;
897     if (nanos > 0L)
898     break;
899     }
900     else if (nanos > 0L &&
901     (nanos = deadline - System.nanoTime()) <= 0L)
902     break;
903     else if (waiter == null)
904     waiter = Thread.currentThread();
905     else {
906     if (nanos > 0L)
907     LockSupport.parkNanos(this, nanos);
908     else
909     LockSupport.park(this);
910     waiter = null;
911     }
912     }
913     }
914     waiter = null;
915     return true;
916     }
917    
918     final int submit(T item) {
919     putItem = item;
920     timeout = 0L;
921     wasInterrupted = false;
922     try {
923     ForkJoinPool.managedBlock(this);
924     } catch (InterruptedException cantHappen) {
925     }
926     if (wasInterrupted)
927     Thread.currentThread().interrupt();
928     return putStat;
929     }
930    
931     final int timedOffer(T item, long nanos) {
932     if (nanos <= 0L)
933     return 0;
934     putItem = item;
935     timeout = nanos;
936     wasInterrupted = false;
937     try {
938     ForkJoinPool.managedBlock(this);
939     } catch (InterruptedException cantHappen) {
940     }
941     if (wasInterrupted)
942     Thread.currentThread().interrupt();
943     return putStat;
944     }
945    
946     /** Consume loop called only from ConsumerTask */
947     final void consume() {
948     Flow.Subscriber<? super T> s;
949     if ((s = subscriber) != null) { // else disabled
950     for (;;) {
951     long d = demand; // read volatile fields in acceptable order
952     int c = ctl;
953     int h = head;
954     int t = tail;
955     Object[] a = array;
956     int i, n; Object x; Thread w;
957     if (c < 0) {
958     detach();
959     break;
960     }
961     else if ((c & ERROR) != 0) {
962     Throwable ex = pendingError;
963     ctl = DISABLED; // no need for CAS
964     if (ex != null) {
965     try {
966     s.onError(ex);
967 jsr166 1.2 } catch (Throwable ignore) {
968 dl 1.1 }
969     }
970     }
971     else if (h == t) { // empty
972     if (h == tail &&
973     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
974     if (h != tail || c != (c = ctl)) { // recheck
975     if ((c & (ACTIVE | DISABLED)) != 0 ||
976     !U.compareAndSwapInt(this, CTL, c,
977     c | ACTIVE))
978     break;
979     }
980     else if ((c & COMPLETE) != 0) {
981     ctl = DISABLED;
982     try {
983     s.onComplete();
984 jsr166 1.2 } catch (Throwable ignore) {
985 dl 1.1 }
986     }
987     else
988     break;
989     }
990     }
991     else if (d == 0L) { // can't take
992     if (demand == 0L &&
993     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
994     ((demand == 0L && c == (c = ctl)) || // recheck
995     (c & (ACTIVE | DISABLED)) != 0 ||
996     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
997     break;
998     }
999     else if (a != null &&
1000     (n = a.length) > 0 &&
1001     (x = a[i = h & (n - 1)]) != null &&
1002     U.compareAndSwapObject(
1003     a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1004     U.putOrderedInt(this, HEAD, h + 1);
1005     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1006     d = demand; // almost never fails
1007     if ((w = waiter) != null) {
1008     waiter = null;
1009     LockSupport.unpark(w); // release producer
1010     }
1011     try {
1012     @SuppressWarnings("unchecked") T y = (T) x;
1013     s.onNext(y);
1014 jsr166 1.2 } catch (Throwable ex) {
1015 dl 1.1 ctl = DISABLED;
1016     }
1017     }
1018     }
1019     }
1020     }
1021    
1022 jsr166 1.9 // Unsafe mechanics
1023     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1024 dl 1.1 private static final long CTL;
1025     private static final long TAIL;
1026     private static final long HEAD;
1027     private static final long DEMAND;
1028     private static final int ABASE;
1029     private static final int ASHIFT;
1030    
1031     static {
1032     try {
1033     CTL = U.objectFieldOffset
1034 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1035 dl 1.1 TAIL = U.objectFieldOffset
1036 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1037 dl 1.1 HEAD = U.objectFieldOffset
1038 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1039 dl 1.1 DEMAND = U.objectFieldOffset
1040 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1041    
1042     ABASE = U.arrayBaseOffset(Object[].class);
1043     int scale = U.arrayIndexScale(Object[].class);
1044 dl 1.1 if ((scale & (scale - 1)) != 0)
1045     throw new Error("data type scale not a power of two");
1046     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1047 jsr166 1.9 } catch (ReflectiveOperationException e) {
1048 dl 1.1 throw new Error(e);
1049     }
1050     }
1051     }
1052    
1053     static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1054     final Function<? super U, ? extends R> fn;
1055     final SubmissionPublisher<R> sink;
1056     Flow.Subscription subscription;
1057     final long requestSize;
1058     long count;
1059     TransformProcessor(long requestSize,
1060     Function<? super U, ? extends R> fn,
1061     SubmissionPublisher<R> sink) {
1062     this.fn = fn;
1063     this.sink = sink;
1064     this.requestSize = requestSize;
1065     this.count = requestSize >>> 1;
1066     }
1067     public void subscribe(Flow.Subscriber<? super R> subscriber) {
1068     sink.subscribe(subscriber);
1069     }
1070     public void onSubscribe(Flow.Subscription subscription) {
1071     (this.subscription = subscription).request(requestSize);
1072     }
1073     public void onNext(U item) {
1074     Flow.Subscription s = subscription;
1075     if (s == null)
1076     sink.closeExceptionally(
1077     new IllegalStateException("onNext without subscription"));
1078     else {
1079     try {
1080     if (--count <= 0)
1081     s.request(count = requestSize);
1082     sink.submit(fn.apply(item));
1083     } catch (Throwable ex) {
1084     try {
1085     s.cancel();
1086     } finally {
1087     sink.closeExceptionally(ex);
1088     }
1089     }
1090     }
1091     }
1092     public void onError(Throwable ex) {
1093     sink.closeExceptionally(ex);
1094     }
1095     public void onComplete() {
1096     sink.close();
1097     }
1098     }
1099     }