ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.16
Committed: Sun Jan 18 15:54:13 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.15: +258 -303 lines
Log Message:
Misc refactorings; TransformProcessors now as example, not method

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14 dl 1.15 import java.util.function.Supplier;
15 dl 1.1
16     /**
17     * A {@link Flow.Publisher} that asynchronously issues submitted items
18     * to current subscribers until it is closed. Each current subscriber
19     * receives newly submitted items in the same order unless drops or
20     * exceptions are encountered. Using a SubmissionPublisher allows
21     * item generators to act as Publishers, although without integrated
22     * flow control. Instead they rely on drop handling and/or blocking.
23     *
24 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 dl 1.1 * constructor for delivery to subscribers. The best choice of
26     * Executor depends on expected usage. If the generator(s) of
27     * submitted items run in separate threads, and the number of
28     * subscribers can be estimated, consider using a {@link
29     * Executors#newFixedThreadPool}. Otherwise consider using a
30     * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31     *
32     * <p>Buffering allows producers and consumers to transiently operate
33     * at different rates. Each subscriber uses an independent buffer.
34     * Buffers are created upon first use with a given initial capacity,
35     * and are resized as needed up to the maximum. (Capacity arguments
36 dl 1.16 * may be rounded up to powers of two.) Invocations of {@link
37     * Flow.Subscription#request} do not directly result in buffer
38     * expansion, but risk saturation if unfulfilled requests exceed the
39     * maximum capacity. Choices of buffer parameters rely on expected
40     * rates, resources, and usages, that usually benefit from empirical
41     * testing. As first guesses, consider initial 8 and maximum 1024.
42 dl 1.1 *
43     * <p>Publication methods support different policies about what to do
44     * when buffers are saturated. Method {@link #submit} blocks until
45 dl 1.16 * resources are available. This is simplest, but least
46     * responsive. The {@code offer} methods may either immediately, or
47     * with bounded timeout, drop items, but provide an opportunity to
48     * interpose a handler and then retry.
49 dl 1.1 *
50     * <p>If any Subscriber method throws an exception, its subscription
51     * is cancelled. If the supplied Executor throws
52     * RejectedExecutionException (or any other RuntimeException or Error)
53     * when attempting to execute a task, or a drop handler throws an
54     * exception when processing a dropped item, then the exception is
55     * rethrown. In these cases, some but not all subscribers may have
56     * received items. It is usually good practice to {@link
57 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
58 dl 1.1 *
59 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
60     * that generate items, and use the methods in this class to publish
61     * them. For example here is a class that periodically publishes the
62     * items generated from a supplier. (In practice you might add methods
63     * to independently start and stop generation, to share schedulers
64 dl 1.16 * among publishers, and so on, or instead use a SubmissionPublisher
65     * as a component rather than a superclass.)
66 dl 1.15 *
67     * <pre> {@code
68     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
69     * final ScheduledFuture<?> periodicTask;
70     * final ScheduledExecutorService scheduler;
71     * PeriodicPublisher(Executor executor, int initialBufferCapacity,
72     * int maxBufferCapacity, Supplier<? extends T> supplier,
73     * long period, TimeUnit unit) {
74     * super(executor, initialBufferCapacity, maxBufferCapacity);
75     * scheduler = new ScheduledThreadPoolExecutor(1);
76     * periodicTask = scheduler.scheduleAtFixedRate(
77     * () -> submit(supplier.get()), 0, period, unit);
78     * }
79     * public void close() {
80     * periodicTask.cancel(false);
81     * scheduler.shutdown();
82     * super.close();
83     * }
84     * }}</pre>
85     *
86 dl 1.16 * <p>Here is an example of {@link Flow.Processor} subclass (using
87     * single-step requests to its publisher, for simplicity of
88     * illustration):
89     *
90     * <pre> {@code
91     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
92     * implements Flow.Processor<S,T> {
93     * final Function<? super S, ? extends T> function;
94     * Flow.Subscription subscription;
95     * TransformProcessor(Executor executor, int initialBufferCapacity,
96     * int maxBufferCapacity,
97     * Function<? super S, ? extends T> function) {
98     * super(executor, initialBufferCapacity, maxBufferCapacity);
99     * this.function = function;
100     * }
101     * public void onSubscribe(Flow.Subscription subscription) {
102     * (this.subscription = subscription).request(1);
103     * }
104     * public void onNext(S item) {
105     * subscription.request(1);
106     * submit(function.apply(item));
107     * }
108     * public void onError(Throwable ex) { closeExceptionally(ex); }
109     * public void onComplete() { close(); }
110     * }}</pre>
111     *
112 dl 1.1 * @param <T> the published item type
113     * @author Doug Lea
114     * @since 1.9
115     */
116     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
117     AutoCloseable {
118     /*
119     * Most mechanics are handled by BufferedSubscription. This class
120     * mainly ensures sequentiality by using built-in synchronization
121     * locks across public methods. (Using built-in locks works well
122     * in the most typical case in which only one thread submits
123     * items).
124     */
125    
126     // Ensuring that all arrays have power of two length
127    
128     static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
129     static final int roundCapacity(int cap) { // to nearest power of 2
130     int n = cap - 1;
131     n |= n >>> 1;
132     n |= n >>> 2;
133     n |= n >>> 4;
134     n |= n >>> 8;
135     n |= n >>> 16;
136     return (n < 0) ? 1 :
137     (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
138     }
139    
140     /**
141     * Clients (BufferedSubscriptions) are maintained in a linked list
142     * (via their "next" fields). This works well for publish loops.
143     * It requires O(n) traversal to check for duplicate subscribers,
144     * but we expect that subscribing is much less common than
145 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
146     * when BufferedSubscription methods or status checks return
147     * negative values signifying that they have been disabled.
148 dl 1.1 */
149     BufferedSubscription<T> clients;
150    
151     // Parameters for constructing BufferedSubscriptions
152     final Executor executor;
153     final int minBufferCapacity;
154     final int maxBufferCapacity;
155    
156     /** Run status, updated only within locks */
157     volatile boolean closed;
158    
159     /**
160     * Creates a new SubmissionPublisher using the given Executor for
161     * async delivery to subscribers, and with the given initial and
162     * maximum buffer sizes for each subscriber. In the absence of
163     * other constraints, consider using {@code
164 jsr166 1.5 * ForkJoinPool.commonPool(), 8, 1024}.
165 dl 1.1 *
166     * @param executor the executor to use for async delivery,
167     * supporting creation of at least one independent thread
168     * @param initialBufferCapacity the initial capacity for each
169     * subscriber's buffer (the actual capacity may be rounded up to
170     * the nearest power of two)
171     * @param maxBufferCapacity the maximum capacity for each
172     * subscriber's buffer (the actual capacity may be rounded up to
173     * the nearest power of two)
174     * @throws NullPointerException if executor is null
175     * @throws IllegalArgumentException if initialBufferCapacity is
176     * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
177     * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
178     * a power of two array size
179     */
180     public SubmissionPublisher(Executor executor,
181     int initialBufferCapacity,
182     int maxBufferCapacity) {
183     if (executor == null)
184     throw new NullPointerException();
185     if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
186     throw new IllegalArgumentException("capacity must be positive");
187     if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
188     throw new IllegalArgumentException("capacity exceeds limit");
189     if (initialBufferCapacity > maxBufferCapacity)
190     throw new IllegalArgumentException("initial cannot exceed max capacity");
191     int minc = roundCapacity(initialBufferCapacity);
192     int maxc = roundCapacity(maxBufferCapacity);
193     this.executor = executor;
194     this.minBufferCapacity = minc;
195     this.maxBufferCapacity = maxc;
196     }
197    
198     /**
199     * Adds the given Subscriber unless already subscribed. If
200     * already subscribed, the Subscriber's onError method is invoked
201     * with an IllegalStateException. Otherwise, upon success, the
202     * Subscriber's onSubscribe method is invoked with a new
203     * Subscription (upon exception, the exception is rethrown and the
204     * Subscriber remains unsubscribed). If this SubmissionPublisher
205     * is closed, the subscriber's onComplete method is then invoked.
206 dl 1.13 * Subscribers may enable receiving items by invoking the {@code
207     * request} method of the new Subscription, and may unsubscribe by
208 dl 1.1 * invoking its cancel method.
209     *
210     * @param subscriber the subscriber
211     * @throws NullPointerException if subscriber is null
212     */
213     public void subscribe(Flow.Subscriber<? super T> subscriber) {
214     if (subscriber == null) throw new NullPointerException();
215     BufferedSubscription<T> sub = new BufferedSubscription<T>(
216     subscriber, executor, minBufferCapacity, maxBufferCapacity);
217     boolean present = false, clsd;
218 jsr166 1.2 synchronized (this) {
219 dl 1.1 BufferedSubscription<T> pred = null, next;
220 dl 1.16 if (!(clsd = closed)) {
221     for (BufferedSubscription<T> b = clients; b != null; b = next) {
222     next = b.next;
223     if (b.ctl < 0) { // disabled; remove
224     if (pred == null)
225     clients = next;
226     else
227     pred.next = next;
228     }
229     else if (subscriber == b.subscriber) {
230     present = true;
231     break;
232     }
233     pred = b;
234 dl 1.1 }
235     }
236     if (!present) {
237     subscriber.onSubscribe(sub); // don't link on exception
238     if (!clsd) {
239     if (pred == null)
240     clients = sub;
241     else
242     pred.next = sub;
243     }
244     }
245     }
246     if (clsd)
247     subscriber.onComplete();
248     else if (present)
249     subscriber.onError(new IllegalStateException("Already subscribed"));
250     }
251    
252     /**
253 dl 1.16 * Publishes the given item to each current subscriber by
254     * asynchronously invoking its onNext method, blocking
255     * uninterruptibly while resources for any subscriber are
256     * unavailable.
257     *
258     * <p>If the Executor for this publisher throws a
259     * RejectedExecutionException (or any other RuntimeException or
260     * Error) when attempting to asynchronously notify subscribers,
261     * then this exception is rethrown.
262     *
263     * @param item the (non-null) item to publish
264     * @throws IllegalStateException if closed
265     * @throws NullPointerException if item is null
266     * @throws RejectedExecutionException if thrown by Executor
267     */
268     public void submit(T item) {
269     if (item == null) throw new NullPointerException();
270     synchronized (this) {
271     if (closed)
272     throw new IllegalStateException("Closed");
273     BufferedSubscription<T> pred = null, next;
274     for (BufferedSubscription<T> b = clients; b != null; b = next) {
275     int stat;
276     next = b.next;
277     if ((stat = b.submit(item)) < 0) {
278     if (pred == null)
279     clients = next;
280     else
281     pred.next = next;
282     }
283     else
284     pred = b;
285     }
286     }
287     }
288    
289     /**
290 dl 1.1 * Publishes the given item, if possible, to each current
291     * subscriber by asynchronously invoking its onNext method. The
292     * item may be dropped by one or more subscribers if resource
293     * limits are exceeded, in which case the given handler (if
294     * non-null) is invoked, and if it returns true, retried once.
295 dl 1.16 * Other calls to methods in this class by other threads are
296     * blocked while the handler is invoked. Unless recovery is
297     * assured, options are usually limited to logging the error
298     * and/or issuing an onError signal to the subscriber.
299 dl 1.1 *
300     * <p>If the Executor for this publisher throws a
301     * RejectedExecutionException (or any other RuntimeException or
302     * Error) when attempting to asynchronously notify subscribers, or
303     * the drop handler throws an exception when processing a dropped
304     * item, then this exception is rethrown.
305     *
306     * @param item the (non-null) item to publish
307     * @param onDrop if non-null, the handler invoked upon a drop to a
308     * subscriber, with arguments of the subscriber and item; if it
309     * returns true, an offer is re-attempted (once)
310 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
311     * to a subscriber)
312 dl 1.1 * @throws IllegalStateException if closed
313     * @throws NullPointerException if item is null
314     * @throws RejectedExecutionException if thrown by Executor
315     */
316     public int offer(T item,
317     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
318     if (item == null) throw new NullPointerException();
319     int drops = 0;
320 jsr166 1.2 synchronized (this) {
321 dl 1.1 if (closed)
322     throw new IllegalStateException("Closed");
323     BufferedSubscription<T> pred = null, next;
324     for (BufferedSubscription<T> b = clients; b != null; b = next) {
325     int stat;
326     next = b.next;
327     if ((stat = b.offer(item)) == 0 &&
328     onDrop != null &&
329     onDrop.test(b.subscriber, item))
330     stat = b.offer(item);
331     if (stat < 0) {
332     if (pred == null)
333     clients = next;
334     else
335     pred.next = next;
336     }
337     else {
338     pred = b;
339     if (stat == 0)
340     ++drops;
341     }
342     }
343     return drops;
344     }
345     }
346    
347     /**
348     * Publishes the given item, if possible, to each current
349     * subscriber by asynchronously invoking its onNext method,
350     * blocking while resources for any subscription are unavailable,
351     * up to the specified timeout or the caller thread is
352 dl 1.16 * interrupted, at which point the given handler (if non-null) is
353     * invoked, and if it returns true, retried once. (The drop
354     * handler may distinguish timeouts from interrupts by checking
355     * whether the current thread is interrupted.) Other calls to
356     * methods in this class by other threads are blocked while the
357     * handler is invoked. Unless recovery is assured, options are
358     * usually limited to logging the error and/or issuing an onError
359     * signal to the subscriber.
360 dl 1.1 *
361     * <p>If the Executor for this publisher throws a
362     * RejectedExecutionException (or any other RuntimeException or
363     * Error) when attempting to asynchronously notify subscribers, or
364     * the drop handler throws an exception when processing a dropped
365     * item, then this exception is rethrown.
366     *
367     * @param item the (non-null) item to publish
368     * @param timeout how long to wait for resources for any subscriber
369     * before giving up, in units of {@code unit}
370     * @param unit a {@code TimeUnit} determining how to interpret the
371     * {@code timeout} parameter
372     * @param onDrop if non-null, the handler invoked upon a drop to a
373     * subscriber, with arguments of the subscriber and item; if it
374     * returns true, an offer is re-attempted (once)
375 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
376     * to a subscriber)
377 dl 1.1 * @throws IllegalStateException if closed
378     * @throws NullPointerException if item is null
379     * @throws RejectedExecutionException if thrown by Executor
380     */
381     public int offer(T item, long timeout, TimeUnit unit,
382     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
383     if (item == null) throw new NullPointerException();
384     long nanos = unit.toNanos(timeout);
385     int drops = 0;
386 jsr166 1.2 synchronized (this) {
387 dl 1.1 if (closed)
388     throw new IllegalStateException("Closed");
389     BufferedSubscription<T> pred = null, next;
390     for (BufferedSubscription<T> b = clients; b != null; b = next) {
391     int stat;
392     next = b.next;
393 dl 1.16 if ((stat = b.timedOffer(item, nanos)) == 0 &&
394     onDrop != null && onDrop.test(b.subscriber, item))
395 dl 1.1 stat = b.offer(item);
396     if (stat < 0) {
397     if (pred == null)
398     clients = next;
399     else
400     pred.next = next;
401     }
402     else {
403     pred = b;
404     if (stat == 0)
405     ++drops;
406     }
407     }
408     }
409     return drops;
410     }
411    
412     /**
413     * Unless already closed, issues onComplete signals to current
414     * subscribers, and disallows subsequent attempts to publish.
415     */
416     public void close() {
417     if (!closed) {
418     BufferedSubscription<T> b, next;
419 jsr166 1.2 synchronized (this) {
420 dl 1.1 b = clients;
421     clients = null;
422     closed = true;
423     }
424     while (b != null) {
425     next = b.next;
426     b.close();
427     b = next;
428     }
429     }
430     }
431    
432     /**
433     * Unless already closed, issues onError signals to current
434     * subscribers with the given error, and disallows subsequent
435     * attempts to publish.
436     *
437 jsr166 1.5 * @param error the onError argument sent to subscribers
438 dl 1.1 * @throws NullPointerException if error is null
439     */
440     public void closeExceptionally(Throwable error) {
441     if (error == null)
442     throw new NullPointerException();
443     if (!closed) {
444     BufferedSubscription<T> b, next;
445 jsr166 1.2 synchronized (this) {
446 dl 1.1 b = clients;
447     clients = null;
448     closed = true;
449     }
450     while (b != null) {
451     next = b.next;
452     b.closeExceptionally(error);
453     b = next;
454     }
455     }
456     }
457    
458     /**
459     * Returns true if this publisher is not accepting submissions.
460     *
461     * @return true if closed
462     */
463     public boolean isClosed() {
464     return closed;
465     }
466    
467     /**
468 jsr166 1.6 * Returns true if this publisher has any subscribers.
469 dl 1.1 *
470     * @return true if this publisher has any subscribers
471     */
472     public boolean hasSubscribers() {
473     boolean nonEmpty = false;
474     if (!closed) {
475 jsr166 1.2 synchronized (this) {
476 dl 1.1 BufferedSubscription<T> pred = null, next;
477     for (BufferedSubscription<T> b = clients; b != null; b = next) {
478     next = b.next;
479     if (b.ctl < 0) {
480     if (pred == null)
481     clients = next;
482     else
483     pred.next = next;
484 jsr166 1.2 }
485 dl 1.1 else {
486     nonEmpty = true;
487     break;
488     }
489     }
490     }
491     }
492     return nonEmpty;
493     }
494    
495     /**
496     * Returns the Executor used for asynchronous delivery.
497     *
498     * @return the Executor used for asynchronous delivery
499     */
500     public Executor getExecutor() {
501     return executor;
502     }
503    
504 jsr166 1.7 /**
505 jsr166 1.5 * Returns the initial per-subscriber buffer capacity.
506 dl 1.1 *
507 jsr166 1.5 * @return the initial per-subscriber buffer capacity
508 dl 1.1 */
509     public int getInitialBufferCapacity() {
510     return minBufferCapacity;
511     }
512    
513 jsr166 1.7 /**
514 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
515 dl 1.1 *
516 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
517 dl 1.1 */
518     public int getMaxBufferCapacity() {
519     return maxBufferCapacity;
520     }
521    
522     /**
523     * Returns a list of current subscribers.
524     *
525     * @return list of current subscribers
526     */
527     public List<Flow.Subscriber<? super T>> getSubscribers() {
528     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
529 jsr166 1.2 synchronized (this) {
530 dl 1.1 BufferedSubscription<T> pred = null, next;
531     for (BufferedSubscription<T> b = clients; b != null; b = next) {
532     next = b.next;
533     if (b.ctl < 0) {
534     if (pred == null)
535     clients = next;
536     else
537     pred.next = next;
538 jsr166 1.2 }
539 dl 1.1 else
540     subs.add(b.subscriber);
541     }
542     }
543     return subs;
544     }
545 jsr166 1.2
546 dl 1.1 /**
547     * Returns true if the given Subscriber is currently subscribed.
548     *
549     * @param subscriber the subscriber
550     * @return true if currently subscribed
551     */
552     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
553 jsr166 1.2 if (!closed) {
554     synchronized (this) {
555 dl 1.1 BufferedSubscription<T> pred = null, next;
556     for (BufferedSubscription<T> b = clients; b != null; b = next) {
557     next = b.next;
558     if (b.ctl < 0) {
559     if (pred == null)
560     clients = next;
561     else
562     pred.next = next;
563 jsr166 1.2 }
564 dl 1.1 else if (subscriber == b.subscriber)
565     return true;
566     }
567     }
568     }
569     return false;
570     }
571    
572     /**
573     * If the given subscription is managed by a SubmissionPublisher,
574     * returns an estimate of the number of items produced but not yet
575     * consumed; otherwise returns zero. This method is designed only
576     * for monitoring purposes, not for control.
577     *
578     * @param subscription the subscription
579     * @return the estimate, or zero if the subscription is of an
580 jsr166 1.8 * unknown type
581 dl 1.1 */
582     public static int estimateAvailable(Flow.Subscription subscription) {
583     if (subscription instanceof BufferedSubscription)
584     return ((BufferedSubscription)subscription).estimateAvailable();
585     else
586     return 0;
587     }
588    
589     /**
590 dl 1.15 * A bounded (ring) buffer with integrated control to start a
591     * consumer task whenever items are available. The buffer
592 dl 1.1 * algorithm is similar to one used inside ForkJoinPool,
593     * specialized for the case of at most one concurrent producer and
594     * consumer, and power of two buffer sizes. This allows methods to
595     * operate without locks even while supporting resizing, blocking,
596     * task-triggering, and garbage-free buffers (nulling out elements
597     * when consumed), although supporting these does impose a bit of
598     * overhead compared to plain fixed-size ring buffers.
599     *
600     * The publisher guarantees a single producer via its lock. We
601     * ensure in this class that there is at most one consumer. The
602     * request and cancel methods must be fully thread-safe but are
603     * coded to exploit the most common case in which they are only
604     * called by consumers (usually within onNext).
605     *
606 dl 1.15 * This class also serves as its own consumer task, consuming as
607     * many items/signals as possible before terminating, at which
608     * point it is re-executed created when needed. (The dual Runnable
609     * and ForkJoinTask declaration saves overhead when executed by
610     * ForkJoinPools, without impacting other kinds of Executors.)
611     * Execution control is managed using the ACTIVE ctl bit. We
612     * ensure that a task is active when consumable items (and
613     * usually, ERROR or COMPLETE signals) are present and there is
614     * demand (unfulfilled requests). This is complicated on the
615 dl 1.1 * creation side by the possibility of exceptions when trying to
616     * execute tasks. These eventually force DISABLED state, but
617     * sometimes not directly. On the task side, termination (clearing
618     * ACTIVE) may race with producers or request() calls, so in some
619     * cases requires a re-check, re-activating if possible.
620     *
621     * The ctl field also manages run state. When DISABLED, no further
622     * updates are possible (to simplify checks, DISABLED is defined
623     * as a negative value). Disabling may be preceded by setting
624     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
625     * case the associated Subscriber methods are invoked, possibly
626     * synchronously if there is no active consumer task (including
627     * cases where execute() failed).
628     *
629     * Support for blocking also exploits the fact that there is only
630     * one possible waiter. ManagedBlocker-compatible control fields
631     * are placed in this class itself rather than in wait-nodes.
632     * Blocking control relies on the "waiter" field. Producers set
633     * the field before trying to block, but must then recheck (via
634     * offer) before parking. Signalling then just unparks and clears
635     * waiter field.
636     *
637     * This class uses @Contended and heuristic field declaration
638     * ordering to reduce memory contention on BufferedSubscription
639     * itself, but it does not currently attempt to avoid memory
640     * contention (especially including card-marks) among buffer
641     * elements, that can significantly slow down some usages.
642     * Addressing this may require allocating substantially more space
643     * than users expect.
644     */
645 dl 1.15 @SuppressWarnings("serial")
646 dl 1.1 @sun.misc.Contended
647 dl 1.15 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
648     implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
649 dl 1.1 // Order-sensitive field declarations
650     long timeout; // > 0 if timed wait
651     volatile long demand; // # unfilled requests
652     final int minCapacity; // initial buffer size
653     int maxCapacity; // reduced on OOME
654     int putStat; // offer result for ManagedBlocker
655     volatile int ctl; // atomic run state flags
656     volatile int head; // next position to take
657     volatile int tail; // next position to put
658     volatile Object[] array; // buffer: null if disabled
659     Flow.Subscriber<? super T> subscriber; // null if disabled
660     Executor executor; // null if disabled
661     volatile Throwable pendingError; // holds until onError issued
662     volatile Thread waiter; // blocked producer thread
663     T putItem; // for offer within ManagedBlocker
664     BufferedSubscription<T> next; // used only by publisher
665    
666     // ctl values
667     static final int ACTIVE = 0x01; // consumer task active
668     static final int ERROR = 0x02; // signal pending error
669     static final int COMPLETE = 0x04; // signal completion when done
670     static final int DISABLED = 0x80000000; // must be negative
671    
672 dl 1.16 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
673    
674 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
675     Executor executor, int minBufferCapacity,
676     int maxBufferCapacity) {
677     this.subscriber = subscriber;
678     this.executor = executor;
679     this.minCapacity = minBufferCapacity;
680     this.maxCapacity = maxBufferCapacity;
681     }
682    
683     /**
684 dl 1.16 * Tries to add item and start consumer task if necessary.
685 dl 1.1 * @return -1 if disabled, 0 if dropped, else 1
686     */
687     final int offer(T item) {
688     Object[] a = array;
689 dl 1.16 int t = tail, h = head, n, i, stat;
690     if (a != null && (n = a.length) > t - h && (i = t & (n - 1)) >= 0) {
691     a[i] = item;
692 dl 1.1 U.putOrderedInt(this, TAIL, t + 1);
693     }
694 dl 1.16 else if ((stat = growAndAdd(a, item)) <= 0)
695     return stat;
696     for (int c;;) { // possibly start task
697     Executor e;
698     if (((c = ctl) & ACTIVE) != 0)
699     break;
700     else if (c < 0 || (e = executor) == null)
701     return -1;
702     else if (demand == 0L || tail == head)
703     break;
704     else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
705     try {
706     e.execute(this);
707     break;
708     } catch (RuntimeException | Error ex) { // back out
709     do {} while ((c = ctl) >= 0 &&
710     (c & ACTIVE) != 0 &&
711     !U.compareAndSwapInt(this, CTL, c,
712     c & ~ACTIVE));
713     throw ex;
714     }
715     }
716     }
717     return 1;
718 dl 1.1 }
719    
720     /**
721 dl 1.16 * Tries to create or expand buffer, then adds item if possible
722 dl 1.1 */
723 dl 1.16 final int growAndAdd(Object[] oldArray, T item) {
724     int oldLen, newLen;
725     if (oldArray != null)
726     newLen = (oldLen = oldArray.length) << 1;
727     else if (ctl >= 0) {
728 dl 1.1 oldLen = 0;
729 dl 1.16 newLen = minCapacity;
730 dl 1.1 }
731 dl 1.16 else
732     return -1; // disabled
733     if (oldLen >= maxCapacity || newLen <= 0)
734     return 0; // cannot grow
735     Object[] newArray;
736 dl 1.1 try {
737 dl 1.16 newArray = new Object[newLen];
738     } catch (Throwable ex) { // try to cope with OOME
739     if (oldLen > 0) // avoid continuous failure
740 dl 1.1 maxCapacity = oldLen;
741     return 0;
742     }
743 dl 1.16 array = newArray;
744     int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
745     if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
746     for (int j = head; j != t; ++j) { // races with consumer
747     Object x;
748 dl 1.1 int i = j & oldMask;
749 dl 1.16 if ((x = oldArray[i]) != null &&
750     U.compareAndSwapObject(oldArray,
751 dl 1.1 (((long)i) << ASHIFT) + ABASE,
752     x, null))
753 dl 1.16 newArray[j & newMask] = x;
754 dl 1.1 }
755     }
756 dl 1.16 newArray[t & newMask] = item;
757     tail = t + 1;
758     return 1;
759 dl 1.1 }
760    
761     /**
762 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
763     * until publisher unsubscribes.
764 dl 1.1 */
765 dl 1.16 final void detach() {
766     pendingError = null;
767     subscriber = null;
768     executor = null;
769     array = null;
770     Thread w = waiter;
771     if (w != null) {
772     waiter = null;
773     LockSupport.unpark(w); // force wakeup
774 dl 1.1 }
775     }
776    
777     /**
778     * Tries to start consumer task upon a signal or request;
779 jsr166 1.12 * disables on failure.
780 dl 1.1 */
781     final void startOrDisable() {
782     Executor e; // skip if already disabled
783     if ((e = executor) != null) {
784     try {
785 dl 1.15 e.execute(this);
786 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
787 dl 1.1 for (int c;;) {
788     if ((c = ctl) < 0 || (c & ACTIVE) == 0)
789     break;
790     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
791     closeExceptionally(ex);
792     break;
793     }
794     }
795     }
796     }
797     }
798    
799 dl 1.16 final void close() {
800 dl 1.1 for (int c;;) {
801     if ((c = ctl) < 0)
802     break;
803 dl 1.16 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
804     if ((c & ACTIVE) == 0)
805     startOrDisable();
806 dl 1.1 break;
807     }
808     }
809     }
810    
811     final void closeExceptionally(Throwable ex) {
812     for (int c;;) {
813     if ((c = ctl) < 0)
814     break;
815     else if ((c & ACTIVE) != 0) {
816 dl 1.16 pendingError = ex;
817 dl 1.1 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
818     break; // cause consumer task to exit
819     }
820     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
821     Flow.Subscriber<? super T> s = subscriber;
822 dl 1.16 if (s != null && ex != null) {
823 dl 1.1 try {
824     s.onError(ex);
825 jsr166 1.2 } catch (Throwable ignore) {
826 dl 1.1 }
827     }
828     detach();
829     break;
830     }
831     }
832     }
833    
834 dl 1.16 /**
835     * Causes consumer task to exit if active (without reporting
836     * onError unless there is already a pending error), and
837     * disables.
838     */
839     public void cancel() {
840 dl 1.1 for (int c;;) {
841     if ((c = ctl) < 0)
842     break;
843 dl 1.16 else if ((c & ACTIVE) != 0) {
844     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
845     break;
846     }
847     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
848     detach();
849 dl 1.1 break;
850     }
851     }
852     }
853    
854 dl 1.16 /**
855     * Adds to demand and possibly starts task.
856     */
857     public void request(long n) {
858     if (n > 0L) {
859     for (;;) {
860     long prev = demand, d;
861     if ((d = prev + n) < prev) // saturate
862     d = Long.MAX_VALUE;
863     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
864     for (int c, h;;) {
865     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
866     demand == 0L)
867     break;
868     if ((h = head) != tail) {
869     if (U.compareAndSwapInt(this, CTL, c,
870     c | ACTIVE)) {
871     startOrDisable();
872     break;
873     }
874     }
875     else if (head == h && tail == h)
876     break;
877     }
878     break;
879     }
880     }
881     }
882     else if (n < 0L)
883     closeExceptionally(new IllegalArgumentException(
884     "negative subscription request"));
885     }
886    
887 dl 1.1 final int estimateAvailable() {
888     int n;
889     return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
890     }
891    
892     // ManagedBlocker support
893    
894     public final boolean isReleasable() {
895     T item = putItem;
896     if (item != null) {
897     if ((putStat = offer(item)) == 0)
898     return false;
899     putItem = null;
900     }
901     return true;
902     }
903    
904     public final boolean block() {
905     T item = putItem;
906     if (item != null) {
907     putItem = null;
908     long nanos = timeout;
909     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
910     while ((putStat = offer(item)) == 0) {
911     if (Thread.interrupted()) {
912 dl 1.16 timeout = INTERRUPTED;
913 dl 1.1 if (nanos > 0L)
914     break;
915     }
916     else if (nanos > 0L &&
917     (nanos = deadline - System.nanoTime()) <= 0L)
918     break;
919     else if (waiter == null)
920     waiter = Thread.currentThread();
921     else {
922     if (nanos > 0L)
923     LockSupport.parkNanos(this, nanos);
924     else
925     LockSupport.park(this);
926     waiter = null;
927     }
928     }
929     }
930     waiter = null;
931     return true;
932     }
933    
934     final int submit(T item) {
935 dl 1.16 int stat;
936     if ((stat = offer(item)) == 0) {
937     putItem = item;
938     timeout = 0L;
939     try {
940     ForkJoinPool.managedBlock(this);
941     } catch (InterruptedException cantHappen) {
942     }
943     stat = putStat;
944     if (timeout < 0L)
945     Thread.currentThread().interrupt();
946 dl 1.1 }
947 dl 1.16 return stat;
948 dl 1.1 }
949    
950     final int timedOffer(T item, long nanos) {
951 dl 1.16 int stat;
952     if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
953     putItem = item;
954     try {
955     ForkJoinPool.managedBlock(this);
956     } catch (InterruptedException cantHappen) {
957     }
958     stat = putStat;
959     if (timeout < 0L)
960     Thread.currentThread().interrupt();
961 dl 1.1 }
962 dl 1.16 return stat;
963 dl 1.1 }
964    
965 dl 1.16 /**
966     * Consumer task loop; supports resubmission when used as
967     * ForkJoinTask.
968     */
969     public final boolean exec() {
970 dl 1.1 Flow.Subscriber<? super T> s;
971     if ((s = subscriber) != null) { // else disabled
972     for (;;) {
973     long d = demand; // read volatile fields in acceptable order
974     int c = ctl;
975     int h = head;
976     int t = tail;
977     Object[] a = array;
978     int i, n; Object x; Thread w;
979     if (c < 0) {
980     detach();
981     break;
982     }
983     else if ((c & ERROR) != 0) {
984     Throwable ex = pendingError;
985     ctl = DISABLED; // no need for CAS
986     if (ex != null) {
987     try {
988     s.onError(ex);
989 jsr166 1.2 } catch (Throwable ignore) {
990 dl 1.1 }
991     }
992     }
993     else if (h == t) { // empty
994     if (h == tail &&
995     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
996     if (h != tail || c != (c = ctl)) { // recheck
997     if ((c & (ACTIVE | DISABLED)) != 0 ||
998     !U.compareAndSwapInt(this, CTL, c,
999     c | ACTIVE))
1000     break;
1001     }
1002     else if ((c & COMPLETE) != 0) {
1003     ctl = DISABLED;
1004     try {
1005     s.onComplete();
1006 jsr166 1.2 } catch (Throwable ignore) {
1007 dl 1.1 }
1008     }
1009     else
1010     break;
1011     }
1012     }
1013 dl 1.16 else if (a == null || (n = a.length) == 0 ||
1014     (x = a[i = h & (n - 1)]) == null)
1015     ; // stale; retry
1016 dl 1.1 else if (d == 0L) { // can't take
1017     if (demand == 0L &&
1018     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1019     ((demand == 0L && c == (c = ctl)) || // recheck
1020     (c & (ACTIVE | DISABLED)) != 0 ||
1021     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1022     break;
1023     }
1024 dl 1.16 else if (U.compareAndSwapObject(
1025 dl 1.1 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1026     U.putOrderedInt(this, HEAD, h + 1);
1027     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1028     d = demand; // almost never fails
1029     if ((w = waiter) != null) {
1030     waiter = null;
1031     LockSupport.unpark(w); // release producer
1032     }
1033     try {
1034     @SuppressWarnings("unchecked") T y = (T) x;
1035     s.onNext(y);
1036 dl 1.16 } catch (Throwable ex) { // disable on throw
1037 dl 1.1 ctl = DISABLED;
1038     }
1039     }
1040     }
1041     }
1042 dl 1.16 return false; // resubmittable; never joined
1043 dl 1.1 }
1044    
1045 dl 1.16 public final void run() { exec(); }
1046 dl 1.15 public final Void getRawResult() { return null; }
1047     public final void setRawResult(Void v) {}
1048    
1049 jsr166 1.9 // Unsafe mechanics
1050     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1051 dl 1.1 private static final long CTL;
1052     private static final long TAIL;
1053     private static final long HEAD;
1054     private static final long DEMAND;
1055     private static final int ABASE;
1056     private static final int ASHIFT;
1057    
1058     static {
1059     try {
1060     CTL = U.objectFieldOffset
1061 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1062 dl 1.1 TAIL = U.objectFieldOffset
1063 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1064 dl 1.1 HEAD = U.objectFieldOffset
1065 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1066 dl 1.1 DEMAND = U.objectFieldOffset
1067 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1068    
1069     ABASE = U.arrayBaseOffset(Object[].class);
1070     int scale = U.arrayIndexScale(Object[].class);
1071 dl 1.1 if ((scale & (scale - 1)) != 0)
1072     throw new Error("data type scale not a power of two");
1073     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1074 jsr166 1.9 } catch (ReflectiveOperationException e) {
1075 dl 1.1 throw new Error(e);
1076     }
1077     }
1078     }
1079     }