ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.14
Committed: Fri Jan 16 17:36:37 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +4 -4 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14
15 /**
16 * A {@link Flow.Publisher} that asynchronously issues submitted items
17 * to current subscribers until it is closed. Each current subscriber
18 * receives newly submitted items in the same order unless drops or
19 * exceptions are encountered. Using a SubmissionPublisher allows
20 * item generators to act as Publishers, although without integrated
21 * flow control. Instead they rely on drop handling and/or blocking.
22 * This class may also serve as a base for subclasses that generate
23 * items, and use the methods in this class to publish them.
24 *
25 * <p>A SubmissionPublisher uses the Executor supplied in its
26 * constructor for delivery to subscribers. The best choice of
27 * Executor depends on expected usage. If the generator(s) of
28 * submitted items run in separate threads, and the number of
29 * subscribers can be estimated, consider using a {@link
30 * Executors#newFixedThreadPool}. Otherwise consider using a
31 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
32 *
33 * <p>Buffering allows producers and consumers to transiently operate
34 * at different rates. Each subscriber uses an independent buffer.
35 * Buffers are created upon first use with a given initial capacity,
36 * and are resized as needed up to the maximum. (Capacity arguments
37 * may be rounded up to powers of two.) Invocations of {@code
38 * subscription.request} do not directly result in buffer expansion,
39 * but risk saturation if unfulfilled requests exceed the maximum
40 * capacity. Choices of buffer parameters rely on expected rates,
41 * resources, and usages, that usually benefit from empirical testing.
42 * As first guesses, consider initial 8 and maximum 1024.
43 *
44 * <p>Publication methods support different policies about what to do
45 * when buffers are saturated. Method {@link #submit} blocks until
46 * resources are available. This is simplest (and often appropriate
47 * for relay stages; see {@link #newTransformProcessor newTransformProcessor}),
48 * but least responsive. The {@code offer} methods may either
49 * immediately, or with bounded timeout, drop items, but provide an
50 * opportunity to interpose a handler and then retry. While the
51 * handler is invoked, other calls to methods in this class by other
52 * threads are blocked. Unless recovery is assured, options are
53 * usually limited to logging the error and/or issuing an onError
54 * signal to the subscriber.
55 *
56 * <p>If any Subscriber method throws an exception, its subscription
57 * is cancelled. If the supplied Executor throws
58 * RejectedExecutionException (or any other RuntimeException or Error)
59 * when attempting to execute a task, or a drop handler throws an
60 * exception when processing a dropped item, then the exception is
61 * rethrown. In these cases, some but not all subscribers may have
62 * received items. It is usually good practice to {@link
63 * #closeExceptionally closeExceptionally} in these cases.
64 *
65 * @param <T> the published item type
66 * @author Doug Lea
67 * @since 1.9
68 */
69 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
70 AutoCloseable {
71 /*
72 * Most mechanics are handled by BufferedSubscription. This class
73 * mainly ensures sequentiality by using built-in synchronization
74 * locks across public methods. (Using built-in locks works well
75 * in the most typical case in which only one thread submits
76 * items).
77 */
78
79 // Ensuring that all arrays have power of two length
80
81 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
82 static final int roundCapacity(int cap) { // to nearest power of 2
83 int n = cap - 1;
84 n |= n >>> 1;
85 n |= n >>> 2;
86 n |= n >>> 4;
87 n |= n >>> 8;
88 n |= n >>> 16;
89 return (n < 0) ? 1 :
90 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
91 }
92
93 /**
94 * Clients (BufferedSubscriptions) are maintained in a linked list
95 * (via their "next" fields). This works well for publish loops.
96 * It requires O(n) traversal to check for duplicate subscribers,
97 * but we expect that subscribing is much less common than
98 * publishing. Unsubscribing occurs only during traversal loops,
99 * when BufferedSubscription methods or status checks return
100 * negative values signifying that they have been disabled.
101 */
102 BufferedSubscription<T> clients;
103
104 // Parameters for constructing BufferedSubscriptions
105 final Executor executor;
106 final int minBufferCapacity;
107 final int maxBufferCapacity;
108
109 /** Run status, updated only within locks */
110 volatile boolean closed;
111
112 /**
113 * Creates a new SubmissionPublisher using the given Executor for
114 * async delivery to subscribers, and with the given initial and
115 * maximum buffer sizes for each subscriber. In the absence of
116 * other constraints, consider using {@code
117 * ForkJoinPool.commonPool(), 8, 1024}.
118 *
119 * @param executor the executor to use for async delivery,
120 * supporting creation of at least one independent thread
121 * @param initialBufferCapacity the initial capacity for each
122 * subscriber's buffer (the actual capacity may be rounded up to
123 * the nearest power of two)
124 * @param maxBufferCapacity the maximum capacity for each
125 * subscriber's buffer (the actual capacity may be rounded up to
126 * the nearest power of two)
127 * @throws NullPointerException if executor is null
128 * @throws IllegalArgumentException if initialBufferCapacity is
129 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
130 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
131 * a power of two array size
132 */
133 public SubmissionPublisher(Executor executor,
134 int initialBufferCapacity,
135 int maxBufferCapacity) {
136 if (executor == null)
137 throw new NullPointerException();
138 if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
139 throw new IllegalArgumentException("capacity must be positive");
140 if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
141 throw new IllegalArgumentException("capacity exceeds limit");
142 if (initialBufferCapacity > maxBufferCapacity)
143 throw new IllegalArgumentException("initial cannot exceed max capacity");
144 int minc = roundCapacity(initialBufferCapacity);
145 int maxc = roundCapacity(maxBufferCapacity);
146 this.executor = executor;
147 this.minBufferCapacity = minc;
148 this.maxBufferCapacity = maxc;
149 }
150
151 /**
152 * Adds the given Subscriber unless already subscribed. If
153 * already subscribed, the Subscriber's onError method is invoked
154 * with an IllegalStateException. Otherwise, upon success, the
155 * Subscriber's onSubscribe method is invoked with a new
156 * Subscription (upon exception, the exception is rethrown and the
157 * Subscriber remains unsubscribed). If this SubmissionPublisher
158 * is closed, the subscriber's onComplete method is then invoked.
159 * Subscribers may enable receiving items by invoking the {@code
160 * request} method of the new Subscription, and may unsubscribe by
161 * invoking its cancel method.
162 *
163 * @param subscriber the subscriber
164 * @throws NullPointerException if subscriber is null
165 */
166 public void subscribe(Flow.Subscriber<? super T> subscriber) {
167 if (subscriber == null) throw new NullPointerException();
168 BufferedSubscription<T> sub = new BufferedSubscription<T>(
169 subscriber, executor, minBufferCapacity, maxBufferCapacity);
170 boolean present = false, clsd;
171 synchronized (this) {
172 clsd = closed;
173 BufferedSubscription<T> pred = null, next;
174 for (BufferedSubscription<T> b = clients; b != null; b = next) {
175 next = b.next;
176 if (b.ctl < 0) { // disabled; remove
177 if (pred == null)
178 clients = next;
179 else
180 pred.next = next;
181 }
182 else if (subscriber == b.subscriber) {
183 present = true;
184 break;
185 }
186 pred = b;
187 }
188 if (!present) {
189 subscriber.onSubscribe(sub); // don't link on exception
190 if (!clsd) {
191 if (pred == null)
192 clients = sub;
193 else
194 pred.next = sub;
195 }
196 }
197 }
198 if (clsd)
199 subscriber.onComplete();
200 else if (present)
201 subscriber.onError(new IllegalStateException("Already subscribed"));
202 }
203
204 /**
205 * Publishes the given item, if possible, to each current
206 * subscriber by asynchronously invoking its onNext method. The
207 * item may be dropped by one or more subscribers if resource
208 * limits are exceeded, in which case the given handler (if
209 * non-null) is invoked, and if it returns true, retried once.
210 *
211 * <p>If the Executor for this publisher throws a
212 * RejectedExecutionException (or any other RuntimeException or
213 * Error) when attempting to asynchronously notify subscribers, or
214 * the drop handler throws an exception when processing a dropped
215 * item, then this exception is rethrown.
216 *
217 * @param item the (non-null) item to publish
218 * @param onDrop if non-null, the handler invoked upon a drop to a
219 * subscriber, with arguments of the subscriber and item; if it
220 * returns true, an offer is re-attempted (once)
221 * @return the number of drops (failed attempts to issue the item
222 * to a subscriber)
223 * @throws IllegalStateException if closed
224 * @throws NullPointerException if item is null
225 * @throws RejectedExecutionException if thrown by Executor
226 */
227 public int offer(T item,
228 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
229 if (item == null) throw new NullPointerException();
230 int drops = 0;
231 synchronized (this) {
232 if (closed)
233 throw new IllegalStateException("Closed");
234 BufferedSubscription<T> pred = null, next;
235 for (BufferedSubscription<T> b = clients; b != null; b = next) {
236 int stat;
237 next = b.next;
238 if ((stat = b.offer(item)) == 0 &&
239 onDrop != null &&
240 onDrop.test(b.subscriber, item))
241 stat = b.offer(item);
242 if (stat < 0) {
243 if (pred == null)
244 clients = next;
245 else
246 pred.next = next;
247 }
248 else {
249 pred = b;
250 if (stat == 0)
251 ++drops;
252 }
253 }
254 return drops;
255 }
256 }
257
258 /**
259 * Publishes the given item, if possible, to each current
260 * subscriber by asynchronously invoking its onNext method,
261 * blocking while resources for any subscription are unavailable,
262 * up to the specified timeout or the caller thread is
263 * interrupted, at which point the given handler (if non-null)
264 * is invoked, and if it returns true, retried once.
265 *
266 * <p>If the Executor for this publisher throws a
267 * RejectedExecutionException (or any other RuntimeException or
268 * Error) when attempting to asynchronously notify subscribers, or
269 * the drop handler throws an exception when processing a dropped
270 * item, then this exception is rethrown.
271 *
272 * @param item the (non-null) item to publish
273 * @param timeout how long to wait for resources for any subscriber
274 * before giving up, in units of {@code unit}
275 * @param unit a {@code TimeUnit} determining how to interpret the
276 * {@code timeout} parameter
277 * @param onDrop if non-null, the handler invoked upon a drop to a
278 * subscriber, with arguments of the subscriber and item; if it
279 * returns true, an offer is re-attempted (once)
280 * @return the number of drops (failed attempts to issue the item
281 * to a subscriber)
282 * @throws IllegalStateException if closed
283 * @throws NullPointerException if item is null
284 * @throws RejectedExecutionException if thrown by Executor
285 */
286 public int offer(T item, long timeout, TimeUnit unit,
287 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
288 if (item == null) throw new NullPointerException();
289 long nanos = unit.toNanos(timeout);
290 int drops = 0;
291 synchronized (this) {
292 if (closed)
293 throw new IllegalStateException("Closed");
294 BufferedSubscription<T> pred = null, next;
295 for (BufferedSubscription<T> b = clients; b != null; b = next) {
296 int stat;
297 next = b.next;
298 if ((stat = b.offer(item)) == 0 &&
299 (stat = b.timedOffer(item, nanos)) == 0 &&
300 onDrop != null &&
301 onDrop.test(b.subscriber, item))
302 stat = b.offer(item);
303 if (stat < 0) {
304 if (pred == null)
305 clients = next;
306 else
307 pred.next = next;
308 }
309 else {
310 pred = b;
311 if (stat == 0)
312 ++drops;
313 }
314 }
315 }
316 return drops;
317 }
318
319 /**
320 * Publishes the given item to each current subscriber by
321 * asynchronously invoking its onNext method, blocking
322 * uninterruptibly while resources for any subscriber are
323 * unavailable.
324 *
325 * <p>If the Executor for this publisher throws a
326 * RejectedExecutionException (or any other RuntimeException or
327 * Error) when attempting to asynchronously notify subscribers,
328 * then this exception is rethrown.
329 *
330 * @param item the (non-null) item to publish
331 * @throws IllegalStateException if closed
332 * @throws NullPointerException if item is null
333 * @throws RejectedExecutionException if thrown by Executor
334 */
335 public void submit(T item) {
336 if (item == null) throw new NullPointerException();
337 synchronized (this) {
338 if (closed)
339 throw new IllegalStateException("Closed");
340 BufferedSubscription<T> pred = null, next;
341 for (BufferedSubscription<T> b = clients; b != null; b = next) {
342 int stat;
343 next = b.next;
344 if ((stat = b.offer(item)) == 0)
345 stat = b.submit(item);
346 if (stat < 0) {
347 if (pred == null)
348 clients = next;
349 else
350 pred.next = next;
351 }
352 else
353 pred = b;
354 }
355 }
356 }
357
358 /**
359 * Unless already closed, issues onComplete signals to current
360 * subscribers, and disallows subsequent attempts to publish.
361 */
362 public void close() {
363 if (!closed) {
364 BufferedSubscription<T> b, next;
365 synchronized (this) {
366 b = clients;
367 clients = null;
368 closed = true;
369 }
370 while (b != null) {
371 next = b.next;
372 b.close();
373 b = next;
374 }
375 }
376 }
377
378 /**
379 * Unless already closed, issues onError signals to current
380 * subscribers with the given error, and disallows subsequent
381 * attempts to publish.
382 *
383 * @param error the onError argument sent to subscribers
384 * @throws NullPointerException if error is null
385 */
386 public void closeExceptionally(Throwable error) {
387 if (error == null)
388 throw new NullPointerException();
389 if (!closed) {
390 BufferedSubscription<T> b, next;
391 synchronized (this) {
392 b = clients;
393 clients = null;
394 closed = true;
395 }
396 while (b != null) {
397 next = b.next;
398 b.closeExceptionally(error);
399 b = next;
400 }
401 }
402 }
403
404 /**
405 * Returns true if this publisher is not accepting submissions.
406 *
407 * @return true if closed
408 */
409 public boolean isClosed() {
410 return closed;
411 }
412
413 /**
414 * Returns true if this publisher has any subscribers.
415 *
416 * @return true if this publisher has any subscribers
417 */
418 public boolean hasSubscribers() {
419 boolean nonEmpty = false;
420 if (!closed) {
421 synchronized (this) {
422 BufferedSubscription<T> pred = null, next;
423 for (BufferedSubscription<T> b = clients; b != null; b = next) {
424 next = b.next;
425 if (b.ctl < 0) {
426 if (pred == null)
427 clients = next;
428 else
429 pred.next = next;
430 }
431 else {
432 nonEmpty = true;
433 break;
434 }
435 }
436 }
437 }
438 return nonEmpty;
439 }
440
441 /**
442 * Returns the Executor used for asynchronous delivery.
443 *
444 * @return the Executor used for asynchronous delivery
445 */
446 public Executor getExecutor() {
447 return executor;
448 }
449
450 /**
451 * Returns the initial per-subscriber buffer capacity.
452 *
453 * @return the initial per-subscriber buffer capacity
454 */
455 public int getInitialBufferCapacity() {
456 return minBufferCapacity;
457 }
458
459 /**
460 * Returns the maximum per-subscriber buffer capacity.
461 *
462 * @return the maximum per-subscriber buffer capacity
463 */
464 public int getMaxBufferCapacity() {
465 return maxBufferCapacity;
466 }
467
468 /**
469 * Returns a list of current subscribers.
470 *
471 * @return list of current subscribers
472 */
473 public List<Flow.Subscriber<? super T>> getSubscribers() {
474 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
475 synchronized (this) {
476 BufferedSubscription<T> pred = null, next;
477 for (BufferedSubscription<T> b = clients; b != null; b = next) {
478 next = b.next;
479 if (b.ctl < 0) {
480 if (pred == null)
481 clients = next;
482 else
483 pred.next = next;
484 }
485 else
486 subs.add(b.subscriber);
487 }
488 }
489 return subs;
490 }
491
492 /**
493 * Returns true if the given Subscriber is currently subscribed.
494 *
495 * @param subscriber the subscriber
496 * @return true if currently subscribed
497 */
498 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
499 if (!closed) {
500 synchronized (this) {
501 BufferedSubscription<T> pred = null, next;
502 for (BufferedSubscription<T> b = clients; b != null; b = next) {
503 next = b.next;
504 if (b.ctl < 0) {
505 if (pred == null)
506 clients = next;
507 else
508 pred.next = next;
509 }
510 else if (subscriber == b.subscriber)
511 return true;
512 }
513 }
514 }
515 return false;
516 }
517
518 /**
519 * If the given subscription is managed by a SubmissionPublisher,
520 * returns an estimate of the number of items produced but not yet
521 * consumed; otherwise returns zero. This method is designed only
522 * for monitoring purposes, not for control.
523 *
524 * @param subscription the subscription
525 * @return the estimate, or zero if the subscription is of an
526 * unknown type
527 */
528 public static int estimateAvailable(Flow.Subscription subscription) {
529 if (subscription instanceof BufferedSubscription)
530 return ((BufferedSubscription)subscription).estimateAvailable();
531 else
532 return 0;
533 }
534
535 /**
536 * Returns a new {@link Flow.Processor} that uses this
537 * SubmissionPublisher to relay transformed items from its source
538 * using method {@link submit}, and using the given request size
539 * for requests to its source subscription. (Typically,
540 * requestSizes should range from the initial and maximum buffer
541 * capacity of this SubmissionPublisher.)
542 *
543 * @param <S> the subscribed item type
544 * @param requestSize the request size for subscriptions
545 * with the source
546 * @param transform the transform function
547 * @return the new Processor
548 * @throws NullPointerException if transform is null
549 * @throws IllegalArgumentException if requestSize not positive
550 */
551 public <S> Flow.Processor<S,T> newTransformProcessor(
552 long requestSize,
553 Function<? super S, ? extends T> transform) {
554 if (requestSize <= 0L)
555 throw new IllegalArgumentException("requestSize must be positive");
556 if (transform == null)
557 throw new NullPointerException();
558 return new TransformProcessor<S,T>(requestSize, transform, this);
559 }
560
561 /**
562 * A task for consuming buffer items and signals, created and
563 * executed whenever they become available. A task consumes as
564 * many items/signals as possible before terminating, at which
565 * point another task is created when needed. The dual Runnable
566 * and ForkJoinTask declaration saves overhead when executed by
567 * ForkJoinPools, without impacting other kinds of Executors.
568 */
569 @SuppressWarnings("serial")
570 static final class ConsumerTask<T> extends ForkJoinTask<Void>
571 implements Runnable {
572 final BufferedSubscription<T> s;
573 ConsumerTask(BufferedSubscription<T> s) { this.s = s; }
574 public final Void getRawResult() { return null; }
575 public final void setRawResult(Void v) {}
576 public final boolean exec() { s.consume(); return false; }
577 public final void run() { s.consume(); }
578 }
579
580 /**
581 * A bounded (ring) buffer with integrated control to start
582 * consumer tasks whenever items are available. The buffer
583 * algorithm is similar to one used inside ForkJoinPool,
584 * specialized for the case of at most one concurrent producer and
585 * consumer, and power of two buffer sizes. This allows methods to
586 * operate without locks even while supporting resizing, blocking,
587 * task-triggering, and garbage-free buffers (nulling out elements
588 * when consumed), although supporting these does impose a bit of
589 * overhead compared to plain fixed-size ring buffers.
590 *
591 * The publisher guarantees a single producer via its lock. We
592 * ensure in this class that there is at most one consumer. The
593 * request and cancel methods must be fully thread-safe but are
594 * coded to exploit the most common case in which they are only
595 * called by consumers (usually within onNext).
596 *
597 * Control over creating ConsumerTasks is managed using the ACTIVE
598 * ctl bit. We ensure that a task is active when consumable items
599 * (and usually, ERROR or COMPLETE signals) are present and there
600 * is demand (unfulfilled requests). This is complicated on the
601 * creation side by the possibility of exceptions when trying to
602 * execute tasks. These eventually force DISABLED state, but
603 * sometimes not directly. On the task side, termination (clearing
604 * ACTIVE) may race with producers or request() calls, so in some
605 * cases requires a re-check, re-activating if possible.
606 *
607 * The ctl field also manages run state. When DISABLED, no further
608 * updates are possible (to simplify checks, DISABLED is defined
609 * as a negative value). Disabling may be preceded by setting
610 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
611 * case the associated Subscriber methods are invoked, possibly
612 * synchronously if there is no active consumer task (including
613 * cases where execute() failed).
614 *
615 * Support for blocking also exploits the fact that there is only
616 * one possible waiter. ManagedBlocker-compatible control fields
617 * are placed in this class itself rather than in wait-nodes.
618 * Blocking control relies on the "waiter" field. Producers set
619 * the field before trying to block, but must then recheck (via
620 * offer) before parking. Signalling then just unparks and clears
621 * waiter field.
622 *
623 * This class uses @Contended and heuristic field declaration
624 * ordering to reduce memory contention on BufferedSubscription
625 * itself, but it does not currently attempt to avoid memory
626 * contention (especially including card-marks) among buffer
627 * elements, that can significantly slow down some usages.
628 * Addressing this may require allocating substantially more space
629 * than users expect.
630 */
631 @sun.misc.Contended
632 static final class BufferedSubscription<T> implements Flow.Subscription,
633 ForkJoinPool.ManagedBlocker {
634 // Order-sensitive field declarations
635 long timeout; // > 0 if timed wait
636 volatile long demand; // # unfilled requests
637 final int minCapacity; // initial buffer size
638 int maxCapacity; // reduced on OOME
639 int putStat; // offer result for ManagedBlocker
640 volatile int ctl; // atomic run state flags
641 volatile int head; // next position to take
642 volatile int tail; // next position to put
643 boolean wasInterrupted; // true if interrupted while waiting
644 volatile Object[] array; // buffer: null if disabled
645 Flow.Subscriber<? super T> subscriber; // null if disabled
646 Executor executor; // null if disabled
647 volatile Throwable pendingError; // holds until onError issued
648 volatile Thread waiter; // blocked producer thread
649 T putItem; // for offer within ManagedBlocker
650 BufferedSubscription<T> next; // used only by publisher
651
652 // ctl values
653 static final int ACTIVE = 0x01; // consumer task active
654 static final int ERROR = 0x02; // signal pending error
655 static final int COMPLETE = 0x04; // signal completion when done
656 static final int DISABLED = 0x80000000; // must be negative
657
658 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
659 Executor executor, int minBufferCapacity,
660 int maxBufferCapacity) {
661 this.subscriber = subscriber;
662 this.executor = executor;
663 this.minCapacity = minBufferCapacity;
664 this.maxCapacity = maxBufferCapacity;
665 }
666
667 /**
668 * @return -1 if disabled, 0 if dropped, else 1
669 */
670 final int offer(T item) {
671 Object[] a = array;
672 int t = tail, h = head, mask;
673 if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
674 return growAndOffer(item);
675 else {
676 a[t & mask] = item;
677 U.putOrderedInt(this, TAIL, t + 1);
678 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
679 }
680 }
681
682 /**
683 * Creates or expands buffer if possible, then offers.
684 */
685 final int growAndOffer(T item) {
686 int oldLen, len;
687 Object[] oldA = array, a = null;
688 if (oldA != null)
689 len = (oldLen = oldA.length) << 1;
690 else if (ctl < 0)
691 return -1; // disabled
692 else {
693 oldLen = 0;
694 len = minCapacity;
695 }
696 if (oldLen >= maxCapacity || len <= 0)
697 return 0;
698 try {
699 a = new Object[len];
700 } catch (Throwable ex) { // try to cope with OOME
701 if (oldLen != 0) // avoid continuous failure
702 maxCapacity = oldLen;
703 return 0;
704 }
705 array = a;
706 int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
707 if (oldA != null && oldMask >= 0 && mask > oldMask) {
708 for (Object x; j != oldTail; ++j) { // races with consumer
709 int i = j & oldMask;
710 if ((x = oldA[i]) != null &&
711 U.compareAndSwapObject(oldA,
712 (((long)i) << ASHIFT) + ABASE,
713 x, null))
714 a[j & mask] = x;
715 }
716 }
717 a[j & mask] = item;
718 U.putOrderedInt(this, TAIL, j + 1);
719 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
720 }
721
722 /**
723 * Tries to start consumer task if items are available.
724 * Backs out and relays exception if executor fails.
725 */
726 final int startOnOffer() {
727 if (demand != 0L) {
728 ConsumerTask<T> task = new ConsumerTask<T>(this);
729 for (int c;;) {
730 if (((c = ctl) & ACTIVE) != 0)
731 break;
732 else if (c < 0)
733 return -1;
734 else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
735 try {
736 executor.execute(task);
737 break;
738 } catch (RuntimeException | Error ex) { // back out
739 do {} while ((c = ctl) >= 0 &&
740 (c & ACTIVE) != 0 &&
741 !U.compareAndSwapInt(this, CTL, c,
742 c & ~ACTIVE));
743 throw ex;
744 }
745 }
746 else if (demand == 0L || tail == head)
747 break;
748 }
749 }
750 return 1;
751 }
752
753 /**
754 * Tries to start consumer task upon a signal or request;
755 * disables on failure.
756 */
757 final void startOrDisable() {
758 Executor e; // skip if already disabled
759 if ((e = executor) != null) {
760 try {
761 e.execute(new ConsumerTask<T>(this));
762 } catch (Throwable ex) { // back out and force signal
763 for (int c;;) {
764 if ((c = ctl) < 0 || (c & ACTIVE) == 0)
765 break;
766 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
767 closeExceptionally(ex);
768 break;
769 }
770 }
771 }
772 }
773 }
774
775 /**
776 * Nulls out most fields, mainly to avoid garbage retention
777 * until publisher unsubscribes.
778 */
779 final void detach() {
780 pendingError = null;
781 subscriber = null;
782 executor = null;
783 array = null;
784 Thread w = waiter;
785 if (w != null) {
786 waiter = null;
787 LockSupport.unpark(w); // force wakeup
788 }
789 }
790
791 public void request(long n) {
792 if (n > 0L) {
793 for (;;) {
794 long prev = demand, d;
795 if ((d = prev + n) < prev) // saturate
796 d = Long.MAX_VALUE;
797 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
798 for (int c, h;;) {
799 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
800 demand == 0L)
801 break;
802 if ((h = head) != tail) {
803 if (U.compareAndSwapInt(this, CTL, c,
804 c | ACTIVE)) {
805 startOrDisable();
806 break;
807 }
808 }
809 else if (head == h && tail == h)
810 break;
811 }
812 break;
813 }
814 }
815 }
816 else if (n < 0L)
817 closeExceptionally(new IllegalArgumentException(
818 "negative subscription request"));
819 }
820
821 public void cancel() {
822 for (int c;;) {
823 if ((c = ctl) < 0)
824 break;
825 else if ((c & ACTIVE) != 0) {
826 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
827 break; // cause consumer task to exit
828 }
829 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
830 detach();
831 break;
832 }
833 }
834 }
835
836 final void closeExceptionally(Throwable ex) {
837 pendingError = ex;
838 for (int c;;) {
839 if ((c = ctl) < 0)
840 break;
841 else if ((c & ACTIVE) != 0) {
842 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
843 break; // cause consumer task to exit
844 }
845 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
846 Flow.Subscriber<? super T> s = subscriber;
847 if (s != null) {
848 try {
849 s.onError(ex);
850 } catch (Throwable ignore) {
851 }
852 }
853 detach();
854 break;
855 }
856 }
857 }
858
859 final void close() {
860 for (int c;;) {
861 if ((c = ctl) < 0)
862 break;
863 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
864 if ((c & ACTIVE) == 0)
865 startOrDisable();
866 break;
867 }
868 }
869 }
870
871 final int estimateAvailable() {
872 int n;
873 return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
874 }
875
876 // ManagedBlocker support
877
878 public final boolean isReleasable() {
879 T item = putItem;
880 if (item != null) {
881 if ((putStat = offer(item)) == 0)
882 return false;
883 putItem = null;
884 }
885 return true;
886 }
887
888 public final boolean block() {
889 T item = putItem;
890 if (item != null) {
891 putItem = null;
892 long nanos = timeout;
893 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
894 while ((putStat = offer(item)) == 0) {
895 if (Thread.interrupted()) {
896 wasInterrupted = true;
897 if (nanos > 0L)
898 break;
899 }
900 else if (nanos > 0L &&
901 (nanos = deadline - System.nanoTime()) <= 0L)
902 break;
903 else if (waiter == null)
904 waiter = Thread.currentThread();
905 else {
906 if (nanos > 0L)
907 LockSupport.parkNanos(this, nanos);
908 else
909 LockSupport.park(this);
910 waiter = null;
911 }
912 }
913 }
914 waiter = null;
915 return true;
916 }
917
918 final int submit(T item) {
919 putItem = item;
920 timeout = 0L;
921 wasInterrupted = false;
922 try {
923 ForkJoinPool.managedBlock(this);
924 } catch (InterruptedException cantHappen) {
925 }
926 if (wasInterrupted)
927 Thread.currentThread().interrupt();
928 return putStat;
929 }
930
931 final int timedOffer(T item, long nanos) {
932 if (nanos <= 0L)
933 return 0;
934 putItem = item;
935 timeout = nanos;
936 wasInterrupted = false;
937 try {
938 ForkJoinPool.managedBlock(this);
939 } catch (InterruptedException cantHappen) {
940 }
941 if (wasInterrupted)
942 Thread.currentThread().interrupt();
943 return putStat;
944 }
945
946 /** Consume loop called only from ConsumerTask */
947 final void consume() {
948 Flow.Subscriber<? super T> s;
949 if ((s = subscriber) != null) { // else disabled
950 for (;;) {
951 long d = demand; // read volatile fields in acceptable order
952 int c = ctl;
953 int h = head;
954 int t = tail;
955 Object[] a = array;
956 int i, n; Object x; Thread w;
957 if (c < 0) {
958 detach();
959 break;
960 }
961 else if ((c & ERROR) != 0) {
962 Throwable ex = pendingError;
963 ctl = DISABLED; // no need for CAS
964 if (ex != null) {
965 try {
966 s.onError(ex);
967 } catch (Throwable ignore) {
968 }
969 }
970 }
971 else if (h == t) { // empty
972 if (h == tail &&
973 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
974 if (h != tail || c != (c = ctl)) { // recheck
975 if ((c & (ACTIVE | DISABLED)) != 0 ||
976 !U.compareAndSwapInt(this, CTL, c,
977 c | ACTIVE))
978 break;
979 }
980 else if ((c & COMPLETE) != 0) {
981 ctl = DISABLED;
982 try {
983 s.onComplete();
984 } catch (Throwable ignore) {
985 }
986 }
987 else
988 break;
989 }
990 }
991 else if (d == 0L) { // can't take
992 if (demand == 0L &&
993 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
994 ((demand == 0L && c == (c = ctl)) || // recheck
995 (c & (ACTIVE | DISABLED)) != 0 ||
996 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
997 break;
998 }
999 else if (a != null &&
1000 (n = a.length) > 0 &&
1001 (x = a[i = h & (n - 1)]) != null &&
1002 U.compareAndSwapObject(
1003 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1004 U.putOrderedInt(this, HEAD, h + 1);
1005 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1006 d = demand; // almost never fails
1007 if ((w = waiter) != null) {
1008 waiter = null;
1009 LockSupport.unpark(w); // release producer
1010 }
1011 try {
1012 @SuppressWarnings("unchecked") T y = (T) x;
1013 s.onNext(y);
1014 } catch (Throwable ex) {
1015 ctl = DISABLED;
1016 }
1017 }
1018 }
1019 }
1020 }
1021
1022 // Unsafe mechanics
1023 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1024 private static final long CTL;
1025 private static final long TAIL;
1026 private static final long HEAD;
1027 private static final long DEMAND;
1028 private static final int ABASE;
1029 private static final int ASHIFT;
1030
1031 static {
1032 try {
1033 CTL = U.objectFieldOffset
1034 (BufferedSubscription.class.getDeclaredField("ctl"));
1035 TAIL = U.objectFieldOffset
1036 (BufferedSubscription.class.getDeclaredField("tail"));
1037 HEAD = U.objectFieldOffset
1038 (BufferedSubscription.class.getDeclaredField("head"));
1039 DEMAND = U.objectFieldOffset
1040 (BufferedSubscription.class.getDeclaredField("demand"));
1041
1042 ABASE = U.arrayBaseOffset(Object[].class);
1043 int scale = U.arrayIndexScale(Object[].class);
1044 if ((scale & (scale - 1)) != 0)
1045 throw new Error("data type scale not a power of two");
1046 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1047 } catch (ReflectiveOperationException e) {
1048 throw new Error(e);
1049 }
1050 }
1051 }
1052
1053 static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1054 final Function<? super U, ? extends R> fn;
1055 final SubmissionPublisher<R> sink;
1056 Flow.Subscription subscription;
1057 final long requestSize;
1058 long count;
1059 TransformProcessor(long requestSize,
1060 Function<? super U, ? extends R> fn,
1061 SubmissionPublisher<R> sink) {
1062 this.fn = fn;
1063 this.sink = sink;
1064 this.requestSize = requestSize;
1065 this.count = requestSize >>> 1;
1066 }
1067 public void subscribe(Flow.Subscriber<? super R> subscriber) {
1068 sink.subscribe(subscriber);
1069 }
1070 public void onSubscribe(Flow.Subscription subscription) {
1071 (this.subscription = subscription).request(requestSize);
1072 }
1073 public void onNext(U item) {
1074 Flow.Subscription s = subscription;
1075 if (s == null)
1076 sink.closeExceptionally(
1077 new IllegalStateException("onNext without subscription"));
1078 else {
1079 try {
1080 if (--count <= 0)
1081 s.request(count = requestSize);
1082 sink.submit(fn.apply(item));
1083 } catch (Throwable ex) {
1084 try {
1085 s.cancel();
1086 } finally {
1087 sink.closeExceptionally(ex);
1088 }
1089 }
1090 }
1091 }
1092 public void onError(Throwable ex) {
1093 sink.closeExceptionally(ex);
1094 }
1095 public void onComplete() {
1096 sink.close();
1097 }
1098 }
1099 }