ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.20
Committed: Tue Jan 20 15:54:33 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.19: +158 -109 lines
Log Message:
Better FJ integration

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted items
18 * to current subscribers until it is closed. Each current subscriber
19 * receives newly submitted items in the same order unless drops or
20 * exceptions are encountered. Using a SubmissionPublisher allows
21 * item generators to act as Publishers relying on drop handling
22 * and/or blocking for flow control.
23 *
24 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using a
30 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use with a given initial capacity,
35 * and are resized as needed up to the maximum. (Capacity arguments
36 * may be rounded up to powers of two.) Invocations of {@link
37 * Flow.Subscription#request} do not directly result in buffer
38 * expansion, but risk saturation if unfulfilled requests exceed the
39 * maximum capacity. Choices of buffer parameters rely on expected
40 * rates, resources, and usages, that usually benefit from empirical
41 * testing. As first guesses, consider initial 8 and maximum 1024.
42 *
43 * <p>An overloaded version of method {@code subscribe} allows
44 * subscribers to override executor and buffer parameters for
45 * a new subscription.
46 *
47 * <p>Publication methods support different policies about what to do
48 * when buffers are saturated. Method {@link #submit} blocks until
49 * resources are available. This is simplest, but least
50 * responsive. The {@code offer} methods may drop items (either
51 * immediately or with bounded timeout), but provide an opportunity to
52 * interpose a handler and then retry.
53 *
54 * <p>If any Subscriber method throws an exception, its subscription
55 * is cancelled. If the supplied Executor throws
56 * RejectedExecutionException (or any other RuntimeException or Error)
57 * when attempting to execute a task, or a drop handler throws an
58 * exception when processing a dropped item, then the exception is
59 * rethrown. In these cases, some but not all subscribers may have
60 * received items. It is usually good practice to {@link
61 * #closeExceptionally closeExceptionally} in these cases.
62 *
63 * <p>This class may also serve as a convenient base for subclasses
64 * that generate items, and use the methods in this class to publish
65 * them. For example here is a class that periodically publishes the
66 * items generated from a supplier. (In practice you might add methods
67 * to independently start and stop generation, to share schedulers
68 * among publishers, and so on, or instead use a SubmissionPublisher
69 * as a component rather than a superclass.)
70 *
71 * <pre> {@code
72 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
73 * final ScheduledFuture<?> periodicTask;
74 * final ScheduledExecutorService scheduler;
75 * PeriodicPublisher(Executor executor, int initialBufferCapacity,
76 * int maxBufferCapacity, Supplier<? extends T> supplier,
77 * long period, TimeUnit unit) {
78 * super(executor, initialBufferCapacity, maxBufferCapacity);
79 * scheduler = new ScheduledThreadPoolExecutor(1);
80 * periodicTask = scheduler.scheduleAtFixedRate(
81 * () -> submit(supplier.get()), 0, period, unit);
82 * }
83 * public void close() {
84 * periodicTask.cancel(false);
85 * scheduler.shutdown();
86 * super.close();
87 * }
88 * }}</pre>
89 *
90 * <p>Here is an example of {@link Flow.Processor} subclass (using
91 * single-step requests to its publisher, for simplicity of
92 * illustration):
93 *
94 * <pre> {@code
95 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
96 * implements Flow.Processor<S,T> {
97 * final Function<? super S, ? extends T> function;
98 * Flow.Subscription subscription;
99 * TransformProcessor(Executor executor, int initialBufferCapacity,
100 * int maxBufferCapacity,
101 * Function<? super S, ? extends T> function) {
102 * super(executor, initialBufferCapacity, maxBufferCapacity);
103 * this.function = function;
104 * }
105 * public void onSubscribe(Flow.Subscription subscription) {
106 * (this.subscription = subscription).request(1);
107 * }
108 * public void onNext(S item) {
109 * subscription.request(1);
110 * submit(function.apply(item));
111 * }
112 * public void onError(Throwable ex) { closeExceptionally(ex); }
113 * public void onComplete() { close(); }
114 * }}</pre>
115 *
116 * @param <T> the published item type
117 * @author Doug Lea
118 * @since 1.9
119 */
120 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
121 AutoCloseable {
122 /*
123 * Most mechanics are handled by BufferedSubscription. This class
124 * mainly tracks subscribers and ensures sequentiality, by using
125 * built-in synchronization locks across public methods. (Using
126 * built-in locks works well in the most typical case in which
127 * only one thread submits items).
128 */
129
130 // Ensuring that all arrays have power of two length
131
132 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
133 static final int roundCapacity(int cap) { // to nearest power of 2
134 int n = cap - 1;
135 n |= n >>> 1;
136 n |= n >>> 2;
137 n |= n >>> 4;
138 n |= n >>> 8;
139 n |= n >>> 16;
140 return (n < 0) ? 1 :
141 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
142 }
143
144 /**
145 * Clients (BufferedSubscriptions) are maintained in a linked list
146 * (via their "next" fields). This works well for publish loops.
147 * It requires O(n) traversal to check for duplicate subscribers,
148 * but we expect that subscribing is much less common than
149 * publishing. Unsubscribing occurs only during traversal loops,
150 * when BufferedSubscription methods or status checks return
151 * negative values signifying that they have been disabled.
152 */
153 BufferedSubscription<T> clients;
154
155 /** Run status, updated only within locks */
156 volatile boolean closed;
157
158 // Parameters for constructing BufferedSubscriptions
159 final Executor executor;
160 final int minBufferCapacity;
161 final int maxBufferCapacity;
162
163 /**
164 * Screens user arguments
165 */
166 static void checkParams(Executor executor,
167 int initialBufferCapacity,
168 int maxBufferCapacity) {
169 if (executor == null)
170 throw new NullPointerException();
171 if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
172 throw new IllegalArgumentException("capacity must be positive");
173 if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
174 throw new IllegalArgumentException("capacity exceeds limit");
175 if (initialBufferCapacity > maxBufferCapacity)
176 throw new IllegalArgumentException("initial cannot exceed max capacity");
177 }
178
179
180 /**
181 * Creates a new SubmissionPublisher using the given Executor for
182 * async delivery to subscribers, and with the given initial and
183 * maximum buffer sizes for each subscriber. In the absence of
184 * other constraints, consider using {@code
185 * ForkJoinPool.commonPool(), 8, 1024}.
186 *
187 * @param executor the executor to use for async delivery,
188 * supporting creation of at least one independent thread
189 * @param initialBufferCapacity the initial capacity for each
190 * subscriber's buffer (the actual capacity may be rounded up to
191 * the nearest power of two)
192 * @param maxBufferCapacity the maximum capacity for each
193 * subscriber's buffer (the actual capacity may be rounded up to
194 * the nearest power of two)
195 * @throws NullPointerException if executor is null
196 * @throws IllegalArgumentException if initialBufferCapacity is
197 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
198 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
199 * a power of two array size
200 */
201 public SubmissionPublisher(Executor executor,
202 int initialBufferCapacity,
203 int maxBufferCapacity) {
204 checkParams(executor, initialBufferCapacity, maxBufferCapacity);
205 int minc = roundCapacity(initialBufferCapacity);
206 int maxc = roundCapacity(maxBufferCapacity);
207 this.executor = executor;
208 this.minBufferCapacity = minc;
209 this.maxBufferCapacity = maxc;
210 }
211
212 /**
213 * Adds the given Subscriber unless already subscribed. If
214 * already subscribed, the Subscriber's onError method is invoked
215 * with an IllegalStateException. Otherwise, upon success, the
216 * Subscriber's onSubscribe method is invoked with a new
217 * Subscription. If onSubscribe throws an exception, the
218 * subscription is cancelled. Otherwise, if this
219 * SubmissionPublisher is closed, the subscriber's onComplete
220 * method is then invoked. Subscribers may enable receiving items
221 * by invoking the {@code request} method of the new Subscription,
222 * and may unsubscribe by invoking its cancel method.
223 *
224 * @param subscriber the subscriber
225 * @throws NullPointerException if subscriber is null
226 */
227 public void subscribe(Flow.Subscriber<? super T> subscriber) {
228 doSubscribe(subscriber, executor, minBufferCapacity, maxBufferCapacity);
229 }
230
231 /**
232 * Adds the given subscriber, with the same effects as {@link
233 * #subscribe(Flow.Subscriber)}, but overriding the
234 * executor and/or buffer capacities otherwise used by this
235 * Publisher.
236 *
237 * @param subscriber the subscriber
238 * @param executor the executor to use for async delivery,
239 * supporting creation of at least one independent thread
240 * @param initialBufferCapacity the initial capacity for each
241 * subscriber's buffer (the actual capacity may be rounded up to
242 * the nearest power of two)
243 * @param maxBufferCapacity the maximum capacity for each
244 * subscriber's buffer (the actual capacity may be rounded up to
245 * the nearest power of two)
246 * @throws NullPointerException if executor or subscriber are null
247 * @throws IllegalArgumentException if initialBufferCapacity is
248 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
249 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
250 * a power of two array size
251 */
252 public void subscribe(Flow.Subscriber<? super T> subscriber,
253 Executor executor,
254 int initialBufferCapacity,
255 int maxBufferCapacity) {
256 checkParams(executor, initialBufferCapacity, maxBufferCapacity);
257 doSubscribe(subscriber, executor,
258 roundCapacity(initialBufferCapacity),
259 roundCapacity(maxBufferCapacity));
260 }
261
262 /**
263 * Implements both forms of subscribe
264 */
265 final void doSubscribe(Flow.Subscriber<? super T> subscriber,
266 Executor e, int minCap, int maxCap) {
267 if (subscriber == null) throw new NullPointerException();
268 BufferedSubscription<T> subscription =
269 new BufferedSubscription<T>(subscriber, e, minCap, maxCap);
270 boolean present = false, complete;
271 synchronized (this) {
272 complete = closed;
273 BufferedSubscription<T> pred = null, next;
274 for (BufferedSubscription<T> b = clients; ; b = next) {
275 if (b == null) {
276 if (pred == null)
277 clients = subscription;
278 else
279 pred.next = subscription;
280 subscription.onSubscribe();
281 break;
282 }
283 next = b.next;
284 if (b.isDisabled()) { // remove
285 if (pred == null)
286 clients = next;
287 else
288 pred.next = next;
289 }
290 else if (subscriber.equals(b.subscriber)) {
291 present = true;
292 break;
293 }
294 pred = b;
295 }
296 }
297 if (present)
298 subscriber.onError(new IllegalStateException("Already subscribed"));
299 else if (complete)
300 subscription.onComplete();
301 }
302
303 /**
304 * Publishes the given item to each current subscriber by
305 * asynchronously invoking its onNext method, blocking
306 * uninterruptibly while resources for any subscriber are
307 * unavailable.
308 *
309 * <p>If the Executor for this publisher throws a
310 * RejectedExecutionException (or any other RuntimeException or
311 * Error) when attempting to asynchronously notify subscribers,
312 * then this exception is rethrown.
313 *
314 * @param item the (non-null) item to publish
315 * @throws IllegalStateException if closed
316 * @throws NullPointerException if item is null
317 * @throws RejectedExecutionException if thrown by Executor
318 */
319 public void submit(T item) {
320 if (item == null) throw new NullPointerException();
321 synchronized (this) {
322 if (closed)
323 throw new IllegalStateException("Closed");
324 BufferedSubscription<T> pred = null, next;
325 for (BufferedSubscription<T> b = clients; b != null; b = next) {
326 int stat;
327 next = b.next;
328 if ((stat = b.submit(item)) < 0) {
329 if (pred == null)
330 clients = next;
331 else
332 pred.next = next;
333 }
334 else
335 pred = b;
336 }
337 }
338 }
339
340 /**
341 * Publishes the given item, if possible, to each current
342 * subscriber by asynchronously invoking its onNext method. The
343 * item may be dropped by one or more subscribers if resource
344 * limits are exceeded, in which case the given handler (if
345 * non-null) is invoked, and if it returns true, retried once.
346 * Other calls to methods in this class by other threads are
347 * blocked while the handler is invoked. Unless recovery is
348 * assured, options are usually limited to logging the error
349 * and/or issuing an onError signal to the subscriber.
350 *
351 * <p>If the Executor for this publisher throws a
352 * RejectedExecutionException (or any other RuntimeException or
353 * Error) when attempting to asynchronously notify subscribers, or
354 * the drop handler throws an exception when processing a dropped
355 * item, then this exception is rethrown.
356 *
357 * @param item the (non-null) item to publish
358 * @param onDrop if non-null, the handler invoked upon a drop to a
359 * subscriber, with arguments of the subscriber and item; if it
360 * returns true, an offer is re-attempted (once)
361 * @return the number of drops (failed attempts to issue the item
362 * to a subscriber)
363 * @throws IllegalStateException if closed
364 * @throws NullPointerException if item is null
365 * @throws RejectedExecutionException if thrown by Executor
366 */
367 public int offer(T item,
368 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
369 if (item == null) throw new NullPointerException();
370 int drops = 0;
371 synchronized (this) {
372 if (closed)
373 throw new IllegalStateException("Closed");
374 BufferedSubscription<T> pred = null, next;
375 for (BufferedSubscription<T> b = clients; b != null; b = next) {
376 int stat;
377 next = b.next;
378 if ((stat = b.offer(item)) == 0 &&
379 onDrop != null &&
380 onDrop.test(b.subscriber, item))
381 stat = b.offer(item);
382 if (stat < 0) {
383 if (pred == null)
384 clients = next;
385 else
386 pred.next = next;
387 }
388 else {
389 pred = b;
390 if (stat == 0)
391 ++drops;
392 }
393 }
394 return drops;
395 }
396 }
397
398 /**
399 * Publishes the given item, if possible, to each current
400 * subscriber by asynchronously invoking its onNext method,
401 * blocking while resources for any subscription are unavailable,
402 * up to the specified timeout or the caller thread is
403 * interrupted, at which point the given handler (if non-null) is
404 * invoked, and if it returns true, retried once. (The drop
405 * handler may distinguish timeouts from interrupts by checking
406 * whether the current thread is interrupted.) Other calls to
407 * methods in this class by other threads are blocked while the
408 * handler is invoked. Unless recovery is assured, options are
409 * usually limited to logging the error and/or issuing an onError
410 * signal to the subscriber.
411 *
412 * <p>If the Executor for this publisher throws a
413 * RejectedExecutionException (or any other RuntimeException or
414 * Error) when attempting to asynchronously notify subscribers, or
415 * the drop handler throws an exception when processing a dropped
416 * item, then this exception is rethrown.
417 *
418 * @param item the (non-null) item to publish
419 * @param timeout how long to wait for resources for any subscriber
420 * before giving up, in units of {@code unit}
421 * @param unit a {@code TimeUnit} determining how to interpret the
422 * {@code timeout} parameter
423 * @param onDrop if non-null, the handler invoked upon a drop to a
424 * subscriber, with arguments of the subscriber and item; if it
425 * returns true, an offer is re-attempted (once)
426 * @return the number of drops (failed attempts to issue the item
427 * to a subscriber)
428 * @throws IllegalStateException if closed
429 * @throws NullPointerException if item is null
430 * @throws RejectedExecutionException if thrown by Executor
431 */
432 public int offer(T item, long timeout, TimeUnit unit,
433 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
434 if (item == null) throw new NullPointerException();
435 long nanos = unit.toNanos(timeout);
436 int drops = 0;
437 synchronized (this) {
438 if (closed)
439 throw new IllegalStateException("Closed");
440 BufferedSubscription<T> pred = null, next;
441 for (BufferedSubscription<T> b = clients; b != null; b = next) {
442 int stat;
443 next = b.next;
444 if ((stat = b.offerNanos(item, nanos)) == 0 &&
445 onDrop != null && onDrop.test(b.subscriber, item))
446 stat = b.offer(item);
447 if (stat < 0) {
448 if (pred == null)
449 clients = next;
450 else
451 pred.next = next;
452 }
453 else {
454 pred = b;
455 if (stat == 0)
456 ++drops;
457 }
458 }
459 }
460 return drops;
461 }
462
463 /**
464 * Unless already closed, issues onComplete signals to current
465 * subscribers, and disallows subsequent attempts to publish.
466 */
467 public void close() {
468 if (!closed) {
469 BufferedSubscription<T> b, next;
470 synchronized (this) {
471 b = clients;
472 clients = null;
473 closed = true;
474 }
475 while (b != null) {
476 next = b.next;
477 b.onComplete();
478 b = next;
479 }
480 }
481 }
482
483 /**
484 * Unless already closed, issues onError signals to current
485 * subscribers with the given error, and disallows subsequent
486 * attempts to publish.
487 *
488 * @param error the onError argument sent to subscribers
489 * @throws NullPointerException if error is null
490 */
491 public void closeExceptionally(Throwable error) {
492 if (error == null)
493 throw new NullPointerException();
494 if (!closed) {
495 BufferedSubscription<T> b, next;
496 synchronized (this) {
497 b = clients;
498 clients = null;
499 closed = true;
500 }
501 while (b != null) {
502 next = b.next;
503 b.onError(error);
504 b = next;
505 }
506 }
507 }
508
509 /**
510 * Returns true if this publisher is not accepting submissions.
511 *
512 * @return true if closed
513 */
514 public boolean isClosed() {
515 return closed;
516 }
517
518 /**
519 * Returns true if this publisher has any subscribers.
520 *
521 * @return true if this publisher has any subscribers
522 */
523 public boolean hasSubscribers() {
524 boolean nonEmpty = false;
525 if (!closed) {
526 synchronized (this) {
527 BufferedSubscription<T> pred = null, next;
528 for (BufferedSubscription<T> b = clients; b != null; b = next) {
529 next = b.next;
530 if (b.isDisabled()) {
531 if (pred == null)
532 clients = next;
533 else
534 pred.next = next;
535 }
536 else {
537 nonEmpty = true;
538 break;
539 }
540 }
541 }
542 }
543 return nonEmpty;
544 }
545
546 /**
547 * Returns the Executor used for asynchronous delivery.
548 *
549 * @return the Executor used for asynchronous delivery
550 */
551 public Executor getExecutor() {
552 return executor;
553 }
554
555 /**
556 * Returns the initial per-subscriber buffer capacity.
557 *
558 * @return the initial per-subscriber buffer capacity
559 */
560 public int getInitialBufferCapacity() {
561 return minBufferCapacity;
562 }
563
564 /**
565 * Returns the maximum per-subscriber buffer capacity.
566 *
567 * @return the maximum per-subscriber buffer capacity
568 */
569 public int getMaxBufferCapacity() {
570 return maxBufferCapacity;
571 }
572
573 /**
574 * Returns a list of current subscribers.
575 *
576 * @return list of current subscribers
577 */
578 public List<Flow.Subscriber<? super T>> getSubscribers() {
579 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
580 synchronized (this) {
581 BufferedSubscription<T> pred = null, next;
582 for (BufferedSubscription<T> b = clients; b != null; b = next) {
583 next = b.next;
584 if (b.isDisabled()) {
585 if (pred == null)
586 clients = next;
587 else
588 pred.next = next;
589 }
590 else
591 subs.add(b.subscriber);
592 }
593 }
594 return subs;
595 }
596
597 /**
598 * Returns true if the given Subscriber is currently subscribed.
599 *
600 * @param subscriber the subscriber
601 * @return true if currently subscribed
602 */
603 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
604 if (!closed) {
605 synchronized (this) {
606 BufferedSubscription<T> pred = null, next;
607 for (BufferedSubscription<T> b = clients; b != null; b = next) {
608 next = b.next;
609 if (b.isDisabled()) {
610 if (pred == null)
611 clients = next;
612 else
613 pred.next = next;
614 }
615 else if (subscriber == b.subscriber)
616 return true;
617 }
618 }
619 }
620 return false;
621 }
622
623 /**
624 * Returns an estimate of the number of buffered items (those
625 * produced but not consumed), summed across all current
626 * subscribers.
627 *
628 * @return the estimate
629 */
630 public long estimateBuffered() {
631 long sum = 0L;
632 synchronized (this) {
633 BufferedSubscription<T> pred = null, next;
634 for (BufferedSubscription<T> b = clients; b != null; b = next) {
635 next = b.next;
636 if (b.isDisabled()) {
637 if (pred == null)
638 clients = next;
639 else
640 pred.next = next;
641 }
642 else
643 sum += b.estimateBuffered();
644 }
645 }
646 return sum;
647 }
648
649 /**
650 * A bounded (ring) buffer with integrated control to start a
651 * consumer task whenever items are available. The buffer
652 * algorithm is similar to one used inside ForkJoinPool,
653 * specialized for the case of at most one concurrent producer and
654 * consumer, and power of two buffer sizes. This allows methods to
655 * operate without locks even while supporting resizing, blocking,
656 * task-triggering, and garbage-free buffers (nulling out elements
657 * when consumed), although supporting these does impose a bit of
658 * overhead compared to plain fixed-size ring buffers.
659 *
660 * The publisher guarantees a single producer via its lock. We
661 * ensure in this class that there is at most one consumer. The
662 * request and cancel methods must be fully thread-safe but are
663 * coded to exploit the most common case in which they are only
664 * called by consumers (usually within onNext).
665 *
666 * This class also serves as its own consumer task, consuming as
667 * many items/signals as possible before terminating, at which
668 * point it is re-executed when needed. (The dual Runnable and
669 * ForkJoinTask declaration saves overhead when executed by
670 * ForkJoinPools, without impacting other kinds of Executors.)
671 * Execution control is managed using the ACTIVE ctl bit. We
672 * ensure that a task is active when consumable items (and
673 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
674 * there is demand (unfulfilled requests). This is complicated on
675 * the creation side by the possibility of exceptions when trying
676 * to execute tasks. These eventually force DISABLED state, but
677 * sometimes not directly. On the task side, termination (clearing
678 * ACTIVE) may race with producers or request() calls, so in some
679 * cases requires a re-check, re-activating if possible.
680 *
681 * The ctl field also manages run state. When DISABLED, no further
682 * updates are possible. Disabling may be preceded by setting
683 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
684 * case the associated Subscriber methods are invoked, possibly
685 * synchronously if there is no active consumer task (including
686 * cases where execute() failed). The cancel() method is supported
687 * by treating as ERROR but suppressing onError signal.
688 *
689 * Support for blocking also exploits the fact that there is only
690 * one possible waiter. ManagedBlocker-compatible control fields
691 * are placed in this class itself rather than in wait-nodes.
692 * Blocking control relies on the "waiter" field. Producers set
693 * the field before trying to block, but must then recheck (via
694 * offer) before parking. Signalling then just unparks and clears
695 * waiter field. If the producer and consumer are both in the same
696 * ForkJoinPool, the producer attempts to help run consumer tasks
697 * that it forked before blocking.
698 *
699 * This class uses @Contended and heuristic field declaration
700 * ordering to reduce memory contention on BufferedSubscription
701 * itself, but it does not currently attempt to avoid memory
702 * contention (especially including card-marks) among buffer
703 * elements, that can significantly slow down some usages.
704 * Addressing this may require allocating substantially more space
705 * than users expect.
706 */
707 @SuppressWarnings("serial")
708 @sun.misc.Contended
709 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
710 implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
711 // Order-sensitive field declarations
712 long timeout; // > 0 if timed wait
713 volatile long demand; // # unfilled requests
714 final int minCapacity; // initial buffer size
715 int maxCapacity; // reduced on OOME
716 int putStat; // offer result for ManagedBlocker
717 volatile int ctl; // atomic run state flags
718 volatile int head; // next position to take
719 volatile int tail; // next position to put
720 volatile Object[] array; // buffer: null if disabled
721 Flow.Subscriber<? super T> subscriber; // null if disabled
722 Executor executor; // null if disabled
723 volatile Throwable pendingError; // holds until onError issued
724 volatile Thread waiter; // blocked producer thread
725 T putItem; // for offer within ManagedBlocker
726 BufferedSubscription<T> next; // used only by publisher
727
728 // ctl values
729 static final int ACTIVE = 0x01; // consumer task active
730 static final int DISABLED = 0x02; // final state
731 static final int ERROR = 0x04; // signal onError then disable
732 static final int SUBSCRIBE = 0x08; // signal onSubscribe
733 static final int COMPLETE = 0x10; // signal onComplete when done
734
735 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
736
737 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
738 Executor executor, int minBufferCapacity,
739 int maxBufferCapacity) {
740 this.subscriber = subscriber;
741 this.executor = executor;
742 this.minCapacity = minBufferCapacity;
743 this.maxCapacity = maxBufferCapacity;
744 }
745
746 final boolean isDisabled() {
747 return ctl == DISABLED;
748 }
749
750 final int estimateBuffered() {
751 int n;
752 return (ctl != DISABLED && (n = tail - head) > 0) ? n : 0;
753 }
754
755 /**
756 * Tries to add item and start consumer task if necessary.
757 * @return -1 if disabled, 0 if dropped, else 1
758 */
759 final int offer(T item) {
760 Object[] a = array;
761 int t = tail, h = head, n, i, stat;
762 if (a != null && (n = a.length) > t - h && (i = t & (n - 1)) >= 0) {
763 a[i] = item;
764 U.putOrderedInt(this, TAIL, t + 1);
765 }
766 else if ((stat = growAndAdd(a, item)) <= 0)
767 return stat;
768 for (int c;;) { // possibly start task
769 Executor e;
770 if (((c = ctl) & ACTIVE) != 0)
771 break;
772 else if (c == DISABLED || (e = executor) == null)
773 return -1;
774 else if (demand == 0L || tail == head)
775 break;
776 else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
777 try {
778 e.execute(this);
779 break;
780 } catch (RuntimeException | Error ex) { // back out
781 do {} while ((c = ctl) >= 0 &&
782 (c & ACTIVE) != 0 &&
783 !U.compareAndSwapInt(this, CTL, c,
784 c & ~ACTIVE));
785 throw ex;
786 }
787 }
788 }
789 return 1;
790 }
791
792 /**
793 * Tries to create or expand buffer, then adds item if possible
794 */
795 final int growAndAdd(Object[] oldArray, T item) {
796 int oldLen, newLen;
797 if (oldArray != null)
798 newLen = (oldLen = oldArray.length) << 1;
799 else if (ctl >= 0) {
800 oldLen = 0;
801 newLen = minCapacity;
802 }
803 else
804 return -1; // disabled
805 if (oldLen >= maxCapacity || newLen <= 0)
806 return 0; // cannot grow
807 Object[] newArray;
808 try {
809 newArray = new Object[newLen];
810 } catch (Throwable ex) { // try to cope with OOME
811 if (oldLen > 0) // avoid continuous failure
812 maxCapacity = oldLen;
813 return 0;
814 }
815 array = newArray;
816 int t = tail, oldMask = oldLen - 1, newMask = newLen - 1;
817 if (oldArray != null && oldMask >= 0 && newMask >= oldMask) {
818 for (int j = head; j != t; ++j) { // races with consumer
819 Object x;
820 int i = j & oldMask;
821 if ((x = oldArray[i]) != null &&
822 U.compareAndSwapObject(oldArray,
823 (((long)i) << ASHIFT) + ABASE,
824 x, null))
825 newArray[j & newMask] = x;
826 }
827 }
828 newArray[t & newMask] = item;
829 tail = t + 1;
830 return 1;
831 }
832
833 /**
834 * Spins/helps/blocks while offer returns 0
835 */
836 final int submit(T item) {
837 int stat;
838 if ((stat = offer(item)) == 0) {
839 Thread thread = Thread.currentThread();
840 if ((thread instanceof ForkJoinWorkerThread) &&
841 ((ForkJoinWorkerThread)thread).getPool() == executor) {
842 for (ForkJoinTask<?> t;;) { // try helping
843 if ((t = ForkJoinTask.peekNextLocalTask()) == null ||
844 !(t instanceof BufferedSubscription))
845 break;
846 if (t.tryUnfork())
847 t.exec();
848 if ((stat = offer(item)) != 0)
849 break;
850 }
851 }
852 if (stat == 0) {
853 putItem = item;
854 timeout = 0L;
855 try {
856 ForkJoinPool.managedBlock(this);
857 } catch (InterruptedException ie) {
858 timeout = INTERRUPTED;
859 }
860 stat = putStat;
861 if (timeout < 0L)
862 Thread.currentThread().interrupt();
863 }
864 }
865 return stat;
866 }
867
868 /**
869 * Timeout version of offer
870 */
871 final int offerNanos(T item, long nanos) {
872 int stat;
873 if ((stat = offer(item)) == 0 && nanos > 0L) {
874 Thread thread = Thread.currentThread();
875 if ((thread instanceof ForkJoinWorkerThread) &&
876 ((ForkJoinWorkerThread)thread).getPool() == executor) {
877 long deadline = System.nanoTime() + nanos;
878 for (ForkJoinTask<?> t;;) { // similar to above
879 if ((t = ForkJoinTask.peekNextLocalTask()) == null ||
880 !(t instanceof BufferedSubscription))
881 break;
882 if (t.tryUnfork())
883 t.exec();
884 if ((stat = offer(item)) != 0 ||
885 (nanos = deadline - System.nanoTime()) <= 0L)
886 break;
887 }
888 }
889 if (stat == 0 && (timeout = nanos) > 0L) {
890 putItem = item;
891 try {
892 ForkJoinPool.managedBlock(this);
893 } catch (InterruptedException ie) {
894 timeout = INTERRUPTED;
895 }
896 stat = putStat;
897 if (timeout < 0L)
898 Thread.currentThread().interrupt();
899 }
900 }
901 return stat;
902 }
903
904 /**
905 * Nulls out most fields, mainly to avoid garbage retention
906 * until publisher unsubscribes, but also to help cleanly stop
907 * upon error by nulling required components.
908 */
909 final void detach() {
910 pendingError = null;
911 subscriber = null;
912 executor = null;
913 array = null;
914 Thread w = waiter;
915 if (w != null) {
916 waiter = null;
917 LockSupport.unpark(w); // force wakeup
918 }
919 }
920
921 /**
922 * Issues error signal, asynchronously if a task is running,
923 * else synchronously
924 */
925 final void onError(Throwable ex) {
926 for (int c;;) {
927 if ((c = ctl) == DISABLED)
928 break;
929 else if ((c & ACTIVE) != 0) {
930 pendingError = ex;
931 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
932 break; // cause consumer task to exit
933 }
934 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
935 Flow.Subscriber<? super T> s = subscriber;
936 if (s != null && ex != null) {
937 try {
938 s.onError(ex);
939 } catch (Throwable ignore) {
940 }
941 }
942 detach();
943 break;
944 }
945 }
946 }
947
948 /**
949 * Tries to start consumer task upon a signal or request;
950 * disables on failure.
951 */
952 final void startOrDisable() {
953 Executor e; // skip if already disabled
954 if ((e = executor) != null) {
955 try {
956 e.execute(this);
957 } catch (Throwable ex) { // back out and force signal
958 for (int c;;) {
959 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
960 break;
961 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
962 onError(ex);
963 break;
964 }
965 }
966 }
967 }
968 }
969
970 final void onComplete() {
971 for (int c;;) {
972 if ((c = ctl) == DISABLED)
973 break;
974 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
975 if ((c & ACTIVE) == 0)
976 startOrDisable();
977 break;
978 }
979 }
980 }
981
982 final void onSubscribe() {
983 for (int c;;) {
984 if ((c = ctl) == DISABLED)
985 break;
986 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | SUBSCRIBE))) {
987 if ((c & ACTIVE) == 0)
988 startOrDisable();
989 break;
990 }
991 }
992 }
993
994 /**
995 * Causes consumer task to exit if active (without reporting
996 * onError unless there is already a pending error), and
997 * disables.
998 */
999 public void cancel() {
1000 for (int c;;) {
1001 if ((c = ctl) == DISABLED)
1002 break;
1003 else if ((c & ACTIVE) != 0) {
1004 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1005 break;
1006 }
1007 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1008 detach();
1009 break;
1010 }
1011 }
1012 }
1013
1014 /**
1015 * Adds to demand and possibly starts task.
1016 */
1017 public void request(long n) {
1018 if (n > 0L) {
1019 for (;;) {
1020 long prev = demand, d;
1021 if ((d = prev + n) < prev) // saturate
1022 d = Long.MAX_VALUE;
1023 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1024 for (int c, h;;) {
1025 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
1026 demand == 0L)
1027 break;
1028 if ((h = head) != tail) {
1029 if (U.compareAndSwapInt(this, CTL, c,
1030 c | ACTIVE)) {
1031 startOrDisable();
1032 break;
1033 }
1034 }
1035 else if (head == h && tail == h)
1036 break;
1037 }
1038 break;
1039 }
1040 }
1041 }
1042 else if (n < 0L)
1043 onError(new IllegalArgumentException(
1044 "negative subscription request"));
1045 }
1046
1047 public final boolean isReleasable() { // for ManagedBlocker
1048 T item = putItem;
1049 if (item != null) {
1050 if ((putStat = offer(item)) == 0)
1051 return false;
1052 putItem = null;
1053 }
1054 return true;
1055 }
1056
1057 public final boolean block() { // for ManagedBlocker
1058 T item = putItem;
1059 if (item != null) {
1060 putItem = null;
1061 long nanos = timeout;
1062 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1063 while ((putStat = offer(item)) == 0) {
1064 if (Thread.interrupted()) {
1065 timeout = INTERRUPTED;
1066 if (nanos > 0L)
1067 break;
1068 }
1069 else if (nanos > 0L &&
1070 (nanos = deadline - System.nanoTime()) <= 0L)
1071 break;
1072 else if (waiter == null)
1073 waiter = Thread.currentThread();
1074 else {
1075 if (nanos > 0L)
1076 LockSupport.parkNanos(this, nanos);
1077 else
1078 LockSupport.park(this);
1079 waiter = null;
1080 }
1081 }
1082 }
1083 waiter = null;
1084 return true;
1085 }
1086
1087 /**
1088 * Consumer task loop; supports resubmission when used as
1089 * ForkJoinTask.
1090 */
1091 public final boolean exec() {
1092 Flow.Subscriber<? super T> s;
1093 if ((s = subscriber) != null) { // else disabled
1094 for (;;) {
1095 long d = demand; // read volatile fields in acceptable order
1096 int c = ctl;
1097 int h = head;
1098 int t = tail;
1099 Object[] a = array;
1100 int i, n; Object x; Thread w;
1101 if ((c & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1102 if ((c & ERROR) != 0) {
1103 Throwable ex = pendingError;
1104 ctl = DISABLED; // no need for CAS
1105 if (ex != null) {
1106 try {
1107 s.onError(ex);
1108 } catch (Throwable ignore) {
1109 }
1110 }
1111 }
1112 else if ((c & SUBSCRIBE) != 0) {
1113 if (U.compareAndSwapInt(this, CTL, c,
1114 c & ~SUBSCRIBE)) {
1115 try {
1116 s.onSubscribe(this);
1117 } catch (Throwable ex) {
1118 ctl = DISABLED;
1119 }
1120 }
1121 }
1122 else {
1123 detach();
1124 break;
1125 }
1126 }
1127 else if (h == t) { // empty
1128 if (h == tail &&
1129 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
1130 if (h != tail || c != (c = ctl)) { // recheck
1131 if ((c & (ACTIVE | DISABLED)) != 0 ||
1132 !U.compareAndSwapInt(this, CTL, c,
1133 c | ACTIVE))
1134 break;
1135 }
1136 else if ((c & COMPLETE) != 0) {
1137 ctl = DISABLED;
1138 try {
1139 s.onComplete();
1140 } catch (Throwable ignore) {
1141 }
1142 }
1143 else
1144 break;
1145 }
1146 }
1147 else if (a == null || (n = a.length) == 0 ||
1148 (x = a[i = h & (n - 1)]) == null)
1149 ; // stale; retry
1150 else if (d == 0L) { // can't take
1151 if (demand == 0L &&
1152 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1153 ((demand == 0L && c == (c = ctl)) || // recheck
1154 (c & (ACTIVE | DISABLED)) != 0 ||
1155 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1156 break;
1157 }
1158 else if (U.compareAndSwapObject(
1159 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1160 U.putOrderedInt(this, HEAD, h + 1);
1161 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1162 d = demand; // almost never fails
1163 if ((w = waiter) != null) {
1164 waiter = null;
1165 LockSupport.unpark(w); // release producer
1166 }
1167 try {
1168 @SuppressWarnings("unchecked") T y = (T) x;
1169 s.onNext(y);
1170 } catch (Throwable ex) { // disable on throw
1171 ctl = DISABLED;
1172 }
1173 }
1174 }
1175 }
1176 return false; // resubmittable; never joined
1177 }
1178
1179 // Runnable and FJ support
1180 public final void run() { exec(); }
1181 public final Void getRawResult() { return null; }
1182 public final void setRawResult(Void v) {}
1183
1184 // Unsafe mechanics
1185 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1186 private static final long CTL;
1187 private static final long TAIL;
1188 private static final long HEAD;
1189 private static final long DEMAND;
1190 private static final int ABASE;
1191 private static final int ASHIFT;
1192
1193 static {
1194 try {
1195 CTL = U.objectFieldOffset
1196 (BufferedSubscription.class.getDeclaredField("ctl"));
1197 TAIL = U.objectFieldOffset
1198 (BufferedSubscription.class.getDeclaredField("tail"));
1199 HEAD = U.objectFieldOffset
1200 (BufferedSubscription.class.getDeclaredField("head"));
1201 DEMAND = U.objectFieldOffset
1202 (BufferedSubscription.class.getDeclaredField("demand"));
1203
1204 ABASE = U.arrayBaseOffset(Object[].class);
1205 int scale = U.arrayIndexScale(Object[].class);
1206 if ((scale & (scale - 1)) != 0)
1207 throw new Error("data type scale not a power of two");
1208 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1209 } catch (ReflectiveOperationException e) {
1210 throw new Error(e);
1211 }
1212 }
1213 }
1214 }