ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.11
Committed: Thu Jan 15 19:29:22 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.10: +9 -8 lines
Log Message:
generate more readable html for @links

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14    
15     /**
16     * A {@link Flow.Publisher} that asynchronously issues submitted items
17     * to current subscribers until it is closed. Each current subscriber
18     * receives newly submitted items in the same order unless drops or
19     * exceptions are encountered. Using a SubmissionPublisher allows
20     * item generators to act as Publishers, although without integrated
21     * flow control. Instead they rely on drop handling and/or blocking.
22     * This class may also serve as a base for subclasses that generate
23     * items, and use the methods in this class to publish them.
24     *
25     * <p>A SubmissionPublisher uses the Executor supplied in its
26     * constructor for delivery to subscribers. The best choice of
27     * Executor depends on expected usage. If the generator(s) of
28     * submitted items run in separate threads, and the number of
29     * subscribers can be estimated, consider using a {@link
30     * Executors#newFixedThreadPool}. Otherwise consider using a
31     * work-stealing pool (including {@link ForkJoinPool#commonPool}).
32     *
33     * <p>Buffering allows producers and consumers to transiently operate
34     * at different rates. Each subscriber uses an independent buffer.
35     * Buffers are created upon first use with a given initial capacity,
36     * and are resized as needed up to the maximum. (Capacity arguments
37     * may be rounded up to powers of two.) Invocations of {@code
38     * subscription.request} do not directly result in buffer expansion,
39     * but risk saturation if unfulfilled requests exceed the maximum
40     * capacity. Choices of buffer parameters rely on expected rates,
41     * resources, and usages, that usually benefit from empirical testing.
42     * As first guesses, consider initial 8 and maximum 1024.
43     *
44     * <p>Publication methods support different policies about what to do
45     * when buffers are saturated. Method {@link #submit} blocks until
46     * resources are available. This is simplest (and often appropriate
47 jsr166 1.11 * for relay stages; see {@link #newTransformProcessor newTransformProcessor}),
48     * but least responsive. The {@code offer} methods may either
49     * immediately, or with bounded timeout, drop items, but provide an
50     * opportunity to interpose a handler and then retry. While the
51     * handler is invoked, other calls to methods in this class by other
52     * threads are blocked. Unless recovery is assured, options are
53     * usually limited to logging the error and/or issuing an onError
54     * signal to the subscriber.
55 dl 1.1 *
56     * <p>If any Subscriber method throws an exception, its subscription
57     * is cancelled. If the supplied Executor throws
58     * RejectedExecutionException (or any other RuntimeException or Error)
59     * when attempting to execute a task, or a drop handler throws an
60     * exception when processing a dropped item, then the exception is
61     * rethrown. In these cases, some but not all subscribers may have
62     * received items. It is usually good practice to {@link
63 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
64 dl 1.1 *
65     * @param <T> the published item type
66     * @author Doug Lea
67     * @since 1.9
68     */
69     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
70     AutoCloseable {
71     /*
72     * Most mechanics are handled by BufferedSubscription. This class
73     * mainly ensures sequentiality by using built-in synchronization
74     * locks across public methods. (Using built-in locks works well
75     * in the most typical case in which only one thread submits
76     * items).
77     */
78    
79     // Ensuring that all arrays have power of two length
80    
81     static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
82     static final int roundCapacity(int cap) { // to nearest power of 2
83     int n = cap - 1;
84     n |= n >>> 1;
85     n |= n >>> 2;
86     n |= n >>> 4;
87     n |= n >>> 8;
88     n |= n >>> 16;
89     return (n < 0) ? 1 :
90     (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
91     }
92    
93     /**
94     * Clients (BufferedSubscriptions) are maintained in a linked list
95     * (via their "next" fields). This works well for publish loops.
96     * It requires O(n) traversal to check for duplicate subscribers,
97     * but we expect that subscribing is much less common than
98     * publishing. Unsubscribing occurs only during publish loops,
99     * when BufferedSubscription methods return negative values
100     * signifying that they have been disabled (cancelled or closed).
101     */
102     BufferedSubscription<T> clients;
103    
104     // Parameters for constructing BufferedSubscriptions
105     final Executor executor;
106     final int minBufferCapacity;
107     final int maxBufferCapacity;
108    
109     /** Run status, updated only within locks */
110     volatile boolean closed;
111    
112     /**
113     * Creates a new SubmissionPublisher using the given Executor for
114     * async delivery to subscribers, and with the given initial and
115     * maximum buffer sizes for each subscriber. In the absence of
116     * other constraints, consider using {@code
117 jsr166 1.5 * ForkJoinPool.commonPool(), 8, 1024}.
118 dl 1.1 *
119     * @param executor the executor to use for async delivery,
120     * supporting creation of at least one independent thread
121     * @param initialBufferCapacity the initial capacity for each
122     * subscriber's buffer (the actual capacity may be rounded up to
123     * the nearest power of two)
124     * @param maxBufferCapacity the maximum capacity for each
125     * subscriber's buffer (the actual capacity may be rounded up to
126     * the nearest power of two)
127     * @throws NullPointerException if executor is null
128     * @throws IllegalArgumentException if initialBufferCapacity is
129     * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
130     * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
131     * a power of two array size
132     */
133     public SubmissionPublisher(Executor executor,
134     int initialBufferCapacity,
135     int maxBufferCapacity) {
136     if (executor == null)
137     throw new NullPointerException();
138     if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
139     throw new IllegalArgumentException("capacity must be positive");
140     if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
141     throw new IllegalArgumentException("capacity exceeds limit");
142     if (initialBufferCapacity > maxBufferCapacity)
143     throw new IllegalArgumentException("initial cannot exceed max capacity");
144     int minc = roundCapacity(initialBufferCapacity);
145     int maxc = roundCapacity(maxBufferCapacity);
146     this.executor = executor;
147     this.minBufferCapacity = minc;
148     this.maxBufferCapacity = maxc;
149     }
150    
151     /**
152     * Adds the given Subscriber unless already subscribed. If
153     * already subscribed, the Subscriber's onError method is invoked
154     * with an IllegalStateException. Otherwise, upon success, the
155     * Subscriber's onSubscribe method is invoked with a new
156     * Subscription (upon exception, the exception is rethrown and the
157     * Subscriber remains unsubscribed). If this SubmissionPublisher
158     * is closed, the subscriber's onComplete method is then invoked.
159     * Subscribers may enable receiving items by invoking the request
160     * method of the returned Subscription, and may unsubscribe by
161     * invoking its cancel method.
162     *
163     * @param subscriber the subscriber
164     * @throws NullPointerException if subscriber is null
165     */
166     public void subscribe(Flow.Subscriber<? super T> subscriber) {
167     if (subscriber == null) throw new NullPointerException();
168     BufferedSubscription<T> sub = new BufferedSubscription<T>(
169     subscriber, executor, minBufferCapacity, maxBufferCapacity);
170     boolean present = false, clsd;
171 jsr166 1.2 synchronized (this) {
172 dl 1.1 clsd = closed;
173     BufferedSubscription<T> pred = null, next;
174     for (BufferedSubscription<T> b = clients; b != null; b = next) {
175     next = b.next;
176     if (b.ctl < 0) { // disabled; remove
177     if (pred == null)
178     clients = next;
179     else
180     pred.next = next;
181 jsr166 1.2 }
182 dl 1.1 else if (subscriber == b.subscriber) {
183     present = true;
184     break;
185     }
186     pred = b;
187     }
188     if (!present) {
189     subscriber.onSubscribe(sub); // don't link on exception
190     if (!clsd) {
191     if (pred == null)
192     clients = sub;
193     else
194     pred.next = sub;
195     }
196     }
197     }
198     if (clsd)
199     subscriber.onComplete();
200     else if (present)
201     subscriber.onError(new IllegalStateException("Already subscribed"));
202     }
203    
204     /**
205     * Publishes the given item, if possible, to each current
206     * subscriber by asynchronously invoking its onNext method. The
207     * item may be dropped by one or more subscribers if resource
208     * limits are exceeded, in which case the given handler (if
209     * non-null) is invoked, and if it returns true, retried once.
210     *
211     * <p>If the Executor for this publisher throws a
212     * RejectedExecutionException (or any other RuntimeException or
213     * Error) when attempting to asynchronously notify subscribers, or
214     * the drop handler throws an exception when processing a dropped
215     * item, then this exception is rethrown.
216     *
217     * @param item the (non-null) item to publish
218     * @param onDrop if non-null, the handler invoked upon a drop to a
219     * subscriber, with arguments of the subscriber and item; if it
220     * returns true, an offer is re-attempted (once)
221 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
222     * to a subscriber)
223 dl 1.1 * @throws IllegalStateException if closed
224     * @throws NullPointerException if item is null
225     * @throws RejectedExecutionException if thrown by Executor
226     */
227     public int offer(T item,
228     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
229     if (item == null) throw new NullPointerException();
230     int drops = 0;
231 jsr166 1.2 synchronized (this) {
232 dl 1.1 if (closed)
233     throw new IllegalStateException("Closed");
234     BufferedSubscription<T> pred = null, next;
235     for (BufferedSubscription<T> b = clients; b != null; b = next) {
236     int stat;
237     next = b.next;
238     if ((stat = b.offer(item)) == 0 &&
239     onDrop != null &&
240     onDrop.test(b.subscriber, item))
241     stat = b.offer(item);
242     if (stat < 0) {
243     if (pred == null)
244     clients = next;
245     else
246     pred.next = next;
247     }
248     else {
249     pred = b;
250     if (stat == 0)
251     ++drops;
252     }
253     }
254     return drops;
255     }
256     }
257    
258     /**
259     * Publishes the given item, if possible, to each current
260     * subscriber by asynchronously invoking its onNext method,
261     * blocking while resources for any subscription are unavailable,
262     * up to the specified timeout or the caller thread is
263 jsr166 1.4 * interrupted, at which point the given handler (if non-null)
264 dl 1.1 * is invoked, and if it returns true, retried once.
265     *
266     * <p>If the Executor for this publisher throws a
267     * RejectedExecutionException (or any other RuntimeException or
268     * Error) when attempting to asynchronously notify subscribers, or
269     * the drop handler throws an exception when processing a dropped
270     * item, then this exception is rethrown.
271     *
272     * @param item the (non-null) item to publish
273     * @param timeout how long to wait for resources for any subscriber
274     * before giving up, in units of {@code unit}
275     * @param unit a {@code TimeUnit} determining how to interpret the
276     * {@code timeout} parameter
277     * @param onDrop if non-null, the handler invoked upon a drop to a
278     * subscriber, with arguments of the subscriber and item; if it
279     * returns true, an offer is re-attempted (once)
280 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
281     * to a subscriber)
282 dl 1.1 * @throws IllegalStateException if closed
283     * @throws NullPointerException if item is null
284     * @throws RejectedExecutionException if thrown by Executor
285     */
286     public int offer(T item, long timeout, TimeUnit unit,
287     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
288     if (item == null) throw new NullPointerException();
289     long nanos = unit.toNanos(timeout);
290     int drops = 0;
291 jsr166 1.2 synchronized (this) {
292 dl 1.1 if (closed)
293     throw new IllegalStateException("Closed");
294     BufferedSubscription<T> pred = null, next;
295     for (BufferedSubscription<T> b = clients; b != null; b = next) {
296     int stat;
297     next = b.next;
298     if ((stat = b.offer(item)) == 0 &&
299     (stat = b.timedOffer(item, nanos)) == 0 &&
300     onDrop != null &&
301     onDrop.test(b.subscriber, item))
302     stat = b.offer(item);
303     if (stat < 0) {
304     if (pred == null)
305     clients = next;
306     else
307     pred.next = next;
308     }
309     else {
310     pred = b;
311     if (stat == 0)
312     ++drops;
313     }
314     }
315     }
316     return drops;
317     }
318    
319     /**
320     * Publishes the given item to each current subscriber by
321     * asynchronously invoking its onNext method, blocking
322     * uninterruptibly while resources for any subscriber are
323     * unavailable.
324     *
325     * <p>If the Executor for this publisher throws a
326     * RejectedExecutionException (or any other RuntimeException or
327     * Error) when attempting to asynchronously notify subscribers,
328     * then this exception is rethrown.
329     *
330     * @param item the (non-null) item to publish
331     * @throws IllegalStateException if closed
332     * @throws NullPointerException if item is null
333     * @throws RejectedExecutionException if thrown by Executor
334     */
335     public void submit(T item) {
336     if (item == null) throw new NullPointerException();
337 jsr166 1.2 synchronized (this) {
338 dl 1.1 if (closed)
339     throw new IllegalStateException("Closed");
340     BufferedSubscription<T> pred = null, next;
341     for (BufferedSubscription<T> b = clients; b != null; b = next) {
342     int stat;
343     next = b.next;
344     if ((stat = b.offer(item)) == 0)
345     stat = b.submit(item);
346     if (stat < 0) {
347     if (pred == null)
348     clients = next;
349     else
350     pred.next = next;
351     }
352     else
353     pred = b;
354     }
355     }
356     }
357    
358     /**
359     * Unless already closed, issues onComplete signals to current
360     * subscribers, and disallows subsequent attempts to publish.
361     */
362     public void close() {
363     if (!closed) {
364     BufferedSubscription<T> b, next;
365 jsr166 1.2 synchronized (this) {
366 dl 1.1 b = clients;
367     clients = null;
368     closed = true;
369     }
370     while (b != null) {
371     next = b.next;
372     b.close();
373     b = next;
374     }
375     }
376     }
377    
378     /**
379     * Unless already closed, issues onError signals to current
380     * subscribers with the given error, and disallows subsequent
381     * attempts to publish.
382     *
383 jsr166 1.5 * @param error the onError argument sent to subscribers
384 dl 1.1 * @throws NullPointerException if error is null
385     */
386     public void closeExceptionally(Throwable error) {
387     if (error == null)
388     throw new NullPointerException();
389     if (!closed) {
390     BufferedSubscription<T> b, next;
391 jsr166 1.2 synchronized (this) {
392 dl 1.1 b = clients;
393     clients = null;
394     closed = true;
395     }
396     while (b != null) {
397     next = b.next;
398     b.closeExceptionally(error);
399     b = next;
400     }
401     }
402     }
403    
404     /**
405     * Returns true if this publisher is not accepting submissions.
406     *
407     * @return true if closed
408     */
409     public boolean isClosed() {
410     return closed;
411     }
412    
413     /**
414 jsr166 1.6 * Returns true if this publisher has any subscribers.
415 dl 1.1 *
416     * @return true if this publisher has any subscribers
417     */
418     public boolean hasSubscribers() {
419     boolean nonEmpty = false;
420     if (!closed) {
421 jsr166 1.2 synchronized (this) {
422 dl 1.1 BufferedSubscription<T> pred = null, next;
423     for (BufferedSubscription<T> b = clients; b != null; b = next) {
424     next = b.next;
425     if (b.ctl < 0) {
426     if (pred == null)
427     clients = next;
428     else
429     pred.next = next;
430 jsr166 1.2 }
431 dl 1.1 else {
432     nonEmpty = true;
433     break;
434     }
435     }
436     }
437     }
438     return nonEmpty;
439     }
440    
441     /**
442     * Returns the Executor used for asynchronous delivery.
443     *
444     * @return the Executor used for asynchronous delivery
445     */
446     public Executor getExecutor() {
447     return executor;
448     }
449    
450 jsr166 1.7 /**
451 jsr166 1.5 * Returns the initial per-subscriber buffer capacity.
452 dl 1.1 *
453 jsr166 1.5 * @return the initial per-subscriber buffer capacity
454 dl 1.1 */
455     public int getInitialBufferCapacity() {
456     return minBufferCapacity;
457     }
458    
459 jsr166 1.7 /**
460 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
461 dl 1.1 *
462 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
463 dl 1.1 */
464     public int getMaxBufferCapacity() {
465     return maxBufferCapacity;
466     }
467    
468     /**
469     * Returns a list of current subscribers.
470     *
471     * @return list of current subscribers
472     */
473     public List<Flow.Subscriber<? super T>> getSubscribers() {
474     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
475 jsr166 1.2 synchronized (this) {
476 dl 1.1 BufferedSubscription<T> pred = null, next;
477     for (BufferedSubscription<T> b = clients; b != null; b = next) {
478     next = b.next;
479     if (b.ctl < 0) {
480     if (pred == null)
481     clients = next;
482     else
483     pred.next = next;
484 jsr166 1.2 }
485 dl 1.1 else
486     subs.add(b.subscriber);
487     }
488     }
489     return subs;
490     }
491 jsr166 1.2
492 dl 1.1 /**
493     * Returns true if the given Subscriber is currently subscribed.
494     *
495     * @param subscriber the subscriber
496     * @return true if currently subscribed
497     */
498     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
499 jsr166 1.2 if (!closed) {
500     synchronized (this) {
501 dl 1.1 BufferedSubscription<T> pred = null, next;
502     for (BufferedSubscription<T> b = clients; b != null; b = next) {
503     next = b.next;
504     if (b.ctl < 0) {
505     if (pred == null)
506     clients = next;
507     else
508     pred.next = next;
509 jsr166 1.2 }
510 dl 1.1 else if (subscriber == b.subscriber)
511     return true;
512     }
513     }
514     }
515     return false;
516     }
517    
518     /**
519     * If the given subscription is managed by a SubmissionPublisher,
520     * returns an estimate of the number of items produced but not yet
521     * consumed; otherwise returns zero. This method is designed only
522     * for monitoring purposes, not for control.
523     *
524     * @param subscription the subscription
525     * @return the estimate, or zero if the subscription is of an
526 jsr166 1.8 * unknown type
527 dl 1.1 */
528     public static int estimateAvailable(Flow.Subscription subscription) {
529     if (subscription instanceof BufferedSubscription)
530     return ((BufferedSubscription)subscription).estimateAvailable();
531     else
532     return 0;
533     }
534    
535     /**
536     * Returns a new {@link Flow.Processor} that uses this
537     * SubmissionPublisher to relay transformed items from its source
538     * using method {@link submit}, and using the given request size
539     * for requests to its source subscription. (Typically,
540     * requestSizes should range from the initial and maximum buffer
541     * capacity of this SubmissionPublisher.)
542     *
543 jsr166 1.3 * @param <S> the subscribed item type
544 dl 1.1 * @param requestSize the request size for subscriptions
545     * with the source
546     * @param transform the transform function
547     * @return the new Processor
548     * @throws NullPointerException if transform is null
549     * @throws IllegalArgumentException if requestSize not positive
550     */
551     public <S> Flow.Processor<S,T> newTransformProcessor(
552     long requestSize,
553     Function<? super S, ? extends T> transform) {
554     if (requestSize <= 0L)
555     throw new IllegalArgumentException("requestSize must be positive");
556     if (transform == null)
557     throw new NullPointerException();
558     return new TransformProcessor<S,T>(requestSize, transform, this);
559     }
560    
561     /**
562     * A task for consuming buffer items and signals, created and
563     * executed whenever they become available. A task consumes as
564     * many items/signals as possible before terminating, at which
565     * point another task is created when needed. The dual Runnable
566     * and ForkJoinTask declaration saves overhead when executed by
567     * ForkJoinPools, without impacting other kinds of Executors.
568     */
569     @SuppressWarnings("serial")
570     static final class ConsumerTask<T> extends ForkJoinTask<Void>
571     implements Runnable {
572     final BufferedSubscription<T> s;
573     ConsumerTask(BufferedSubscription<T> s) { this.s = s; }
574     public final Void getRawResult() { return null; }
575     public final void setRawResult(Void v) {}
576     public final boolean exec() { s.consume(); return true; }
577     public final void run() { s.consume(); }
578     }
579    
580     /**
581     * A bounded (ring) buffer with integrated control to start
582     * consumer tasks whenever items are available. The buffer
583     * algorithm is similar to one used inside ForkJoinPool,
584     * specialized for the case of at most one concurrent producer and
585     * consumer, and power of two buffer sizes. This allows methods to
586     * operate without locks even while supporting resizing, blocking,
587     * task-triggering, and garbage-free buffers (nulling out elements
588     * when consumed), although supporting these does impose a bit of
589     * overhead compared to plain fixed-size ring buffers.
590     *
591     * The publisher guarantees a single producer via its lock. We
592     * ensure in this class that there is at most one consumer. The
593     * request and cancel methods must be fully thread-safe but are
594     * coded to exploit the most common case in which they are only
595     * called by consumers (usually within onNext).
596     *
597     * Control over creating ConsumerTasks is managed using the ACTIVE
598     * ctl bit. We ensure that a task is active when consumable items
599     * (and usually, ERROR or COMPLETE signals) are present and there
600     * is demand (unfulfilled requests). This is complicated on the
601     * creation side by the possibility of exceptions when trying to
602     * execute tasks. These eventually force DISABLED state, but
603     * sometimes not directly. On the task side, termination (clearing
604     * ACTIVE) may race with producers or request() calls, so in some
605     * cases requires a re-check, re-activating if possible.
606     *
607     * The ctl field also manages run state. When DISABLED, no further
608     * updates are possible (to simplify checks, DISABLED is defined
609     * as a negative value). Disabling may be preceded by setting
610     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
611     * case the associated Subscriber methods are invoked, possibly
612     * synchronously if there is no active consumer task (including
613     * cases where execute() failed).
614     *
615     * Support for blocking also exploits the fact that there is only
616     * one possible waiter. ManagedBlocker-compatible control fields
617     * are placed in this class itself rather than in wait-nodes.
618     * Blocking control relies on the "waiter" field. Producers set
619     * the field before trying to block, but must then recheck (via
620     * offer) before parking. Signalling then just unparks and clears
621     * waiter field.
622     *
623     * This class uses @Contended and heuristic field declaration
624     * ordering to reduce memory contention on BufferedSubscription
625     * itself, but it does not currently attempt to avoid memory
626     * contention (especially including card-marks) among buffer
627     * elements, that can significantly slow down some usages.
628     * Addressing this may require allocating substantially more space
629     * than users expect.
630     */
631     @sun.misc.Contended
632     static final class BufferedSubscription<T> implements Flow.Subscription,
633     ForkJoinPool.ManagedBlocker {
634     // Order-sensitive field declarations
635     long timeout; // > 0 if timed wait
636     volatile long demand; // # unfilled requests
637     final int minCapacity; // initial buffer size
638     int maxCapacity; // reduced on OOME
639     int putStat; // offer result for ManagedBlocker
640     volatile int ctl; // atomic run state flags
641     volatile int head; // next position to take
642     volatile int tail; // next position to put
643     boolean wasInterrupted; // true if interrupted while waiting
644     volatile Object[] array; // buffer: null if disabled
645     Flow.Subscriber<? super T> subscriber; // null if disabled
646     Executor executor; // null if disabled
647     volatile Throwable pendingError; // holds until onError issued
648     volatile Thread waiter; // blocked producer thread
649     T putItem; // for offer within ManagedBlocker
650     BufferedSubscription<T> next; // used only by publisher
651    
652     // ctl values
653     static final int ACTIVE = 0x01; // consumer task active
654     static final int ERROR = 0x02; // signal pending error
655     static final int COMPLETE = 0x04; // signal completion when done
656     static final int DISABLED = 0x80000000; // must be negative
657    
658     BufferedSubscription(Flow.Subscriber<? super T> subscriber,
659     Executor executor, int minBufferCapacity,
660     int maxBufferCapacity) {
661     this.subscriber = subscriber;
662     this.executor = executor;
663     this.minCapacity = minBufferCapacity;
664     this.maxCapacity = maxBufferCapacity;
665     }
666    
667     /**
668     * @return -1 if disabled, 0 if dropped, else 1
669     */
670     final int offer(T item) {
671     Object[] a = array;
672     int t = tail, h = head, mask;
673     if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
674     return growAndOffer(item);
675     else {
676     a[t & mask] = item;
677     U.putOrderedInt(this, TAIL, t + 1);
678     return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
679     }
680     }
681    
682     /**
683 jsr166 1.9 * Creates or expands buffer if possible, then offers.
684 dl 1.1 */
685     final int growAndOffer(T item) {
686     int oldLen, len;
687     Object[] oldA = array, a = null;
688     if (oldA != null)
689     len = (oldLen = oldA.length) << 1;
690     else if (ctl < 0)
691     return -1; // disabled
692     else {
693     oldLen = 0;
694     len = minCapacity;
695     }
696     if (oldLen >= maxCapacity || len <= 0)
697     return 0;
698     try {
699     a = new Object[len];
700     } catch (Throwable ex) { // try to cope with OOME
701     if (oldLen != 0) // avoid continuous failure
702     maxCapacity = oldLen;
703     return 0;
704     }
705     array = a;
706     int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
707     if (oldA != null && oldMask >= 0 && mask > oldMask) {
708     for (Object x; j != oldTail; ++j) { // races with consumer
709     int i = j & oldMask;
710     if ((x = oldA[i]) != null &&
711     U.compareAndSwapObject(oldA,
712     (((long)i) << ASHIFT) + ABASE,
713     x, null))
714     a[j & mask] = x;
715     }
716     }
717     a[j & mask] = item;
718     U.putOrderedInt(this, TAIL, j + 1);
719     return startOnOffer();
720     }
721    
722     /**
723 jsr166 1.5 * Tries to start consumer task if items are available.
724     * Backs out and relays exception if executor fails.
725 dl 1.1 */
726     final int startOnOffer() {
727     for (;;) {
728     int c; Executor e;
729     if (((c = ctl) & ACTIVE) != 0 || demand == 0L || tail == head)
730     break;
731     if (c < 0 || (e = executor) == null)
732     return -1;
733     if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
734     try {
735     e.execute(new ConsumerTask<T>(this));
736     break;
737     } catch (RuntimeException | Error ex) { // back out
738     for (;;) {
739     if ((c = ctl) < 0 || (c & ACTIVE) == 0 ||
740     U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
741     throw ex;
742     }
743     }
744     }
745     }
746     return 1;
747     }
748    
749     /**
750     * Tries to start consumer task upon a signal or request;
751     * disables on failure
752     */
753     final void startOrDisable() {
754     Executor e; // skip if already disabled
755     if ((e = executor) != null) {
756     try {
757     e.execute(new ConsumerTask<T>(this));
758 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
759 dl 1.1 for (int c;;) {
760     if ((c = ctl) < 0 || (c & ACTIVE) == 0)
761     break;
762     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
763     closeExceptionally(ex);
764     break;
765     }
766     }
767     }
768     }
769     }
770    
771     /**
772     * Nulls out most fields, mainly to avoid garbage retention
773     * until publisher unsubscribes.
774     */
775     final void detach() {
776     pendingError = null;
777     subscriber = null;
778     executor = null;
779     array = null;
780     Thread w = waiter;
781     if (w != null) {
782     waiter = null;
783     LockSupport.unpark(w); // force wakeup
784     }
785     }
786    
787     public void request(long n) {
788     if (n > 0L) {
789     for (;;) {
790     long prev = demand, d;
791     if ((d = prev + n) < prev) // saturate
792     d = Long.MAX_VALUE;
793     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
794     for (int c, h;;) {
795     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
796     demand == 0L)
797     break;
798     if ((h = head) != tail) {
799     if (U.compareAndSwapInt(this, CTL, c,
800     c | ACTIVE)) {
801     startOrDisable();
802     break;
803     }
804     }
805     else if (head == h && tail == h)
806     break;
807     }
808     break;
809     }
810     }
811     }
812     else if (n < 0L)
813     closeExceptionally(new IllegalArgumentException(
814     "negative subscription request"));
815     }
816    
817     public void cancel() {
818     for (int c;;) {
819     if ((c = ctl) < 0)
820     break;
821     else if ((c & ACTIVE) != 0) {
822     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
823     break; // cause consumer task to exit
824     }
825     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
826     detach();
827     break;
828     }
829     }
830     }
831    
832     final void closeExceptionally(Throwable ex) {
833     pendingError = ex;
834     for (int c;;) {
835     if ((c = ctl) < 0)
836     break;
837     else if ((c & ACTIVE) != 0) {
838     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
839     break; // cause consumer task to exit
840     }
841     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
842     Flow.Subscriber<? super T> s = subscriber;
843     if (s != null) {
844     try {
845     s.onError(ex);
846 jsr166 1.2 } catch (Throwable ignore) {
847 dl 1.1 }
848     }
849     detach();
850     break;
851     }
852     }
853     }
854    
855     final void close() {
856     for (int c;;) {
857     if ((c = ctl) < 0)
858     break;
859     if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
860     if ((c & ACTIVE) == 0)
861     startOrDisable();
862     break;
863     }
864     }
865     }
866    
867     final int estimateAvailable() {
868     int n;
869     return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
870     }
871    
872     // ManagedBlocker support
873    
874     public final boolean isReleasable() {
875     T item = putItem;
876     if (item != null) {
877     if ((putStat = offer(item)) == 0)
878     return false;
879     putItem = null;
880     }
881     return true;
882     }
883    
884     public final boolean block() {
885     T item = putItem;
886     if (item != null) {
887     putItem = null;
888     long nanos = timeout;
889     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
890     while ((putStat = offer(item)) == 0) {
891     if (Thread.interrupted()) {
892     wasInterrupted = true;
893     if (nanos > 0L)
894     break;
895     }
896     else if (nanos > 0L &&
897     (nanos = deadline - System.nanoTime()) <= 0L)
898     break;
899     else if (waiter == null)
900     waiter = Thread.currentThread();
901     else {
902     if (nanos > 0L)
903     LockSupport.parkNanos(this, nanos);
904     else
905     LockSupport.park(this);
906     waiter = null;
907     }
908     }
909     }
910     waiter = null;
911     return true;
912     }
913    
914     final int submit(T item) {
915     putItem = item;
916     timeout = 0L;
917     wasInterrupted = false;
918     try {
919     ForkJoinPool.managedBlock(this);
920     } catch (InterruptedException cantHappen) {
921     }
922     if (wasInterrupted)
923     Thread.currentThread().interrupt();
924     return putStat;
925     }
926    
927     final int timedOffer(T item, long nanos) {
928     if (nanos <= 0L)
929     return 0;
930     putItem = item;
931     timeout = nanos;
932     wasInterrupted = false;
933     try {
934     ForkJoinPool.managedBlock(this);
935     } catch (InterruptedException cantHappen) {
936     }
937     if (wasInterrupted)
938     Thread.currentThread().interrupt();
939     return putStat;
940     }
941    
942     /** Consume loop called only from ConsumerTask */
943     final void consume() {
944     Flow.Subscriber<? super T> s;
945     if ((s = subscriber) != null) { // else disabled
946     for (;;) {
947     long d = demand; // read volatile fields in acceptable order
948     int c = ctl;
949     int h = head;
950     int t = tail;
951     Object[] a = array;
952     int i, n; Object x; Thread w;
953     if (c < 0) {
954     detach();
955     break;
956     }
957     else if ((c & ERROR) != 0) {
958     Throwable ex = pendingError;
959     ctl = DISABLED; // no need for CAS
960     if (ex != null) {
961     try {
962     s.onError(ex);
963 jsr166 1.2 } catch (Throwable ignore) {
964 dl 1.1 }
965     }
966     }
967     else if (h == t) { // empty
968     if (h == tail &&
969     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
970     if (h != tail || c != (c = ctl)) { // recheck
971     if ((c & (ACTIVE | DISABLED)) != 0 ||
972     !U.compareAndSwapInt(this, CTL, c,
973     c | ACTIVE))
974     break;
975     }
976     else if ((c & COMPLETE) != 0) {
977     ctl = DISABLED;
978     try {
979     s.onComplete();
980 jsr166 1.2 } catch (Throwable ignore) {
981 dl 1.1 }
982     }
983     else
984     break;
985     }
986     }
987     else if (d == 0L) { // can't take
988     if (demand == 0L &&
989     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
990     ((demand == 0L && c == (c = ctl)) || // recheck
991     (c & (ACTIVE | DISABLED)) != 0 ||
992     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
993     break;
994     }
995     else if (a != null &&
996     (n = a.length) > 0 &&
997     (x = a[i = h & (n - 1)]) != null &&
998     U.compareAndSwapObject(
999     a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1000     U.putOrderedInt(this, HEAD, h + 1);
1001     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1002     d = demand; // almost never fails
1003     if ((w = waiter) != null) {
1004     waiter = null;
1005     LockSupport.unpark(w); // release producer
1006     }
1007     try {
1008     @SuppressWarnings("unchecked") T y = (T) x;
1009     s.onNext(y);
1010 jsr166 1.2 } catch (Throwable ex) {
1011 dl 1.1 ctl = DISABLED;
1012     }
1013     }
1014     }
1015     }
1016     }
1017    
1018 jsr166 1.9 // Unsafe mechanics
1019     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1020 dl 1.1 private static final long CTL;
1021     private static final long TAIL;
1022     private static final long HEAD;
1023     private static final long DEMAND;
1024     private static final int ABASE;
1025     private static final int ASHIFT;
1026    
1027     static {
1028     try {
1029     CTL = U.objectFieldOffset
1030 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1031 dl 1.1 TAIL = U.objectFieldOffset
1032 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1033 dl 1.1 HEAD = U.objectFieldOffset
1034 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1035 dl 1.1 DEMAND = U.objectFieldOffset
1036 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1037    
1038     ABASE = U.arrayBaseOffset(Object[].class);
1039     int scale = U.arrayIndexScale(Object[].class);
1040 dl 1.1 if ((scale & (scale - 1)) != 0)
1041     throw new Error("data type scale not a power of two");
1042     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1043 jsr166 1.9 } catch (ReflectiveOperationException e) {
1044 dl 1.1 throw new Error(e);
1045     }
1046     }
1047     }
1048    
1049     static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1050     final Function<? super U, ? extends R> fn;
1051     final SubmissionPublisher<R> sink;
1052     Flow.Subscription subscription;
1053     final long requestSize;
1054     long count;
1055     TransformProcessor(long requestSize,
1056     Function<? super U, ? extends R> fn,
1057     SubmissionPublisher<R> sink) {
1058     this.fn = fn;
1059     this.sink = sink;
1060     this.requestSize = requestSize;
1061     this.count = requestSize >>> 1;
1062     }
1063     public void subscribe(Flow.Subscriber<? super R> subscriber) {
1064     sink.subscribe(subscriber);
1065     }
1066     public void onSubscribe(Flow.Subscription subscription) {
1067     (this.subscription = subscription).request(requestSize);
1068     }
1069     public void onNext(U item) {
1070     Flow.Subscription s = subscription;
1071     if (s == null)
1072     sink.closeExceptionally(
1073     new IllegalStateException("onNext without subscription"));
1074     else {
1075     try {
1076     if (--count <= 0)
1077     s.request(count = requestSize);
1078     sink.submit(fn.apply(item));
1079     } catch (Throwable ex) {
1080     try {
1081     s.cancel();
1082     } finally {
1083     sink.closeExceptionally(ex);
1084     }
1085     }
1086     }
1087     }
1088     public void onError(Throwable ex) {
1089     sink.closeExceptionally(ex);
1090     }
1091     public void onComplete() {
1092     sink.close();
1093     }
1094     }
1095     }