ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.32
Committed: Mon Jan 26 13:16:15 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.31: +12 -6 lines
Log Message:
Fix loops

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted
18 * (non-null) items to current subscribers until it is closed. Each
19 * current subscriber receives newly submitted items in the same order
20 * unless drops or exceptions are encountered. Using a
21 * SubmissionPublisher allows item generators to act as Publishers
22 * relying on drop handling and/or blocking for flow control.
23 *
24 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using
30 * the default {@link ForkJoinPool#commonPool}.
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use and expanded as needed up to the
35 * given maximum. (The enforced capacity may be rounded up to the
36 * nearest power of two and/or bounded by the largest value supported
37 * by this implementation.) Invocations of {@link
38 * Flow.Subscription#request} do not directly result in buffer
39 * expansion, but risk saturation if unfilled requests exceed the
40 * maximum capacity. The default value of {@link
41 * Flow#defaultBufferSize} may provide a useful starting point for
42 * choosing a capacity based on expected rates, resources, and usages.
43 *
44 * <p>Publication methods support different policies about what to do
45 * when buffers are saturated. Method {@link #submit} blocks until
46 * resources are available. This is simplest, but least responsive.
47 * The {@code offer} methods may drop items (either immediately or
48 * with bounded timeout), but provide an opportunity to interpose a
49 * handler and then retry.
50 *
51 * <p>If any Subscriber method throws an exception, its subscription
52 * is cancelled. If the supplied Executor throws {@link
53 * RejectedExecutionException} (or any other RuntimeException or
54 * Error) when attempting to execute a task, or a drop handler throws
55 * an exception when processing a dropped item, then the exception is
56 * rethrown. In these cases, not all subscribers will have been issued
57 * the published item. It is usually good practice to {@link
58 * #closeExceptionally closeExceptionally} in these cases.
59 *
60 * <p>This class may also serve as a convenient base for subclasses
61 * that generate items, and use the methods in this class to publish
62 * them. For example here is a class that periodically publishes the
63 * items generated from a supplier. (In practice you might add methods
64 * to independently start and stop generation, to share schedulers
65 * among publishers, and so on, or use a SubmissionPublisher as a
66 * component rather than a superclass.)
67 *
68 * <pre> {@code
69 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
70 * final ScheduledFuture<?> periodicTask;
71 * final ScheduledExecutorService scheduler;
72 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
73 * Supplier<? extends T> supplier,
74 * long period, TimeUnit unit) {
75 * super(executor, maxBufferCapacity);
76 * scheduler = new ScheduledThreadPoolExecutor(1);
77 * periodicTask = scheduler.scheduleAtFixedRate(
78 * () -> submit(supplier.get()), 0, period, unit);
79 * }
80 * public void close() {
81 * periodicTask.cancel(false);
82 * scheduler.shutdown();
83 * super.close();
84 * }
85 * }}</pre>
86 *
87 * <p>Here is an example of a {@link Flow.Processor} implementation.
88 * It uses single-step requests to its publisher for simplicity of
89 * illustration. A more adaptive version could monitor flow using the
90 * lag estimate returned from {@code submit}, along with other utility
91 * methods.
92 *
93 * <pre> {@code
94 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
95 * implements Flow.Processor<S,T> {
96 * final Function<? super S, ? extends T> function;
97 * Flow.Subscription subscription;
98 * TransformProcessor(Executor executor, int maxBufferCapacity,
99 * Function<? super S, ? extends T> function) {
100 * super(executor, maxBufferCapacity);
101 * this.function = function;
102 * }
103 * public void onSubscribe(Flow.Subscription subscription) {
104 * (this.subscription = subscription).request(1);
105 * }
106 * public void onNext(S item) {
107 * subscription.request(1);
108 * submit(function.apply(item));
109 * }
110 * public void onError(Throwable ex) { closeExceptionally(ex); }
111 * public void onComplete() { close(); }
112 * }}</pre>
113 *
114 * @param <T> the published item type
115 * @author Doug Lea
116 * @since 1.9
117 */
118 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
119 AutoCloseable {
120 /*
121 * Most mechanics are handled by BufferedSubscription. This class
122 * mainly tracks subscribers and ensures sequentiality, by using
123 * built-in synchronization locks across public methods. (Using
124 * built-in locks works well in the most typical case in which
125 * only one thread submits items).
126 */
127
128 /** The largest possible power of two array size. */
129 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
130
131 /** Round capacity to power of 2, at least 2, at most limit. */
132 static final int roundCapacity(int cap) { // to nearest power of 2
133 int n = cap - 1;
134 n |= n >>> 1;
135 n |= n >>> 2;
136 n |= n >>> 4;
137 n |= n >>> 8;
138 n |= n >>> 16;
139 return (n <= 0) ? 2 : // at least 2
140 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
141 }
142
143 /**
144 * Clients (BufferedSubscriptions) are maintained in a linked list
145 * (via their "next" fields). This works well for publish loops.
146 * It requires O(n) traversal to check for duplicate subscribers,
147 * but we expect that subscribing is much less common than
148 * publishing. Unsubscribing occurs only during traversal loops,
149 * when BufferedSubscription methods return negative values
150 * signifying that they have been disabled. To reduce
151 * head-of-line blocking, submit and offer methods first call
152 * BufferedSubscription.offer on each subscriber, and place
153 * saturated ones in retries list (using nextRetry field), and
154 * retry, possibly blocking or dropping.
155 */
156 BufferedSubscription<T> clients;
157
158 /** Run status, updated only within locks */
159 volatile boolean closed;
160
161 // Parameters for constructing BufferedSubscriptions
162 final Executor executor;
163 final int maxBufferCapacity;
164
165 /**
166 * Creates a new SubmissionPublisher using the given Executor for
167 * async delivery to subscribers, and with the given maximum
168 * buffer size for each subscriber.
169 *
170 * @param executor the executor to use for async delivery,
171 * supporting creation of at least one independent thread
172 * @param maxBufferCapacity the maximum capacity for each
173 * subscriber's buffer (the enforced capacity may be rounded up to
174 * the nearest power of two and/or bounded by the largest value
175 * supported by this implementation; method {@link #getMaxBufferCapacity}
176 * returns the actual value)
177 * @throws NullPointerException if executor is null
178 * @throws IllegalArgumentException if maxBufferCapacity not
179 * positive
180 */
181 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
182 if (executor == null)
183 throw new NullPointerException();
184 if (maxBufferCapacity <= 0)
185 throw new IllegalArgumentException("capacity must be positive");
186 this.executor = executor;
187 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
188 }
189
190 /**
191 * Creates a new SubmissionPublisher using the {@link
192 * ForkJoinPool#commonPool()} for async delivery to subscribers,
193 * and with maximum buffer capacity of {@link
194 * Flow#defaultBufferSize}.
195 */
196 public SubmissionPublisher() {
197 this(ForkJoinPool.commonPool(), Flow.defaultBufferSize());
198 }
199
200 /**
201 * Adds the given Subscriber unless already subscribed. If
202 * already subscribed, the Subscriber's {@code onError} method is
203 * invoked on the existing subscription with an {@link
204 * IllegalStateException}. Otherwise, upon success, the
205 * Subscriber's {@code onSubscribe} method is invoked
206 * asynchronously with a new {@link Flow.Subscription}. If {@code
207 * onSubscribe} throws an exception, the subscription is
208 * cancelled. Otherwise, if this SubmissionPublisher is closed,
209 * the subscriber's {@code onComplete} method is then invoked.
210 * Subscribers may enable receiving items by invoking the {@code
211 * request} method of the new Subscription, and may unsubscribe by
212 * invoking its {@code cancel} method.
213 *
214 * @param subscriber the subscriber
215 * @throws NullPointerException if subscriber is null
216 */
217 public void subscribe(Flow.Subscriber<? super T> subscriber) {
218 if (subscriber == null) throw new NullPointerException();
219 BufferedSubscription<T> subscription =
220 new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
221 synchronized (this) {
222 for (BufferedSubscription<T> b = clients, pred = null;;) {
223 if (b == null) {
224 subscription.onSubscribe();
225 if (closed)
226 subscription.onComplete();
227 else if (pred == null)
228 clients = subscription;
229 else
230 pred.next = subscription;
231 break;
232 }
233 BufferedSubscription<T> next = b.next;
234 if (b.isDisabled()) { // remove
235 b.next = null; // detach
236 if (pred == null)
237 clients = next;
238 else
239 pred.next = next;
240 }
241 else if (subscriber.equals(b.subscriber)) {
242 b.onError(new IllegalStateException("Duplicate subscribe"));
243 break;
244 }
245 else
246 pred = b;
247 b = next;
248 }
249 }
250 }
251
252 /**
253 * Publishes the given item to each current subscriber by
254 * asynchronously invoking its {@link Flow.Subscriber#onNext}
255 * method, blocking uninterruptibly while resources for any
256 * subscriber are unavailable. This method returns an estimate of
257 * the maximum lag (number of items submitted but not yet
258 * consumed) among all current subscribers. This value is at least
259 * one (accounting for this submitted item) if there are any
260 * subscribers, else zero.
261 *
262 * <p>If the Executor for this publisher throws a
263 * RejectedExecutionException (or any other RuntimeException or
264 * Error) when attempting to asynchronously notify subscribers,
265 * then this exception is rethrown, in which case not all
266 * subscribers will have been issued this item.
267 *
268 * @param item the (non-null) item to publish
269 * @return the estimated maximum lag among subscribers
270 * @throws IllegalStateException if closed
271 * @throws NullPointerException if item is null
272 * @throws RejectedExecutionException if thrown by Executor
273 */
274 public int submit(T item) {
275 if (item == null) throw new NullPointerException();
276 int lag = 0;
277 boolean complete;
278 synchronized (this) {
279 complete = closed;
280 BufferedSubscription<T> b = clients;
281 if (!complete) {
282 BufferedSubscription<T> pred = null, r = null, rtail = null;
283 while (b != null) {
284 BufferedSubscription<T> next = b.next;
285 int stat = b.offer(item);
286 if (stat < 0) { // disabled
287 b.next = null;
288 if (pred == null)
289 clients = next;
290 else
291 pred.next = next;
292 }
293 else {
294 if (stat > lag)
295 lag = stat;
296 else if (stat == 0) { // place on retry list
297 b.nextRetry = null;
298 if (rtail == null)
299 r = b;
300 else
301 rtail.nextRetry = b;
302 rtail = b;
303 }
304 pred = b;
305 }
306 b = next;
307 }
308 while (r != null) {
309 BufferedSubscription<T> nextRetry = r.nextRetry;
310 r.nextRetry = null;
311 int stat = r.submit(item);
312 if (stat > lag)
313 lag = stat;
314 else if (stat < 0 && clients == r)
315 clients = r.next; // postpone internal unsubscribes
316 r = nextRetry;
317 }
318 }
319 }
320 if (complete)
321 throw new IllegalStateException("Closed");
322 else
323 return lag;
324 }
325
326 /**
327 * Publishes the given item, if possible, to each current
328 * subscriber by asynchronously invoking its {@link
329 * Flow.Subscriber#onNext} method. The item may be dropped by one
330 * or more subscribers if resource limits are exceeded, in which
331 * case the given handler (if non-null) is invoked, and if it
332 * returns true, retried once. Other calls to methods in this
333 * class by other threads are blocked while the handler is
334 * invoked. Unless recovery is assured, options are usually
335 * limited to logging the error and/or issuing an {@code onError}
336 * signal to the subscriber.
337 *
338 * <p>This method returns a status indicator: If negative, it
339 * represents the (negative) number of drops (failed attempts to
340 * issue the item to a subscriber). Otherwise it is an estimate of
341 * the maximum lag (number of items submitted but not yet
342 * consumed) among all current subscribers. This value is at least
343 * one (accounting for this submitted item) if there are any
344 * subscribers, else zero.
345 *
346 * <p>If the Executor for this publisher throws a
347 * RejectedExecutionException (or any other RuntimeException or
348 * Error) when attempting to asynchronously notify subscribers, or
349 * the drop handler throws an exception when processing a dropped
350 * item, then this exception is rethrown.
351 *
352 * @param item the (non-null) item to publish
353 * @param onDrop if non-null, the handler invoked upon a drop to a
354 * subscriber, with arguments of the subscriber and item; if it
355 * returns true, an offer is re-attempted (once)
356 * @return if negative, the (negative) number of drops; otherwise
357 * an estimate of maximum lag
358 * @throws IllegalStateException if closed
359 * @throws NullPointerException if item is null
360 * @throws RejectedExecutionException if thrown by Executor
361 */
362 public int offer(T item,
363 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
364 return doOffer(0L, item, onDrop);
365 }
366
367 /**
368 * Publishes the given item, if possible, to each current
369 * subscriber by asynchronously invoking its {@link
370 * Flow.Subscriber#onNext} method, blocking while resources for
371 * any subscription are unavailable, up to the specified timeout
372 * or the caller thread is interrupted, at which point the given
373 * handler (if non-null) is invoked, and if it returns true,
374 * retried once. (The drop handler may distinguish timeouts from
375 * interrupts by checking whether the current thread is
376 * interrupted.) Other calls to methods in this class by other
377 * threads are blocked while the handler is invoked. Unless
378 * recovery is assured, options are usually limited to logging the
379 * error and/or issuing an {@code onError} signal to the
380 * subscriber.
381 *
382 * <p>This method returns a status indicator: If negative, it
383 * represents the (negative) number of drops (failed attempts to
384 * issue the item to a subscriber). Otherwise it is an estimate of
385 * the maximum lag (number of items submitted but not yet
386 * consumed) among all current subscribers. This value is at least
387 * one (accounting for this submitted item) if there are any
388 * subscribers, else zero.
389 *
390 * <p>If the Executor for this publisher throws a
391 * RejectedExecutionException (or any other RuntimeException or
392 * Error) when attempting to asynchronously notify subscribers, or
393 * the drop handler throws an exception when processing a dropped
394 * item, then this exception is rethrown.
395 *
396 * @param item the (non-null) item to publish
397 * @param timeout how long to wait for resources for any subscriber
398 * before giving up, in units of {@code unit}
399 * @param unit a {@code TimeUnit} determining how to interpret the
400 * {@code timeout} parameter
401 * @param onDrop if non-null, the handler invoked upon a drop to a
402 * subscriber, with arguments of the subscriber and item; if it
403 * returns true, an offer is re-attempted (once)
404 * @return if negative, the (negative) number of drops; otherwise
405 * an estimate of maximum lag
406 * @throws IllegalStateException if closed
407 * @throws NullPointerException if item is null
408 * @throws RejectedExecutionException if thrown by Executor
409 */
410 public int offer(T item, long timeout, TimeUnit unit,
411 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
412 return doOffer(unit.toNanos(timeout), item, onDrop);
413 }
414
415 /** Common implementation for both forms of offer */
416 final int doOffer(long nanos, T item,
417 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
418 if (item == null) throw new NullPointerException();
419 int lag = 0, drops = 0;
420 boolean complete;
421 synchronized (this) {
422 complete = closed;
423 BufferedSubscription<T> b = clients;
424 if (!complete) {
425 BufferedSubscription<T> pred = null, r = null, rtail = null;
426 while (b != null) {
427 BufferedSubscription<T> next = b.next;
428 int stat = b.offer(item);
429 if (stat < 0) {
430 b.next = null;
431 if (pred == null)
432 clients = next;
433 else
434 pred.next = next;
435 }
436 else {
437 if (stat > lag)
438 lag = stat;
439 else if (stat == 0) {
440 b.nextRetry = null;
441 if (rtail == null)
442 r = b;
443 else
444 rtail.nextRetry = b;
445 rtail = b;
446 }
447 else if (stat > lag)
448 lag = stat;
449 pred = b;
450 }
451 b = next;
452 }
453 while (r != null) {
454 BufferedSubscription<T> nextRetry = r.nextRetry;
455 r.nextRetry = null;
456 int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
457 r.offer(item);
458 if (stat == 0 && onDrop != null &&
459 onDrop.test(r.subscriber, item))
460 stat = r.offer(item);
461 if (stat == 0)
462 ++drops;
463 else if (stat > lag)
464 lag = stat;
465 else if (stat < 0 && clients == r)
466 clients = r.next;
467 r = nextRetry;
468 }
469 }
470 }
471 if (complete)
472 throw new IllegalStateException("Closed");
473 else
474 return (drops > 0) ? -drops : lag;
475 }
476
477 /**
478 * Unless already closed, issues {@link
479 * Flow.Subscriber#onComplete} signals to current subscribers, and
480 * disallows subsequent attempts to publish. Upon return, this
481 * method does <em>NOT</em> guarantee that all subscribers have
482 * yet completed.
483 */
484 public void close() {
485 if (!closed) {
486 BufferedSubscription<T> b;
487 synchronized (this) {
488 b = clients;
489 clients = null;
490 closed = true;
491 }
492 while (b != null) {
493 BufferedSubscription<T> next = b.next;
494 b.next = null;
495 b.onComplete();
496 b = next;
497 }
498 }
499 }
500
501 /**
502 * Unless already closed, issues {@link Flow.Subscriber#onError}
503 * signals to current subscribers with the given error, and
504 * disallows subsequent attempts to publish. Upon return, this
505 * method does <em>NOT</em> guarantee that all subscribers have
506 * yet completed.
507 *
508 * @param error the {@code onError} argument sent to subscribers
509 * @throws NullPointerException if error is null
510 */
511 public void closeExceptionally(Throwable error) {
512 if (error == null)
513 throw new NullPointerException();
514 if (!closed) {
515 BufferedSubscription<T> b;
516 synchronized (this) {
517 b = clients;
518 clients = null;
519 closed = true;
520 }
521 while (b != null) {
522 BufferedSubscription<T> next = b.next;
523 b.next = null;
524 b.onError(error);
525 b = next;
526 }
527 }
528 }
529
530 /**
531 * Returns true if this publisher is not accepting submissions.
532 *
533 * @return true if closed
534 */
535 public boolean isClosed() {
536 return closed;
537 }
538
539 /**
540 * Returns true if this publisher has any subscribers.
541 *
542 * @return true if this publisher has any subscribers
543 */
544 public boolean hasSubscribers() {
545 boolean nonEmpty = false;
546 if (!closed) {
547 synchronized (this) {
548 for (BufferedSubscription<T> b = clients; b != null;) {
549 BufferedSubscription<T> next = b.next;
550 if (b.isDisabled()) {
551 b.next = null;
552 b = clients = next;
553 }
554 else {
555 nonEmpty = true;
556 break;
557 }
558 }
559 }
560 }
561 return nonEmpty;
562 }
563
564 /**
565 * Returns the number of current subscribers.
566 *
567 * @return the number of current subscribers
568 */
569 public int getNumberOfSubscribers() {
570 int count = 0;
571 if (!closed) {
572 synchronized (this) {
573 BufferedSubscription<T> pred = null, next;
574 for (BufferedSubscription<T> b = clients; b != null; b = next) {
575 next = b.next;
576 if (b.isDisabled()) {
577 b.next = null;
578 if (pred == null)
579 clients = next;
580 else
581 pred.next = next;
582 }
583 else {
584 pred = b;
585 ++count;
586 }
587 }
588 }
589 }
590 return count;
591 }
592
593 /**
594 * Returns the Executor used for asynchronous delivery.
595 *
596 * @return the Executor used for asynchronous delivery
597 */
598 public Executor getExecutor() {
599 return executor;
600 }
601
602 /**
603 * Returns the maximum per-subscriber buffer capacity.
604 *
605 * @return the maximum per-subscriber buffer capacity
606 */
607 public int getMaxBufferCapacity() {
608 return maxBufferCapacity;
609 }
610
611 /**
612 * Returns a list of current subscribers for monitoring and
613 * tracking purposes, not for invoking {@link Flow.Subscriber}
614 * methods on the subscribers.
615 *
616 * @return list of current subscribers
617 */
618 public List<Flow.Subscriber<? super T>> getSubscribers() {
619 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
620 synchronized (this) {
621 BufferedSubscription<T> pred = null, next;
622 for (BufferedSubscription<T> b = clients; b != null; b = next) {
623 next = b.next;
624 if (b.isDisabled()) {
625 b.next = null;
626 if (pred == null)
627 clients = next;
628 else
629 pred.next = next;
630 }
631 else
632 subs.add(b.subscriber);
633 }
634 }
635 return subs;
636 }
637
638 /**
639 * Returns true if the given Subscriber is currently subscribed.
640 *
641 * @param subscriber the subscriber
642 * @return true if currently subscribed
643 * @throws NullPointerException if subscriber is null
644 */
645 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
646 if (subscriber == null) throw new NullPointerException();
647 if (!closed) {
648 synchronized (this) {
649 BufferedSubscription<T> pred = null, next;
650 for (BufferedSubscription<T> b = clients; b != null; b = next) {
651 next = b.next;
652 if (b.isDisabled()) {
653 b.next = null;
654 if (pred == null)
655 clients = next;
656 else
657 pred.next = next;
658 }
659 else if (subscriber.equals(b.subscriber))
660 return true;
661 else
662 pred = b;
663 }
664 }
665 }
666 return false;
667 }
668
669 /**
670 * Returns an estimate of the minimum number of items requested
671 * (via {@link Flow.Subscription#request}) but not yet produced,
672 * among all current subscribers.
673 *
674 * @return the estimate, or zero if no subscribers
675 */
676 public long estimateMinimumDemand() {
677 long min = Long.MAX_VALUE;
678 boolean nonEmpty = false;
679 synchronized (this) {
680 BufferedSubscription<T> pred = null, next;
681 for (BufferedSubscription<T> b = clients; b != null; b = next) {
682 int n; long d;
683 next = b.next;
684 if ((n = b.estimateLag()) < 0) {
685 b.next = null;
686 if (pred == null)
687 clients = next;
688 else
689 pred.next = next;
690 }
691 else {
692 if ((d = b.demand - n) < min)
693 min = d;
694 nonEmpty = true;
695 pred = b;
696 }
697 }
698 }
699 return nonEmpty ? min : 0;
700 }
701
702 /**
703 * Returns an estimate of the maximum number of items produced but
704 * not yet consumed among all current subscribers.
705 *
706 * @return the estimate
707 */
708 public int estimateMaximumLag() {
709 int max = 0;
710 synchronized (this) {
711 BufferedSubscription<T> pred = null, next;
712 for (BufferedSubscription<T> b = clients; b != null; b = next) {
713 int n;
714 next = b.next;
715 if ((n = b.estimateLag()) < 0) {
716 b.next = null;
717 if (pred == null)
718 clients = next;
719 else
720 pred.next = next;
721 }
722 else {
723 if (n > max)
724 max = n;
725 pred = b;
726 }
727 }
728 }
729 return max;
730 }
731
732 /**
733 * A task for consuming buffer items and signals, created and
734 * executed whenever they become available. A task consumes as
735 * many items/signals as possible before terminating, at which
736 * point another task is created when needed. The dual Runnable
737 * and ForkJoinTask declaration saves overhead when executed by
738 * ForkJoinPools, without impacting other kinds of Executors.
739 */
740 @SuppressWarnings("serial")
741 static final class ConsumerTask<T> extends ForkJoinTask<Void>
742 implements Runnable {
743 final BufferedSubscription<T> consumer;
744 ConsumerTask(BufferedSubscription<T> consumer) {
745 this.consumer = consumer;
746 }
747 public final Void getRawResult() { return null; }
748 public final void setRawResult(Void v) {}
749 public final boolean exec() { consumer.consume(); return false; }
750 public final void run() { consumer.consume(); }
751 }
752
753 /**
754 * A bounded (ring) buffer with integrated control to start a
755 * consumer task whenever items are available. The buffer
756 * algorithm is similar to one used inside ForkJoinPool,
757 * specialized for the case of at most one concurrent producer and
758 * consumer, and power of two buffer sizes. This allows methods to
759 * operate without locks even while supporting resizing, blocking,
760 * task-triggering, and garbage-free buffers (nulling out elements
761 * when consumed), although supporting these does impose a bit of
762 * overhead compared to plain fixed-size ring buffers.
763 *
764 * The publisher guarantees a single producer via its lock. We
765 * ensure in this class that there is at most one consumer. The
766 * request and cancel methods must be fully thread-safe but are
767 * coded to exploit the most common case in which they are only
768 * called by consumers (usually within onNext).
769 *
770 * Execution control is managed using the ACTIVE ctl bit. We
771 * ensure that a task is active when consumable items (and
772 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
773 * there is demand (unfilled requests). This is complicated on
774 * the creation side by the possibility of exceptions when trying
775 * to execute tasks. These eventually force DISABLED state, but
776 * sometimes not directly. On the task side, termination (clearing
777 * ACTIVE) that would otherwise race with producers or request()
778 * calls uses the CONSUME keep-alive bit to force a recheck.
779 *
780 * The ctl field also manages run state. When DISABLED, no further
781 * updates are possible. Disabling may be preceded by setting
782 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
783 * case the associated Subscriber methods are invoked, possibly
784 * synchronously if there is no active consumer task (including
785 * cases where execute() failed). The cancel() method is supported
786 * by treating as ERROR but suppressing onError signal.
787 *
788 * Support for blocking also exploits the fact that there is only
789 * one possible waiter. ManagedBlocker-compatible control fields
790 * are placed in this class itself rather than in wait-nodes.
791 * Blocking control relies on the "waiter" field. Producers set
792 * the field before trying to block, but must then recheck (via
793 * offer) before parking. Signalling then just unparks and clears
794 * waiter field. If the producer and consumer are both in the same
795 * ForkJoinPool, or consumers are running in commonPool, the
796 * producer attempts to help run consumer tasks that it forked
797 * before blocking. To avoid potential cycles, only one level of
798 * helping is currently supported.
799 *
800 * This class uses @Contended and heuristic field declaration
801 * ordering to reduce false-sharing-based memory contention among
802 * instances of BufferedSubscription, but it does not currently
803 * attempt to avoid memory contention among buffers. This field
804 * and element packing can hurt performance especially when each
805 * publisher has only one client operating at a high rate.
806 * Addressing this may require allocating substantially more space
807 * than users expect.
808 */
809 @SuppressWarnings("serial")
810 @sun.misc.Contended
811 static final class BufferedSubscription<T>
812 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
813 // Order-sensitive field declarations
814 long timeout; // > 0 if timed wait
815 volatile long demand; // # unfilled requests
816 int maxCapacity; // reduced on OOME
817 int putStat; // offer result for ManagedBlocker
818 int helpDepth; // nested helping depth (at most 1)
819 volatile int ctl; // atomic run state flags
820 volatile int head; // next position to take
821 volatile int tail; // next position to put
822 Object[] array; // buffer: null if disabled
823 Flow.Subscriber<? super T> subscriber; // null if disabled
824 Executor executor; // null if disabled
825 volatile Throwable pendingError; // holds until onError issued
826 volatile Thread waiter; // blocked producer thread
827 T putItem; // for offer within ManagedBlocker
828 BufferedSubscription<T> next; // used only by publisher
829 BufferedSubscription<T> nextRetry; // used only by publisher
830
831 // ctl values
832 static final int ACTIVE = 0x01; // consumer task active
833 static final int CONSUME = 0x02; // keep-alive for consumer task
834 static final int DISABLED = 0x04; // final state
835 static final int ERROR = 0x08; // signal onError then disable
836 static final int SUBSCRIBE = 0x10; // signal onSubscribe
837 static final int COMPLETE = 0x20; // signal onComplete when done
838
839 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
840
841 /**
842 * Initial/Minimum buffer capacity. Must be a power of two, at least 2.
843 */
844 static final int MINCAP = 8;
845
846 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
847 Executor executor, int maxBufferCapacity) {
848 this.subscriber = subscriber;
849 this.executor = executor;
850 this.maxCapacity = maxBufferCapacity;
851 }
852
853 final boolean isDisabled() {
854 return ctl == DISABLED;
855 }
856
857 /**
858 * Returns estimated number of buffered items, or -1 if
859 * disabled
860 */
861 final int estimateLag() {
862 int n;
863 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
864 }
865
866 /**
867 * Tries to add item and start consumer task if necessary.
868 * @return -1 if disabled, 0 if dropped, else estimated lag
869 */
870 final int offer(T item) {
871 Object[] a = array;
872 int t = tail, size = t + 1 - head, stat, cap, i;
873 if (a == null || (cap = a.length) < size || (i = t & (cap - 1)) < 0)
874 stat = growAndAdd(a, item);
875 else {
876 a[i] = item;
877 U.putOrderedInt(this, TAIL, t + 1);
878 stat = size;
879 }
880 return (stat > 0 &&
881 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
882 startOnOffer(stat) : stat;
883 }
884
885 /**
886 * Tries to create or expand buffer, then adds item if possible.
887 */
888 private int growAndAdd(Object[] a, T item) {
889 int cap, stat;
890 if ((ctl & (ERROR | DISABLED)) != 0) {
891 cap = 0;
892 stat = -1;
893 }
894 else if (a == null) {
895 cap = 0;
896 stat = 1;
897 }
898 else if ((cap = a.length) >= maxCapacity)
899 stat = 0; // cannot grow
900 else
901 stat = cap + 1;
902 if (stat > 0) {
903 int newCap = (cap > 0 ? cap << 1 :
904 maxCapacity >= MINCAP ? MINCAP :
905 maxCapacity >= 2 ? maxCapacity : 2);
906 if (newCap <= cap)
907 stat = 0;
908 else {
909 Object[] newArray = null;
910 try {
911 newArray = new Object[newCap];
912 } catch (Throwable ex) { // try to cope with OOME
913 }
914 if (newArray == null) {
915 if (cap > 0)
916 maxCapacity = cap; // avoid continuous failure
917 stat = 0;
918 }
919 else {
920 array = newArray;
921 int t = tail;
922 int newMask = newCap - 1;
923 if (a != null && cap > 0) {
924 int mask = cap - 1;
925 for (int j = head; j != t; ++j) {
926 Object x; // races with consumer
927 int k = j & mask;
928 if ((x = a[k]) != null &&
929 U.compareAndSwapObject(
930 a, (((long)k) << ASHIFT) + ABASE,
931 x, null))
932 newArray[j & newMask] = x;
933 }
934 }
935 newArray[t & newMask] = item;
936 tail = t + 1;
937 }
938 }
939 }
940 return stat;
941 }
942
943 /**
944 * Spins/helps/blocks while offer returns 0. Called only if
945 * initial offer return 0.
946 */
947 final int submit(T item) {
948 int stat = 0;
949 Executor e = executor;
950 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
951 Thread thread = Thread.currentThread();
952 // split cases to help compiler specialization
953 if ((thread instanceof ForkJoinWorkerThread) &&
954 ((ForkJoinWorkerThread)thread).getPool() == e) {
955 ForkJoinTask<?> t;
956 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
957 (t instanceof ConsumerTask)) {
958 if ((stat = offer(item)) != 0 || !t.tryUnfork())
959 break;
960 ((ConsumerTask<?>)t).consumer.helpConsume();
961 }
962 }
963 else if (e == ForkJoinPool.commonPool()) {
964 ForkJoinTask<?> t;
965 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
966 (t instanceof ConsumerTask)) {
967 if ((stat = offer(item)) != 0 || !t.tryUnfork())
968 break;
969 ((ConsumerTask<?>)t).consumer.helpConsume();
970 }
971 }
972 }
973 if (stat == 0 && (stat = offer(item)) == 0) {
974 putItem = item;
975 timeout = 0L;
976 try {
977 ForkJoinPool.managedBlock(this);
978 } catch (InterruptedException ie) {
979 timeout = INTERRUPTED;
980 }
981 stat = putStat;
982 if (timeout < 0L)
983 Thread.currentThread().interrupt();
984 }
985 return stat;
986 }
987
988 /**
989 * Timeout version; similar to submit
990 */
991 final int timedOffer(T item, long nanos) {
992 int stat = 0;
993 Executor e = executor;
994 if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
995 Thread thread = Thread.currentThread();
996 if (((thread instanceof ForkJoinWorkerThread) &&
997 ((ForkJoinWorkerThread)thread).getPool() == e) ||
998 e == ForkJoinPool.commonPool()) {
999 ForkJoinTask<?> t;
1000 long deadline = System.nanoTime() + nanos;
1001 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1002 (t instanceof ConsumerTask)) {
1003 if ((stat = offer(item)) != 0 ||
1004 (nanos = deadline - System.nanoTime()) <= 0L ||
1005 !t.tryUnfork())
1006 break;
1007 ((ConsumerTask<?>)t).consumer.helpConsume();
1008 }
1009 }
1010 }
1011 if (stat == 0 && (stat = offer(item)) == 0 &&
1012 (timeout = nanos) > 0L) {
1013 putItem = item;
1014 try {
1015 ForkJoinPool.managedBlock(this);
1016 } catch (InterruptedException ie) {
1017 timeout = INTERRUPTED;
1018 }
1019 stat = putStat;
1020 if (timeout < 0L)
1021 Thread.currentThread().interrupt();
1022 }
1023 return stat;
1024 }
1025
1026 /** Version of consume called when helping in submit or timedOffer */
1027 private void helpConsume() {
1028 helpDepth = 1; // only one level allowed
1029 consume();
1030 helpDepth = 0;
1031 }
1032
1033 /**
1034 * Tries to start consumer task after offer.
1035 * @return -1 if now disabled, else argument
1036 */
1037 private int startOnOffer(int stat) {
1038 for (;;) {
1039 Executor e; int c;
1040 if ((c = ctl) == DISABLED || (e = executor) == null) {
1041 stat = -1;
1042 break;
1043 }
1044 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1045 if ((c & CONSUME) != 0 ||
1046 U.compareAndSwapInt(this, CTL, c,
1047 c | CONSUME))
1048 break;
1049 }
1050 else if (demand == 0L || tail == head)
1051 break;
1052 else if (U.compareAndSwapInt(this, CTL, c,
1053 c | (ACTIVE | CONSUME))) {
1054 try {
1055 e.execute(new ConsumerTask<T>(this));
1056 break;
1057 } catch (RuntimeException | Error ex) { // back out
1058 do {} while (((c = ctl) & DISABLED) == 0 &&
1059 (c & ACTIVE) != 0 &&
1060 !U.compareAndSwapInt(this, CTL, c,
1061 c & ~ACTIVE));
1062 throw ex;
1063 }
1064 }
1065 }
1066 return stat;
1067 }
1068
1069 /**
1070 * Nulls out most fields, mainly to avoid garbage retention
1071 * until publisher unsubscribes, but also to help cleanly stop
1072 * upon error by nulling required components.
1073 */
1074 private void detach() {
1075 Thread w = waiter;
1076 array = null;
1077 executor = null;
1078 subscriber = null;
1079 pendingError = null;
1080 waiter = null;
1081 if (w != null)
1082 LockSupport.unpark(w); // force wakeup
1083 }
1084
1085 /**
1086 * Issues error signal, asynchronously if a task is running,
1087 * else synchronously.
1088 */
1089 final void onError(Throwable ex) {
1090 for (int c;;) {
1091 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1092 break;
1093 else if ((c & ACTIVE) != 0) {
1094 pendingError = ex;
1095 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1096 break; // cause consumer task to exit
1097 }
1098 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1099 Flow.Subscriber<? super T> s = subscriber;
1100 if (s != null && ex != null) {
1101 try {
1102 s.onError(ex);
1103 } catch (Throwable ignore) {
1104 }
1105 }
1106 detach();
1107 break;
1108 }
1109 }
1110 }
1111
1112 /**
1113 * Tries to start consumer task upon a signal or request;
1114 * disables on failure.
1115 */
1116 private void startOrDisable() {
1117 Executor e;
1118 if ((e = executor) != null) { // skip if already disabled
1119 try {
1120 e.execute(new ConsumerTask<T>(this));
1121 } catch (Throwable ex) { // back out and force signal
1122 for (int c;;) {
1123 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1124 break;
1125 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1126 onError(ex);
1127 break;
1128 }
1129 }
1130 }
1131 }
1132 }
1133
1134 final void onComplete() {
1135 for (int c;;) {
1136 if ((c = ctl) == DISABLED)
1137 break;
1138 if (U.compareAndSwapInt(this, CTL, c,
1139 c | (ACTIVE | CONSUME | COMPLETE))) {
1140 if ((c & ACTIVE) == 0)
1141 startOrDisable();
1142 break;
1143 }
1144 }
1145 }
1146
1147 final void onSubscribe() {
1148 for (int c;;) {
1149 if ((c = ctl) == DISABLED)
1150 break;
1151 if (U.compareAndSwapInt(this, CTL, c,
1152 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1153 if ((c & ACTIVE) == 0)
1154 startOrDisable();
1155 break;
1156 }
1157 }
1158 }
1159
1160 /**
1161 * Causes consumer task to exit if active (without reporting
1162 * onError unless there is already a pending error), and
1163 * disables.
1164 */
1165 public void cancel() {
1166 for (int c;;) {
1167 if ((c = ctl) == DISABLED)
1168 break;
1169 else if ((c & ACTIVE) != 0) {
1170 if (U.compareAndSwapInt(this, CTL, c,
1171 c | (CONSUME | ERROR)))
1172 break;
1173 }
1174 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1175 detach();
1176 break;
1177 }
1178 }
1179 }
1180
1181 /**
1182 * Adds to demand and possibly starts task.
1183 */
1184 public void request(long n) {
1185 if (n > 0L) {
1186 for (;;) {
1187 long prev = demand, d;
1188 if ((d = prev + n) < prev) // saturate
1189 d = Long.MAX_VALUE;
1190 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1191 for (int c, h;;) {
1192 if ((c = ctl) == DISABLED)
1193 break;
1194 else if ((c & ACTIVE) != 0) {
1195 if ((c & CONSUME) != 0 ||
1196 U.compareAndSwapInt(this, CTL, c,
1197 c | CONSUME))
1198 break;
1199 }
1200 else if ((h = head) != tail) {
1201 if (U.compareAndSwapInt(this, CTL, c,
1202 c | (ACTIVE|CONSUME))) {
1203 startOrDisable();
1204 break;
1205 }
1206 }
1207 else if (head == h && tail == h)
1208 break; // else stale
1209 if (demand == 0L)
1210 break;
1211 }
1212 break;
1213 }
1214 }
1215 }
1216 else if (n < 0L)
1217 onError(new IllegalArgumentException(
1218 "negative subscription request"));
1219 }
1220
1221 public final boolean isReleasable() { // for ManagedBlocker
1222 T item = putItem;
1223 if (item != null) { // A few randomized spins
1224 for (int spins = MINCAP, r = 0;;) {
1225 if ((putStat = offer(item)) != 0) {
1226 putItem = null;
1227 break;
1228 }
1229 else if (r == 0)
1230 r = ThreadLocalRandom.nextSecondarySeed();
1231 else {
1232 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1233 if (r >= 0 && --spins <= 0)
1234 return false;
1235 }
1236 }
1237 }
1238 return true;
1239 }
1240
1241 public final boolean block() { // for ManagedBlocker
1242 T item = putItem;
1243 if (item != null) {
1244 putItem = null;
1245 long nanos = timeout;
1246 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1247 while ((putStat = offer(item)) == 0) {
1248 if (Thread.interrupted()) {
1249 timeout = INTERRUPTED;
1250 if (nanos > 0L)
1251 break;
1252 }
1253 else if (nanos > 0L &&
1254 (nanos = deadline - System.nanoTime()) <= 0L)
1255 break;
1256 else if (waiter == null)
1257 waiter = Thread.currentThread();
1258 else {
1259 if (nanos > 0L)
1260 LockSupport.parkNanos(this, nanos);
1261 else
1262 LockSupport.park(this);
1263 waiter = null;
1264 }
1265 }
1266 }
1267 waiter = null;
1268 return true;
1269 }
1270
1271 /**
1272 * Consumer loop, called from ConsumerTask, or indirectly
1273 * via helpConsume when helping during submit.
1274 */
1275 final void consume() {
1276 Flow.Subscriber<? super T> s;
1277 if ((s = subscriber) != null) { // else disabled
1278 for (int h = head;;) {
1279 long d = demand;
1280 int c, n, i; Object[] a; Object x; Thread w;
1281 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1282 if ((c & ERROR) != 0) {
1283 Throwable ex = pendingError;
1284 ctl = DISABLED; // no need for CAS
1285 if (ex != null) { // null if errorless cancel
1286 try {
1287 s.onError(ex);
1288 } catch (Throwable ignore) {
1289 }
1290 }
1291 }
1292 else if ((c & SUBSCRIBE) != 0) {
1293 if (U.compareAndSwapInt(this, CTL, c,
1294 c & ~SUBSCRIBE)) {
1295 try {
1296 s.onSubscribe(this);
1297 } catch (Throwable ex) {
1298 onError(ex);
1299 }
1300 }
1301 }
1302 else {
1303 detach();
1304 break;
1305 }
1306 }
1307 else if (h == tail) { // apparently empty
1308 if ((c & CONSUME) != 0) // recheck
1309 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1310 else if ((c & COMPLETE) != 0) {
1311 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1312 try {
1313 s.onComplete();
1314 } catch (Throwable ignore) {
1315 }
1316 }
1317 }
1318 else if (U.compareAndSwapInt(this, CTL, c,
1319 c & ~ACTIVE))
1320 break;
1321 }
1322 else if (d == 0L) { // can't consume
1323 if (demand == 0L) { // recheck
1324 if ((c & CONSUME) != 0)
1325 U.compareAndSwapInt(this, CTL, c,
1326 c & ~CONSUME);
1327 else if (U.compareAndSwapInt(this, CTL, c,
1328 c & ~ACTIVE))
1329 break;
1330 }
1331 }
1332 else if ((a = array) == null || (n = a.length) == 0 ||
1333 (x = a[i = h & (n - 1)]) == null)
1334 ; // stale; retry
1335 else if ((c & CONSUME) == 0)
1336 U.compareAndSwapInt(this, CTL, c, c | CONSUME);
1337 else if (U.compareAndSwapObject(
1338 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1339 U.putOrderedInt(this, HEAD, ++h);
1340 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1341 d = demand; // almost never fails
1342 if ((w = waiter) != null) {
1343 waiter = null;
1344 LockSupport.unpark(w); // release producer
1345 }
1346 try {
1347 @SuppressWarnings("unchecked") T y = (T) x;
1348 s.onNext(y);
1349 } catch (Throwable ex) {
1350 onError(ex);
1351 }
1352 }
1353 }
1354 }
1355 }
1356
1357 // Unsafe mechanics
1358 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1359 private static final long CTL;
1360 private static final long TAIL;
1361 private static final long HEAD;
1362 private static final long DEMAND;
1363 private static final int ABASE;
1364 private static final int ASHIFT;
1365
1366 static {
1367 try {
1368 CTL = U.objectFieldOffset
1369 (BufferedSubscription.class.getDeclaredField("ctl"));
1370 TAIL = U.objectFieldOffset
1371 (BufferedSubscription.class.getDeclaredField("tail"));
1372 HEAD = U.objectFieldOffset
1373 (BufferedSubscription.class.getDeclaredField("head"));
1374 DEMAND = U.objectFieldOffset
1375 (BufferedSubscription.class.getDeclaredField("demand"));
1376
1377 ABASE = U.arrayBaseOffset(Object[].class);
1378 int scale = U.arrayIndexScale(Object[].class);
1379 if ((scale & (scale - 1)) != 0)
1380 throw new Error("data type scale not a power of two");
1381 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1382 } catch (ReflectiveOperationException e) {
1383 throw new Error(e);
1384 }
1385 }
1386 }
1387 }