ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.19
Committed: Tue Jan 20 01:22:28 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.18: +116 -93 lines
Log Message:
subscribe async when possible

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted items
18 * to current subscribers until it is closed. Each current subscriber
19 * receives newly submitted items in the same order unless drops or
20 * exceptions are encountered. Using a SubmissionPublisher allows
21 * item generators to act as Publishers relying on drop handling
22 * and/or blocking for flow control.
23 *
24 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using a
30 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use with a given initial capacity,
35 * and are resized as needed up to the maximum. (Capacity arguments
36 * may be rounded up to powers of two.) Invocations of {@link
37 * Flow.Subscription#request} do not directly result in buffer
38 * expansion, but risk saturation if unfulfilled requests exceed the
39 * maximum capacity. Choices of buffer parameters rely on expected
40 * rates, resources, and usages, that usually benefit from empirical
41 * testing. As first guesses, consider initial 8 and maximum 1024.
42 *
43 * <p>An overloaded version of method {@code subscribe} allows
44 * subscribers to override executor and buffer parameters for
45 * a new subscription.
46 *
47 * <p>Publication methods support different policies about what to do
48 * when buffers are saturated. Method {@link #submit} blocks until
49 * resources are available. This is simplest, but least
50 * responsive. The {@code offer} methods may drop items (either
51 * immediately or with bounded timeout), but provide an opportunity to
52 * interpose a handler and then retry.
53 *
54 * <p>If any Subscriber method throws an exception, its subscription
55 * is cancelled. If the supplied Executor throws
56 * RejectedExecutionException (or any other RuntimeException or Error)
57 * when attempting to execute a task, or a drop handler throws an
58 * exception when processing a dropped item, then the exception is
59 * rethrown. In these cases, some but not all subscribers may have
60 * received items. It is usually good practice to {@link
61 * #closeExceptionally closeExceptionally} in these cases.
62 *
63 * <p>This class may also serve as a convenient base for subclasses
64 * that generate items, and use the methods in this class to publish
65 * them. For example here is a class that periodically publishes the
66 * items generated from a supplier. (In practice you might add methods
67 * to independently start and stop generation, to share schedulers
68 * among publishers, and so on, or instead use a SubmissionPublisher
69 * as a component rather than a superclass.)
70 *
71 * <pre> {@code
72 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
73 * final ScheduledFuture<?> periodicTask;
74 * final ScheduledExecutorService scheduler;
75 * PeriodicPublisher(Executor executor, int initialBufferCapacity,
76 * int maxBufferCapacity, Supplier<? extends T> supplier,
77 * long period, TimeUnit unit) {
78 * super(executor, initialBufferCapacity, maxBufferCapacity);
79 * scheduler = new ScheduledThreadPoolExecutor(1);
80 * periodicTask = scheduler.scheduleAtFixedRate(
81 * () -> submit(supplier.get()), 0, period, unit);
82 * }
83 * public void close() {
84 * periodicTask.cancel(false);
85 * scheduler.shutdown();
86 * super.close();
87 * }
88 * }}</pre>
89 *
90 * <p>Here is an example of {@link Flow.Processor} subclass (using
91 * single-step requests to its publisher, for simplicity of
92 * illustration):
93 *
94 * <pre> {@code
95 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
96 * implements Flow.Processor<S,T> {
97 * final Function<? super S, ? extends T> function;
98 * Flow.Subscription subscription;
99 * TransformProcessor(Executor executor, int initialBufferCapacity,
100 * int maxBufferCapacity,
101 * Function<? super S, ? extends T> function) {
102 * super(executor, initialBufferCapacity, maxBufferCapacity);
103 * this.function = function;
104 * }
105 * public void onSubscribe(Flow.Subscription subscription) {
106 * (this.subscription = subscription).request(1);
107 * }
108 * public void onNext(S item) {
109 * subscription.request(1);
110 * submit(function.apply(item));
111 * }
112 * public void onError(Throwable ex) { closeExceptionally(ex); }
113 * public void onComplete() { close(); }
114 * }}</pre>
115 *
116 * @param <T> the published item type
117 * @author Doug Lea
118 * @since 1.9
119 */
120 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
121 AutoCloseable {
122 /*
123 * Most mechanics are handled by BufferedSubscription. This class
124 * mainly tracks subscribers and ensures sequentiality, by using
125 * built-in synchronization locks across public methods. (Using
126 * built-in locks works well in the most typical case in which
127 * only one thread submits items).
128 */
129
130 // Ensuring that all arrays have power of two length
131
132 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
133 static final int roundCapacity(int cap) { // to nearest power of 2
134 int n = cap - 1;
135 n |= n >>> 1;
136 n |= n >>> 2;
137 n |= n >>> 4;
138 n |= n >>> 8;
139 n |= n >>> 16;
140 return (n < 0) ? 1 :
141 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
142 }
143
144 /**
145 * Clients (BufferedSubscriptions) are maintained in a linked list
146 * (via their "next" fields). This works well for publish loops.
147 * It requires O(n) traversal to check for duplicate subscribers,
148 * but we expect that subscribing is much less common than
149 * publishing. Unsubscribing occurs only during traversal loops,
150 * when BufferedSubscription methods or status checks return
151 * negative values signifying that they have been disabled.
152 */
153 BufferedSubscription<T> clients;
154
155 // Parameters for constructing BufferedSubscriptions
156 final Executor executor;
157 final int minBufferCapacity;
158 final int maxBufferCapacity;
159
160 /** Run status, updated only within locks */
161 volatile boolean closed;
162
163 /**
164 * Creates a new SubmissionPublisher using the given Executor for
165 * async delivery to subscribers, and with the given initial and
166 * maximum buffer sizes for each subscriber. In the absence of
167 * other constraints, consider using {@code
168 * ForkJoinPool.commonPool(), 8, 1024}.
169 *
170 * @param executor the executor to use for async delivery,
171 * supporting creation of at least one independent thread
172 * @param initialBufferCapacity the initial capacity for each
173 * subscriber's buffer (the actual capacity may be rounded up to
174 * the nearest power of two)
175 * @param maxBufferCapacity the maximum capacity for each
176 * subscriber's buffer (the actual capacity may be rounded up to
177 * the nearest power of two)
178 * @throws NullPointerException if executor is null
179 * @throws IllegalArgumentException if initialBufferCapacity is
180 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
181 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
182 * a power of two array size
183 */
184 public SubmissionPublisher(Executor executor,
185 int initialBufferCapacity,
186 int maxBufferCapacity) {
187 checkSubscriptionParams(executor, initialBufferCapacity,
188 maxBufferCapacity);
189 int minc = roundCapacity(initialBufferCapacity);
190 int maxc = roundCapacity(maxBufferCapacity);
191 this.executor = executor;
192 this.minBufferCapacity = minc;
193 this.maxBufferCapacity = maxc;
194 }
195
196 static void checkSubscriptionParams(Executor executor,
197 int initialBufferCapacity,
198 int maxBufferCapacity) {
199 if (executor == null)
200 throw new NullPointerException();
201 if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
202 throw new IllegalArgumentException("capacity must be positive");
203 if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
204 throw new IllegalArgumentException("capacity exceeds limit");
205 if (initialBufferCapacity > maxBufferCapacity)
206 throw new IllegalArgumentException("initial cannot exceed max capacity");
207 }
208
209 /**
210 * Adds the given Subscriber unless already subscribed. If
211 * already subscribed, the Subscriber's onError method is invoked
212 * with an IllegalStateException. Otherwise, upon success, the
213 * Subscriber's onSubscribe method is invoked with a new
214 * Subscription (upon exception, the subscription is
215 * cancelled). If this SubmissionPublisher is closed, the
216 * subscriber's onComplete method is then invoked. Subscribers
217 * may enable receiving items by invoking the {@code request}
218 * method of the new Subscription, and may unsubscribe by invoking
219 * its cancel method.
220 *
221 * @param subscriber the subscriber
222 * @throws NullPointerException if subscriber is null
223 */
224 public void subscribe(Flow.Subscriber<? super T> subscriber) {
225 doSubscribe(subscriber, executor, minBufferCapacity, maxBufferCapacity);
226 }
227
228 /**
229 * Adds the given subscriber, with the same effects as {@link
230 * #subscribe(Flow.Subscriber)}, but overriding the
231 * executor and/or buffer capacities otherwise used by this
232 * Publisher.
233 *
234 * @param subscriber the subscriber
235 * @param executor the executor to use for async delivery,
236 * supporting creation of at least one independent thread
237 * @param initialBufferCapacity the initial capacity for each
238 * subscriber's buffer (the actual capacity may be rounded up to
239 * the nearest power of two)
240 * @param maxBufferCapacity the maximum capacity for each
241 * subscriber's buffer (the actual capacity may be rounded up to
242 * the nearest power of two)
243 * @throws NullPointerException if executor or subscriber are null
244 * @throws IllegalArgumentException if initialBufferCapacity is
245 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
246 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
247 * a power of two array size
248 */
249 public void subscribe(Flow.Subscriber<? super T> subscriber,
250 Executor executor,
251 int initialBufferCapacity,
252 int maxBufferCapacity) {
253 checkSubscriptionParams(executor, initialBufferCapacity, maxBufferCapacity);
254 doSubscribe(subscriber, executor,
255 roundCapacity(initialBufferCapacity),
256 roundCapacity(maxBufferCapacity));
257 }
258
259 /**
260 * Implements both forms of subscribe
261 */
262 final void doSubscribe(Flow.Subscriber<? super T> subscriber,
263 Executor e, int minCap, int maxCap) {
264 if (subscriber == null) throw new NullPointerException();
265 BufferedSubscription<T> sbn =
266 new BufferedSubscription<T>(subscriber, e, minCap, maxCap);
267 boolean present = false;
268 synchronized (this) {
269 BufferedSubscription<T> pred = null, next;
270 for (BufferedSubscription<T> b = clients; ; b = next) {
271 if (b == null) {
272 if (pred == null)
273 clients = sbn;
274 else
275 pred.next = sbn;
276 sbn.onSubscribe();
277 if (closed)
278 sbn.onComplete();
279 break;
280 }
281 next = b.next;
282 if (b.isDisabled()) { // remove
283 if (pred == null)
284 clients = next;
285 else
286 pred.next = next;
287 }
288 else if (subscriber == b.subscriber) {
289 present = true;
290 break;
291 }
292 pred = b;
293 }
294 }
295 if (present)
296 subscriber.onError(new IllegalStateException("Already subscribed"));
297 }
298
299 /**
300 * Publishes the given item to each current subscriber by
301 * asynchronously invoking its onNext method, blocking
302 * uninterruptibly while resources for any subscriber are
303 * unavailable.
304 *
305 * <p>If the Executor for this publisher throws a
306 * RejectedExecutionException (or any other RuntimeException or
307 * Error) when attempting to asynchronously notify subscribers,
308 * then this exception is rethrown.
309 *
310 * @param item the (non-null) item to publish
311 * @throws IllegalStateException if closed
312 * @throws NullPointerException if item is null
313 * @throws RejectedExecutionException if thrown by Executor
314 */
315 public void submit(T item) {
316 if (item == null) throw new NullPointerException();
317 synchronized (this) {
318 if (closed)
319 throw new IllegalStateException("Closed");
320 BufferedSubscription<T> pred = null, next;
321 for (BufferedSubscription<T> b = clients; b != null; b = next) {
322 int stat;
323 next = b.next;
324 if ((stat = b.submit(item)) < 0) {
325 if (pred == null)
326 clients = next;
327 else
328 pred.next = next;
329 }
330 else
331 pred = b;
332 }
333 }
334 }
335
336 /**
337 * Publishes the given item, if possible, to each current
338 * subscriber by asynchronously invoking its onNext method. The
339 * item may be dropped by one or more subscribers if resource
340 * limits are exceeded, in which case the given handler (if
341 * non-null) is invoked, and if it returns true, retried once.
342 * Other calls to methods in this class by other threads are
343 * blocked while the handler is invoked. Unless recovery is
344 * assured, options are usually limited to logging the error
345 * and/or issuing an onError signal to the subscriber.
346 *
347 * <p>If the Executor for this publisher throws a
348 * RejectedExecutionException (or any other RuntimeException or
349 * Error) when attempting to asynchronously notify subscribers, or
350 * the drop handler throws an exception when processing a dropped
351 * item, then this exception is rethrown.
352 *
353 * @param item the (non-null) item to publish
354 * @param onDrop if non-null, the handler invoked upon a drop to a
355 * subscriber, with arguments of the subscriber and item; if it
356 * returns true, an offer is re-attempted (once)
357 * @return the number of drops (failed attempts to issue the item
358 * to a subscriber)
359 * @throws IllegalStateException if closed
360 * @throws NullPointerException if item is null
361 * @throws RejectedExecutionException if thrown by Executor
362 */
363 public int offer(T item,
364 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
365 if (item == null) throw new NullPointerException();
366 int drops = 0;
367 synchronized (this) {
368 if (closed)
369 throw new IllegalStateException("Closed");
370 BufferedSubscription<T> pred = null, next;
371 for (BufferedSubscription<T> b = clients; b != null; b = next) {
372 int stat;
373 next = b.next;
374 if ((stat = b.offer(item)) == 0 &&
375 onDrop != null &&
376 onDrop.test(b.subscriber, item))
377 stat = b.offer(item);
378 if (stat < 0) {
379 if (pred == null)
380 clients = next;
381 else
382 pred.next = next;
383 }
384 else {
385 pred = b;
386 if (stat == 0)
387 ++drops;
388 }
389 }
390 return drops;
391 }
392 }
393
394 /**
395 * Publishes the given item, if possible, to each current
396 * subscriber by asynchronously invoking its onNext method,
397 * blocking while resources for any subscription are unavailable,
398 * up to the specified timeout or the caller thread is
399 * interrupted, at which point the given handler (if non-null) is
400 * invoked, and if it returns true, retried once. (The drop
401 * handler may distinguish timeouts from interrupts by checking
402 * whether the current thread is interrupted.) Other calls to
403 * methods in this class by other threads are blocked while the
404 * handler is invoked. Unless recovery is assured, options are
405 * usually limited to logging the error and/or issuing an onError
406 * signal to the subscriber.
407 *
408 * <p>If the Executor for this publisher throws a
409 * RejectedExecutionException (or any other RuntimeException or
410 * Error) when attempting to asynchronously notify subscribers, or
411 * the drop handler throws an exception when processing a dropped
412 * item, then this exception is rethrown.
413 *
414 * @param item the (non-null) item to publish
415 * @param timeout how long to wait for resources for any subscriber
416 * before giving up, in units of {@code unit}
417 * @param unit a {@code TimeUnit} determining how to interpret the
418 * {@code timeout} parameter
419 * @param onDrop if non-null, the handler invoked upon a drop to a
420 * subscriber, with arguments of the subscriber and item; if it
421 * returns true, an offer is re-attempted (once)
422 * @return the number of drops (failed attempts to issue the item
423 * to a subscriber)
424 * @throws IllegalStateException if closed
425 * @throws NullPointerException if item is null
426 * @throws RejectedExecutionException if thrown by Executor
427 */
428 public int offer(T item, long timeout, TimeUnit unit,
429 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
430 if (item == null) throw new NullPointerException();
431 long nanos = unit.toNanos(timeout);
432 int drops = 0;
433 synchronized (this) {
434 if (closed)
435 throw new IllegalStateException("Closed");
436 BufferedSubscription<T> pred = null, next;
437 for (BufferedSubscription<T> b = clients; b != null; b = next) {
438 int stat;
439 next = b.next;
440 if ((stat = b.offerNanos(item, nanos)) == 0 &&
441 onDrop != null && onDrop.test(b.subscriber, item))
442 stat = b.offer(item);
443 if (stat < 0) {
444 if (pred == null)
445 clients = next;
446 else
447 pred.next = next;
448 }
449 else {
450 pred = b;
451 if (stat == 0)
452 ++drops;
453 }
454 }
455 }
456 return drops;
457 }
458
459 /**
460 * Unless already closed, issues onComplete signals to current
461 * subscribers, and disallows subsequent attempts to publish.
462 */
463 public void close() {
464 if (!closed) {
465 BufferedSubscription<T> b, next;
466 synchronized (this) {
467 b = clients;
468 clients = null;
469 closed = true;
470 }
471 while (b != null) {
472 next = b.next;
473 b.onComplete();
474 b = next;
475 }
476 }
477 }
478
479 /**
480 * Unless already closed, issues onError signals to current
481 * subscribers with the given error, and disallows subsequent
482 * attempts to publish.
483 *
484 * @param error the onError argument sent to subscribers
485 * @throws NullPointerException if error is null
486 */
487 public void closeExceptionally(Throwable error) {
488 if (error == null)
489 throw new NullPointerException();
490 if (!closed) {
491 BufferedSubscription<T> b, next;
492 synchronized (this) {
493 b = clients;
494 clients = null;
495 closed = true;
496 }
497 while (b != null) {
498 next = b.next;
499 b.onError(error);
500 b = next;
501 }
502 }
503 }
504
505 /**
506 * Returns true if this publisher is not accepting submissions.
507 *
508 * @return true if closed
509 */
510 public boolean isClosed() {
511 return closed;
512 }
513
514 /**
515 * Returns true if this publisher has any subscribers.
516 *
517 * @return true if this publisher has any subscribers
518 */
519 public boolean hasSubscribers() {
520 boolean nonEmpty = false;
521 if (!closed) {
522 synchronized (this) {
523 BufferedSubscription<T> pred = null, next;
524 for (BufferedSubscription<T> b = clients; b != null; b = next) {
525 next = b.next;
526 if (b.isDisabled()) {
527 if (pred == null)
528 clients = next;
529 else
530 pred.next = next;
531 }
532 else {
533 nonEmpty = true;
534 break;
535 }
536 }
537 }
538 }
539 return nonEmpty;
540 }
541
542 /**
543 * Returns the Executor used for asynchronous delivery.
544 *
545 * @return the Executor used for asynchronous delivery
546 */
547 public Executor getExecutor() {
548 return executor;
549 }
550
551 /**
552 * Returns the initial per-subscriber buffer capacity.
553 *
554 * @return the initial per-subscriber buffer capacity
555 */
556 public int getInitialBufferCapacity() {
557 return minBufferCapacity;
558 }
559
560 /**
561 * Returns the maximum per-subscriber buffer capacity.
562 *
563 * @return the maximum per-subscriber buffer capacity
564 */
565 public int getMaxBufferCapacity() {
566 return maxBufferCapacity;
567 }
568
569 /**
570 * Returns a list of current subscribers.
571 *
572 * @return list of current subscribers
573 */
574 public List<Flow.Subscriber<? super T>> getSubscribers() {
575 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
576 synchronized (this) {
577 BufferedSubscription<T> pred = null, next;
578 for (BufferedSubscription<T> b = clients; b != null; b = next) {
579 next = b.next;
580 if (b.isDisabled()) {
581 if (pred == null)
582 clients = next;
583 else
584 pred.next = next;
585 }
586 else
587 subs.add(b.subscriber);
588 }
589 }
590 return subs;
591 }
592
593 /**
594 * Returns true if the given Subscriber is currently subscribed.
595 *
596 * @param subscriber the subscriber
597 * @return true if currently subscribed
598 */
599 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
600 if (!closed) {
601 synchronized (this) {
602 BufferedSubscription<T> pred = null, next;
603 for (BufferedSubscription<T> b = clients; b != null; b = next) {
604 next = b.next;
605 if (b.isDisabled()) {
606 if (pred == null)
607 clients = next;
608 else
609 pred.next = next;
610 }
611 else if (subscriber == b.subscriber)
612 return true;
613 }
614 }
615 }
616 return false;
617 }
618
619 /**
620 * Returns an estimate of the number of buffered items (those
621 * produced but not consumed), summed across all current
622 * subscribers.
623 *
624 * @return the estimate
625 */
626 public long estimateBuffered() {
627 long sum = 0L;
628 synchronized (this) {
629 BufferedSubscription<T> pred = null, next;
630 for (BufferedSubscription<T> b = clients; b != null; b = next) {
631 next = b.next;
632 if (b.isDisabled()) {
633 if (pred == null)
634 clients = next;
635 else
636 pred.next = next;
637 }
638 else
639 sum += b.estimateBuffered();
640 }
641 }
642 return sum;
643 }
644
645 /**
646 * A bounded (ring) buffer with integrated control to start a
647 * consumer task whenever items are available. The buffer
648 * algorithm is similar to one used inside ForkJoinPool,
649 * specialized for the case of at most one concurrent producer and
650 * consumer, and power of two buffer sizes. This allows methods to
651 * operate without locks even while supporting resizing, blocking,
652 * task-triggering, and garbage-free buffers (nulling out elements
653 * when consumed), although supporting these does impose a bit of
654 * overhead compared to plain fixed-size ring buffers.
655 *
656 * The publisher guarantees a single producer via its lock. We
657 * ensure in this class that there is at most one consumer. The
658 * request and cancel methods must be fully thread-safe but are
659 * coded to exploit the most common case in which they are only
660 * called by consumers (usually within onNext).
661 *
662 * This class also serves as its own consumer task, consuming as
663 * many items/signals as possible before terminating, at which
664 * point it is re-executed when needed. (The dual Runnable and
665 * ForkJoinTask declaration saves overhead when executed by
666 * ForkJoinPools, without impacting other kinds of Executors.)
667 * Execution control is managed using the ACTIVE ctl bit. We
668 * ensure that a task is active when consumable items (and
669 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
670 * there is demand (unfulfilled requests). This is complicated on
671 * the creation side by the possibility of exceptions when trying
672 * to execute tasks. These eventually force DISABLED state, but
673 * sometimes not directly. On the task side, termination (clearing
674 * ACTIVE) may race with producers or request() calls, so in some
675 * cases requires a re-check, re-activating if possible.
676 *
677 * The ctl field also manages run state. When DISABLED, no further
678 * updates are possible (to simplify checks, DISABLED is defined
679 * as a negative value). Disabling may be preceded by setting
680 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
681 * case the associated Subscriber methods are invoked, possibly
682 * synchronously if there is no active consumer task (including
683 * cases where execute() failed).
684 *
685 * Support for blocking also exploits the fact that there is only
686 * one possible waiter. ManagedBlocker-compatible control fields
687 * are placed in this class itself rather than in wait-nodes.
688 * Blocking control relies on the "waiter" field. Producers set
689 * the field before trying to block, but must then recheck (via
690 * offer) before parking. Signalling then just unparks and clears
691 * waiter field.
692 *
693 * This class uses @Contended and heuristic field declaration
694 * ordering to reduce memory contention on BufferedSubscription
695 * itself, but it does not currently attempt to avoid memory
696 * contention (especially including card-marks) among buffer
697 * elements, that can significantly slow down some usages.
698 * Addressing this may require allocating substantially more space
699 * than users expect.
700 */
701 @SuppressWarnings("serial")
702 @sun.misc.Contended
703 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
704 implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
705 // Order-sensitive field declarations
706 volatile long demand; // # unfilled requests
707 long timeout; // > 0 if timed wait
708 final int minCapacity; // initial buffer size
709 int maxCapacity; // reduced on OOME
710 int putStat; // offer result for ManagedBlocker
711 volatile int ctl; // atomic run state flags
712 volatile int head; // next position to take
713 volatile int tail; // next position to put
714 volatile Object[] array; // buffer: null if disabled
715 Flow.Subscriber<? super T> subscriber; // null if disabled
716 Executor executor; // null if disabled
717 volatile Throwable pendingError; // holds until onError issued
718 volatile Thread waiter; // blocked producer thread
719 T putItem; // for offer within ManagedBlocker
720 BufferedSubscription<T> next; // used only by publisher
721
722 // ctl values
723 static final int ACTIVE = 0x01; // consumer task active
724 static final int ERROR = 0x02; // signal onError
725 static final int SUBSCRIBE = 0x04; // signal onSubscribe
726 static final int COMPLETE = 0x08; // signal onComplete when done
727 static final int DISABLED = 0x80000000; // must be negative
728
729 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
730
731 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
732 Executor executor, int minBufferCapacity,
733 int maxBufferCapacity) {
734 this.subscriber = subscriber;
735 this.executor = executor;
736 this.minCapacity = minBufferCapacity;
737 this.maxCapacity = maxBufferCapacity;
738 }
739
740 final boolean isDisabled() {
741 return ctl < 0;
742 }
743
744 /**
745 * Tries to add item and start consumer task if necessary.
746 * @return -1 if disabled, 0 if dropped, else 1
747 */
748 final int offer(T item) {
749 Object[] a = array;
750 int t = tail, h = head, n, i, stat;
751 if (a != null && (n = a.length) > t - h && (i = t & (n - 1)) >= 0) {
752 a[i] = item;
753 U.putOrderedInt(this, TAIL, t + 1);
754 }
755 else if ((stat = growAndAdd(a, item)) <= 0)
756 return stat;
757 for (int c;;) { // possibly start task
758 Executor e;
759 if (((c = ctl) & ACTIVE) != 0)
760 break;
761 else if (c < 0 || (e = executor) == null)
762 return -1;
763 else if (demand == 0L || tail == head)
764 break;
765 else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
766 try {
767 e.execute(this);
768 break;
769 } catch (RuntimeException | Error ex) { // back out
770 do {} while ((c = ctl) >= 0 &&
771 (c & ACTIVE) != 0 &&
772 !U.compareAndSwapInt(this, CTL, c,
773 c & ~ACTIVE));
774 throw ex;
775 }
776 }
777 }
778 return 1;
779 }
780
781 /**
782 * Tries to create or expand buffer, then adds item if possible
783 */
784 final int growAndAdd(Object[] oldArray, T item) {
785 int oldLen, newLen;
786 if (oldArray != null)
787 newLen = (oldLen = oldArray.length) << 1;
788 else if (ctl >= 0) {
789 oldLen = 0;
790 newLen = minCapacity;
791 }
792 else
793 return -1; // disabled
794 if (oldLen >= maxCapacity || newLen <= 0)
795 return 0; // cannot grow
796 Object[] newArray;
797 try {
798 newArray = new Object[newLen];
799 } catch (Throwable ex) { // try to cope with OOME
800 if (oldLen > 0) // avoid continuous failure
801 maxCapacity = oldLen;
802 return 0;
803 }
804 array = newArray;
805 int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
806 if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
807 for (int j = head; j != t; ++j) { // races with consumer
808 Object x;
809 int i = j & oldMask;
810 if ((x = oldArray[i]) != null &&
811 U.compareAndSwapObject(oldArray,
812 (((long)i) << ASHIFT) + ABASE,
813 x, null))
814 newArray[j & newMask] = x;
815 }
816 }
817 newArray[t & newMask] = item;
818 tail = t + 1;
819 return 1;
820 }
821
822 /**
823 * Nulls out most fields, mainly to avoid garbage retention
824 * until publisher unsubscribes.
825 */
826 final void detach() {
827 pendingError = null;
828 subscriber = null;
829 executor = null;
830 array = null;
831 Thread w = waiter;
832 if (w != null) {
833 waiter = null;
834 LockSupport.unpark(w); // force wakeup
835 }
836 }
837
838 /**
839 * Issues error signal, asynchronously if a task is running,
840 * else synchronously
841 */
842 final void onError(Throwable ex) {
843 for (int c;;) {
844 if ((c = ctl) < 0)
845 break;
846 else if ((c & ACTIVE) != 0) {
847 pendingError = ex;
848 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
849 break; // cause consumer task to exit
850 }
851 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
852 Flow.Subscriber<? super T> s = subscriber;
853 if (s != null && ex != null) {
854 try {
855 s.onError(ex);
856 } catch (Throwable ignore) {
857 }
858 }
859 detach();
860 break;
861 }
862 }
863 }
864
865 /**
866 * Tries to start consumer task upon a signal or request;
867 * disables on failure.
868 */
869 final void startOrDisable() {
870 Executor e; // skip if already disabled
871 if ((e = executor) != null) {
872 try {
873 e.execute(this);
874 } catch (Throwable ex) { // back out and force signal
875 for (int c;;) {
876 if ((c = ctl) < 0 || (c & ACTIVE) == 0)
877 break;
878 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
879 onError(ex);
880 break;
881 }
882 }
883 }
884 }
885 }
886
887 final void onComplete() {
888 for (int c;;) {
889 if ((c = ctl) < 0)
890 break;
891 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
892 if ((c & ACTIVE) == 0)
893 startOrDisable();
894 break;
895 }
896 }
897 }
898
899 final void onSubscribe() {
900 for (int c;;) {
901 if ((c = ctl) < 0)
902 break;
903 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | SUBSCRIBE))) {
904 if ((c & ACTIVE) == 0)
905 startOrDisable();
906 break;
907 }
908 }
909 }
910
911 /**
912 * Causes consumer task to exit if active (without reporting
913 * onError unless there is already a pending error), and
914 * disables.
915 */
916 public void cancel() {
917 for (int c;;) {
918 if ((c = ctl) < 0)
919 break;
920 else if ((c & ACTIVE) != 0) {
921 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
922 break;
923 }
924 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
925 detach();
926 break;
927 }
928 }
929 }
930
931 /**
932 * Adds to demand and possibly starts task.
933 */
934 public void request(long n) {
935 if (n > 0L) {
936 for (;;) {
937 long prev = demand, d;
938 if ((d = prev + n) < prev) // saturate
939 d = Long.MAX_VALUE;
940 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
941 for (int c, h;;) {
942 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
943 demand == 0L)
944 break;
945 if ((h = head) != tail) {
946 if (U.compareAndSwapInt(this, CTL, c,
947 c | ACTIVE)) {
948 startOrDisable();
949 break;
950 }
951 }
952 else if (head == h && tail == h)
953 break;
954 }
955 break;
956 }
957 }
958 }
959 else if (n < 0L)
960 onError(new IllegalArgumentException(
961 "negative subscription request"));
962 }
963
964 final int estimateBuffered() {
965 int n;
966 return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
967 }
968
969 // ManagedBlocker support
970
971 public final boolean isReleasable() {
972 T item = putItem;
973 if (item != null) {
974 if ((putStat = offer(item)) == 0)
975 return false;
976 putItem = null;
977 }
978 return true;
979 }
980
981 public final boolean block() {
982 T item = putItem;
983 if (item != null) {
984 putItem = null;
985 long nanos = timeout;
986 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
987 while ((putStat = offer(item)) == 0) {
988 if (Thread.interrupted()) {
989 timeout = INTERRUPTED;
990 if (nanos > 0L)
991 break;
992 }
993 else if (nanos > 0L &&
994 (nanos = deadline - System.nanoTime()) <= 0L)
995 break;
996 else if (waiter == null)
997 waiter = Thread.currentThread();
998 else {
999 if (nanos > 0L)
1000 LockSupport.parkNanos(this, nanos);
1001 else
1002 LockSupport.park(this);
1003 waiter = null;
1004 }
1005 }
1006 }
1007 waiter = null;
1008 return true;
1009 }
1010
1011 final int submit(T item) {
1012 int stat;
1013 if ((stat = offer(item)) == 0) {
1014 putItem = item;
1015 timeout = 0L;
1016 try {
1017 ForkJoinPool.managedBlock(this);
1018 } catch (InterruptedException cantHappen) {
1019 }
1020 stat = putStat;
1021 if (timeout < 0L)
1022 Thread.currentThread().interrupt();
1023 }
1024 return stat;
1025 }
1026
1027 final int offerNanos(T item, long nanos) {
1028 int stat;
1029 if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1030 putItem = item;
1031 try {
1032 ForkJoinPool.managedBlock(this);
1033 } catch (InterruptedException cantHappen) {
1034 }
1035 stat = putStat;
1036 if (timeout < 0L)
1037 Thread.currentThread().interrupt();
1038 }
1039 return stat;
1040 }
1041
1042 /**
1043 * Consumer task loop; supports resubmission when used as
1044 * ForkJoinTask.
1045 */
1046 public final boolean exec() {
1047 Flow.Subscriber<? super T> s;
1048 if ((s = subscriber) != null) { // else disabled
1049 for (;;) {
1050 long d = demand; // read volatile fields in acceptable order
1051 int c = ctl;
1052 int h = head;
1053 int t = tail;
1054 Object[] a = array;
1055 int i, n; Object x; Thread w;
1056 if (c < 0) {
1057 detach();
1058 break;
1059 }
1060 else if ((c & ERROR) != 0) {
1061 Throwable ex = pendingError;
1062 ctl = DISABLED; // no need for CAS
1063 if (ex != null) {
1064 try {
1065 s.onError(ex);
1066 } catch (Throwable ignore) {
1067 }
1068 }
1069 }
1070 else if ((c & SUBSCRIBE) != 0) {
1071 if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1072 try {
1073 s.onSubscribe(this);
1074 } catch (Throwable ex) {
1075 ctl = DISABLED;
1076 }
1077 }
1078 }
1079 else if (h == t) { // empty
1080 if (h == tail &&
1081 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
1082 if (h != tail || c != (c = ctl)) { // recheck
1083 if ((c & (ACTIVE | DISABLED)) != 0 ||
1084 !U.compareAndSwapInt(this, CTL, c,
1085 c | ACTIVE))
1086 break;
1087 }
1088 else if ((c & COMPLETE) != 0) {
1089 ctl = DISABLED;
1090 try {
1091 s.onComplete();
1092 } catch (Throwable ignore) {
1093 }
1094 }
1095 else
1096 break;
1097 }
1098 }
1099 else if (a == null || (n = a.length) == 0 ||
1100 (x = a[i = h & (n - 1)]) == null)
1101 ; // stale; retry
1102 else if (d == 0L) { // can't take
1103 if (demand == 0L &&
1104 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1105 ((demand == 0L && c == (c = ctl)) || // recheck
1106 (c & (ACTIVE | DISABLED)) != 0 ||
1107 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1108 break;
1109 }
1110 else if (U.compareAndSwapObject(
1111 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1112 U.putOrderedInt(this, HEAD, h + 1);
1113 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1114 d = demand; // almost never fails
1115 if ((w = waiter) != null) {
1116 waiter = null;
1117 LockSupport.unpark(w); // release producer
1118 }
1119 try {
1120 @SuppressWarnings("unchecked") T y = (T) x;
1121 s.onNext(y);
1122 } catch (Throwable ex) { // disable on throw
1123 ctl = DISABLED;
1124 }
1125 }
1126 }
1127 }
1128 return false; // resubmittable; never joined
1129 }
1130
1131 public final void run() { exec(); }
1132 public final Void getRawResult() { return null; }
1133 public final void setRawResult(Void v) {}
1134
1135 // Unsafe mechanics
1136 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1137 private static final long CTL;
1138 private static final long TAIL;
1139 private static final long HEAD;
1140 private static final long DEMAND;
1141 private static final int ABASE;
1142 private static final int ASHIFT;
1143
1144 static {
1145 try {
1146 CTL = U.objectFieldOffset
1147 (BufferedSubscription.class.getDeclaredField("ctl"));
1148 TAIL = U.objectFieldOffset
1149 (BufferedSubscription.class.getDeclaredField("tail"));
1150 HEAD = U.objectFieldOffset
1151 (BufferedSubscription.class.getDeclaredField("head"));
1152 DEMAND = U.objectFieldOffset
1153 (BufferedSubscription.class.getDeclaredField("demand"));
1154
1155 ABASE = U.arrayBaseOffset(Object[].class);
1156 int scale = U.arrayIndexScale(Object[].class);
1157 if ((scale & (scale - 1)) != 0)
1158 throw new Error("data type scale not a power of two");
1159 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1160 } catch (ReflectiveOperationException e) {
1161 throw new Error(e);
1162 }
1163 }
1164 }
1165 }