ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.6
Committed: Thu Jan 15 17:31:03 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.5: +1 -1 lines
Log Message:
javadoc punctuation

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.concurrent.locks.LockSupport;
10     import java.util.function.BiPredicate;
11     import java.util.function.Function;
12     import java.util.List;
13     import java.util.ArrayList;
14    
15     /**
16     * A {@link Flow.Publisher} that asynchronously issues submitted items
17     * to current subscribers until it is closed. Each current subscriber
18     * receives newly submitted items in the same order unless drops or
19     * exceptions are encountered. Using a SubmissionPublisher allows
20     * item generators to act as Publishers, although without integrated
21     * flow control. Instead they rely on drop handling and/or blocking.
22     * This class may also serve as a base for subclasses that generate
23     * items, and use the methods in this class to publish them.
24     *
25     * <p>A SubmissionPublisher uses the Executor supplied in its
26     * constructor for delivery to subscribers. The best choice of
27     * Executor depends on expected usage. If the generator(s) of
28     * submitted items run in separate threads, and the number of
29     * subscribers can be estimated, consider using a {@link
30     * Executors#newFixedThreadPool}. Otherwise consider using a
31     * work-stealing pool (including {@link ForkJoinPool#commonPool}).
32     *
33     * <p>Buffering allows producers and consumers to transiently operate
34     * at different rates. Each subscriber uses an independent buffer.
35     * Buffers are created upon first use with a given initial capacity,
36     * and are resized as needed up to the maximum. (Capacity arguments
37     * may be rounded up to powers of two.) Invocations of {@code
38     * subscription.request} do not directly result in buffer expansion,
39     * but risk saturation if unfulfilled requests exceed the maximum
40     * capacity. Choices of buffer parameters rely on expected rates,
41     * resources, and usages, that usually benefit from empirical testing.
42     * As first guesses, consider initial 8 and maximum 1024.
43     *
44     * <p>Publication methods support different policies about what to do
45     * when buffers are saturated. Method {@link #submit} blocks until
46     * resources are available. This is simplest (and often appropriate
47     * for relay stages; see {@link #newTransformProcessor}), but least
48     * responsive. The {@code offer} methods may either immediately, or
49     * with bounded timeout, drop items, but provide an opportunity to
50     * interpose a handler and then retry. While the handler is invoked,
51     * other calls to methods in this class by other threads are blocked.
52     * Unless recovery is assured, options are usually limited to logging
53     * the error and/or issuing an onError signal to the subscriber.
54     *
55     * <p>If any Subscriber method throws an exception, its subscription
56     * is cancelled. If the supplied Executor throws
57     * RejectedExecutionException (or any other RuntimeException or Error)
58     * when attempting to execute a task, or a drop handler throws an
59     * exception when processing a dropped item, then the exception is
60     * rethrown. In these cases, some but not all subscribers may have
61     * received items. It is usually good practice to {@link
62     * #closeExceptionally} in these cases.
63     *
64     * @param <T> the published item type
65     * @author Doug Lea
66     * @since 1.9
67     */
68     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
69     AutoCloseable {
70     /*
71     * Most mechanics are handled by BufferedSubscription. This class
72     * mainly ensures sequentiality by using built-in synchronization
73     * locks across public methods. (Using built-in locks works well
74     * in the most typical case in which only one thread submits
75     * items).
76     */
77    
78     // Ensuring that all arrays have power of two length
79    
80     static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
81     static final int roundCapacity(int cap) { // to nearest power of 2
82     int n = cap - 1;
83     n |= n >>> 1;
84     n |= n >>> 2;
85     n |= n >>> 4;
86     n |= n >>> 8;
87     n |= n >>> 16;
88     return (n < 0) ? 1 :
89     (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
90     }
91    
92     /**
93     * Clients (BufferedSubscriptions) are maintained in a linked list
94     * (via their "next" fields). This works well for publish loops.
95     * It requires O(n) traversal to check for duplicate subscribers,
96     * but we expect that subscribing is much less common than
97     * publishing. Unsubscribing occurs only during publish loops,
98     * when BufferedSubscription methods return negative values
99     * signifying that they have been disabled (cancelled or closed).
100     */
101     BufferedSubscription<T> clients;
102    
103     // Parameters for constructing BufferedSubscriptions
104     final Executor executor;
105     final int minBufferCapacity;
106     final int maxBufferCapacity;
107    
108     /** Run status, updated only within locks */
109     volatile boolean closed;
110    
111     /**
112     * Creates a new SubmissionPublisher using the given Executor for
113     * async delivery to subscribers, and with the given initial and
114     * maximum buffer sizes for each subscriber. In the absence of
115     * other constraints, consider using {@code
116 jsr166 1.5 * ForkJoinPool.commonPool(), 8, 1024}.
117 dl 1.1 *
118     * @param executor the executor to use for async delivery,
119     * supporting creation of at least one independent thread
120     * @param initialBufferCapacity the initial capacity for each
121     * subscriber's buffer (the actual capacity may be rounded up to
122     * the nearest power of two)
123     * @param maxBufferCapacity the maximum capacity for each
124     * subscriber's buffer (the actual capacity may be rounded up to
125     * the nearest power of two)
126     * @throws NullPointerException if executor is null
127     * @throws IllegalArgumentException if initialBufferCapacity is
128     * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
129     * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
130     * a power of two array size
131     */
132     public SubmissionPublisher(Executor executor,
133     int initialBufferCapacity,
134     int maxBufferCapacity) {
135     if (executor == null)
136     throw new NullPointerException();
137     if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
138     throw new IllegalArgumentException("capacity must be positive");
139     if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
140     throw new IllegalArgumentException("capacity exceeds limit");
141     if (initialBufferCapacity > maxBufferCapacity)
142     throw new IllegalArgumentException("initial cannot exceed max capacity");
143     int minc = roundCapacity(initialBufferCapacity);
144     int maxc = roundCapacity(maxBufferCapacity);
145     this.executor = executor;
146     this.minBufferCapacity = minc;
147     this.maxBufferCapacity = maxc;
148     }
149    
150     /**
151     * Adds the given Subscriber unless already subscribed. If
152     * already subscribed, the Subscriber's onError method is invoked
153     * with an IllegalStateException. Otherwise, upon success, the
154     * Subscriber's onSubscribe method is invoked with a new
155     * Subscription (upon exception, the exception is rethrown and the
156     * Subscriber remains unsubscribed). If this SubmissionPublisher
157     * is closed, the subscriber's onComplete method is then invoked.
158     * Subscribers may enable receiving items by invoking the request
159     * method of the returned Subscription, and may unsubscribe by
160     * invoking its cancel method.
161     *
162     * @param subscriber the subscriber
163     * @throws NullPointerException if subscriber is null
164     */
165     public void subscribe(Flow.Subscriber<? super T> subscriber) {
166     if (subscriber == null) throw new NullPointerException();
167     BufferedSubscription<T> sub = new BufferedSubscription<T>(
168     subscriber, executor, minBufferCapacity, maxBufferCapacity);
169     boolean present = false, clsd;
170 jsr166 1.2 synchronized (this) {
171 dl 1.1 clsd = closed;
172     BufferedSubscription<T> pred = null, next;
173     for (BufferedSubscription<T> b = clients; b != null; b = next) {
174     next = b.next;
175     if (b.ctl < 0) { // disabled; remove
176     if (pred == null)
177     clients = next;
178     else
179     pred.next = next;
180 jsr166 1.2 }
181 dl 1.1 else if (subscriber == b.subscriber) {
182     present = true;
183     break;
184     }
185     pred = b;
186     }
187     if (!present) {
188     subscriber.onSubscribe(sub); // don't link on exception
189     if (!clsd) {
190     if (pred == null)
191     clients = sub;
192     else
193     pred.next = sub;
194     }
195     }
196     }
197     if (clsd)
198     subscriber.onComplete();
199     else if (present)
200     subscriber.onError(new IllegalStateException("Already subscribed"));
201     }
202    
203     /**
204     * Publishes the given item, if possible, to each current
205     * subscriber by asynchronously invoking its onNext method. The
206     * item may be dropped by one or more subscribers if resource
207     * limits are exceeded, in which case the given handler (if
208     * non-null) is invoked, and if it returns true, retried once.
209     *
210     * <p>If the Executor for this publisher throws a
211     * RejectedExecutionException (or any other RuntimeException or
212     * Error) when attempting to asynchronously notify subscribers, or
213     * the drop handler throws an exception when processing a dropped
214     * item, then this exception is rethrown.
215     *
216     * @param item the (non-null) item to publish
217     * @param onDrop if non-null, the handler invoked upon a drop to a
218     * subscriber, with arguments of the subscriber and item; if it
219     * returns true, an offer is re-attempted (once)
220 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
221     * to a subscriber)
222 dl 1.1 * @throws IllegalStateException if closed
223     * @throws NullPointerException if item is null
224     * @throws RejectedExecutionException if thrown by Executor
225     */
226     public int offer(T item,
227     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
228     if (item == null) throw new NullPointerException();
229     int drops = 0;
230 jsr166 1.2 synchronized (this) {
231 dl 1.1 if (closed)
232     throw new IllegalStateException("Closed");
233     BufferedSubscription<T> pred = null, next;
234     for (BufferedSubscription<T> b = clients; b != null; b = next) {
235     int stat;
236     next = b.next;
237     if ((stat = b.offer(item)) == 0 &&
238     onDrop != null &&
239     onDrop.test(b.subscriber, item))
240     stat = b.offer(item);
241     if (stat < 0) {
242     if (pred == null)
243     clients = next;
244     else
245     pred.next = next;
246     }
247     else {
248     pred = b;
249     if (stat == 0)
250     ++drops;
251     }
252     }
253     return drops;
254     }
255     }
256    
257     /**
258     * Publishes the given item, if possible, to each current
259     * subscriber by asynchronously invoking its onNext method,
260     * blocking while resources for any subscription are unavailable,
261     * up to the specified timeout or the caller thread is
262 jsr166 1.4 * interrupted, at which point the given handler (if non-null)
263 dl 1.1 * is invoked, and if it returns true, retried once.
264     *
265     * <p>If the Executor for this publisher throws a
266     * RejectedExecutionException (or any other RuntimeException or
267     * Error) when attempting to asynchronously notify subscribers, or
268     * the drop handler throws an exception when processing a dropped
269     * item, then this exception is rethrown.
270     *
271     * @param item the (non-null) item to publish
272     * @param timeout how long to wait for resources for any subscriber
273     * before giving up, in units of {@code unit}
274     * @param unit a {@code TimeUnit} determining how to interpret the
275     * {@code timeout} parameter
276     * @param onDrop if non-null, the handler invoked upon a drop to a
277     * subscriber, with arguments of the subscriber and item; if it
278     * returns true, an offer is re-attempted (once)
279 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
280     * to a subscriber)
281 dl 1.1 * @throws IllegalStateException if closed
282     * @throws NullPointerException if item is null
283     * @throws RejectedExecutionException if thrown by Executor
284     */
285     public int offer(T item, long timeout, TimeUnit unit,
286     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
287     if (item == null) throw new NullPointerException();
288     long nanos = unit.toNanos(timeout);
289     int drops = 0;
290 jsr166 1.2 synchronized (this) {
291 dl 1.1 if (closed)
292     throw new IllegalStateException("Closed");
293     BufferedSubscription<T> pred = null, next;
294     for (BufferedSubscription<T> b = clients; b != null; b = next) {
295     int stat;
296     next = b.next;
297     if ((stat = b.offer(item)) == 0 &&
298     (stat = b.timedOffer(item, nanos)) == 0 &&
299     onDrop != null &&
300     onDrop.test(b.subscriber, item))
301     stat = b.offer(item);
302     if (stat < 0) {
303     if (pred == null)
304     clients = next;
305     else
306     pred.next = next;
307     }
308     else {
309     pred = b;
310     if (stat == 0)
311     ++drops;
312     }
313     }
314     }
315     return drops;
316     }
317    
318     /**
319     * Publishes the given item to each current subscriber by
320     * asynchronously invoking its onNext method, blocking
321     * uninterruptibly while resources for any subscriber are
322     * unavailable.
323     *
324     * <p>If the Executor for this publisher throws a
325     * RejectedExecutionException (or any other RuntimeException or
326     * Error) when attempting to asynchronously notify subscribers,
327     * then this exception is rethrown.
328     *
329     * @param item the (non-null) item to publish
330     * @throws IllegalStateException if closed
331     * @throws NullPointerException if item is null
332     * @throws RejectedExecutionException if thrown by Executor
333     */
334     public void submit(T item) {
335     if (item == null) throw new NullPointerException();
336 jsr166 1.2 synchronized (this) {
337 dl 1.1 if (closed)
338     throw new IllegalStateException("Closed");
339     BufferedSubscription<T> pred = null, next;
340     for (BufferedSubscription<T> b = clients; b != null; b = next) {
341     int stat;
342     next = b.next;
343     if ((stat = b.offer(item)) == 0)
344     stat = b.submit(item);
345     if (stat < 0) {
346     if (pred == null)
347     clients = next;
348     else
349     pred.next = next;
350     }
351     else
352     pred = b;
353     }
354     }
355     }
356    
357     /**
358     * Unless already closed, issues onComplete signals to current
359     * subscribers, and disallows subsequent attempts to publish.
360     */
361     public void close() {
362     if (!closed) {
363     BufferedSubscription<T> b, next;
364 jsr166 1.2 synchronized (this) {
365 dl 1.1 b = clients;
366     clients = null;
367     closed = true;
368     }
369     while (b != null) {
370     next = b.next;
371     b.close();
372     b = next;
373     }
374     }
375     }
376    
377     /**
378     * Unless already closed, issues onError signals to current
379     * subscribers with the given error, and disallows subsequent
380     * attempts to publish.
381     *
382 jsr166 1.5 * @param error the onError argument sent to subscribers
383 dl 1.1 * @throws NullPointerException if error is null
384     */
385     public void closeExceptionally(Throwable error) {
386     if (error == null)
387     throw new NullPointerException();
388     if (!closed) {
389     BufferedSubscription<T> b, next;
390 jsr166 1.2 synchronized (this) {
391 dl 1.1 b = clients;
392     clients = null;
393     closed = true;
394     }
395     while (b != null) {
396     next = b.next;
397     b.closeExceptionally(error);
398     b = next;
399     }
400     }
401     }
402    
403     /**
404     * Returns true if this publisher is not accepting submissions.
405     *
406     * @return true if closed
407     */
408     public boolean isClosed() {
409     return closed;
410     }
411    
412     /**
413 jsr166 1.6 * Returns true if this publisher has any subscribers.
414 dl 1.1 *
415     * @return true if this publisher has any subscribers
416     */
417     public boolean hasSubscribers() {
418     boolean nonEmpty = false;
419     if (!closed) {
420 jsr166 1.2 synchronized (this) {
421 dl 1.1 BufferedSubscription<T> pred = null, next;
422     for (BufferedSubscription<T> b = clients; b != null; b = next) {
423     next = b.next;
424     if (b.ctl < 0) {
425     if (pred == null)
426     clients = next;
427     else
428     pred.next = next;
429 jsr166 1.2 }
430 dl 1.1 else {
431     nonEmpty = true;
432     break;
433     }
434     }
435     }
436     }
437     return nonEmpty;
438     }
439    
440     /**
441     * Returns the Executor used for asynchronous delivery.
442     *
443     * @return the Executor used for asynchronous delivery
444     */
445     public Executor getExecutor() {
446     return executor;
447     }
448    
449     /*
450 jsr166 1.5 * Returns the initial per-subscriber buffer capacity.
451 dl 1.1 *
452 jsr166 1.5 * @return the initial per-subscriber buffer capacity
453 dl 1.1 */
454     public int getInitialBufferCapacity() {
455     return minBufferCapacity;
456     }
457    
458     /*
459 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
460 dl 1.1 *
461 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
462 dl 1.1 */
463     public int getMaxBufferCapacity() {
464     return maxBufferCapacity;
465     }
466    
467     /**
468     * Returns a list of current subscribers.
469     *
470     * @return list of current subscribers
471     */
472     public List<Flow.Subscriber<? super T>> getSubscribers() {
473     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
474 jsr166 1.2 synchronized (this) {
475 dl 1.1 BufferedSubscription<T> pred = null, next;
476     for (BufferedSubscription<T> b = clients; b != null; b = next) {
477     next = b.next;
478     if (b.ctl < 0) {
479     if (pred == null)
480     clients = next;
481     else
482     pred.next = next;
483 jsr166 1.2 }
484 dl 1.1 else
485     subs.add(b.subscriber);
486     }
487     }
488     return subs;
489     }
490 jsr166 1.2
491 dl 1.1 /**
492     * Returns true if the given Subscriber is currently subscribed.
493     *
494     * @param subscriber the subscriber
495     * @return true if currently subscribed
496     */
497     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
498 jsr166 1.2 if (!closed) {
499     synchronized (this) {
500 dl 1.1 BufferedSubscription<T> pred = null, next;
501     for (BufferedSubscription<T> b = clients; b != null; b = next) {
502     next = b.next;
503     if (b.ctl < 0) {
504     if (pred == null)
505     clients = next;
506     else
507     pred.next = next;
508 jsr166 1.2 }
509 dl 1.1 else if (subscriber == b.subscriber)
510     return true;
511     }
512     }
513     }
514     return false;
515     }
516    
517     /**
518     * If the given subscription is managed by a SubmissionPublisher,
519     * returns an estimate of the number of items produced but not yet
520     * consumed; otherwise returns zero. This method is designed only
521     * for monitoring purposes, not for control.
522     *
523     * @param subscription the subscription
524     * @return the estimate, or zero if the subscription is of an
525     * unknown type.
526     */
527     public static int estimateAvailable(Flow.Subscription subscription) {
528     if (subscription instanceof BufferedSubscription)
529     return ((BufferedSubscription)subscription).estimateAvailable();
530     else
531     return 0;
532     }
533    
534     /**
535     * Returns a new {@link Flow.Processor} that uses this
536     * SubmissionPublisher to relay transformed items from its source
537     * using method {@link submit}, and using the given request size
538     * for requests to its source subscription. (Typically,
539     * requestSizes should range from the initial and maximum buffer
540     * capacity of this SubmissionPublisher.)
541     *
542 jsr166 1.3 * @param <S> the subscribed item type
543 dl 1.1 * @param requestSize the request size for subscriptions
544     * with the source
545     * @param transform the transform function
546     * @return the new Processor
547     * @throws NullPointerException if transform is null
548     * @throws IllegalArgumentException if requestSize not positive
549     */
550     public <S> Flow.Processor<S,T> newTransformProcessor(
551     long requestSize,
552     Function<? super S, ? extends T> transform) {
553     if (requestSize <= 0L)
554     throw new IllegalArgumentException("requestSize must be positive");
555     if (transform == null)
556     throw new NullPointerException();
557     return new TransformProcessor<S,T>(requestSize, transform, this);
558     }
559    
560     /**
561     * A task for consuming buffer items and signals, created and
562     * executed whenever they become available. A task consumes as
563     * many items/signals as possible before terminating, at which
564     * point another task is created when needed. The dual Runnable
565     * and ForkJoinTask declaration saves overhead when executed by
566     * ForkJoinPools, without impacting other kinds of Executors.
567     */
568     @SuppressWarnings("serial")
569     static final class ConsumerTask<T> extends ForkJoinTask<Void>
570     implements Runnable {
571     final BufferedSubscription<T> s;
572     ConsumerTask(BufferedSubscription<T> s) { this.s = s; }
573     public final Void getRawResult() { return null; }
574     public final void setRawResult(Void v) {}
575     public final boolean exec() { s.consume(); return true; }
576     public final void run() { s.consume(); }
577     }
578    
579     /**
580     * A bounded (ring) buffer with integrated control to start
581     * consumer tasks whenever items are available. The buffer
582     * algorithm is similar to one used inside ForkJoinPool,
583     * specialized for the case of at most one concurrent producer and
584     * consumer, and power of two buffer sizes. This allows methods to
585     * operate without locks even while supporting resizing, blocking,
586     * task-triggering, and garbage-free buffers (nulling out elements
587     * when consumed), although supporting these does impose a bit of
588     * overhead compared to plain fixed-size ring buffers.
589     *
590     * The publisher guarantees a single producer via its lock. We
591     * ensure in this class that there is at most one consumer. The
592     * request and cancel methods must be fully thread-safe but are
593     * coded to exploit the most common case in which they are only
594     * called by consumers (usually within onNext).
595     *
596     * Control over creating ConsumerTasks is managed using the ACTIVE
597     * ctl bit. We ensure that a task is active when consumable items
598     * (and usually, ERROR or COMPLETE signals) are present and there
599     * is demand (unfulfilled requests). This is complicated on the
600     * creation side by the possibility of exceptions when trying to
601     * execute tasks. These eventually force DISABLED state, but
602     * sometimes not directly. On the task side, termination (clearing
603     * ACTIVE) may race with producers or request() calls, so in some
604     * cases requires a re-check, re-activating if possible.
605     *
606     * The ctl field also manages run state. When DISABLED, no further
607     * updates are possible (to simplify checks, DISABLED is defined
608     * as a negative value). Disabling may be preceded by setting
609     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
610     * case the associated Subscriber methods are invoked, possibly
611     * synchronously if there is no active consumer task (including
612     * cases where execute() failed).
613     *
614     * Support for blocking also exploits the fact that there is only
615     * one possible waiter. ManagedBlocker-compatible control fields
616     * are placed in this class itself rather than in wait-nodes.
617     * Blocking control relies on the "waiter" field. Producers set
618     * the field before trying to block, but must then recheck (via
619     * offer) before parking. Signalling then just unparks and clears
620     * waiter field.
621     *
622     * This class uses @Contended and heuristic field declaration
623     * ordering to reduce memory contention on BufferedSubscription
624     * itself, but it does not currently attempt to avoid memory
625     * contention (especially including card-marks) among buffer
626     * elements, that can significantly slow down some usages.
627     * Addressing this may require allocating substantially more space
628     * than users expect.
629     */
630     @sun.misc.Contended
631     static final class BufferedSubscription<T> implements Flow.Subscription,
632     ForkJoinPool.ManagedBlocker {
633     // Order-sensitive field declarations
634     long timeout; // > 0 if timed wait
635     volatile long demand; // # unfilled requests
636     final int minCapacity; // initial buffer size
637     int maxCapacity; // reduced on OOME
638     int putStat; // offer result for ManagedBlocker
639     volatile int ctl; // atomic run state flags
640     volatile int head; // next position to take
641     volatile int tail; // next position to put
642     boolean wasInterrupted; // true if interrupted while waiting
643     volatile Object[] array; // buffer: null if disabled
644     Flow.Subscriber<? super T> subscriber; // null if disabled
645     Executor executor; // null if disabled
646     volatile Throwable pendingError; // holds until onError issued
647     volatile Thread waiter; // blocked producer thread
648     T putItem; // for offer within ManagedBlocker
649     BufferedSubscription<T> next; // used only by publisher
650    
651     // ctl values
652     static final int ACTIVE = 0x01; // consumer task active
653     static final int ERROR = 0x02; // signal pending error
654     static final int COMPLETE = 0x04; // signal completion when done
655     static final int DISABLED = 0x80000000; // must be negative
656    
657     BufferedSubscription(Flow.Subscriber<? super T> subscriber,
658     Executor executor, int minBufferCapacity,
659     int maxBufferCapacity) {
660     this.subscriber = subscriber;
661     this.executor = executor;
662     this.minCapacity = minBufferCapacity;
663     this.maxCapacity = maxBufferCapacity;
664     }
665    
666     /**
667     * @return -1 if disabled, 0 if dropped, else 1
668     */
669     final int offer(T item) {
670     Object[] a = array;
671     int t = tail, h = head, mask;
672     if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
673     return growAndOffer(item);
674     else {
675     a[t & mask] = item;
676     U.putOrderedInt(this, TAIL, t + 1);
677     return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
678     }
679     }
680    
681     /**
682     * Create or expand buffer if possible, then offer
683     */
684     final int growAndOffer(T item) {
685     int oldLen, len;
686     Object[] oldA = array, a = null;
687     if (oldA != null)
688     len = (oldLen = oldA.length) << 1;
689     else if (ctl < 0)
690     return -1; // disabled
691     else {
692     oldLen = 0;
693     len = minCapacity;
694     }
695     if (oldLen >= maxCapacity || len <= 0)
696     return 0;
697     try {
698     a = new Object[len];
699     } catch (Throwable ex) { // try to cope with OOME
700     if (oldLen != 0) // avoid continuous failure
701     maxCapacity = oldLen;
702     return 0;
703     }
704     array = a;
705     int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
706     if (oldA != null && oldMask >= 0 && mask > oldMask) {
707     for (Object x; j != oldTail; ++j) { // races with consumer
708     int i = j & oldMask;
709     if ((x = oldA[i]) != null &&
710     U.compareAndSwapObject(oldA,
711     (((long)i) << ASHIFT) + ABASE,
712     x, null))
713     a[j & mask] = x;
714     }
715     }
716     a[j & mask] = item;
717     U.putOrderedInt(this, TAIL, j + 1);
718     return startOnOffer();
719     }
720    
721     /**
722 jsr166 1.5 * Tries to start consumer task if items are available.
723     * Backs out and relays exception if executor fails.
724 dl 1.1 */
725     final int startOnOffer() {
726     for (;;) {
727     int c; Executor e;
728     if (((c = ctl) & ACTIVE) != 0 || demand == 0L || tail == head)
729     break;
730     if (c < 0 || (e = executor) == null)
731     return -1;
732     if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
733     try {
734     e.execute(new ConsumerTask<T>(this));
735     break;
736     } catch (RuntimeException | Error ex) { // back out
737     for (;;) {
738     if ((c = ctl) < 0 || (c & ACTIVE) == 0 ||
739     U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
740     throw ex;
741     }
742     }
743     }
744     }
745     return 1;
746     }
747    
748     /**
749     * Tries to start consumer task upon a signal or request;
750     * disables on failure
751     */
752     final void startOrDisable() {
753     Executor e; // skip if already disabled
754     if ((e = executor) != null) {
755     try {
756     e.execute(new ConsumerTask<T>(this));
757 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
758 dl 1.1 for (int c;;) {
759     if ((c = ctl) < 0 || (c & ACTIVE) == 0)
760     break;
761     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
762     closeExceptionally(ex);
763     break;
764     }
765     }
766     }
767     }
768     }
769    
770     /**
771     * Nulls out most fields, mainly to avoid garbage retention
772     * until publisher unsubscribes.
773     */
774     final void detach() {
775     pendingError = null;
776     subscriber = null;
777     executor = null;
778     array = null;
779     Thread w = waiter;
780     if (w != null) {
781     waiter = null;
782     LockSupport.unpark(w); // force wakeup
783     }
784     }
785    
786     public void request(long n) {
787     if (n > 0L) {
788     for (;;) {
789     long prev = demand, d;
790     if ((d = prev + n) < prev) // saturate
791     d = Long.MAX_VALUE;
792     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
793     for (int c, h;;) {
794     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
795     demand == 0L)
796     break;
797     if ((h = head) != tail) {
798     if (U.compareAndSwapInt(this, CTL, c,
799     c | ACTIVE)) {
800     startOrDisable();
801     break;
802     }
803     }
804     else if (head == h && tail == h)
805     break;
806     }
807     break;
808     }
809     }
810     }
811     else if (n < 0L)
812     closeExceptionally(new IllegalArgumentException(
813     "negative subscription request"));
814     }
815    
816     public void cancel() {
817     for (int c;;) {
818     if ((c = ctl) < 0)
819     break;
820     else if ((c & ACTIVE) != 0) {
821     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
822     break; // cause consumer task to exit
823     }
824     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
825     detach();
826     break;
827     }
828     }
829     }
830    
831     final void closeExceptionally(Throwable ex) {
832     pendingError = ex;
833     for (int c;;) {
834     if ((c = ctl) < 0)
835     break;
836     else if ((c & ACTIVE) != 0) {
837     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
838     break; // cause consumer task to exit
839     }
840     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
841     Flow.Subscriber<? super T> s = subscriber;
842     if (s != null) {
843     try {
844     s.onError(ex);
845 jsr166 1.2 } catch (Throwable ignore) {
846 dl 1.1 }
847     }
848     detach();
849     break;
850     }
851     }
852     }
853    
854     final void close() {
855     for (int c;;) {
856     if ((c = ctl) < 0)
857     break;
858     if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
859     if ((c & ACTIVE) == 0)
860     startOrDisable();
861     break;
862     }
863     }
864     }
865    
866     final int estimateAvailable() {
867     int n;
868     return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
869     }
870    
871     // ManagedBlocker support
872    
873     public final boolean isReleasable() {
874     T item = putItem;
875     if (item != null) {
876     if ((putStat = offer(item)) == 0)
877     return false;
878     putItem = null;
879     }
880     return true;
881     }
882    
883     public final boolean block() {
884     T item = putItem;
885     if (item != null) {
886     putItem = null;
887     long nanos = timeout;
888     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
889     while ((putStat = offer(item)) == 0) {
890     if (Thread.interrupted()) {
891     wasInterrupted = true;
892     if (nanos > 0L)
893     break;
894     }
895     else if (nanos > 0L &&
896     (nanos = deadline - System.nanoTime()) <= 0L)
897     break;
898     else if (waiter == null)
899     waiter = Thread.currentThread();
900     else {
901     if (nanos > 0L)
902     LockSupport.parkNanos(this, nanos);
903     else
904     LockSupport.park(this);
905     waiter = null;
906     }
907     }
908     }
909     waiter = null;
910     return true;
911     }
912    
913     final int submit(T item) {
914     putItem = item;
915     timeout = 0L;
916     wasInterrupted = false;
917     try {
918     ForkJoinPool.managedBlock(this);
919     } catch (InterruptedException cantHappen) {
920     }
921     if (wasInterrupted)
922     Thread.currentThread().interrupt();
923     return putStat;
924     }
925    
926     final int timedOffer(T item, long nanos) {
927     if (nanos <= 0L)
928     return 0;
929     putItem = item;
930     timeout = nanos;
931     wasInterrupted = false;
932     try {
933     ForkJoinPool.managedBlock(this);
934     } catch (InterruptedException cantHappen) {
935     }
936     if (wasInterrupted)
937     Thread.currentThread().interrupt();
938     return putStat;
939     }
940    
941     /** Consume loop called only from ConsumerTask */
942     final void consume() {
943     Flow.Subscriber<? super T> s;
944     if ((s = subscriber) != null) { // else disabled
945     for (;;) {
946     long d = demand; // read volatile fields in acceptable order
947     int c = ctl;
948     int h = head;
949     int t = tail;
950     Object[] a = array;
951     int i, n; Object x; Thread w;
952     if (c < 0) {
953     detach();
954     break;
955     }
956     else if ((c & ERROR) != 0) {
957     Throwable ex = pendingError;
958     ctl = DISABLED; // no need for CAS
959     if (ex != null) {
960     try {
961     s.onError(ex);
962 jsr166 1.2 } catch (Throwable ignore) {
963 dl 1.1 }
964     }
965     }
966     else if (h == t) { // empty
967     if (h == tail &&
968     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
969     if (h != tail || c != (c = ctl)) { // recheck
970     if ((c & (ACTIVE | DISABLED)) != 0 ||
971     !U.compareAndSwapInt(this, CTL, c,
972     c | ACTIVE))
973     break;
974     }
975     else if ((c & COMPLETE) != 0) {
976     ctl = DISABLED;
977     try {
978     s.onComplete();
979 jsr166 1.2 } catch (Throwable ignore) {
980 dl 1.1 }
981     }
982     else
983     break;
984     }
985     }
986     else if (d == 0L) { // can't take
987     if (demand == 0L &&
988     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
989     ((demand == 0L && c == (c = ctl)) || // recheck
990     (c & (ACTIVE | DISABLED)) != 0 ||
991     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
992     break;
993     }
994     else if (a != null &&
995     (n = a.length) > 0 &&
996     (x = a[i = h & (n - 1)]) != null &&
997     U.compareAndSwapObject(
998     a, (((long)i) << ASHIFT) + ABASE, x, null)) {
999     U.putOrderedInt(this, HEAD, h + 1);
1000     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1001     d = demand; // almost never fails
1002     if ((w = waiter) != null) {
1003     waiter = null;
1004     LockSupport.unpark(w); // release producer
1005     }
1006     try {
1007     @SuppressWarnings("unchecked") T y = (T) x;
1008     s.onNext(y);
1009 jsr166 1.2 } catch (Throwable ex) {
1010 dl 1.1 ctl = DISABLED;
1011     }
1012     }
1013     }
1014     }
1015     }
1016    
1017     // Unsafe setup
1018     private static final sun.misc.Unsafe U;
1019     private static final long CTL;
1020     private static final long TAIL;
1021     private static final long HEAD;
1022     private static final long DEMAND;
1023     private static final int ABASE;
1024     private static final int ASHIFT;
1025    
1026     static {
1027     try {
1028     U = sun.misc.Unsafe.getUnsafe();
1029     Class<?> k = BufferedSubscription.class;
1030     CTL = U.objectFieldOffset
1031     (k.getDeclaredField("ctl"));
1032     TAIL = U.objectFieldOffset
1033     (k.getDeclaredField("tail"));
1034     HEAD = U.objectFieldOffset
1035     (k.getDeclaredField("head"));
1036     DEMAND = U.objectFieldOffset
1037     (k.getDeclaredField("demand"));
1038     Class<?> ak = Object[].class;
1039     ABASE = U.arrayBaseOffset(ak);
1040     int scale = U.arrayIndexScale(ak);
1041     if ((scale & (scale - 1)) != 0)
1042     throw new Error("data type scale not a power of two");
1043     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1044     } catch (Exception e) {
1045     throw new Error(e);
1046     }
1047     }
1048     }
1049    
1050     static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1051     final Function<? super U, ? extends R> fn;
1052     final SubmissionPublisher<R> sink;
1053     Flow.Subscription subscription;
1054     final long requestSize;
1055     long count;
1056     TransformProcessor(long requestSize,
1057     Function<? super U, ? extends R> fn,
1058     SubmissionPublisher<R> sink) {
1059     this.fn = fn;
1060     this.sink = sink;
1061     this.requestSize = requestSize;
1062     this.count = requestSize >>> 1;
1063     }
1064     public void subscribe(Flow.Subscriber<? super R> subscriber) {
1065     sink.subscribe(subscriber);
1066     }
1067     public void onSubscribe(Flow.Subscription subscription) {
1068     (this.subscription = subscription).request(requestSize);
1069     }
1070     public void onNext(U item) {
1071     Flow.Subscription s = subscription;
1072     if (s == null)
1073     sink.closeExceptionally(
1074     new IllegalStateException("onNext without subscription"));
1075     else {
1076     try {
1077     if (--count <= 0)
1078     s.request(count = requestSize);
1079     sink.submit(fn.apply(item));
1080     } catch (Throwable ex) {
1081     try {
1082     s.cancel();
1083     } finally {
1084     sink.closeExceptionally(ex);
1085     }
1086     }
1087     }
1088     }
1089     public void onError(Throwable ex) {
1090     sink.closeExceptionally(ex);
1091     }
1092     public void onComplete() {
1093     sink.close();
1094     }
1095     }
1096     }