ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.24
Committed: Thu Jan 22 14:27:07 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.23: +46 -17 lines
Log Message:
Reduce head-of-line blocking

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted
18 * (non-null) items to current subscribers until it is closed. Each
19 * current subscriber receives newly submitted items in the same order
20 * unless drops or exceptions are encountered. Using a
21 * SubmissionPublisher allows item generators to act as Publishers
22 * relying on drop handling and/or blocking for flow control.
23 *
24 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using a
30 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use and expanded as needed up to the
35 * given maximum. (the enforced capacity may be rounded up to the
36 * nearest power of two and/or bounded by the largest value supported
37 * by this implementation.) Invocations of {@link
38 * Flow.Subscription#request} do not directly result in buffer
39 * expansion, but risk saturation if unfilled requests exceed the
40 * maximum capacity. Choices of buffer parameters rely on expected
41 * rates, resources, and usages, that usually benefit from empirical
42 * testing. As a first guess, consider a value of 64.
43 *
44 * <p>Publication methods support different policies about what to do
45 * when buffers are saturated. Method {@link #submit} blocks until
46 * resources are available. This is simplest, but least
47 * responsive. The {@code offer} methods may drop items (either
48 * immediately or with bounded timeout), but provide an opportunity to
49 * interpose a handler and then retry.
50 *
51 * <p>If any Subscriber method throws an exception, its subscription
52 * is cancelled. If the supplied Executor throws
53 * RejectedExecutionException (or any other RuntimeException or Error)
54 * when attempting to execute a task, or a drop handler throws an
55 * exception when processing a dropped item, then the exception is
56 * rethrown. In these cases, some but not all subscribers may have
57 * received items. It is usually good practice to {@link
58 * #closeExceptionally closeExceptionally} in these cases.
59 *
60 * <p>This class may also serve as a convenient base for subclasses
61 * that generate items, and use the methods in this class to publish
62 * them. For example here is a class that periodically publishes the
63 * items generated from a supplier. (In practice you might add methods
64 * to independently start and stop generation, to share schedulers
65 * among publishers, and so on, or instead use a SubmissionPublisher
66 * as a component rather than a superclass.)
67 *
68 * <pre> {@code
69 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
70 * final ScheduledFuture<?> periodicTask;
71 * final ScheduledExecutorService scheduler;
72 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
73 * Supplier<? extends T> supplier,
74 * long period, TimeUnit unit) {
75 * super(executor, maxBufferCapacity);
76 * scheduler = new ScheduledThreadPoolExecutor(1);
77 * periodicTask = scheduler.scheduleAtFixedRate(
78 * () -> submit(supplier.get()), 0, period, unit);
79 * }
80 * public void close() {
81 * periodicTask.cancel(false);
82 * scheduler.shutdown();
83 * super.close();
84 * }
85 * }}</pre>
86 *
87 * <p>Here is an example of a {@link Flow.Processor} implementation.
88 * It uses single-step requests to its publisher for simplicity of
89 * illustration. A more adaptive version could monitor flow using the
90 * lag estimate returned from {@code submit} and/or other utility
91 * methods.
92 *
93 * <pre> {@code
94 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
95 * implements Flow.Processor<S,T> {
96 * final Function<? super S, ? extends T> function;
97 * Flow.Subscription subscription;
98 * TransformProcessor(Executor executor, int maxBufferCapacity,
99 * Function<? super S, ? extends T> function) {
100 * super(executor, maxBufferCapacity);
101 * this.function = function;
102 * }
103 * public void onSubscribe(Flow.Subscription subscription) {
104 * (this.subscription = subscription).request(1);
105 * }
106 * public void onNext(S item) {
107 * subscription.request(1);
108 * submit(function.apply(item));
109 * }
110 * public void onError(Throwable ex) { closeExceptionally(ex); }
111 * public void onComplete() { close(); }
112 * }}</pre>
113 *
114 * @param <T> the published item type
115 * @author Doug Lea
116 * @since 1.9
117 */
118 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
119 AutoCloseable {
120 /*
121 * Most mechanics are handled by BufferedSubscription. This class
122 * mainly tracks subscribers and ensures sequentiality, by using
123 * built-in synchronization locks across public methods. (Using
124 * built-in locks works well in the most typical case in which
125 * only one thread submits items).
126 */
127
128 // Ensuring that all arrays have power of two length
129
130 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
131 static final int roundCapacity(int cap) { // to nearest power of 2
132 int n = cap - 1;
133 n |= n >>> 1;
134 n |= n >>> 2;
135 n |= n >>> 4;
136 n |= n >>> 8;
137 n |= n >>> 16;
138 return (n <= 0) ? 2 : // at least 2
139 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
140 }
141
142 /**
143 * Clients (BufferedSubscriptions) are maintained in a linked list
144 * (via their "next" fields). This works well for publish loops.
145 * It requires O(n) traversal to check for duplicate subscribers,
146 * but we expect that subscribing is much less common than
147 * publishing. Unsubscribing occurs only during traversal loops,
148 * when BufferedSubscription methods or status checks return
149 * negative values signifying that they have been disabled.
150 */
151 BufferedSubscription<T> clients;
152
153 /** Run status, updated only within locks */
154 volatile boolean closed;
155
156 // Parameters for constructing BufferedSubscriptions
157 final Executor executor;
158 final int maxBufferCapacity;
159
160 /**
161 * Creates a new SubmissionPublisher using the given Executor for
162 * async delivery to subscribers, and with the given maximum
163 * buffer size for each subscriber. In the absence of other
164 * constraints, consider using {@code ForkJoinPool.commonPool(),
165 * 64}.
166 *
167 * @param executor the executor to use for async delivery,
168 * supporting creation of at least one independent thread
169 * @param maxBufferCapacity the maximum capacity for each
170 * subscriber's buffer (the enforced capacity may be rounded up to
171 * the nearest power of two and/or bounded by the largest value
172 * supported by this implementation; method {@link #getMaxBufferCapacity}
173 * returns the actual value)
174 * @throws NullPointerException if executor is null
175 * @throws IllegalArgumentException if maxBufferCapacity not
176 * positive
177 */
178 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
179 if (executor == null)
180 throw new NullPointerException();
181 if (maxBufferCapacity <= 0)
182 throw new IllegalArgumentException("capacity must be positive");
183 this.executor = executor;
184 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
185 }
186
187 /**
188 * Adds the given Subscriber unless already subscribed. If
189 * already subscribed, the Subscriber's onError method is invoked
190 * with an IllegalStateException. Otherwise, upon success, the
191 * Subscriber's onSubscribe method is invoked with a new
192 * Subscription. If onSubscribe throws an exception, the
193 * subscription is cancelled. Otherwise, if this
194 * SubmissionPublisher is closed, the subscriber's onComplete
195 * method is then invoked. Subscribers may enable receiving items
196 * by invoking the {@code request} method of the new Subscription,
197 * and may unsubscribe by invoking its cancel method.
198 *
199 * @param subscriber the subscriber
200 * @throws NullPointerException if subscriber is null
201 */
202 public void subscribe(Flow.Subscriber<? super T> subscriber) {
203 if (subscriber == null) throw new NullPointerException();
204 BufferedSubscription<T> subscription =
205 new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
206 boolean present = false, complete;
207 synchronized (this) {
208 complete = closed;
209 BufferedSubscription<T> pred = null, next;
210 for (BufferedSubscription<T> b = clients; ; b = next) {
211 if (b == null) {
212 if (pred == null)
213 clients = subscription;
214 else
215 pred.next = subscription;
216 subscription.onSubscribe();
217 break;
218 }
219 next = b.next;
220 if (b.isDisabled()) { // remove
221 if (pred == null)
222 clients = next;
223 else
224 pred.next = next;
225 }
226 else if (subscriber.equals(b.subscriber)) {
227 present = true;
228 break;
229 }
230 pred = b;
231 }
232 }
233 if (present)
234 subscriber.onError(new IllegalStateException("Already subscribed"));
235 else if (complete)
236 subscription.onComplete();
237 }
238
239 /**
240 * Publishes the given item to each current subscriber by
241 * asynchronously invoking its onNext method, blocking
242 * uninterruptibly while resources for any subscriber are
243 * unavailable. This method returns an estimate of the maximum lag
244 * (number of items submitted but not yet consumed) among all
245 * current subscribers. This value is at least one (accounting for
246 * this submitted item) if there are any subscribers, else zero.
247 *
248 * <p>If the Executor for this publisher throws a
249 * RejectedExecutionException (or any other RuntimeException or
250 * Error) when attempting to asynchronously notify subscribers,
251 * then this exception is rethrown.
252 *
253 * @param item the (non-null) item to publish
254 * @return the estimated maximum lag among subscribers
255 * @throws IllegalStateException if closed
256 * @throws NullPointerException if item is null
257 * @throws RejectedExecutionException if thrown by Executor
258 */
259 public int submit(T item) {
260 if (item == null) throw new NullPointerException();
261 int lag = 0;
262 synchronized (this) {
263 if (closed)
264 throw new IllegalStateException("Closed");
265 /*
266 * To reduce head-of-line blocking, try offer() on each,
267 * place saturated ones in retries list, and later wait
268 * them out.
269 */
270 BufferedSubscription<T> b = clients, retries = null,
271 rtail = null, pred = null, next;
272 for ( ; b != null; b = next) {
273 int stat;
274 next = b.next;
275 if ((stat = b.offer(item)) < 0) {
276 if (pred == null)
277 clients = next;
278 else
279 pred.next = next;
280 }
281 else {
282 if (stat == 0) {
283 if (rtail == null)
284 retries = b;
285 else
286 rtail.nextRetry = b;
287 rtail = b;
288 stat = maxBufferCapacity;
289 }
290 if (stat > lag)
291 lag = stat;
292 pred = b;
293 }
294 }
295 if (retries != null)
296 retrySubmit(retries, item);
297 }
298 return lag;
299 }
300
301 /**
302 * Calls submit on each subscription on retry list.
303 */
304 private void retrySubmit(BufferedSubscription<T> retries, T item) {
305 for (BufferedSubscription<T> r = retries; r != null;) {
306 BufferedSubscription<T> nextRetry = r.nextRetry;
307 r.nextRetry = null;
308 r.submit(item);
309 r = nextRetry;
310 }
311 }
312
313 /**
314 * Publishes the given item, if possible, to each current
315 * subscriber by asynchronously invoking its onNext method. The
316 * item may be dropped by one or more subscribers if resource
317 * limits are exceeded, in which case the given handler (if
318 * non-null) is invoked, and if it returns true, retried once.
319 * Other calls to methods in this class by other threads are
320 * blocked while the handler is invoked. Unless recovery is
321 * assured, options are usually limited to logging the error
322 * and/or issuing an onError signal to the subscriber.
323 *
324 * <p>This method returns a status indicator: If negative, it
325 * represents the (negative) number of drops (failed attempts to
326 * issue the item to a subscriber). Otherwise it is an estimate of
327 * the maximum lag (number of items submitted but not yet
328 * consumed) among all current subscribers. This value is at least
329 * one (accounting for this submitted item) if there are any
330 * subscribers, else zero.
331 *
332 * <p>If the Executor for this publisher throws a
333 * RejectedExecutionException (or any other RuntimeException or
334 * Error) when attempting to asynchronously notify subscribers, or
335 * the drop handler throws an exception when processing a dropped
336 * item, then this exception is rethrown.
337 *
338 * @param item the (non-null) item to publish
339 * @param onDrop if non-null, the handler invoked upon a drop to a
340 * subscriber, with arguments of the subscriber and item; if it
341 * returns true, an offer is re-attempted (once)
342 * @return if negative, the (negative) number of drops; otherwise
343 * an estimate of maximum lag
344 * @throws IllegalStateException if closed
345 * @throws NullPointerException if item is null
346 * @throws RejectedExecutionException if thrown by Executor
347 */
348 public int offer(T item,
349 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
350 if (item == null) throw new NullPointerException();
351 int ret = 0;
352 synchronized (this) {
353 if (closed)
354 throw new IllegalStateException("Closed");
355 BufferedSubscription<T> pred = null, next;
356 for (BufferedSubscription<T> b = clients; b != null; b = next) {
357 int stat;
358 next = b.next;
359 if ((stat = b.offer(item)) == 0 &&
360 onDrop != null &&
361 onDrop.test(b.subscriber, item))
362 stat = b.offer(item);
363 if (stat < 0) {
364 if (pred == null)
365 clients = next;
366 else
367 pred.next = next;
368 }
369 else {
370 pred = b;
371 if (stat == 0)
372 ret = (ret >= 0) ? -1 : ret - 1;
373 else if (ret >= 0 && stat > ret)
374 ret = stat;
375 }
376 }
377 return ret;
378 }
379 }
380
381 /**
382 * Publishes the given item, if possible, to each current
383 * subscriber by asynchronously invoking its onNext method,
384 * blocking while resources for any subscription are unavailable,
385 * up to the specified timeout or the caller thread is
386 * interrupted, at which point the given handler (if non-null) is
387 * invoked, and if it returns true, retried once. (The drop
388 * handler may distinguish timeouts from interrupts by checking
389 * whether the current thread is interrupted.) Other calls to
390 * methods in this class by other threads are blocked while the
391 * handler is invoked. Unless recovery is assured, options are
392 * usually limited to logging the error and/or issuing an onError
393 * signal to the subscriber.
394 *
395 * <p>This method returns a status indicator: If negative, it
396 * represents the (negative) number of drops (failed attempts to
397 * issue the item to a subscriber). Otherwise it is an estimate of
398 * the maximum lag (number of items submitted but not yet
399 * consumed) among all current subscribers. This value is at least
400 * one (accounting for this submitted item) if there are any
401 * subscribers, else zero.
402 *
403 * <p>If the Executor for this publisher throws a
404 * RejectedExecutionException (or any other RuntimeException or
405 * Error) when attempting to asynchronously notify subscribers, or
406 * the drop handler throws an exception when processing a dropped
407 * item, then this exception is rethrown.
408 *
409 * @param item the (non-null) item to publish
410 * @param timeout how long to wait for resources for any subscriber
411 * before giving up, in units of {@code unit}
412 * @param unit a {@code TimeUnit} determining how to interpret the
413 * {@code timeout} parameter
414 * @param onDrop if non-null, the handler invoked upon a drop to a
415 * subscriber, with arguments of the subscriber and item; if it
416 * returns true, an offer is re-attempted (once)
417 * @return if negative, the (negative) number of drops; otherwise
418 * an estimate of maximum lag
419 * @throws IllegalStateException if closed
420 * @throws NullPointerException if item is null
421 * @throws RejectedExecutionException if thrown by Executor
422 */
423 public int offer(T item, long timeout, TimeUnit unit,
424 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
425 if (item == null) throw new NullPointerException();
426 long nanos = unit.toNanos(timeout);
427 int ret = 0;
428 synchronized (this) {
429 if (closed)
430 throw new IllegalStateException("Closed");
431 BufferedSubscription<T> pred = null, next;
432 for (BufferedSubscription<T> b = clients; b != null; b = next) {
433 int stat;
434 next = b.next;
435 if ((stat = b.offerNanos(item, nanos)) == 0 &&
436 onDrop != null && onDrop.test(b.subscriber, item))
437 stat = b.offer(item);
438 if (stat < 0) {
439 if (pred == null)
440 clients = next;
441 else
442 pred.next = next;
443 }
444 else {
445 pred = b;
446 if (stat == 0)
447 ret = (ret >= 0) ? -1 : ret - 1;
448 else if (ret >= 0 && stat > ret)
449 ret = stat;
450 }
451 }
452 }
453 return ret;
454 }
455
456 /**
457 * Unless already closed, issues onComplete signals to current
458 * subscribers, and disallows subsequent attempts to publish.
459 */
460 public void close() {
461 if (!closed) {
462 BufferedSubscription<T> b, next;
463 synchronized (this) {
464 b = clients;
465 clients = null;
466 closed = true;
467 }
468 while (b != null) {
469 next = b.next;
470 b.onComplete();
471 b = next;
472 }
473 }
474 }
475
476 /**
477 * Unless already closed, issues onError signals to current
478 * subscribers with the given error, and disallows subsequent
479 * attempts to publish.
480 *
481 * @param error the onError argument sent to subscribers
482 * @throws NullPointerException if error is null
483 */
484 public void closeExceptionally(Throwable error) {
485 if (error == null)
486 throw new NullPointerException();
487 if (!closed) {
488 BufferedSubscription<T> b, next;
489 synchronized (this) {
490 b = clients;
491 clients = null;
492 closed = true;
493 }
494 while (b != null) {
495 next = b.next;
496 b.onError(error);
497 b = next;
498 }
499 }
500 }
501
502 /**
503 * Returns true if this publisher is not accepting submissions.
504 *
505 * @return true if closed
506 */
507 public boolean isClosed() {
508 return closed;
509 }
510
511 /**
512 * Returns true if this publisher has any subscribers.
513 *
514 * @return true if this publisher has any subscribers
515 */
516 public boolean hasSubscribers() {
517 boolean nonEmpty = false;
518 if (!closed) {
519 synchronized (this) {
520 BufferedSubscription<T> pred = null, next;
521 for (BufferedSubscription<T> b = clients; b != null; b = next) {
522 next = b.next;
523 if (b.isDisabled()) {
524 if (pred == null)
525 clients = next;
526 else
527 pred.next = next;
528 }
529 else {
530 nonEmpty = true;
531 break;
532 }
533 }
534 }
535 }
536 return nonEmpty;
537 }
538
539 /**
540 * Returns the number of current subscribers.
541 *
542 * @return the number of current subscribers
543 */
544 public int getNumberOfSubscribers() {
545 int count = 0;
546 if (!closed) {
547 synchronized (this) {
548 BufferedSubscription<T> pred = null, next;
549 for (BufferedSubscription<T> b = clients; b != null; b = next) {
550 next = b.next;
551 if (b.isDisabled()) {
552 if (pred == null)
553 clients = next;
554 else
555 pred.next = next;
556 }
557 else {
558 pred = b;
559 ++count;
560 }
561 }
562 }
563 }
564 return count;
565 }
566
567 /**
568 * Returns the Executor used for asynchronous delivery.
569 *
570 * @return the Executor used for asynchronous delivery
571 */
572 public Executor getExecutor() {
573 return executor;
574 }
575
576 /**
577 * Returns the maximum per-subscriber buffer capacity.
578 *
579 * @return the maximum per-subscriber buffer capacity
580 */
581 public int getMaxBufferCapacity() {
582 return maxBufferCapacity;
583 }
584
585 /**
586 * Returns a list of current subscribers.
587 *
588 * @return list of current subscribers
589 */
590 public List<Flow.Subscriber<? super T>> getSubscribers() {
591 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
592 synchronized (this) {
593 BufferedSubscription<T> pred = null, next;
594 for (BufferedSubscription<T> b = clients; b != null; b = next) {
595 next = b.next;
596 if (b.isDisabled()) {
597 if (pred == null)
598 clients = next;
599 else
600 pred.next = next;
601 }
602 else
603 subs.add(b.subscriber);
604 }
605 }
606 return subs;
607 }
608
609 /**
610 * Returns true if the given Subscriber is currently subscribed.
611 *
612 * @param subscriber the subscriber
613 * @return true if currently subscribed
614 * @throws NullPointerException if subscriber is null
615 */
616 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
617 if (subscriber == null) throw new NullPointerException();
618 if (!closed) {
619 synchronized (this) {
620 BufferedSubscription<T> pred = null, next;
621 for (BufferedSubscription<T> b = clients; b != null; b = next) {
622 next = b.next;
623 if (b.isDisabled()) {
624 if (pred == null)
625 clients = next;
626 else
627 pred.next = next;
628 }
629 else if (subscriber.equals(b.subscriber))
630 return true;
631 else
632 pred = b;
633 }
634 }
635 }
636 return false;
637 }
638
639 /**
640 * Returns an estimate of the minimum number of items requested
641 * (via {@link Flow.Subscription#request}) but not yet produced,
642 * among all current subscribers.
643 *
644 * @return the estimate, or zero if no subscribers
645 */
646 public long estimateMinimumDemand() {
647 long min = Long.MAX_VALUE;
648 boolean nonEmpty = false;
649 synchronized (this) {
650 BufferedSubscription<T> pred = null, next;
651 for (BufferedSubscription<T> b = clients; b != null; b = next) {
652 int n; long d;
653 next = b.next;
654 if ((n = b.estimateLag()) < 0) {
655 if (pred == null)
656 clients = next;
657 else
658 pred.next = next;
659 }
660 else {
661 if ((d = b.demand - n) < min)
662 min = d;
663 nonEmpty = true;
664 }
665 }
666 }
667 return nonEmpty ? min : 0;
668 }
669
670 /**
671 * Returns an estimate of the maximum number of items produced but
672 * not yet consumed among all current subscribers.
673 *
674 * @return the estimate
675 */
676 public int estimateMaximumLag() {
677 int max = 0;
678 synchronized (this) {
679 BufferedSubscription<T> pred = null, next;
680 for (BufferedSubscription<T> b = clients; b != null; b = next) {
681 int n;
682 next = b.next;
683 if ((n = b.estimateLag()) < 0) {
684 if (pred == null)
685 clients = next;
686 else
687 pred.next = next;
688 }
689 else if (n > max)
690 max = n;
691 }
692 }
693 return max;
694 }
695
696 /**
697 * A bounded (ring) buffer with integrated control to start a
698 * consumer task whenever items are available. The buffer
699 * algorithm is similar to one used inside ForkJoinPool,
700 * specialized for the case of at most one concurrent producer and
701 * consumer, and power of two buffer sizes. This allows methods to
702 * operate without locks even while supporting resizing, blocking,
703 * task-triggering, and garbage-free buffers (nulling out elements
704 * when consumed), although supporting these does impose a bit of
705 * overhead compared to plain fixed-size ring buffers.
706 *
707 * The publisher guarantees a single producer via its lock. We
708 * ensure in this class that there is at most one consumer. The
709 * request and cancel methods must be fully thread-safe but are
710 * coded to exploit the most common case in which they are only
711 * called by consumers (usually within onNext).
712 *
713 * This class also serves as its own consumer task, consuming as
714 * many items/signals as possible before terminating, at which
715 * point it is re-executed when needed. (The dual Runnable and
716 * ForkJoinTask declaration saves overhead when executed by
717 * ForkJoinPools, without impacting other kinds of Executors.)
718 * Execution control is managed using the ACTIVE ctl bit. We
719 * ensure that a task is active when consumable items (and
720 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
721 * there is demand (unfilled requests). This is complicated on
722 * the creation side by the possibility of exceptions when trying
723 * to execute tasks. These eventually force DISABLED state, but
724 * sometimes not directly. On the task side, termination (clearing
725 * ACTIVE) may race with producers or request() calls, so in some
726 * cases requires a re-check, re-activating if possible.
727 *
728 * The ctl field also manages run state. When DISABLED, no further
729 * updates are possible. Disabling may be preceded by setting
730 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
731 * case the associated Subscriber methods are invoked, possibly
732 * synchronously if there is no active consumer task (including
733 * cases where execute() failed). The cancel() method is supported
734 * by treating as ERROR but suppressing onError signal.
735 *
736 * Support for blocking also exploits the fact that there is only
737 * one possible waiter. ManagedBlocker-compatible control fields
738 * are placed in this class itself rather than in wait-nodes.
739 * Blocking control relies on the "waiter" field. Producers set
740 * the field before trying to block, but must then recheck (via
741 * offer) before parking. Signalling then just unparks and clears
742 * waiter field. If the producer and consumer are both in the same
743 * ForkJoinPool, the producer attempts to help run consumer tasks
744 * that it forked before blocking.
745 *
746 * This class uses @Contended and heuristic field declaration
747 * ordering to reduce memory contention on BufferedSubscription
748 * itself, but it does not currently attempt to avoid memory
749 * contention (especially including card-marks) among buffer
750 * elements, that can significantly slow down some usages.
751 * Addressing this may require allocating substantially more space
752 * than users expect.
753 */
754 @SuppressWarnings("serial")
755 @sun.misc.Contended
756 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
757 implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
758 // Order-sensitive field declarations
759 long timeout; // > 0 if timed wait
760 volatile long demand; // # unfilled requests
761 int maxCapacity; // reduced on OOME
762 int putStat; // offer result for ManagedBlocker
763 volatile int ctl; // atomic run state flags
764 volatile int head; // next position to take
765 volatile int tail; // next position to put
766 volatile Object[] array; // buffer: null if disabled
767 Flow.Subscriber<? super T> subscriber; // null if disabled
768 Executor executor; // null if disabled
769 volatile Throwable pendingError; // holds until onError issued
770 volatile Thread waiter; // blocked producer thread
771 T putItem; // for offer within ManagedBlocker
772 BufferedSubscription<T> next; // used only by publisher
773 BufferedSubscription<T> nextRetry;// used only by publisher
774
775 // ctl values
776 static final int ACTIVE = 0x01; // consumer task active
777 static final int DISABLED = 0x02; // final state
778 static final int ERROR = 0x04; // signal onError then disable
779 static final int SUBSCRIBE = 0x08; // signal onSubscribe
780 static final int COMPLETE = 0x10; // signal onComplete when done
781
782 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
783
784 /**
785 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
786 */
787 static final int MINCAP = 8;
788
789 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
790 Executor executor, int maxBufferCapacity) {
791 this.subscriber = subscriber;
792 this.executor = executor;
793 this.maxCapacity = maxBufferCapacity;
794 }
795
796 final boolean isDisabled() {
797 return ctl == DISABLED;
798 }
799
800 /**
801 * Returns estimated number of buffered items, or -1 if
802 * disabled
803 */
804 final int estimateLag() {
805 int n;
806 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
807 }
808
809 /**
810 * Tries to add item and start consumer task if necessary.
811 * @return -1 if disabled, 0 if dropped, else estimated lag
812 */
813 final int offer(T item) {
814 Object[] a = array;
815 int t = tail, stat = t + 1 - head, n, i;
816 if (a != null && (n = a.length) >= stat && (i = t & (n - 1)) >= 0) {
817 a[i] = item;
818 U.putOrderedInt(this, TAIL, t + 1);
819 }
820 else if ((stat = growAndAdd(a, item)) <= 0)
821 return stat;
822 for (int c;;) { // possibly start task
823 Executor e;
824 if (((c = ctl) & ACTIVE) != 0)
825 break;
826 else if (c == DISABLED || (e = executor) == null)
827 return -1;
828 else if (demand == 0L || tail == head)
829 break;
830 else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
831 try {
832 e.execute(this);
833 break;
834 } catch (RuntimeException | Error ex) { // back out
835 do {} while ((c = ctl) >= 0 &&
836 (c & ACTIVE) != 0 &&
837 !U.compareAndSwapInt(this, CTL, c,
838 c & ~ACTIVE));
839 throw ex;
840 }
841 }
842 }
843 return stat;
844 }
845
846 /**
847 * Tries to create or expand buffer, then adds item if possible
848 */
849 final int growAndAdd(Object[] oldArray, T item) {
850 int oldLen, newLen;
851 if (oldArray == null) {
852 oldLen = 0;
853 newLen = (maxCapacity >= MINCAP ? MINCAP :
854 maxCapacity >= 2 ? maxCapacity : 2);
855 }
856 else if ((oldLen = oldArray.length) >= maxCapacity ||
857 (newLen = oldLen << 1) <= 0)
858 return 0; // cannot grow
859 if (ctl == DISABLED)
860 return -1;
861 Object[] newArray;
862 try {
863 newArray = new Object[newLen];
864 } catch (Throwable ex) { // try to cope with OOME
865 if (oldLen > 0)
866 maxCapacity = oldLen; // avoid continuous failure
867 return 0;
868 }
869 array = newArray;
870 int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
871 if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
872 for (int j = head; j != t; ++j) { // races with consumer
873 Object x;
874 int i = j & oldMask;
875 if ((x = oldArray[i]) != null &&
876 U.compareAndSwapObject(oldArray,
877 (((long)i) << ASHIFT) + ABASE,
878 x, null))
879 newArray[j & newMask] = x;
880 }
881 }
882 newArray[t & newMask] = item;
883 tail = t + 1;
884 return oldLen + 1;
885 }
886
887 /**
888 * Spins/helps/blocks while offer returns 0
889 */
890 final int submit(T item) {
891 int stat;
892 if ((stat = offer(item)) == 0) {
893 Thread thread = Thread.currentThread();
894 if ((thread instanceof ForkJoinWorkerThread) &&
895 ((ForkJoinWorkerThread)thread).getPool() == executor) {
896 for (ForkJoinTask<?> t;;) { // try helping
897 if ((t = ForkJoinTask.peekNextLocalTask()) == null ||
898 !(t instanceof BufferedSubscription))
899 break;
900 if (t.tryUnfork())
901 ((BufferedSubscription<?>)t).exec();
902 if ((stat = offer(item)) != 0)
903 break;
904 }
905 }
906 if (stat == 0) {
907 putItem = item;
908 timeout = 0L;
909 try {
910 ForkJoinPool.managedBlock(this);
911 } catch (InterruptedException ie) {
912 timeout = INTERRUPTED;
913 }
914 stat = putStat;
915 if (timeout < 0L)
916 Thread.currentThread().interrupt();
917 }
918 }
919 return stat;
920 }
921
922 /**
923 * Timeout version of offer
924 */
925 final int offerNanos(T item, long nanos) {
926 int stat;
927 if ((stat = offer(item)) == 0 && nanos > 0L) {
928 Thread thread = Thread.currentThread();
929 if ((thread instanceof ForkJoinWorkerThread) &&
930 ((ForkJoinWorkerThread)thread).getPool() == executor) {
931 long deadline = System.nanoTime() + nanos;
932 for (ForkJoinTask<?> t;;) { // similar to above
933 if ((t = ForkJoinTask.peekNextLocalTask()) == null ||
934 !(t instanceof BufferedSubscription))
935 break;
936 if (t.tryUnfork())
937 ((BufferedSubscription<?>)t).exec();
938 if ((stat = offer(item)) != 0 ||
939 (nanos = deadline - System.nanoTime()) <= 0L)
940 break;
941 }
942 }
943 if (stat == 0 && (timeout = nanos) > 0L) {
944 putItem = item;
945 try {
946 ForkJoinPool.managedBlock(this);
947 } catch (InterruptedException ie) {
948 timeout = INTERRUPTED;
949 }
950 stat = putStat;
951 if (timeout < 0L)
952 Thread.currentThread().interrupt();
953 }
954 }
955 return stat;
956 }
957
958 /**
959 * Nulls out most fields, mainly to avoid garbage retention
960 * until publisher unsubscribes, but also to help cleanly stop
961 * upon error by nulling required components.
962 */
963 final void detach() {
964 pendingError = null;
965 subscriber = null;
966 executor = null;
967 array = null;
968 Thread w = waiter;
969 if (w != null) {
970 waiter = null;
971 LockSupport.unpark(w); // force wakeup
972 }
973 }
974
975 /**
976 * Issues error signal, asynchronously if a task is running,
977 * else synchronously
978 */
979 final void onError(Throwable ex) {
980 for (int c;;) {
981 if ((c = ctl) == DISABLED)
982 break;
983 else if ((c & ACTIVE) != 0) {
984 pendingError = ex;
985 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
986 break; // cause consumer task to exit
987 }
988 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
989 Flow.Subscriber<? super T> s = subscriber;
990 if (s != null && ex != null) {
991 try {
992 s.onError(ex);
993 } catch (Throwable ignore) {
994 }
995 }
996 detach();
997 break;
998 }
999 }
1000 }
1001
1002 /**
1003 * Tries to start consumer task upon a signal or request;
1004 * disables on failure.
1005 */
1006 final void startOrDisable() {
1007 Executor e; // skip if already disabled
1008 if ((e = executor) != null) {
1009 try {
1010 e.execute(this);
1011 } catch (Throwable ex) { // back out and force signal
1012 for (int c;;) {
1013 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1014 break;
1015 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1016 onError(ex);
1017 break;
1018 }
1019 }
1020 }
1021 }
1022 }
1023
1024 final void onComplete() {
1025 for (int c;;) {
1026 if ((c = ctl) == DISABLED)
1027 break;
1028 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
1029 if ((c & ACTIVE) == 0)
1030 startOrDisable();
1031 break;
1032 }
1033 }
1034 }
1035
1036 final void onSubscribe() {
1037 for (int c;;) {
1038 if ((c = ctl) == DISABLED)
1039 break;
1040 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | SUBSCRIBE))) {
1041 if ((c & ACTIVE) == 0)
1042 startOrDisable();
1043 break;
1044 }
1045 }
1046 }
1047
1048 /**
1049 * Causes consumer task to exit if active (without reporting
1050 * onError unless there is already a pending error), and
1051 * disables.
1052 */
1053 public void cancel() {
1054 for (int c;;) {
1055 if ((c = ctl) == DISABLED)
1056 break;
1057 else if ((c & ACTIVE) != 0) {
1058 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1059 break;
1060 }
1061 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1062 detach();
1063 break;
1064 }
1065 }
1066 }
1067
1068 /**
1069 * Adds to demand and possibly starts task.
1070 */
1071 public void request(long n) {
1072 if (n > 0L) {
1073 for (;;) {
1074 long prev = demand, d;
1075 if ((d = prev + n) < prev) // saturate
1076 d = Long.MAX_VALUE;
1077 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1078 for (int c, h;;) {
1079 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
1080 demand == 0L)
1081 break;
1082 if ((h = head) != tail) {
1083 if (U.compareAndSwapInt(this, CTL, c,
1084 c | ACTIVE)) {
1085 startOrDisable();
1086 break;
1087 }
1088 }
1089 else if (head == h && tail == h)
1090 break;
1091 }
1092 break;
1093 }
1094 }
1095 }
1096 else if (n < 0L)
1097 onError(new IllegalArgumentException(
1098 "negative subscription request"));
1099 }
1100
1101 public final boolean isReleasable() { // for ManagedBlocker
1102 T item = putItem;
1103 if (item != null) {
1104 if ((putStat = offer(item)) == 0)
1105 return false;
1106 putItem = null;
1107 }
1108 return true;
1109 }
1110
1111 public final boolean block() { // for ManagedBlocker
1112 T item = putItem;
1113 if (item != null) {
1114 putItem = null;
1115 long nanos = timeout;
1116 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1117 while ((putStat = offer(item)) == 0) {
1118 if (Thread.interrupted()) {
1119 timeout = INTERRUPTED;
1120 if (nanos > 0L)
1121 break;
1122 }
1123 else if (nanos > 0L &&
1124 (nanos = deadline - System.nanoTime()) <= 0L)
1125 break;
1126 else if (waiter == null)
1127 waiter = Thread.currentThread();
1128 else {
1129 if (nanos > 0L)
1130 LockSupport.parkNanos(this, nanos);
1131 else
1132 LockSupport.park(this);
1133 waiter = null;
1134 }
1135 }
1136 }
1137 waiter = null;
1138 return true;
1139 }
1140
1141 /**
1142 * Consumer task loop; supports resubmission when used as
1143 * ForkJoinTask.
1144 */
1145 public final boolean exec() {
1146 Flow.Subscriber<? super T> s;
1147 if ((s = subscriber) != null) { // else disabled
1148 for (;;) {
1149 long d = demand; // read volatile fields in acceptable order
1150 int c = ctl;
1151 int h = head;
1152 int t = tail;
1153 Object[] a = array;
1154 int i, n; Object x; Thread w;
1155 if ((c & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1156 if ((c & ERROR) != 0) {
1157 Throwable ex = pendingError;
1158 ctl = DISABLED; // no need for CAS
1159 if (ex != null) {
1160 try {
1161 s.onError(ex);
1162 } catch (Throwable ignore) {
1163 }
1164 }
1165 }
1166 else if ((c & SUBSCRIBE) != 0) {
1167 if (U.compareAndSwapInt(this, CTL, c,
1168 c & ~SUBSCRIBE)) {
1169 try {
1170 s.onSubscribe(this);
1171 } catch (Throwable ex) {
1172 ctl = DISABLED; // disable on throw
1173 }
1174 }
1175 }
1176 else {
1177 detach();
1178 break;
1179 }
1180 }
1181 else if (h == t) { // empty
1182 if (h == tail &&
1183 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
1184 if (h != tail || c != (c = ctl)) { // recheck
1185 if ((c & (ACTIVE | DISABLED)) != 0 ||
1186 !U.compareAndSwapInt(this, CTL, c,
1187 c | ACTIVE))
1188 break;
1189 }
1190 else if ((c & COMPLETE) != 0) {
1191 ctl = DISABLED;
1192 try {
1193 s.onComplete();
1194 } catch (Throwable ignore) {
1195 }
1196 }
1197 else
1198 break;
1199 }
1200 }
1201 else if (a == null || (n = a.length) == 0 ||
1202 (x = a[i = h & (n - 1)]) == null)
1203 ; // stale; retry
1204 else if (d == 0L) { // can't take
1205 if (demand == 0L &&
1206 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1207 ((demand == 0L && c == (c = ctl)) || // recheck
1208 (c & (ACTIVE | DISABLED)) != 0 ||
1209 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1210 break;
1211 }
1212 else if (U.compareAndSwapObject(
1213 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1214 U.putOrderedInt(this, HEAD, h + 1);
1215 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1216 d = demand; // almost never fails
1217 if ((w = waiter) != null) {
1218 waiter = null;
1219 LockSupport.unpark(w); // release producer
1220 }
1221 try {
1222 @SuppressWarnings("unchecked") T y = (T) x;
1223 s.onNext(y);
1224 } catch (Throwable ex) { // disable on throw
1225 ctl = DISABLED;
1226 }
1227 }
1228 }
1229 }
1230 return false; // resubmittable; never joined
1231 }
1232
1233 // Runnable and FJ support
1234 public final void run() { exec(); }
1235 public final Void getRawResult() { return null; }
1236 public final void setRawResult(Void v) {}
1237
1238 // Unsafe mechanics
1239 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1240 private static final long CTL;
1241 private static final long TAIL;
1242 private static final long HEAD;
1243 private static final long DEMAND;
1244 private static final int ABASE;
1245 private static final int ASHIFT;
1246
1247 static {
1248 try {
1249 CTL = U.objectFieldOffset
1250 (BufferedSubscription.class.getDeclaredField("ctl"));
1251 TAIL = U.objectFieldOffset
1252 (BufferedSubscription.class.getDeclaredField("tail"));
1253 HEAD = U.objectFieldOffset
1254 (BufferedSubscription.class.getDeclaredField("head"));
1255 DEMAND = U.objectFieldOffset
1256 (BufferedSubscription.class.getDeclaredField("demand"));
1257
1258 ABASE = U.arrayBaseOffset(Object[].class);
1259 int scale = U.arrayIndexScale(Object[].class);
1260 if ((scale & (scale - 1)) != 0)
1261 throw new Error("data type scale not a power of two");
1262 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1263 } catch (ReflectiveOperationException e) {
1264 throw new Error(e);
1265 }
1266 }
1267 }
1268 }