ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.30
Committed: Sun Jan 25 23:37:31 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.29: +55 -45 lines
Log Message:
Javadoc improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted
18 * (non-null) items to current subscribers until it is closed. Each
19 * current subscriber receives newly submitted items in the same order
20 * unless drops or exceptions are encountered. Using a
21 * SubmissionPublisher allows item generators to act as Publishers
22 * relying on drop handling and/or blocking for flow control.
23 *
24 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using
30 * the default {@link ForkJoinPool#commonPool}.
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use and expanded as needed up to the
35 * given maximum. (The enforced capacity may be rounded up to the
36 * nearest power of two and/or bounded by the largest value supported
37 * by this implementation.) Invocations of {@link
38 * Flow.Subscription#request} do not directly result in buffer
39 * expansion, but risk saturation if unfilled requests exceed the
40 * maximum capacity. The default value of {@link
41 * Flow#defaultBufferSize} may provide a useful starting point for
42 * choosing a capacity based on expected rates, resources, and usages.
43 *
44 * <p>Publication methods support different policies about what to do
45 * when buffers are saturated. Method {@link #submit} blocks until
46 * resources are available. This is simplest, but least responsive.
47 * The {@code offer} methods may drop items (either immediately or
48 * with bounded timeout), but provide an opportunity to interpose a
49 * handler and then retry.
50 *
51 * <p>If any Subscriber method throws an exception, its subscription
52 * is cancelled. If the supplied Executor throws {@link
53 * RejectedExecutionException} (or any other RuntimeException or
54 * Error) when attempting to execute a task, or a drop handler throws
55 * an exception when processing a dropped item, then the exception is
56 * rethrown. In these cases, some but not all subscribers may have
57 * received the published item. It is usually good practice to {@link
58 * #closeExceptionally closeExceptionally} in these cases.
59 *
60 * <p>This class may also serve as a convenient base for subclasses
61 * that generate items, and use the methods in this class to publish
62 * them. For example here is a class that periodically publishes the
63 * items generated from a supplier. (In practice you might add methods
64 * to independently start and stop generation, to share schedulers
65 * among publishers, and so on, or use a SubmissionPublisher as a
66 * component rather than a superclass.)
67 *
68 * <pre> {@code
69 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
70 * final ScheduledFuture<?> periodicTask;
71 * final ScheduledExecutorService scheduler;
72 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
73 * Supplier<? extends T> supplier,
74 * long period, TimeUnit unit) {
75 * super(executor, maxBufferCapacity);
76 * scheduler = new ScheduledThreadPoolExecutor(1);
77 * periodicTask = scheduler.scheduleAtFixedRate(
78 * () -> submit(supplier.get()), 0, period, unit);
79 * }
80 * public void close() {
81 * periodicTask.cancel(false);
82 * scheduler.shutdown();
83 * super.close();
84 * }
85 * }}</pre>
86 *
87 * <p>Here is an example of a {@link Flow.Processor} implementation.
88 * It uses single-step requests to its publisher for simplicity of
89 * illustration. A more adaptive version could monitor flow using the
90 * lag estimate returned from {@code submit}, along with other utility
91 * methods.
92 *
93 * <pre> {@code
94 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
95 * implements Flow.Processor<S,T> {
96 * final Function<? super S, ? extends T> function;
97 * Flow.Subscription subscription;
98 * TransformProcessor(Executor executor, int maxBufferCapacity,
99 * Function<? super S, ? extends T> function) {
100 * super(executor, maxBufferCapacity);
101 * this.function = function;
102 * }
103 * public void onSubscribe(Flow.Subscription subscription) {
104 * (this.subscription = subscription).request(1);
105 * }
106 * public void onNext(S item) {
107 * subscription.request(1);
108 * submit(function.apply(item));
109 * }
110 * public void onError(Throwable ex) { closeExceptionally(ex); }
111 * public void onComplete() { close(); }
112 * }}</pre>
113 *
114 * @param <T> the published item type
115 * @author Doug Lea
116 * @since 1.9
117 */
118 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
119 AutoCloseable {
120 /*
121 * Most mechanics are handled by BufferedSubscription. This class
122 * mainly tracks subscribers and ensures sequentiality, by using
123 * built-in synchronization locks across public methods. (Using
124 * built-in locks works well in the most typical case in which
125 * only one thread submits items).
126 */
127
128 /** The largest possible power of two array size. */
129 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
130
131 /** Round capacity to power of 2, at least 2, at most limit. */
132 static final int roundCapacity(int cap) { // to nearest power of 2
133 int n = cap - 1;
134 n |= n >>> 1;
135 n |= n >>> 2;
136 n |= n >>> 4;
137 n |= n >>> 8;
138 n |= n >>> 16;
139 return (n <= 0) ? 2 : // at least 2
140 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
141 }
142
143 /**
144 * Clients (BufferedSubscriptions) are maintained in a linked list
145 * (via their "next" fields). This works well for publish loops.
146 * It requires O(n) traversal to check for duplicate subscribers,
147 * but we expect that subscribing is much less common than
148 * publishing. Unsubscribing occurs only during traversal loops,
149 * when BufferedSubscription methods return negative values
150 * signifying that they have been disabled. To reduce
151 * head-of-line blocking, submit and offer methods first call
152 * BufferedSubscription.offer on each subscriber, and place
153 * saturated ones in retries list (using nextRetry field), and
154 * retry, possibly blocking or dropping.
155 */
156 BufferedSubscription<T> clients;
157
158 /** Run status, updated only within locks */
159 volatile boolean closed;
160
161 // Parameters for constructing BufferedSubscriptions
162 final Executor executor;
163 final int maxBufferCapacity;
164
165 /**
166 * Creates a new SubmissionPublisher using the given Executor for
167 * async delivery to subscribers, and with the given maximum
168 * buffer size for each subscriber.
169 *
170 * @param executor the executor to use for async delivery,
171 * supporting creation of at least one independent thread
172 * @param maxBufferCapacity the maximum capacity for each
173 * subscriber's buffer (the enforced capacity may be rounded up to
174 * the nearest power of two and/or bounded by the largest value
175 * supported by this implementation; method {@link #getMaxBufferCapacity}
176 * returns the actual value)
177 * @throws NullPointerException if executor is null
178 * @throws IllegalArgumentException if maxBufferCapacity not
179 * positive
180 */
181 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
182 if (executor == null)
183 throw new NullPointerException();
184 if (maxBufferCapacity <= 0)
185 throw new IllegalArgumentException("capacity must be positive");
186 this.executor = executor;
187 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
188 }
189
190 /**
191 * Creates a new SubmissionPublisher using the {@link
192 * ForkJoinPool#commonPool()} for async delivery to subscribers,
193 * and with maximum buffer capacity of {@link
194 * Flow#defaultBufferSize}.
195 */
196 public SubmissionPublisher() {
197 this(ForkJoinPool.commonPool(), Flow.defaultBufferSize());
198 }
199
200 /**
201 * Adds the given Subscriber unless already subscribed. If
202 * already subscribed, the Subscriber's {@code onError} method is
203 * invoked on the existing subscription with an {@link
204 * IllegalStateException}. Otherwise, upon success, the
205 * Subscriber's {@code onSubscribe} method is invoked
206 * asynchronously with a new {@link Flow.Subscription}. If {@code
207 * onSubscribe} throws an exception, the subscription is
208 * cancelled. Otherwise, if this SubmissionPublisher is closed,
209 * the subscriber's {@code onComplete} method is then invoked.
210 * Subscribers may enable receiving items by invoking the {@code
211 * request} method of the new Subscription, and may unsubscribe by
212 * invoking its {@code cancel} method.
213 *
214 * @param subscriber the subscriber
215 * @throws NullPointerException if subscriber is null
216 */
217 public void subscribe(Flow.Subscriber<? super T> subscriber) {
218 if (subscriber == null) throw new NullPointerException();
219 BufferedSubscription<T> subscription =
220 new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
221 synchronized (this) {
222 for (BufferedSubscription<T> b = clients, pred = null;;) {
223 if (b == null) {
224 subscription.onSubscribe();
225 if (closed)
226 subscription.onComplete();
227 else if (pred == null)
228 clients = subscription;
229 else
230 pred.next = subscription;
231 break;
232 }
233 BufferedSubscription<T> next = b.next;
234 if (b.isDisabled()) { // remove
235 b.next = null; // detach
236 if (pred == null)
237 clients = next;
238 else
239 pred.next = next;
240 }
241 else if (subscriber.equals(b.subscriber)) {
242 b.onError(new IllegalStateException("Duplicate subscribe"));
243 break;
244 }
245 else
246 pred = b;
247 b = next;
248 }
249 }
250 }
251
252 /**
253 * Publishes the given item to each current subscriber by
254 * asynchronously invoking its {@link Flow.Subscriber#onNext}
255 * method, blocking uninterruptibly while resources for any
256 * subscriber are unavailable. This method returns an estimate of
257 * the maximum lag (number of items submitted but not yet
258 * consumed) among all current subscribers. This value is at least
259 * one (accounting for this submitted item) if there are any
260 * subscribers, else zero.
261 *
262 * <p>If the Executor for this publisher throws a
263 * RejectedExecutionException (or any other RuntimeException or
264 * Error) when attempting to asynchronously notify subscribers,
265 * then this exception is rethrown, in which case some but not all
266 * subscribers may have received this item.
267 *
268 * @param item the (non-null) item to publish
269 * @return the estimated maximum lag among subscribers
270 * @throws IllegalStateException if closed
271 * @throws NullPointerException if item is null
272 * @throws RejectedExecutionException if thrown by Executor
273 */
274 public int submit(T item) {
275 if (item == null) throw new NullPointerException();
276 int lag = 0;
277 boolean complete;
278 synchronized (this) {
279 complete = closed;
280 BufferedSubscription<T> b = clients;
281 if (!complete) {
282 BufferedSubscription<T> pred = null, r = null, rtail = null;
283 while (b != null) {
284 BufferedSubscription<T> next = b.next;
285 int stat = b.offer(item);
286 if (stat < 0) { // disabled
287 b.next = null;
288 if (pred == null)
289 clients = next;
290 else
291 pred.next = next;
292 }
293 else {
294 if (stat > lag)
295 lag = stat;
296 else if (stat == 0) { // place on retry list
297 b.nextRetry = null;
298 if (rtail == null)
299 r = b;
300 else
301 rtail.nextRetry = b;
302 rtail = b;
303 }
304 pred = b;
305 }
306 b = next;
307 }
308 while (r != null) {
309 BufferedSubscription<T> nextRetry = r.nextRetry;
310 r.nextRetry = null;
311 int stat = r.submit(item);
312 if (stat > lag)
313 lag = stat;
314 else if (stat < 0 && clients == r)
315 clients = r.next; // postpone internal unsubscribes
316 r = nextRetry;
317 }
318 }
319 }
320 if (complete)
321 throw new IllegalStateException("Closed");
322 else
323 return lag;
324 }
325
326 /**
327 * Publishes the given item, if possible, to each current
328 * subscriber by asynchronously invoking its {@link
329 * Flow.Subscriber#onNext} method. The item may be dropped by one
330 * or more subscribers if resource limits are exceeded, in which
331 * case the given handler (if non-null) is invoked, and if it
332 * returns true, retried once. Other calls to methods in this
333 * class by other threads are blocked while the handler is
334 * invoked. Unless recovery is assured, options are usually
335 * limited to logging the error and/or issuing an {@code onError}
336 * signal to the subscriber.
337 *
338 * <p>This method returns a status indicator: If negative, it
339 * represents the (negative) number of drops (failed attempts to
340 * issue the item to a subscriber). Otherwise it is an estimate of
341 * the maximum lag (number of items submitted but not yet
342 * consumed) among all current subscribers. This value is at least
343 * one (accounting for this submitted item) if there are any
344 * subscribers, else zero.
345 *
346 * <p>If the Executor for this publisher throws a
347 * RejectedExecutionException (or any other RuntimeException or
348 * Error) when attempting to asynchronously notify subscribers, or
349 * the drop handler throws an exception when processing a dropped
350 * item, then this exception is rethrown.
351 *
352 * @param item the (non-null) item to publish
353 * @param onDrop if non-null, the handler invoked upon a drop to a
354 * subscriber, with arguments of the subscriber and item; if it
355 * returns true, an offer is re-attempted (once)
356 * @return if negative, the (negative) number of drops; otherwise
357 * an estimate of maximum lag
358 * @throws IllegalStateException if closed
359 * @throws NullPointerException if item is null
360 * @throws RejectedExecutionException if thrown by Executor
361 */
362 public int offer(T item,
363 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
364 return doOffer(0L, item, onDrop);
365 }
366
367 /**
368 * Publishes the given item, if possible, to each current
369 * subscriber by asynchronously invoking its {@link
370 * Flow.Subscriber#onNext} method, blocking while resources for
371 * any subscription are unavailable, up to the specified timeout
372 * or the caller thread is interrupted, at which point the given
373 * handler (if non-null) is invoked, and if it returns true,
374 * retried once. (The drop handler may distinguish timeouts from
375 * interrupts by checking whether the current thread is
376 * interrupted.) Other calls to methods in this class by other
377 * threads are blocked while the handler is invoked. Unless
378 * recovery is assured, options are usually limited to logging the
379 * error and/or issuing an {@code onError} signal to the
380 * subscriber.
381 *
382 * <p>This method returns a status indicator: If negative, it
383 * represents the (negative) number of drops (failed attempts to
384 * issue the item to a subscriber). Otherwise it is an estimate of
385 * the maximum lag (number of items submitted but not yet
386 * consumed) among all current subscribers. This value is at least
387 * one (accounting for this submitted item) if there are any
388 * subscribers, else zero.
389 *
390 * <p>If the Executor for this publisher throws a
391 * RejectedExecutionException (or any other RuntimeException or
392 * Error) when attempting to asynchronously notify subscribers, or
393 * the drop handler throws an exception when processing a dropped
394 * item, then this exception is rethrown.
395 *
396 * @param item the (non-null) item to publish
397 * @param timeout how long to wait for resources for any subscriber
398 * before giving up, in units of {@code unit}
399 * @param unit a {@code TimeUnit} determining how to interpret the
400 * {@code timeout} parameter
401 * @param onDrop if non-null, the handler invoked upon a drop to a
402 * subscriber, with arguments of the subscriber and item; if it
403 * returns true, an offer is re-attempted (once)
404 * @return if negative, the (negative) number of drops; otherwise
405 * an estimate of maximum lag
406 * @throws IllegalStateException if closed
407 * @throws NullPointerException if item is null
408 * @throws RejectedExecutionException if thrown by Executor
409 */
410 public int offer(T item, long timeout, TimeUnit unit,
411 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
412 return doOffer(unit.toNanos(timeout), item, onDrop);
413 }
414
415 /** Common implementation for both forms of offer */
416 final int doOffer(long nanos, T item,
417 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
418 if (item == null) throw new NullPointerException();
419 int lag = 0, drops = 0;
420 boolean complete;
421 synchronized (this) {
422 complete = closed;
423 BufferedSubscription<T> b = clients;
424 if (!complete) {
425 BufferedSubscription<T> pred = null, r = null, rtail = null;
426 while (b != null) {
427 BufferedSubscription<T> next = b.next;
428 int stat = b.offer(item);
429 if (stat < 0) {
430 b.next = null;
431 if (pred == null)
432 clients = next;
433 else
434 pred.next = next;
435 }
436 else {
437 if (stat > lag)
438 lag = stat;
439 else if (stat == 0) {
440 b.nextRetry = null;
441 if (rtail == null)
442 r = b;
443 else
444 rtail.nextRetry = b;
445 rtail = b;
446 }
447 else if (stat > lag)
448 lag = stat;
449 pred = b;
450 }
451 b = next;
452 }
453 while (r != null) {
454 BufferedSubscription<T> nextRetry = r.nextRetry;
455 r.nextRetry = null;
456 int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
457 r.offer(item);
458 if (stat == 0 && onDrop != null &&
459 onDrop.test(r.subscriber, item))
460 stat = r.offer(item);
461 if (stat == 0)
462 ++drops;
463 else if (stat > lag)
464 lag = stat;
465 else if (stat < 0 && clients == r)
466 clients = r.next;
467 r = nextRetry;
468 }
469 }
470 }
471 if (complete)
472 throw new IllegalStateException("Closed");
473 else
474 return (drops > 0) ? -drops : lag;
475 }
476
477 /**
478 * Unless already closed, issues {@link
479 * Flow.Subscriber#onComplete} signals to current subscribers, and
480 * disallows subsequent attempts to publish. Upon return, this
481 * method does <em>NOT</em> guarantee that all subscribers have
482 * yet completed.
483 */
484 public void close() {
485 if (!closed) {
486 BufferedSubscription<T> b, next;
487 synchronized (this) {
488 b = clients;
489 clients = null;
490 closed = true;
491 }
492 while (b != null) {
493 next = b.next;
494 b.onComplete();
495 b = next;
496 }
497 }
498 }
499
500 /**
501 * Unless already closed, issues {@link Flow.Subscriber#onError}
502 * signals to current subscribers with the given error, and
503 * disallows subsequent attempts to publish. Upon return, this
504 * method does <em>NOT</em> guarantee that all subscribers have
505 * yet completed.
506 *
507 * @param error the {@code onError} argument sent to subscribers
508 * @throws NullPointerException if error is null
509 */
510 public void closeExceptionally(Throwable error) {
511 if (error == null)
512 throw new NullPointerException();
513 if (!closed) {
514 BufferedSubscription<T> b, next;
515 synchronized (this) {
516 b = clients;
517 clients = null;
518 closed = true;
519 }
520 while (b != null) {
521 next = b.next;
522 b.onError(error);
523 b = next;
524 }
525 }
526 }
527
528 /**
529 * Returns true if this publisher is not accepting submissions.
530 *
531 * @return true if closed
532 */
533 public boolean isClosed() {
534 return closed;
535 }
536
537 /**
538 * Returns true if this publisher has any subscribers.
539 *
540 * @return true if this publisher has any subscribers
541 */
542 public boolean hasSubscribers() {
543 boolean nonEmpty = false;
544 if (!closed) {
545 synchronized (this) {
546 BufferedSubscription<T> pred = null, next;
547 for (BufferedSubscription<T> b = clients; b != null; b = next) {
548 next = b.next;
549 if (b.isDisabled()) {
550 b.next = null;
551 if (pred == null)
552 clients = next;
553 else
554 pred.next = next;
555 }
556 else {
557 nonEmpty = true;
558 break;
559 }
560 }
561 }
562 }
563 return nonEmpty;
564 }
565
566 /**
567 * Returns the number of current subscribers.
568 *
569 * @return the number of current subscribers
570 */
571 public int getNumberOfSubscribers() {
572 int count = 0;
573 if (!closed) {
574 synchronized (this) {
575 BufferedSubscription<T> pred = null, next;
576 for (BufferedSubscription<T> b = clients; b != null; b = next) {
577 next = b.next;
578 if (b.isDisabled()) {
579 b.next = null;
580 if (pred == null)
581 clients = next;
582 else
583 pred.next = next;
584 }
585 else {
586 pred = b;
587 ++count;
588 }
589 }
590 }
591 }
592 return count;
593 }
594
595 /**
596 * Returns the Executor used for asynchronous delivery.
597 *
598 * @return the Executor used for asynchronous delivery
599 */
600 public Executor getExecutor() {
601 return executor;
602 }
603
604 /**
605 * Returns the maximum per-subscriber buffer capacity.
606 *
607 * @return the maximum per-subscriber buffer capacity
608 */
609 public int getMaxBufferCapacity() {
610 return maxBufferCapacity;
611 }
612
613 /**
614 * Returns a list of current subscribers for monitoring and
615 * tracking purposes, not for invoking {@link Flow.Subscriber}
616 * methods on the subscribers.
617 *
618 * @return list of current subscribers
619 */
620 public List<Flow.Subscriber<? super T>> getSubscribers() {
621 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
622 synchronized (this) {
623 BufferedSubscription<T> pred = null, next;
624 for (BufferedSubscription<T> b = clients; b != null; b = next) {
625 next = b.next;
626 if (b.isDisabled()) {
627 b.next = null;
628 if (pred == null)
629 clients = next;
630 else
631 pred.next = next;
632 }
633 else
634 subs.add(b.subscriber);
635 }
636 }
637 return subs;
638 }
639
640 /**
641 * Returns true if the given Subscriber is currently subscribed.
642 *
643 * @param subscriber the subscriber
644 * @return true if currently subscribed
645 * @throws NullPointerException if subscriber is null
646 */
647 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
648 if (subscriber == null) throw new NullPointerException();
649 if (!closed) {
650 synchronized (this) {
651 BufferedSubscription<T> pred = null, next;
652 for (BufferedSubscription<T> b = clients; b != null; b = next) {
653 next = b.next;
654 if (b.isDisabled()) {
655 b.next = null;
656 if (pred == null)
657 clients = next;
658 else
659 pred.next = next;
660 }
661 else if (subscriber.equals(b.subscriber))
662 return true;
663 else
664 pred = b;
665 }
666 }
667 }
668 return false;
669 }
670
671 /**
672 * Returns an estimate of the minimum number of items requested
673 * (via {@link Flow.Subscription#request}) but not yet produced,
674 * among all current subscribers.
675 *
676 * @return the estimate, or zero if no subscribers
677 */
678 public long estimateMinimumDemand() {
679 long min = Long.MAX_VALUE;
680 boolean nonEmpty = false;
681 synchronized (this) {
682 BufferedSubscription<T> pred = null, next;
683 for (BufferedSubscription<T> b = clients; b != null; b = next) {
684 int n; long d;
685 next = b.next;
686 if ((n = b.estimateLag()) < 0) {
687 b.next = null;
688 if (pred == null)
689 clients = next;
690 else
691 pred.next = next;
692 }
693 else {
694 if ((d = b.demand - n) < min)
695 min = d;
696 nonEmpty = true;
697 }
698 }
699 }
700 return nonEmpty ? min : 0;
701 }
702
703 /**
704 * Returns an estimate of the maximum number of items produced but
705 * not yet consumed among all current subscribers.
706 *
707 * @return the estimate
708 */
709 public int estimateMaximumLag() {
710 int max = 0;
711 synchronized (this) {
712 BufferedSubscription<T> pred = null, next;
713 for (BufferedSubscription<T> b = clients; b != null; b = next) {
714 int n;
715 next = b.next;
716 if ((n = b.estimateLag()) < 0) {
717 b.next = null;
718 if (pred == null)
719 clients = next;
720 else
721 pred.next = next;
722 }
723 else if (n > max)
724 max = n;
725 }
726 }
727 return max;
728 }
729
730 /**
731 * A task for consuming buffer items and signals, created and
732 * executed whenever they become available. A task consumes as
733 * many items/signals as possible before terminating, at which
734 * point another task is created when needed. The dual Runnable
735 * and ForkJoinTask declaration saves overhead when executed by
736 * ForkJoinPools, without impacting other kinds of Executors.
737 */
738 @SuppressWarnings("serial")
739 static final class ConsumerTask<T> extends ForkJoinTask<Void>
740 implements Runnable {
741 final BufferedSubscription<T> consumer;
742 ConsumerTask(BufferedSubscription<T> consumer) {
743 this.consumer = consumer;
744 }
745 public final Void getRawResult() { return null; }
746 public final void setRawResult(Void v) {}
747 public final boolean exec() { consumer.consume(); return false; }
748 public final void run() { consumer.consume(); }
749 }
750
751 /**
752 * A bounded (ring) buffer with integrated control to start a
753 * consumer task whenever items are available. The buffer
754 * algorithm is similar to one used inside ForkJoinPool,
755 * specialized for the case of at most one concurrent producer and
756 * consumer, and power of two buffer sizes. This allows methods to
757 * operate without locks even while supporting resizing, blocking,
758 * task-triggering, and garbage-free buffers (nulling out elements
759 * when consumed), although supporting these does impose a bit of
760 * overhead compared to plain fixed-size ring buffers.
761 *
762 * The publisher guarantees a single producer via its lock. We
763 * ensure in this class that there is at most one consumer. The
764 * request and cancel methods must be fully thread-safe but are
765 * coded to exploit the most common case in which they are only
766 * called by consumers (usually within onNext).
767 *
768 * Execution control is managed using the ACTIVE ctl bit. We
769 * ensure that a task is active when consumable items (and
770 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
771 * there is demand (unfilled requests). This is complicated on
772 * the creation side by the possibility of exceptions when trying
773 * to execute tasks. These eventually force DISABLED state, but
774 * sometimes not directly. On the task side, termination (clearing
775 * ACTIVE) that would otherwise race with producers or request()
776 * calls uses the CONSUME keep-alive bit to force a recheck.
777 *
778 * The ctl field also manages run state. When DISABLED, no further
779 * updates are possible. Disabling may be preceded by setting
780 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
781 * case the associated Subscriber methods are invoked, possibly
782 * synchronously if there is no active consumer task (including
783 * cases where execute() failed). The cancel() method is supported
784 * by treating as ERROR but suppressing onError signal.
785 *
786 * Support for blocking also exploits the fact that there is only
787 * one possible waiter. ManagedBlocker-compatible control fields
788 * are placed in this class itself rather than in wait-nodes.
789 * Blocking control relies on the "waiter" field. Producers set
790 * the field before trying to block, but must then recheck (via
791 * offer) before parking. Signalling then just unparks and clears
792 * waiter field. If the producer and consumer are both in the same
793 * ForkJoinPool, or consumers are running in commonPool, the
794 * producer attempts to help run consumer tasks that it forked
795 * before blocking. To avoid potential cycles, only one level of
796 * helping is currently supported.
797 *
798 * This class uses @Contended and heuristic field declaration
799 * ordering to reduce memory contention on BufferedSubscription
800 * itself, but it does not currently attempt to avoid memory
801 * contention (especially including card-marks) among buffer
802 * elements, that can significantly slow down some usages.
803 * Addressing this may require allocating substantially more space
804 * than users expect.
805 */
806 @SuppressWarnings("serial")
807 @sun.misc.Contended
808 static final class BufferedSubscription<T>
809 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
810 // Order-sensitive field declarations
811 long timeout; // > 0 if timed wait
812 volatile long demand; // # unfilled requests
813 int maxCapacity; // reduced on OOME
814 int putStat; // offer result for ManagedBlocker
815 int helpDepth; // nested helping depth (at most 1)
816 volatile int ctl; // atomic run state flags
817 volatile int head; // next position to take
818 volatile int tail; // next position to put
819 Object[] array; // buffer: null if disabled
820 Flow.Subscriber<? super T> subscriber; // null if disabled
821 Executor executor; // null if disabled
822 volatile Throwable pendingError; // holds until onError issued
823 volatile Thread waiter; // blocked producer thread
824 T putItem; // for offer within ManagedBlocker
825 BufferedSubscription<T> next; // used only by publisher
826 BufferedSubscription<T> nextRetry; // used only by publisher
827
828 // ctl values
829 static final int ACTIVE = 0x01; // consumer task active
830 static final int CONSUME = 0x02; // keep-alive for consumer task
831 static final int DISABLED = 0x04; // final state
832 static final int ERROR = 0x08; // signal onError then disable
833 static final int SUBSCRIBE = 0x10; // signal onSubscribe
834 static final int COMPLETE = 0x20; // signal onComplete when done
835
836 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
837
838 /**
839 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
840 */
841 static final int MINCAP = 8;
842
843 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
844 Executor executor, int maxBufferCapacity) {
845 this.subscriber = subscriber;
846 this.executor = executor;
847 this.maxCapacity = maxBufferCapacity;
848 }
849
850 final boolean isDisabled() {
851 return ctl == DISABLED;
852 }
853
854 /**
855 * Returns estimated number of buffered items, or -1 if
856 * disabled
857 */
858 final int estimateLag() {
859 int n;
860 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
861 }
862
863 /**
864 * Tries to add item and start consumer task if necessary.
865 * @return -1 if disabled, 0 if dropped, else estimated lag
866 */
867 final int offer(T item) {
868 Object[] a = array;
869 int t = tail, size = t + 1 - head, stat, cap, i;
870 if (a == null || (cap = a.length) < size || (i = t & (cap - 1)) < 0)
871 stat = growAndAdd(a, item);
872 else {
873 a[i] = item;
874 U.putOrderedInt(this, TAIL, t + 1);
875 stat = size;
876 }
877 return (stat > 0 &&
878 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
879 startOnOffer(stat) : stat;
880 }
881
882 /**
883 * Tries to create or expand buffer, then adds item if possible.
884 */
885 private int growAndAdd(Object[] a, T item) {
886 int cap, stat;
887 if ((ctl & (ERROR | DISABLED)) != 0) {
888 cap = 0;
889 stat = -1;
890 }
891 else if (a == null) {
892 cap = 0;
893 stat = 1;
894 }
895 else if ((cap = a.length) >= maxCapacity)
896 stat = 0; // cannot grow
897 else
898 stat = cap + 1;
899 if (stat > 0) {
900 int newCap = (cap > 0 ? cap << 1 :
901 maxCapacity >= MINCAP ? MINCAP :
902 maxCapacity >= 2 ? maxCapacity : 2);
903 if (newCap <= cap)
904 stat = 0;
905 else {
906 Object[] newArray = null;
907 try {
908 newArray = new Object[newCap];
909 } catch (Throwable ex) { // try to cope with OOME
910 }
911 if (newArray == null) {
912 if (cap > 0)
913 maxCapacity = cap; // avoid continuous failure
914 stat = 0;
915 }
916 else {
917 array = newArray;
918 int t = tail;
919 int newMask = newCap - 1;
920 if (a != null && cap > 0) {
921 int mask = cap - 1;
922 for (int j = head; j != t; ++j) {
923 Object x; // races with consumer
924 int k = j & mask;
925 if ((x = a[k]) != null &&
926 U.compareAndSwapObject(
927 a, (((long)k) << ASHIFT) + ABASE,
928 x, null))
929 newArray[j & newMask] = x;
930 }
931 }
932 newArray[t & newMask] = item;
933 tail = t + 1;
934 }
935 }
936 }
937 return stat;
938 }
939
940 /**
941 * Spins/helps/blocks while offer returns 0. Called only if
942 * initial offer return 0.
943 */
944 final int submit(T item) {
945 int stat = 0;
946 Executor e = executor;
947 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
948 Thread thread = Thread.currentThread();
949 // split cases to help compiler specialization
950 if ((thread instanceof ForkJoinWorkerThread) &&
951 ((ForkJoinWorkerThread)thread).getPool() == e) {
952 ForkJoinTask<?> t;
953 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
954 (t instanceof ConsumerTask)) {
955 if ((stat = offer(item)) != 0 || !t.tryUnfork())
956 break;
957 ((ConsumerTask<?>)t).consumer.helpConsume();
958 }
959 }
960 else if (e == ForkJoinPool.commonPool()) {
961 ForkJoinTask<?> t;
962 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
963 (t instanceof ConsumerTask)) {
964 if ((stat = offer(item)) != 0 || !t.tryUnfork())
965 break;
966 ((ConsumerTask<?>)t).consumer.helpConsume();
967 }
968 }
969 }
970 if (stat == 0 && (stat = offer(item)) == 0) {
971 putItem = item;
972 timeout = 0L;
973 try {
974 ForkJoinPool.managedBlock(this);
975 } catch (InterruptedException ie) {
976 timeout = INTERRUPTED;
977 }
978 stat = putStat;
979 if (timeout < 0L)
980 Thread.currentThread().interrupt();
981 }
982 return stat;
983 }
984
985 /**
986 * Timeout version; similar to submit
987 */
988 final int timedOffer(T item, long nanos) {
989 int stat = 0;
990 Executor e = executor;
991 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
992 Thread thread = Thread.currentThread();
993 if (((thread instanceof ForkJoinWorkerThread) &&
994 ((ForkJoinWorkerThread)thread).getPool() == e) ||
995 e == ForkJoinPool.commonPool()) {
996 ForkJoinTask<?> t;
997 long deadline = System.nanoTime() + nanos;
998 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
999 (t instanceof ConsumerTask)) {
1000 if ((stat = offer(item)) != 0 ||
1001 (nanos = deadline - System.nanoTime()) <= 0L ||
1002 !t.tryUnfork())
1003 break;
1004 ((ConsumerTask<?>)t).consumer.helpConsume();
1005 }
1006 }
1007 }
1008 if (stat == 0 && (stat = offer(item)) == 0 &&
1009 (timeout = nanos) > 0L) {
1010 putItem = item;
1011 try {
1012 ForkJoinPool.managedBlock(this);
1013 } catch (InterruptedException ie) {
1014 timeout = INTERRUPTED;
1015 }
1016 stat = putStat;
1017 if (timeout < 0L)
1018 Thread.currentThread().interrupt();
1019 }
1020 return stat;
1021 }
1022
1023 /** Version of consume called when helping in submit or timedOffer */
1024 private void helpConsume() {
1025 helpDepth = 1; // only one level allowed
1026 consume();
1027 helpDepth = 0;
1028 }
1029
1030 /**
1031 * Tries to start consumer task after offer.
1032 * @return -1 if now disabled, else argument
1033 */
1034 private int startOnOffer(int stat) {
1035 for (;;) {
1036 Executor e; int c;
1037 if ((c = ctl) == DISABLED || (e = executor) == null) {
1038 stat = -1;
1039 break;
1040 }
1041 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1042 if ((c & CONSUME) != 0 ||
1043 U.compareAndSwapInt(this, CTL, c,
1044 c | CONSUME))
1045 break;
1046 }
1047 else if (demand == 0L || tail == head)
1048 break;
1049 else if (U.compareAndSwapInt(this, CTL, c,
1050 c | (ACTIVE | CONSUME))) {
1051 try {
1052 e.execute(new ConsumerTask<T>(this));
1053 break;
1054 } catch (RuntimeException | Error ex) { // back out
1055 do {} while (((c = ctl) & DISABLED) == 0 &&
1056 (c & ACTIVE) != 0 &&
1057 !U.compareAndSwapInt(this, CTL, c,
1058 c & ~ACTIVE));
1059 throw ex;
1060 }
1061 }
1062 }
1063 return stat;
1064 }
1065
1066 /**
1067 * Nulls out most fields, mainly to avoid garbage retention
1068 * until publisher unsubscribes, but also to help cleanly stop
1069 * upon error by nulling required components.
1070 */
1071 private void detach() {
1072 Thread w = waiter;
1073 array = null;
1074 executor = null;
1075 subscriber = null;
1076 pendingError = null;
1077 waiter = null;
1078 if (w != null)
1079 LockSupport.unpark(w); // force wakeup
1080 }
1081
1082 /**
1083 * Issues error signal, asynchronously if a task is running,
1084 * else synchronously.
1085 */
1086 final void onError(Throwable ex) {
1087 for (int c;;) {
1088 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1089 break;
1090 else if ((c & ACTIVE) != 0) {
1091 pendingError = ex;
1092 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1093 break; // cause consumer task to exit
1094 }
1095 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1096 Flow.Subscriber<? super T> s = subscriber;
1097 if (s != null && ex != null) {
1098 try {
1099 s.onError(ex);
1100 } catch (Throwable ignore) {
1101 }
1102 }
1103 detach();
1104 break;
1105 }
1106 }
1107 }
1108
1109 /**
1110 * Tries to start consumer task upon a signal or request;
1111 * disables on failure.
1112 */
1113 private void startOrDisable() {
1114 Executor e;
1115 if ((e = executor) != null) { // skip if already disabled
1116 try {
1117 e.execute(new ConsumerTask<T>(this));
1118 } catch (Throwable ex) { // back out and force signal
1119 for (int c;;) {
1120 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1121 break;
1122 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1123 onError(ex);
1124 break;
1125 }
1126 }
1127 }
1128 }
1129 }
1130
1131 final void onComplete() {
1132 for (int c;;) {
1133 if ((c = ctl) == DISABLED)
1134 break;
1135 if (U.compareAndSwapInt(this, CTL, c,
1136 c | (ACTIVE | CONSUME | COMPLETE))) {
1137 if ((c & ACTIVE) == 0)
1138 startOrDisable();
1139 break;
1140 }
1141 }
1142 }
1143
1144 final void onSubscribe() {
1145 for (int c;;) {
1146 if ((c = ctl) == DISABLED)
1147 break;
1148 if (U.compareAndSwapInt(this, CTL, c,
1149 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1150 if ((c & ACTIVE) == 0)
1151 startOrDisable();
1152 break;
1153 }
1154 }
1155 }
1156
1157 /**
1158 * Causes consumer task to exit if active (without reporting
1159 * onError unless there is already a pending error), and
1160 * disables.
1161 */
1162 public void cancel() {
1163 for (int c;;) {
1164 if ((c = ctl) == DISABLED)
1165 break;
1166 else if ((c & ACTIVE) != 0) {
1167 if (U.compareAndSwapInt(this, CTL, c,
1168 c | (CONSUME | ERROR)))
1169 break;
1170 }
1171 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1172 detach();
1173 break;
1174 }
1175 }
1176 }
1177
1178 /**
1179 * Adds to demand and possibly starts task.
1180 */
1181 public void request(long n) {
1182 if (n > 0L) {
1183 for (;;) {
1184 long prev = demand, d;
1185 if ((d = prev + n) < prev) // saturate
1186 d = Long.MAX_VALUE;
1187 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1188 while (d != 0L) {
1189 int c, h;
1190 if ((c = ctl) == DISABLED)
1191 break;
1192 else if ((c & ACTIVE) != 0) {
1193 if ((c & CONSUME) != 0 ||
1194 U.compareAndSwapInt(this, CTL, c,
1195 c | CONSUME))
1196 break;
1197 }
1198 else if ((h = head) != tail) {
1199 if (U.compareAndSwapInt(this, CTL, c,
1200 c | (ACTIVE|CONSUME))) {
1201 startOrDisable();
1202 break;
1203 }
1204 }
1205 else if (head == h && tail == h)
1206 break; // else stale
1207 d = demand;
1208 }
1209 break;
1210 }
1211 }
1212 }
1213 else if (n < 0L)
1214 onError(new IllegalArgumentException(
1215 "negative subscription request"));
1216 }
1217
1218 public final boolean isReleasable() { // for ManagedBlocker
1219 T item = putItem;
1220 if (item != null) { // A few randomized spins
1221 for (int spins = MINCAP, r = 0;;) {
1222 if ((putStat = offer(item)) != 0) {
1223 putItem = null;
1224 break;
1225 }
1226 else if (r == 0)
1227 r = ThreadLocalRandom.nextSecondarySeed();
1228 else {
1229 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1230 if (r >= 0 && --spins <= 0)
1231 return false;
1232 }
1233 }
1234 }
1235 return true;
1236 }
1237
1238 public final boolean block() { // for ManagedBlocker
1239 T item = putItem;
1240 if (item != null) {
1241 putItem = null;
1242 long nanos = timeout;
1243 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1244 while ((putStat = offer(item)) == 0) {
1245 if (Thread.interrupted()) {
1246 timeout = INTERRUPTED;
1247 if (nanos > 0L)
1248 break;
1249 }
1250 else if (nanos > 0L &&
1251 (nanos = deadline - System.nanoTime()) <= 0L)
1252 break;
1253 else if (waiter == null)
1254 waiter = Thread.currentThread();
1255 else {
1256 if (nanos > 0L)
1257 LockSupport.parkNanos(this, nanos);
1258 else
1259 LockSupport.park(this);
1260 waiter = null;
1261 }
1262 }
1263 }
1264 waiter = null;
1265 return true;
1266 }
1267
1268 /**
1269 * Consumer loop, called from ConsumerTask, or indirectly
1270 * via helpConsume when helping during submit.
1271 */
1272 final void consume() {
1273 Flow.Subscriber<? super T> s;
1274 if ((s = subscriber) != null) { // else disabled
1275 for (int h = head;;) {
1276 long d = demand;
1277 int c, n, i; Object[] a; Object x; Thread w;
1278 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1279 if ((c & ERROR) != 0) {
1280 Throwable ex = pendingError;
1281 ctl = DISABLED; // no need for CAS
1282 if (ex != null) { // null if errorless cancel
1283 try {
1284 s.onError(ex);
1285 } catch (Throwable ignore) {
1286 }
1287 }
1288 }
1289 else if ((c & SUBSCRIBE) != 0) {
1290 if (U.compareAndSwapInt(this, CTL, c,
1291 c & ~SUBSCRIBE)) {
1292 try {
1293 s.onSubscribe(this);
1294 } catch (Throwable ex) {
1295 onError(ex);
1296 }
1297 }
1298 }
1299 else {
1300 detach();
1301 break;
1302 }
1303 }
1304 else if (h == tail) { // apparently empty
1305 if ((c & CONSUME) != 0) // recheck
1306 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1307 else if ((c & COMPLETE) != 0) {
1308 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1309 try {
1310 s.onComplete();
1311 } catch (Throwable ignore) {
1312 }
1313 }
1314 }
1315 else if (U.compareAndSwapInt(this, CTL, c,
1316 c & ~ACTIVE))
1317 break;
1318 }
1319 else if (d == 0L) { // can't consume
1320 if (demand == 0L) { // recheck
1321 if ((c & CONSUME) != 0)
1322 U.compareAndSwapInt(this, CTL, c,
1323 c & ~CONSUME);
1324 else if (U.compareAndSwapInt(this, CTL, c,
1325 c & ~ACTIVE))
1326 break;
1327 }
1328 }
1329 else if ((a = array) == null || (n = a.length) == 0 ||
1330 (x = a[i = h & (n - 1)]) == null)
1331 ; // stale; retry
1332 else if ((c & CONSUME) == 0)
1333 U.compareAndSwapInt(this, CTL, c, c | CONSUME);
1334 else if (U.compareAndSwapObject(
1335 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1336 U.putOrderedInt(this, HEAD, ++h);
1337 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1338 d = demand; // almost never fails
1339 if ((w = waiter) != null) {
1340 waiter = null;
1341 LockSupport.unpark(w); // release producer
1342 }
1343 try {
1344 @SuppressWarnings("unchecked") T y = (T) x;
1345 s.onNext(y);
1346 } catch (Throwable ex) {
1347 onError(ex);
1348 }
1349 }
1350 }
1351 }
1352 }
1353
1354 // Unsafe mechanics
1355 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1356 private static final long CTL;
1357 private static final long TAIL;
1358 private static final long HEAD;
1359 private static final long DEMAND;
1360 private static final int ABASE;
1361 private static final int ASHIFT;
1362
1363 static {
1364 try {
1365 CTL = U.objectFieldOffset
1366 (BufferedSubscription.class.getDeclaredField("ctl"));
1367 TAIL = U.objectFieldOffset
1368 (BufferedSubscription.class.getDeclaredField("tail"));
1369 HEAD = U.objectFieldOffset
1370 (BufferedSubscription.class.getDeclaredField("head"));
1371 DEMAND = U.objectFieldOffset
1372 (BufferedSubscription.class.getDeclaredField("demand"));
1373
1374 ABASE = U.arrayBaseOffset(Object[].class);
1375 int scale = U.arrayIndexScale(Object[].class);
1376 if ((scale & (scale - 1)) != 0)
1377 throw new Error("data type scale not a power of two");
1378 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1379 } catch (ReflectiveOperationException e) {
1380 throw new Error(e);
1381 }
1382 }
1383 }
1384 }