ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.17
Committed: Mon Jan 19 19:47:06 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.16: +34 -25 lines
Log Message:
estimateAvailable now estimateBuffered

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.10 import java.util.ArrayList;
10     import java.util.List;
11 dl 1.1 import java.util.concurrent.locks.LockSupport;
12     import java.util.function.BiPredicate;
13     import java.util.function.Function;
14 dl 1.15 import java.util.function.Supplier;
15 dl 1.1
16     /**
17     * A {@link Flow.Publisher} that asynchronously issues submitted items
18     * to current subscribers until it is closed. Each current subscriber
19     * receives newly submitted items in the same order unless drops or
20     * exceptions are encountered. Using a SubmissionPublisher allows
21 dl 1.17 * item generators to act as Publishers relying on drop handling
22     * and/or blocking for flow control.
23 dl 1.1 *
24 dl 1.16 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 dl 1.1 * constructor for delivery to subscribers. The best choice of
26     * Executor depends on expected usage. If the generator(s) of
27     * submitted items run in separate threads, and the number of
28     * subscribers can be estimated, consider using a {@link
29     * Executors#newFixedThreadPool}. Otherwise consider using a
30     * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31     *
32     * <p>Buffering allows producers and consumers to transiently operate
33     * at different rates. Each subscriber uses an independent buffer.
34     * Buffers are created upon first use with a given initial capacity,
35     * and are resized as needed up to the maximum. (Capacity arguments
36 dl 1.16 * may be rounded up to powers of two.) Invocations of {@link
37     * Flow.Subscription#request} do not directly result in buffer
38     * expansion, but risk saturation if unfulfilled requests exceed the
39     * maximum capacity. Choices of buffer parameters rely on expected
40     * rates, resources, and usages, that usually benefit from empirical
41     * testing. As first guesses, consider initial 8 and maximum 1024.
42 dl 1.1 *
43     * <p>Publication methods support different policies about what to do
44     * when buffers are saturated. Method {@link #submit} blocks until
45 dl 1.16 * resources are available. This is simplest, but least
46 dl 1.17 * responsive. The {@code offer} methods may drop items (either
47     * immediately or with bounded timeout), but provide an opportunity to
48 dl 1.16 * interpose a handler and then retry.
49 dl 1.1 *
50     * <p>If any Subscriber method throws an exception, its subscription
51     * is cancelled. If the supplied Executor throws
52     * RejectedExecutionException (or any other RuntimeException or Error)
53     * when attempting to execute a task, or a drop handler throws an
54     * exception when processing a dropped item, then the exception is
55     * rethrown. In these cases, some but not all subscribers may have
56     * received items. It is usually good practice to {@link
57 jsr166 1.11 * #closeExceptionally closeExceptionally} in these cases.
58 dl 1.1 *
59 dl 1.15 * <p>This class may also serve as a convenient base for subclasses
60     * that generate items, and use the methods in this class to publish
61     * them. For example here is a class that periodically publishes the
62     * items generated from a supplier. (In practice you might add methods
63     * to independently start and stop generation, to share schedulers
64 dl 1.16 * among publishers, and so on, or instead use a SubmissionPublisher
65     * as a component rather than a superclass.)
66 dl 1.15 *
67     * <pre> {@code
68     * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
69     * final ScheduledFuture<?> periodicTask;
70     * final ScheduledExecutorService scheduler;
71     * PeriodicPublisher(Executor executor, int initialBufferCapacity,
72     * int maxBufferCapacity, Supplier<? extends T> supplier,
73     * long period, TimeUnit unit) {
74     * super(executor, initialBufferCapacity, maxBufferCapacity);
75     * scheduler = new ScheduledThreadPoolExecutor(1);
76     * periodicTask = scheduler.scheduleAtFixedRate(
77     * () -> submit(supplier.get()), 0, period, unit);
78     * }
79     * public void close() {
80     * periodicTask.cancel(false);
81     * scheduler.shutdown();
82     * super.close();
83     * }
84     * }}</pre>
85     *
86 dl 1.16 * <p>Here is an example of {@link Flow.Processor} subclass (using
87     * single-step requests to its publisher, for simplicity of
88     * illustration):
89     *
90     * <pre> {@code
91     * class TransformProcessor<S,T> extends SubmissionPublisher<T>
92     * implements Flow.Processor<S,T> {
93     * final Function<? super S, ? extends T> function;
94     * Flow.Subscription subscription;
95     * TransformProcessor(Executor executor, int initialBufferCapacity,
96     * int maxBufferCapacity,
97     * Function<? super S, ? extends T> function) {
98     * super(executor, initialBufferCapacity, maxBufferCapacity);
99     * this.function = function;
100     * }
101     * public void onSubscribe(Flow.Subscription subscription) {
102     * (this.subscription = subscription).request(1);
103     * }
104     * public void onNext(S item) {
105     * subscription.request(1);
106     * submit(function.apply(item));
107     * }
108     * public void onError(Throwable ex) { closeExceptionally(ex); }
109     * public void onComplete() { close(); }
110     * }}</pre>
111     *
112 dl 1.1 * @param <T> the published item type
113     * @author Doug Lea
114     * @since 1.9
115     */
116     public class SubmissionPublisher<T> implements Flow.Publisher<T>,
117     AutoCloseable {
118     /*
119     * Most mechanics are handled by BufferedSubscription. This class
120 dl 1.17 * mainly tracks subscribers and ensures sequentiality, by using
121     * built-in synchronization locks across public methods. (Using
122     * built-in locks works well in the most typical case in which
123     * only one thread submits items).
124 dl 1.1 */
125    
126     // Ensuring that all arrays have power of two length
127    
128     static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
129     static final int roundCapacity(int cap) { // to nearest power of 2
130     int n = cap - 1;
131     n |= n >>> 1;
132     n |= n >>> 2;
133     n |= n >>> 4;
134     n |= n >>> 8;
135     n |= n >>> 16;
136     return (n < 0) ? 1 :
137     (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
138     }
139    
140     /**
141     * Clients (BufferedSubscriptions) are maintained in a linked list
142     * (via their "next" fields). This works well for publish loops.
143     * It requires O(n) traversal to check for duplicate subscribers,
144     * but we expect that subscribing is much less common than
145 dl 1.13 * publishing. Unsubscribing occurs only during traversal loops,
146     * when BufferedSubscription methods or status checks return
147     * negative values signifying that they have been disabled.
148 dl 1.1 */
149     BufferedSubscription<T> clients;
150    
151     // Parameters for constructing BufferedSubscriptions
152     final Executor executor;
153     final int minBufferCapacity;
154     final int maxBufferCapacity;
155    
156     /** Run status, updated only within locks */
157     volatile boolean closed;
158    
159     /**
160     * Creates a new SubmissionPublisher using the given Executor for
161     * async delivery to subscribers, and with the given initial and
162     * maximum buffer sizes for each subscriber. In the absence of
163     * other constraints, consider using {@code
164 jsr166 1.5 * ForkJoinPool.commonPool(), 8, 1024}.
165 dl 1.1 *
166     * @param executor the executor to use for async delivery,
167     * supporting creation of at least one independent thread
168     * @param initialBufferCapacity the initial capacity for each
169     * subscriber's buffer (the actual capacity may be rounded up to
170     * the nearest power of two)
171     * @param maxBufferCapacity the maximum capacity for each
172     * subscriber's buffer (the actual capacity may be rounded up to
173     * the nearest power of two)
174     * @throws NullPointerException if executor is null
175     * @throws IllegalArgumentException if initialBufferCapacity is
176     * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
177     * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
178     * a power of two array size
179     */
180     public SubmissionPublisher(Executor executor,
181     int initialBufferCapacity,
182     int maxBufferCapacity) {
183     if (executor == null)
184     throw new NullPointerException();
185     if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
186     throw new IllegalArgumentException("capacity must be positive");
187     if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
188     throw new IllegalArgumentException("capacity exceeds limit");
189     if (initialBufferCapacity > maxBufferCapacity)
190     throw new IllegalArgumentException("initial cannot exceed max capacity");
191     int minc = roundCapacity(initialBufferCapacity);
192     int maxc = roundCapacity(maxBufferCapacity);
193     this.executor = executor;
194     this.minBufferCapacity = minc;
195     this.maxBufferCapacity = maxc;
196     }
197    
198     /**
199     * Adds the given Subscriber unless already subscribed. If
200     * already subscribed, the Subscriber's onError method is invoked
201     * with an IllegalStateException. Otherwise, upon success, the
202     * Subscriber's onSubscribe method is invoked with a new
203     * Subscription (upon exception, the exception is rethrown and the
204     * Subscriber remains unsubscribed). If this SubmissionPublisher
205     * is closed, the subscriber's onComplete method is then invoked.
206 dl 1.13 * Subscribers may enable receiving items by invoking the {@code
207     * request} method of the new Subscription, and may unsubscribe by
208 dl 1.1 * invoking its cancel method.
209     *
210     * @param subscriber the subscriber
211     * @throws NullPointerException if subscriber is null
212     */
213     public void subscribe(Flow.Subscriber<? super T> subscriber) {
214     if (subscriber == null) throw new NullPointerException();
215     BufferedSubscription<T> sub = new BufferedSubscription<T>(
216     subscriber, executor, minBufferCapacity, maxBufferCapacity);
217     boolean present = false, clsd;
218 jsr166 1.2 synchronized (this) {
219 dl 1.1 BufferedSubscription<T> pred = null, next;
220 dl 1.16 if (!(clsd = closed)) {
221     for (BufferedSubscription<T> b = clients; b != null; b = next) {
222     next = b.next;
223     if (b.ctl < 0) { // disabled; remove
224     if (pred == null)
225     clients = next;
226     else
227     pred.next = next;
228     }
229     else if (subscriber == b.subscriber) {
230     present = true;
231     break;
232     }
233     pred = b;
234 dl 1.1 }
235     }
236     if (!present) {
237     subscriber.onSubscribe(sub); // don't link on exception
238     if (!clsd) {
239     if (pred == null)
240     clients = sub;
241     else
242     pred.next = sub;
243     }
244     }
245     }
246     if (clsd)
247     subscriber.onComplete();
248     else if (present)
249     subscriber.onError(new IllegalStateException("Already subscribed"));
250     }
251    
252     /**
253 dl 1.16 * Publishes the given item to each current subscriber by
254     * asynchronously invoking its onNext method, blocking
255     * uninterruptibly while resources for any subscriber are
256     * unavailable.
257     *
258     * <p>If the Executor for this publisher throws a
259     * RejectedExecutionException (or any other RuntimeException or
260     * Error) when attempting to asynchronously notify subscribers,
261     * then this exception is rethrown.
262     *
263     * @param item the (non-null) item to publish
264     * @throws IllegalStateException if closed
265     * @throws NullPointerException if item is null
266     * @throws RejectedExecutionException if thrown by Executor
267     */
268     public void submit(T item) {
269     if (item == null) throw new NullPointerException();
270     synchronized (this) {
271     if (closed)
272     throw new IllegalStateException("Closed");
273     BufferedSubscription<T> pred = null, next;
274     for (BufferedSubscription<T> b = clients; b != null; b = next) {
275     int stat;
276     next = b.next;
277     if ((stat = b.submit(item)) < 0) {
278     if (pred == null)
279     clients = next;
280     else
281     pred.next = next;
282     }
283     else
284     pred = b;
285     }
286     }
287     }
288    
289     /**
290 dl 1.1 * Publishes the given item, if possible, to each current
291     * subscriber by asynchronously invoking its onNext method. The
292     * item may be dropped by one or more subscribers if resource
293     * limits are exceeded, in which case the given handler (if
294     * non-null) is invoked, and if it returns true, retried once.
295 dl 1.16 * Other calls to methods in this class by other threads are
296     * blocked while the handler is invoked. Unless recovery is
297     * assured, options are usually limited to logging the error
298     * and/or issuing an onError signal to the subscriber.
299 dl 1.1 *
300     * <p>If the Executor for this publisher throws a
301     * RejectedExecutionException (or any other RuntimeException or
302     * Error) when attempting to asynchronously notify subscribers, or
303     * the drop handler throws an exception when processing a dropped
304     * item, then this exception is rethrown.
305     *
306     * @param item the (non-null) item to publish
307     * @param onDrop if non-null, the handler invoked upon a drop to a
308     * subscriber, with arguments of the subscriber and item; if it
309     * returns true, an offer is re-attempted (once)
310 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
311     * to a subscriber)
312 dl 1.1 * @throws IllegalStateException if closed
313     * @throws NullPointerException if item is null
314     * @throws RejectedExecutionException if thrown by Executor
315     */
316     public int offer(T item,
317     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
318     if (item == null) throw new NullPointerException();
319     int drops = 0;
320 jsr166 1.2 synchronized (this) {
321 dl 1.1 if (closed)
322     throw new IllegalStateException("Closed");
323     BufferedSubscription<T> pred = null, next;
324     for (BufferedSubscription<T> b = clients; b != null; b = next) {
325     int stat;
326     next = b.next;
327     if ((stat = b.offer(item)) == 0 &&
328     onDrop != null &&
329     onDrop.test(b.subscriber, item))
330     stat = b.offer(item);
331     if (stat < 0) {
332     if (pred == null)
333     clients = next;
334     else
335     pred.next = next;
336     }
337     else {
338     pred = b;
339     if (stat == 0)
340     ++drops;
341     }
342     }
343     return drops;
344     }
345     }
346    
347     /**
348     * Publishes the given item, if possible, to each current
349     * subscriber by asynchronously invoking its onNext method,
350     * blocking while resources for any subscription are unavailable,
351     * up to the specified timeout or the caller thread is
352 dl 1.16 * interrupted, at which point the given handler (if non-null) is
353     * invoked, and if it returns true, retried once. (The drop
354     * handler may distinguish timeouts from interrupts by checking
355     * whether the current thread is interrupted.) Other calls to
356     * methods in this class by other threads are blocked while the
357     * handler is invoked. Unless recovery is assured, options are
358     * usually limited to logging the error and/or issuing an onError
359     * signal to the subscriber.
360 dl 1.1 *
361     * <p>If the Executor for this publisher throws a
362     * RejectedExecutionException (or any other RuntimeException or
363     * Error) when attempting to asynchronously notify subscribers, or
364     * the drop handler throws an exception when processing a dropped
365     * item, then this exception is rethrown.
366     *
367     * @param item the (non-null) item to publish
368     * @param timeout how long to wait for resources for any subscriber
369     * before giving up, in units of {@code unit}
370     * @param unit a {@code TimeUnit} determining how to interpret the
371     * {@code timeout} parameter
372     * @param onDrop if non-null, the handler invoked upon a drop to a
373     * subscriber, with arguments of the subscriber and item; if it
374     * returns true, an offer is re-attempted (once)
375 jsr166 1.4 * @return the number of drops (failed attempts to issue the item
376     * to a subscriber)
377 dl 1.1 * @throws IllegalStateException if closed
378     * @throws NullPointerException if item is null
379     * @throws RejectedExecutionException if thrown by Executor
380     */
381     public int offer(T item, long timeout, TimeUnit unit,
382     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
383     if (item == null) throw new NullPointerException();
384     long nanos = unit.toNanos(timeout);
385     int drops = 0;
386 jsr166 1.2 synchronized (this) {
387 dl 1.1 if (closed)
388     throw new IllegalStateException("Closed");
389     BufferedSubscription<T> pred = null, next;
390     for (BufferedSubscription<T> b = clients; b != null; b = next) {
391     int stat;
392     next = b.next;
393 dl 1.16 if ((stat = b.timedOffer(item, nanos)) == 0 &&
394     onDrop != null && onDrop.test(b.subscriber, item))
395 dl 1.1 stat = b.offer(item);
396     if (stat < 0) {
397     if (pred == null)
398     clients = next;
399     else
400     pred.next = next;
401     }
402     else {
403     pred = b;
404     if (stat == 0)
405     ++drops;
406     }
407     }
408     }
409     return drops;
410     }
411    
412     /**
413     * Unless already closed, issues onComplete signals to current
414     * subscribers, and disallows subsequent attempts to publish.
415     */
416     public void close() {
417     if (!closed) {
418     BufferedSubscription<T> b, next;
419 jsr166 1.2 synchronized (this) {
420 dl 1.1 b = clients;
421     clients = null;
422     closed = true;
423     }
424     while (b != null) {
425     next = b.next;
426     b.close();
427     b = next;
428     }
429     }
430     }
431    
432     /**
433     * Unless already closed, issues onError signals to current
434     * subscribers with the given error, and disallows subsequent
435     * attempts to publish.
436     *
437 jsr166 1.5 * @param error the onError argument sent to subscribers
438 dl 1.1 * @throws NullPointerException if error is null
439     */
440     public void closeExceptionally(Throwable error) {
441     if (error == null)
442     throw new NullPointerException();
443     if (!closed) {
444     BufferedSubscription<T> b, next;
445 jsr166 1.2 synchronized (this) {
446 dl 1.1 b = clients;
447     clients = null;
448     closed = true;
449     }
450     while (b != null) {
451     next = b.next;
452     b.closeExceptionally(error);
453     b = next;
454     }
455     }
456     }
457    
458     /**
459     * Returns true if this publisher is not accepting submissions.
460     *
461     * @return true if closed
462     */
463     public boolean isClosed() {
464     return closed;
465     }
466    
467     /**
468 jsr166 1.6 * Returns true if this publisher has any subscribers.
469 dl 1.1 *
470     * @return true if this publisher has any subscribers
471     */
472     public boolean hasSubscribers() {
473     boolean nonEmpty = false;
474     if (!closed) {
475 jsr166 1.2 synchronized (this) {
476 dl 1.1 BufferedSubscription<T> pred = null, next;
477     for (BufferedSubscription<T> b = clients; b != null; b = next) {
478     next = b.next;
479     if (b.ctl < 0) {
480     if (pred == null)
481     clients = next;
482     else
483     pred.next = next;
484 jsr166 1.2 }
485 dl 1.1 else {
486     nonEmpty = true;
487     break;
488     }
489     }
490     }
491     }
492     return nonEmpty;
493     }
494    
495     /**
496     * Returns the Executor used for asynchronous delivery.
497     *
498     * @return the Executor used for asynchronous delivery
499     */
500     public Executor getExecutor() {
501     return executor;
502     }
503    
504 jsr166 1.7 /**
505 jsr166 1.5 * Returns the initial per-subscriber buffer capacity.
506 dl 1.1 *
507 jsr166 1.5 * @return the initial per-subscriber buffer capacity
508 dl 1.1 */
509     public int getInitialBufferCapacity() {
510     return minBufferCapacity;
511     }
512    
513 jsr166 1.7 /**
514 jsr166 1.5 * Returns the maximum per-subscriber buffer capacity.
515 dl 1.1 *
516 jsr166 1.5 * @return the maximum per-subscriber buffer capacity
517 dl 1.1 */
518     public int getMaxBufferCapacity() {
519     return maxBufferCapacity;
520     }
521    
522     /**
523     * Returns a list of current subscribers.
524     *
525     * @return list of current subscribers
526     */
527     public List<Flow.Subscriber<? super T>> getSubscribers() {
528     ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
529 jsr166 1.2 synchronized (this) {
530 dl 1.1 BufferedSubscription<T> pred = null, next;
531     for (BufferedSubscription<T> b = clients; b != null; b = next) {
532     next = b.next;
533     if (b.ctl < 0) {
534     if (pred == null)
535     clients = next;
536     else
537     pred.next = next;
538 jsr166 1.2 }
539 dl 1.1 else
540     subs.add(b.subscriber);
541     }
542     }
543     return subs;
544     }
545 jsr166 1.2
546 dl 1.1 /**
547     * Returns true if the given Subscriber is currently subscribed.
548     *
549     * @param subscriber the subscriber
550     * @return true if currently subscribed
551     */
552     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
553 jsr166 1.2 if (!closed) {
554     synchronized (this) {
555 dl 1.1 BufferedSubscription<T> pred = null, next;
556     for (BufferedSubscription<T> b = clients; b != null; b = next) {
557     next = b.next;
558     if (b.ctl < 0) {
559     if (pred == null)
560     clients = next;
561     else
562     pred.next = next;
563 jsr166 1.2 }
564 dl 1.1 else if (subscriber == b.subscriber)
565     return true;
566     }
567     }
568     }
569     return false;
570     }
571    
572     /**
573 dl 1.17 * Returns an estimate of the number of buffered items (those
574     * produced but not consumed), summed across all current
575     * subscribers.
576     *
577     * @return the estimate
578 dl 1.1 */
579 dl 1.17 public long estimateBuffered() {
580     long sum = 0L;
581     synchronized (this) {
582     BufferedSubscription<T> pred = null, next;
583     for (BufferedSubscription<T> b = clients; b != null; b = next) {
584     next = b.next;
585     if (b.ctl < 0) {
586     if (pred == null)
587     clients = next;
588     else
589     pred.next = next;
590     }
591     else
592     sum += b.estimateBuffered();
593     }
594     }
595     return sum;
596 dl 1.1 }
597    
598     /**
599 dl 1.15 * A bounded (ring) buffer with integrated control to start a
600     * consumer task whenever items are available. The buffer
601 dl 1.1 * algorithm is similar to one used inside ForkJoinPool,
602     * specialized for the case of at most one concurrent producer and
603     * consumer, and power of two buffer sizes. This allows methods to
604     * operate without locks even while supporting resizing, blocking,
605     * task-triggering, and garbage-free buffers (nulling out elements
606     * when consumed), although supporting these does impose a bit of
607     * overhead compared to plain fixed-size ring buffers.
608     *
609     * The publisher guarantees a single producer via its lock. We
610     * ensure in this class that there is at most one consumer. The
611     * request and cancel methods must be fully thread-safe but are
612     * coded to exploit the most common case in which they are only
613     * called by consumers (usually within onNext).
614     *
615 dl 1.15 * This class also serves as its own consumer task, consuming as
616     * many items/signals as possible before terminating, at which
617 dl 1.17 * point it is re-executed when needed. (The dual Runnable and
618     * ForkJoinTask declaration saves overhead when executed by
619 dl 1.15 * ForkJoinPools, without impacting other kinds of Executors.)
620     * Execution control is managed using the ACTIVE ctl bit. We
621     * ensure that a task is active when consumable items (and
622     * usually, ERROR or COMPLETE signals) are present and there is
623     * demand (unfulfilled requests). This is complicated on the
624 dl 1.1 * creation side by the possibility of exceptions when trying to
625     * execute tasks. These eventually force DISABLED state, but
626     * sometimes not directly. On the task side, termination (clearing
627     * ACTIVE) may race with producers or request() calls, so in some
628     * cases requires a re-check, re-activating if possible.
629     *
630     * The ctl field also manages run state. When DISABLED, no further
631     * updates are possible (to simplify checks, DISABLED is defined
632     * as a negative value). Disabling may be preceded by setting
633     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
634     * case the associated Subscriber methods are invoked, possibly
635     * synchronously if there is no active consumer task (including
636     * cases where execute() failed).
637     *
638     * Support for blocking also exploits the fact that there is only
639     * one possible waiter. ManagedBlocker-compatible control fields
640     * are placed in this class itself rather than in wait-nodes.
641     * Blocking control relies on the "waiter" field. Producers set
642     * the field before trying to block, but must then recheck (via
643     * offer) before parking. Signalling then just unparks and clears
644     * waiter field.
645     *
646     * This class uses @Contended and heuristic field declaration
647     * ordering to reduce memory contention on BufferedSubscription
648     * itself, but it does not currently attempt to avoid memory
649     * contention (especially including card-marks) among buffer
650     * elements, that can significantly slow down some usages.
651     * Addressing this may require allocating substantially more space
652     * than users expect.
653     */
654 dl 1.15 @SuppressWarnings("serial")
655 dl 1.1 @sun.misc.Contended
656 dl 1.15 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
657     implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
658 dl 1.1 // Order-sensitive field declarations
659 dl 1.17 volatile long demand; // # unfilled requests
660 dl 1.1 long timeout; // > 0 if timed wait
661     final int minCapacity; // initial buffer size
662     int maxCapacity; // reduced on OOME
663     int putStat; // offer result for ManagedBlocker
664     volatile int ctl; // atomic run state flags
665     volatile int head; // next position to take
666     volatile int tail; // next position to put
667     volatile Object[] array; // buffer: null if disabled
668     Flow.Subscriber<? super T> subscriber; // null if disabled
669     Executor executor; // null if disabled
670     volatile Throwable pendingError; // holds until onError issued
671     volatile Thread waiter; // blocked producer thread
672     T putItem; // for offer within ManagedBlocker
673     BufferedSubscription<T> next; // used only by publisher
674    
675     // ctl values
676     static final int ACTIVE = 0x01; // consumer task active
677     static final int ERROR = 0x02; // signal pending error
678     static final int COMPLETE = 0x04; // signal completion when done
679     static final int DISABLED = 0x80000000; // must be negative
680    
681 dl 1.16 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
682    
683 dl 1.1 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
684     Executor executor, int minBufferCapacity,
685     int maxBufferCapacity) {
686     this.subscriber = subscriber;
687     this.executor = executor;
688     this.minCapacity = minBufferCapacity;
689     this.maxCapacity = maxBufferCapacity;
690     }
691    
692     /**
693 dl 1.16 * Tries to add item and start consumer task if necessary.
694 dl 1.1 * @return -1 if disabled, 0 if dropped, else 1
695     */
696     final int offer(T item) {
697     Object[] a = array;
698 dl 1.16 int t = tail, h = head, n, i, stat;
699     if (a != null && (n = a.length) > t - h && (i = t & (n - 1)) >= 0) {
700     a[i] = item;
701 dl 1.1 U.putOrderedInt(this, TAIL, t + 1);
702     }
703 dl 1.16 else if ((stat = growAndAdd(a, item)) <= 0)
704     return stat;
705     for (int c;;) { // possibly start task
706     Executor e;
707     if (((c = ctl) & ACTIVE) != 0)
708     break;
709     else if (c < 0 || (e = executor) == null)
710     return -1;
711     else if (demand == 0L || tail == head)
712     break;
713     else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
714     try {
715     e.execute(this);
716     break;
717     } catch (RuntimeException | Error ex) { // back out
718     do {} while ((c = ctl) >= 0 &&
719     (c & ACTIVE) != 0 &&
720     !U.compareAndSwapInt(this, CTL, c,
721     c & ~ACTIVE));
722     throw ex;
723     }
724     }
725     }
726     return 1;
727 dl 1.1 }
728    
729     /**
730 dl 1.16 * Tries to create or expand buffer, then adds item if possible
731 dl 1.1 */
732 dl 1.16 final int growAndAdd(Object[] oldArray, T item) {
733     int oldLen, newLen;
734     if (oldArray != null)
735     newLen = (oldLen = oldArray.length) << 1;
736     else if (ctl >= 0) {
737 dl 1.1 oldLen = 0;
738 dl 1.16 newLen = minCapacity;
739 dl 1.1 }
740 dl 1.16 else
741     return -1; // disabled
742     if (oldLen >= maxCapacity || newLen <= 0)
743     return 0; // cannot grow
744     Object[] newArray;
745 dl 1.1 try {
746 dl 1.16 newArray = new Object[newLen];
747     } catch (Throwable ex) { // try to cope with OOME
748     if (oldLen > 0) // avoid continuous failure
749 dl 1.1 maxCapacity = oldLen;
750     return 0;
751     }
752 dl 1.16 array = newArray;
753     int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
754     if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
755     for (int j = head; j != t; ++j) { // races with consumer
756     Object x;
757 dl 1.1 int i = j & oldMask;
758 dl 1.16 if ((x = oldArray[i]) != null &&
759     U.compareAndSwapObject(oldArray,
760 dl 1.1 (((long)i) << ASHIFT) + ABASE,
761     x, null))
762 dl 1.16 newArray[j & newMask] = x;
763 dl 1.1 }
764     }
765 dl 1.16 newArray[t & newMask] = item;
766     tail = t + 1;
767     return 1;
768 dl 1.1 }
769    
770     /**
771 dl 1.16 * Nulls out most fields, mainly to avoid garbage retention
772     * until publisher unsubscribes.
773 dl 1.1 */
774 dl 1.16 final void detach() {
775     pendingError = null;
776     subscriber = null;
777     executor = null;
778     array = null;
779     Thread w = waiter;
780     if (w != null) {
781     waiter = null;
782     LockSupport.unpark(w); // force wakeup
783 dl 1.1 }
784     }
785    
786     /**
787     * Tries to start consumer task upon a signal or request;
788 jsr166 1.12 * disables on failure.
789 dl 1.1 */
790     final void startOrDisable() {
791     Executor e; // skip if already disabled
792     if ((e = executor) != null) {
793     try {
794 dl 1.15 e.execute(this);
795 jsr166 1.2 } catch (Throwable ex) { // back out and force signal
796 dl 1.1 for (int c;;) {
797     if ((c = ctl) < 0 || (c & ACTIVE) == 0)
798     break;
799     if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
800     closeExceptionally(ex);
801     break;
802     }
803     }
804     }
805     }
806     }
807    
808 dl 1.16 final void close() {
809 dl 1.1 for (int c;;) {
810     if ((c = ctl) < 0)
811     break;
812 dl 1.16 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
813     if ((c & ACTIVE) == 0)
814     startOrDisable();
815 dl 1.1 break;
816     }
817     }
818     }
819    
820     final void closeExceptionally(Throwable ex) {
821     for (int c;;) {
822     if ((c = ctl) < 0)
823     break;
824     else if ((c & ACTIVE) != 0) {
825 dl 1.16 pendingError = ex;
826 dl 1.1 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
827     break; // cause consumer task to exit
828     }
829     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
830     Flow.Subscriber<? super T> s = subscriber;
831 dl 1.16 if (s != null && ex != null) {
832 dl 1.1 try {
833     s.onError(ex);
834 jsr166 1.2 } catch (Throwable ignore) {
835 dl 1.1 }
836     }
837     detach();
838     break;
839     }
840     }
841     }
842    
843 dl 1.16 /**
844     * Causes consumer task to exit if active (without reporting
845     * onError unless there is already a pending error), and
846     * disables.
847     */
848     public void cancel() {
849 dl 1.1 for (int c;;) {
850     if ((c = ctl) < 0)
851     break;
852 dl 1.16 else if ((c & ACTIVE) != 0) {
853     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
854     break;
855     }
856     else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
857     detach();
858 dl 1.1 break;
859     }
860     }
861     }
862    
863 dl 1.16 /**
864     * Adds to demand and possibly starts task.
865     */
866     public void request(long n) {
867     if (n > 0L) {
868     for (;;) {
869     long prev = demand, d;
870     if ((d = prev + n) < prev) // saturate
871     d = Long.MAX_VALUE;
872     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
873     for (int c, h;;) {
874     if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
875     demand == 0L)
876     break;
877     if ((h = head) != tail) {
878     if (U.compareAndSwapInt(this, CTL, c,
879     c | ACTIVE)) {
880     startOrDisable();
881     break;
882     }
883     }
884     else if (head == h && tail == h)
885     break;
886     }
887     break;
888     }
889     }
890     }
891     else if (n < 0L)
892     closeExceptionally(new IllegalArgumentException(
893     "negative subscription request"));
894     }
895    
896 dl 1.17 final int estimateBuffered() {
897 dl 1.1 int n;
898     return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
899     }
900    
901     // ManagedBlocker support
902    
903     public final boolean isReleasable() {
904     T item = putItem;
905     if (item != null) {
906     if ((putStat = offer(item)) == 0)
907     return false;
908     putItem = null;
909     }
910     return true;
911     }
912    
913     public final boolean block() {
914     T item = putItem;
915     if (item != null) {
916     putItem = null;
917     long nanos = timeout;
918     long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
919     while ((putStat = offer(item)) == 0) {
920     if (Thread.interrupted()) {
921 dl 1.16 timeout = INTERRUPTED;
922 dl 1.1 if (nanos > 0L)
923     break;
924     }
925     else if (nanos > 0L &&
926     (nanos = deadline - System.nanoTime()) <= 0L)
927     break;
928     else if (waiter == null)
929     waiter = Thread.currentThread();
930     else {
931     if (nanos > 0L)
932     LockSupport.parkNanos(this, nanos);
933     else
934     LockSupport.park(this);
935     waiter = null;
936     }
937     }
938     }
939     waiter = null;
940     return true;
941     }
942    
943     final int submit(T item) {
944 dl 1.16 int stat;
945     if ((stat = offer(item)) == 0) {
946     putItem = item;
947     timeout = 0L;
948     try {
949     ForkJoinPool.managedBlock(this);
950     } catch (InterruptedException cantHappen) {
951     }
952     stat = putStat;
953     if (timeout < 0L)
954     Thread.currentThread().interrupt();
955 dl 1.1 }
956 dl 1.16 return stat;
957 dl 1.1 }
958    
959     final int timedOffer(T item, long nanos) {
960 dl 1.16 int stat;
961     if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
962     putItem = item;
963     try {
964     ForkJoinPool.managedBlock(this);
965     } catch (InterruptedException cantHappen) {
966     }
967     stat = putStat;
968     if (timeout < 0L)
969     Thread.currentThread().interrupt();
970 dl 1.1 }
971 dl 1.16 return stat;
972 dl 1.1 }
973    
974 dl 1.16 /**
975     * Consumer task loop; supports resubmission when used as
976     * ForkJoinTask.
977     */
978     public final boolean exec() {
979 dl 1.1 Flow.Subscriber<? super T> s;
980     if ((s = subscriber) != null) { // else disabled
981     for (;;) {
982     long d = demand; // read volatile fields in acceptable order
983     int c = ctl;
984     int h = head;
985     int t = tail;
986     Object[] a = array;
987     int i, n; Object x; Thread w;
988     if (c < 0) {
989     detach();
990     break;
991     }
992     else if ((c & ERROR) != 0) {
993     Throwable ex = pendingError;
994     ctl = DISABLED; // no need for CAS
995     if (ex != null) {
996     try {
997     s.onError(ex);
998 jsr166 1.2 } catch (Throwable ignore) {
999 dl 1.1 }
1000     }
1001     }
1002     else if (h == t) { // empty
1003     if (h == tail &&
1004     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
1005     if (h != tail || c != (c = ctl)) { // recheck
1006     if ((c & (ACTIVE | DISABLED)) != 0 ||
1007     !U.compareAndSwapInt(this, CTL, c,
1008     c | ACTIVE))
1009     break;
1010     }
1011     else if ((c & COMPLETE) != 0) {
1012     ctl = DISABLED;
1013     try {
1014     s.onComplete();
1015 jsr166 1.2 } catch (Throwable ignore) {
1016 dl 1.1 }
1017     }
1018     else
1019     break;
1020     }
1021     }
1022 dl 1.16 else if (a == null || (n = a.length) == 0 ||
1023     (x = a[i = h & (n - 1)]) == null)
1024     ; // stale; retry
1025 dl 1.1 else if (d == 0L) { // can't take
1026     if (demand == 0L &&
1027     U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1028     ((demand == 0L && c == (c = ctl)) || // recheck
1029     (c & (ACTIVE | DISABLED)) != 0 ||
1030     !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1031     break;
1032     }
1033 dl 1.16 else if (U.compareAndSwapObject(
1034 dl 1.1 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1035     U.putOrderedInt(this, HEAD, h + 1);
1036     while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1037     d = demand; // almost never fails
1038     if ((w = waiter) != null) {
1039     waiter = null;
1040     LockSupport.unpark(w); // release producer
1041     }
1042     try {
1043     @SuppressWarnings("unchecked") T y = (T) x;
1044     s.onNext(y);
1045 dl 1.16 } catch (Throwable ex) { // disable on throw
1046 dl 1.1 ctl = DISABLED;
1047     }
1048     }
1049     }
1050     }
1051 dl 1.16 return false; // resubmittable; never joined
1052 dl 1.1 }
1053    
1054 dl 1.16 public final void run() { exec(); }
1055 dl 1.15 public final Void getRawResult() { return null; }
1056     public final void setRawResult(Void v) {}
1057    
1058 jsr166 1.9 // Unsafe mechanics
1059     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1060 dl 1.1 private static final long CTL;
1061     private static final long TAIL;
1062     private static final long HEAD;
1063     private static final long DEMAND;
1064     private static final int ABASE;
1065     private static final int ASHIFT;
1066    
1067     static {
1068     try {
1069     CTL = U.objectFieldOffset
1070 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("ctl"));
1071 dl 1.1 TAIL = U.objectFieldOffset
1072 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("tail"));
1073 dl 1.1 HEAD = U.objectFieldOffset
1074 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("head"));
1075 dl 1.1 DEMAND = U.objectFieldOffset
1076 jsr166 1.9 (BufferedSubscription.class.getDeclaredField("demand"));
1077    
1078     ABASE = U.arrayBaseOffset(Object[].class);
1079     int scale = U.arrayIndexScale(Object[].class);
1080 dl 1.1 if ((scale & (scale - 1)) != 0)
1081     throw new Error("data type scale not a power of two");
1082     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1083 jsr166 1.9 } catch (ReflectiveOperationException e) {
1084 dl 1.1 throw new Error(e);
1085     }
1086     }
1087     }
1088     }