ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.4
Committed: Thu Jan 15 17:20:08 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +5 -5 lines
Log Message:
double trouble

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.LockSupport;
10 import java.util.function.BiPredicate;
11 import java.util.function.Function;
12 import java.util.List;
13 import java.util.ArrayList;
14
15 /**
16 * A {@link Flow.Publisher} that asynchronously issues submitted items
17 * to current subscribers until it is closed. Each current subscriber
18 * receives newly submitted items in the same order unless drops or
19 * exceptions are encountered. Using a SubmissionPublisher allows
20 * item generators to act as Publishers, although without integrated
21 * flow control. Instead they rely on drop handling and/or blocking.
22 * This class may also serve as a base for subclasses that generate
23 * items, and use the methods in this class to publish them.
24 *
25 * <p>A SubmissionPublisher uses the Executor supplied in its
26 * constructor for delivery to subscribers. The best choice of
27 * Executor depends on expected usage. If the generator(s) of
28 * submitted items run in separate threads, and the number of
29 * subscribers can be estimated, consider using a {@link
30 * Executors#newFixedThreadPool}. Otherwise consider using a
31 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
32 *
33 * <p>Buffering allows producers and consumers to transiently operate
34 * at different rates. Each subscriber uses an independent buffer.
35 * Buffers are created upon first use with a given initial capacity,
36 * and are resized as needed up to the maximum. (Capacity arguments
37 * may be rounded up to powers of two.) Invocations of {@code
38 * subscription.request} do not directly result in buffer expansion,
39 * but risk saturation if unfulfilled requests exceed the maximum
40 * capacity. Choices of buffer parameters rely on expected rates,
41 * resources, and usages, that usually benefit from empirical testing.
42 * As first guesses, consider initial 8 and maximum 1024.
43 *
44 * <p>Publication methods support different policies about what to do
45 * when buffers are saturated. Method {@link #submit} blocks until
46 * resources are available. This is simplest (and often appropriate
47 * for relay stages; see {@link #newTransformProcessor}), but least
48 * responsive. The {@code offer} methods may either immediately, or
49 * with bounded timeout, drop items, but provide an opportunity to
50 * interpose a handler and then retry. While the handler is invoked,
51 * other calls to methods in this class by other threads are blocked.
52 * Unless recovery is assured, options are usually limited to logging
53 * the error and/or issuing an onError signal to the subscriber.
54 *
55 * <p>If any Subscriber method throws an exception, its subscription
56 * is cancelled. If the supplied Executor throws
57 * RejectedExecutionException (or any other RuntimeException or Error)
58 * when attempting to execute a task, or a drop handler throws an
59 * exception when processing a dropped item, then the exception is
60 * rethrown. In these cases, some but not all subscribers may have
61 * received items. It is usually good practice to {@link
62 * #closeExceptionally} in these cases.
63 *
64 * @param <T> the published item type
65 * @author Doug Lea
66 * @since 1.9
67 */
68 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
69 AutoCloseable {
70 /*
71 * Most mechanics are handled by BufferedSubscription. This class
72 * mainly ensures sequentiality by using built-in synchronization
73 * locks across public methods. (Using built-in locks works well
74 * in the most typical case in which only one thread submits
75 * items).
76 */
77
78 // Ensuring that all arrays have power of two length
79
80 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
81 static final int roundCapacity(int cap) { // to nearest power of 2
82 int n = cap - 1;
83 n |= n >>> 1;
84 n |= n >>> 2;
85 n |= n >>> 4;
86 n |= n >>> 8;
87 n |= n >>> 16;
88 return (n < 0) ? 1 :
89 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
90 }
91
92 /**
93 * Clients (BufferedSubscriptions) are maintained in a linked list
94 * (via their "next" fields). This works well for publish loops.
95 * It requires O(n) traversal to check for duplicate subscribers,
96 * but we expect that subscribing is much less common than
97 * publishing. Unsubscribing occurs only during publish loops,
98 * when BufferedSubscription methods return negative values
99 * signifying that they have been disabled (cancelled or closed).
100 */
101 BufferedSubscription<T> clients;
102
103 // Parameters for constructing BufferedSubscriptions
104 final Executor executor;
105 final int minBufferCapacity;
106 final int maxBufferCapacity;
107
108 /** Run status, updated only within locks */
109 volatile boolean closed;
110
111 /**
112 * Creates a new SubmissionPublisher using the given Executor for
113 * async delivery to subscribers, and with the given initial and
114 * maximum buffer sizes for each subscriber. In the absence of
115 * other constraints, consider using {@code
116 * ForrkJoinPool.commonPool(), 8, 1024}.
117 *
118 * @param executor the executor to use for async delivery,
119 * supporting creation of at least one independent thread
120 * @param initialBufferCapacity the initial capacity for each
121 * subscriber's buffer (the actual capacity may be rounded up to
122 * the nearest power of two)
123 * @param maxBufferCapacity the maximum capacity for each
124 * subscriber's buffer (the actual capacity may be rounded up to
125 * the nearest power of two)
126 * @throws NullPointerException if executor is null
127 * @throws IllegalArgumentException if initialBufferCapacity is
128 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
129 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
130 * a power of two array size
131 */
132 public SubmissionPublisher(Executor executor,
133 int initialBufferCapacity,
134 int maxBufferCapacity) {
135 if (executor == null)
136 throw new NullPointerException();
137 if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
138 throw new IllegalArgumentException("capacity must be positive");
139 if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
140 throw new IllegalArgumentException("capacity exceeds limit");
141 if (initialBufferCapacity > maxBufferCapacity)
142 throw new IllegalArgumentException("initial cannot exceed max capacity");
143 int minc = roundCapacity(initialBufferCapacity);
144 int maxc = roundCapacity(maxBufferCapacity);
145 this.executor = executor;
146 this.minBufferCapacity = minc;
147 this.maxBufferCapacity = maxc;
148 }
149
150 /**
151 * Adds the given Subscriber unless already subscribed. If
152 * already subscribed, the Subscriber's onError method is invoked
153 * with an IllegalStateException. Otherwise, upon success, the
154 * Subscriber's onSubscribe method is invoked with a new
155 * Subscription (upon exception, the exception is rethrown and the
156 * Subscriber remains unsubscribed). If this SubmissionPublisher
157 * is closed, the subscriber's onComplete method is then invoked.
158 * Subscribers may enable receiving items by invoking the request
159 * method of the returned Subscription, and may unsubscribe by
160 * invoking its cancel method.
161 *
162 * @param subscriber the subscriber
163 * @throws NullPointerException if subscriber is null
164 */
165 public void subscribe(Flow.Subscriber<? super T> subscriber) {
166 if (subscriber == null) throw new NullPointerException();
167 BufferedSubscription<T> sub = new BufferedSubscription<T>(
168 subscriber, executor, minBufferCapacity, maxBufferCapacity);
169 boolean present = false, clsd;
170 synchronized (this) {
171 clsd = closed;
172 BufferedSubscription<T> pred = null, next;
173 for (BufferedSubscription<T> b = clients; b != null; b = next) {
174 next = b.next;
175 if (b.ctl < 0) { // disabled; remove
176 if (pred == null)
177 clients = next;
178 else
179 pred.next = next;
180 }
181 else if (subscriber == b.subscriber) {
182 present = true;
183 break;
184 }
185 pred = b;
186 }
187 if (!present) {
188 subscriber.onSubscribe(sub); // don't link on exception
189 if (!clsd) {
190 if (pred == null)
191 clients = sub;
192 else
193 pred.next = sub;
194 }
195 }
196 }
197 if (clsd)
198 subscriber.onComplete();
199 else if (present)
200 subscriber.onError(new IllegalStateException("Already subscribed"));
201 }
202
203 /**
204 * Publishes the given item, if possible, to each current
205 * subscriber by asynchronously invoking its onNext method. The
206 * item may be dropped by one or more subscribers if resource
207 * limits are exceeded, in which case the given handler (if
208 * non-null) is invoked, and if it returns true, retried once.
209 *
210 * <p>If the Executor for this publisher throws a
211 * RejectedExecutionException (or any other RuntimeException or
212 * Error) when attempting to asynchronously notify subscribers, or
213 * the drop handler throws an exception when processing a dropped
214 * item, then this exception is rethrown.
215 *
216 * @param item the (non-null) item to publish
217 * @param onDrop if non-null, the handler invoked upon a drop to a
218 * subscriber, with arguments of the subscriber and item; if it
219 * returns true, an offer is re-attempted (once)
220 * @return the number of drops (failed attempts to issue the item
221 * to a subscriber)
222 * @throws IllegalStateException if closed
223 * @throws NullPointerException if item is null
224 * @throws RejectedExecutionException if thrown by Executor
225 */
226 public int offer(T item,
227 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
228 if (item == null) throw new NullPointerException();
229 int drops = 0;
230 synchronized (this) {
231 if (closed)
232 throw new IllegalStateException("Closed");
233 BufferedSubscription<T> pred = null, next;
234 for (BufferedSubscription<T> b = clients; b != null; b = next) {
235 int stat;
236 next = b.next;
237 if ((stat = b.offer(item)) == 0 &&
238 onDrop != null &&
239 onDrop.test(b.subscriber, item))
240 stat = b.offer(item);
241 if (stat < 0) {
242 if (pred == null)
243 clients = next;
244 else
245 pred.next = next;
246 }
247 else {
248 pred = b;
249 if (stat == 0)
250 ++drops;
251 }
252 }
253 return drops;
254 }
255 }
256
257 /**
258 * Publishes the given item, if possible, to each current
259 * subscriber by asynchronously invoking its onNext method,
260 * blocking while resources for any subscription are unavailable,
261 * up to the specified timeout or the caller thread is
262 * interrupted, at which point the given handler (if non-null)
263 * is invoked, and if it returns true, retried once.
264 *
265 * <p>If the Executor for this publisher throws a
266 * RejectedExecutionException (or any other RuntimeException or
267 * Error) when attempting to asynchronously notify subscribers, or
268 * the drop handler throws an exception when processing a dropped
269 * item, then this exception is rethrown.
270 *
271 * @param item the (non-null) item to publish
272 * @param timeout how long to wait for resources for any subscriber
273 * before giving up, in units of {@code unit}
274 * @param unit a {@code TimeUnit} determining how to interpret the
275 * {@code timeout} parameter
276 * @param onDrop if non-null, the handler invoked upon a drop to a
277 * subscriber, with arguments of the subscriber and item; if it
278 * returns true, an offer is re-attempted (once)
279 * @return the number of drops (failed attempts to issue the item
280 * to a subscriber)
281 * @throws IllegalStateException if closed
282 * @throws NullPointerException if item is null
283 * @throws RejectedExecutionException if thrown by Executor
284 */
285 public int offer(T item, long timeout, TimeUnit unit,
286 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
287 if (item == null) throw new NullPointerException();
288 long nanos = unit.toNanos(timeout);
289 int drops = 0;
290 synchronized (this) {
291 if (closed)
292 throw new IllegalStateException("Closed");
293 BufferedSubscription<T> pred = null, next;
294 for (BufferedSubscription<T> b = clients; b != null; b = next) {
295 int stat;
296 next = b.next;
297 if ((stat = b.offer(item)) == 0 &&
298 (stat = b.timedOffer(item, nanos)) == 0 &&
299 onDrop != null &&
300 onDrop.test(b.subscriber, item))
301 stat = b.offer(item);
302 if (stat < 0) {
303 if (pred == null)
304 clients = next;
305 else
306 pred.next = next;
307 }
308 else {
309 pred = b;
310 if (stat == 0)
311 ++drops;
312 }
313 }
314 }
315 return drops;
316 }
317
318 /**
319 * Publishes the given item to each current subscriber by
320 * asynchronously invoking its onNext method, blocking
321 * uninterruptibly while resources for any subscriber are
322 * unavailable.
323 *
324 * <p>If the Executor for this publisher throws a
325 * RejectedExecutionException (or any other RuntimeException or
326 * Error) when attempting to asynchronously notify subscribers,
327 * then this exception is rethrown.
328 *
329 * @param item the (non-null) item to publish
330 * @throws IllegalStateException if closed
331 * @throws NullPointerException if item is null
332 * @throws RejectedExecutionException if thrown by Executor
333 */
334 public void submit(T item) {
335 if (item == null) throw new NullPointerException();
336 synchronized (this) {
337 if (closed)
338 throw new IllegalStateException("Closed");
339 BufferedSubscription<T> pred = null, next;
340 for (BufferedSubscription<T> b = clients; b != null; b = next) {
341 int stat;
342 next = b.next;
343 if ((stat = b.offer(item)) == 0)
344 stat = b.submit(item);
345 if (stat < 0) {
346 if (pred == null)
347 clients = next;
348 else
349 pred.next = next;
350 }
351 else
352 pred = b;
353 }
354 }
355 }
356
357 /**
358 * Unless already closed, issues onComplete signals to current
359 * subscribers, and disallows subsequent attempts to publish.
360 */
361 public void close() {
362 if (!closed) {
363 BufferedSubscription<T> b, next;
364 synchronized (this) {
365 b = clients;
366 clients = null;
367 closed = true;
368 }
369 while (b != null) {
370 next = b.next;
371 b.close();
372 b = next;
373 }
374 }
375 }
376
377 /**
378 * Unless already closed, issues onError signals to current
379 * subscribers with the given error, and disallows subsequent
380 * attempts to publish.
381 *
382 * @param error the onError argument sent to suibscribers
383 * @throws NullPointerException if error is null
384 */
385 public void closeExceptionally(Throwable error) {
386 if (error == null)
387 throw new NullPointerException();
388 if (!closed) {
389 BufferedSubscription<T> b, next;
390 synchronized (this) {
391 b = clients;
392 clients = null;
393 closed = true;
394 }
395 while (b != null) {
396 next = b.next;
397 b.closeExceptionally(error);
398 b = next;
399 }
400 }
401 }
402
403 /**
404 * Returns true if this publisher is not accepting submissions.
405 *
406 * @return true if closed
407 */
408 public boolean isClosed() {
409 return closed;
410 }
411
412 /**
413 * Returns true if this publisher has any subscribers,
414 *
415 * @return true if this publisher has any subscribers
416 */
417 public boolean hasSubscribers() {
418 boolean nonEmpty = false;
419 if (!closed) {
420 synchronized (this) {
421 BufferedSubscription<T> pred = null, next;
422 for (BufferedSubscription<T> b = clients; b != null; b = next) {
423 next = b.next;
424 if (b.ctl < 0) {
425 if (pred == null)
426 clients = next;
427 else
428 pred.next = next;
429 }
430 else {
431 nonEmpty = true;
432 break;
433 }
434 }
435 }
436 }
437 return nonEmpty;
438 }
439
440 /**
441 * Returns the Executor used for asynchronous delivery.
442 *
443 * @return the Executor used for asynchronous delivery
444 */
445 public Executor getExecutor() {
446 return executor;
447 }
448
449 /*
450 * Returns the initial per-subsciber buffer capacity.
451 *
452 * @return the initial per-subsciber buffer capacity
453 */
454 public int getInitialBufferCapacity() {
455 return minBufferCapacity;
456 }
457
458 /*
459 * Returns the maximum per-subsciber buffer capacity.
460 *
461 * @return the maximum per-subsciber buffer capacity
462 */
463 public int getMaxBufferCapacity() {
464 return maxBufferCapacity;
465 }
466
467 /**
468 * Returns a list of current subscribers.
469 *
470 * @return list of current subscribers
471 */
472 public List<Flow.Subscriber<? super T>> getSubscribers() {
473 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
474 synchronized (this) {
475 BufferedSubscription<T> pred = null, next;
476 for (BufferedSubscription<T> b = clients; b != null; b = next) {
477 next = b.next;
478 if (b.ctl < 0) {
479 if (pred == null)
480 clients = next;
481 else
482 pred.next = next;
483 }
484 else
485 subs.add(b.subscriber);
486 }
487 }
488 return subs;
489 }
490
491 /**
492 * Returns true if the given Subscriber is currently subscribed.
493 *
494 * @param subscriber the subscriber
495 * @return true if currently subscribed
496 */
497 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
498 if (!closed) {
499 synchronized (this) {
500 BufferedSubscription<T> pred = null, next;
501 for (BufferedSubscription<T> b = clients; b != null; b = next) {
502 next = b.next;
503 if (b.ctl < 0) {
504 if (pred == null)
505 clients = next;
506 else
507 pred.next = next;
508 }
509 else if (subscriber == b.subscriber)
510 return true;
511 }
512 }
513 }
514 return false;
515 }
516
517 /**
518 * If the given subscription is managed by a SubmissionPublisher,
519 * returns an estimate of the number of items produced but not yet
520 * consumed; otherwise returns zero. This method is designed only
521 * for monitoring purposes, not for control.
522 *
523 * @param subscription the subscription
524 * @return the estimate, or zero if the subscription is of an
525 * unknown type.
526 */
527 public static int estimateAvailable(Flow.Subscription subscription) {
528 if (subscription instanceof BufferedSubscription)
529 return ((BufferedSubscription)subscription).estimateAvailable();
530 else
531 return 0;
532 }
533
534 /**
535 * Returns a new {@link Flow.Processor} that uses this
536 * SubmissionPublisher to relay transformed items from its source
537 * using method {@link submit}, and using the given request size
538 * for requests to its source subscription. (Typically,
539 * requestSizes should range from the initial and maximum buffer
540 * capacity of this SubmissionPublisher.)
541 *
542 * @param <S> the subscribed item type
543 * @param requestSize the request size for subscriptions
544 * with the source
545 * @param transform the transform function
546 * @return the new Processor
547 * @throws NullPointerException if transform is null
548 * @throws IllegalArgumentException if requestSize not positive
549 */
550 public <S> Flow.Processor<S,T> newTransformProcessor(
551 long requestSize,
552 Function<? super S, ? extends T> transform) {
553 if (requestSize <= 0L)
554 throw new IllegalArgumentException("requestSize must be positive");
555 if (transform == null)
556 throw new NullPointerException();
557 return new TransformProcessor<S,T>(requestSize, transform, this);
558 }
559
560 /**
561 * A task for consuming buffer items and signals, created and
562 * executed whenever they become available. A task consumes as
563 * many items/signals as possible before terminating, at which
564 * point another task is created when needed. The dual Runnable
565 * and ForkJoinTask declaration saves overhead when executed by
566 * ForkJoinPools, without impacting other kinds of Executors.
567 */
568 @SuppressWarnings("serial")
569 static final class ConsumerTask<T> extends ForkJoinTask<Void>
570 implements Runnable {
571 final BufferedSubscription<T> s;
572 ConsumerTask(BufferedSubscription<T> s) { this.s = s; }
573 public final Void getRawResult() { return null; }
574 public final void setRawResult(Void v) {}
575 public final boolean exec() { s.consume(); return true; }
576 public final void run() { s.consume(); }
577 }
578
579 /**
580 * A bounded (ring) buffer with integrated control to start
581 * consumer tasks whenever items are available. The buffer
582 * algorithm is similar to one used inside ForkJoinPool,
583 * specialized for the case of at most one concurrent producer and
584 * consumer, and power of two buffer sizes. This allows methods to
585 * operate without locks even while supporting resizing, blocking,
586 * task-triggering, and garbage-free buffers (nulling out elements
587 * when consumed), although supporting these does impose a bit of
588 * overhead compared to plain fixed-size ring buffers.
589 *
590 * The publisher guarantees a single producer via its lock. We
591 * ensure in this class that there is at most one consumer. The
592 * request and cancel methods must be fully thread-safe but are
593 * coded to exploit the most common case in which they are only
594 * called by consumers (usually within onNext).
595 *
596 * Control over creating ConsumerTasks is managed using the ACTIVE
597 * ctl bit. We ensure that a task is active when consumable items
598 * (and usually, ERROR or COMPLETE signals) are present and there
599 * is demand (unfulfilled requests). This is complicated on the
600 * creation side by the possibility of exceptions when trying to
601 * execute tasks. These eventually force DISABLED state, but
602 * sometimes not directly. On the task side, termination (clearing
603 * ACTIVE) may race with producers or request() calls, so in some
604 * cases requires a re-check, re-activating if possible.
605 *
606 * The ctl field also manages run state. When DISABLED, no further
607 * updates are possible (to simplify checks, DISABLED is defined
608 * as a negative value). Disabling may be preceded by setting
609 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
610 * case the associated Subscriber methods are invoked, possibly
611 * synchronously if there is no active consumer task (including
612 * cases where execute() failed).
613 *
614 * Support for blocking also exploits the fact that there is only
615 * one possible waiter. ManagedBlocker-compatible control fields
616 * are placed in this class itself rather than in wait-nodes.
617 * Blocking control relies on the "waiter" field. Producers set
618 * the field before trying to block, but must then recheck (via
619 * offer) before parking. Signalling then just unparks and clears
620 * waiter field.
621 *
622 * This class uses @Contended and heuristic field declaration
623 * ordering to reduce memory contention on BufferedSubscription
624 * itself, but it does not currently attempt to avoid memory
625 * contention (especially including card-marks) among buffer
626 * elements, that can significantly slow down some usages.
627 * Addressing this may require allocating substantially more space
628 * than users expect.
629 */
630 @sun.misc.Contended
631 static final class BufferedSubscription<T> implements Flow.Subscription,
632 ForkJoinPool.ManagedBlocker {
633 // Order-sensitive field declarations
634 long timeout; // > 0 if timed wait
635 volatile long demand; // # unfilled requests
636 final int minCapacity; // initial buffer size
637 int maxCapacity; // reduced on OOME
638 int putStat; // offer result for ManagedBlocker
639 volatile int ctl; // atomic run state flags
640 volatile int head; // next position to take
641 volatile int tail; // next position to put
642 boolean wasInterrupted; // true if interrupted while waiting
643 volatile Object[] array; // buffer: null if disabled
644 Flow.Subscriber<? super T> subscriber; // null if disabled
645 Executor executor; // null if disabled
646 volatile Throwable pendingError; // holds until onError issued
647 volatile Thread waiter; // blocked producer thread
648 T putItem; // for offer within ManagedBlocker
649 BufferedSubscription<T> next; // used only by publisher
650
651 // ctl values
652 static final int ACTIVE = 0x01; // consumer task active
653 static final int ERROR = 0x02; // signal pending error
654 static final int COMPLETE = 0x04; // signal completion when done
655 static final int DISABLED = 0x80000000; // must be negative
656
657 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
658 Executor executor, int minBufferCapacity,
659 int maxBufferCapacity) {
660 this.subscriber = subscriber;
661 this.executor = executor;
662 this.minCapacity = minBufferCapacity;
663 this.maxCapacity = maxBufferCapacity;
664 }
665
666 /**
667 * @return -1 if disabled, 0 if dropped, else 1
668 */
669 final int offer(T item) {
670 Object[] a = array;
671 int t = tail, h = head, mask;
672 if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
673 return growAndOffer(item);
674 else {
675 a[t & mask] = item;
676 U.putOrderedInt(this, TAIL, t + 1);
677 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
678 }
679 }
680
681 /**
682 * Create or expand buffer if possible, then offer
683 */
684 final int growAndOffer(T item) {
685 int oldLen, len;
686 Object[] oldA = array, a = null;
687 if (oldA != null)
688 len = (oldLen = oldA.length) << 1;
689 else if (ctl < 0)
690 return -1; // disabled
691 else {
692 oldLen = 0;
693 len = minCapacity;
694 }
695 if (oldLen >= maxCapacity || len <= 0)
696 return 0;
697 try {
698 a = new Object[len];
699 } catch (Throwable ex) { // try to cope with OOME
700 if (oldLen != 0) // avoid continuous failure
701 maxCapacity = oldLen;
702 return 0;
703 }
704 array = a;
705 int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
706 if (oldA != null && oldMask >= 0 && mask > oldMask) {
707 for (Object x; j != oldTail; ++j) { // races with consumer
708 int i = j & oldMask;
709 if ((x = oldA[i]) != null &&
710 U.compareAndSwapObject(oldA,
711 (((long)i) << ASHIFT) + ABASE,
712 x, null))
713 a[j & mask] = x;
714 }
715 }
716 a[j & mask] = item;
717 U.putOrderedInt(this, TAIL, j + 1);
718 return startOnOffer();
719 }
720
721 /**
722 * Tries to start consumer task if items are availsble.
723 * Backs out and relays exception if executor fails
724 */
725 final int startOnOffer() {
726 for (;;) {
727 int c; Executor e;
728 if (((c = ctl) & ACTIVE) != 0 || demand == 0L || tail == head)
729 break;
730 if (c < 0 || (e = executor) == null)
731 return -1;
732 if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
733 try {
734 e.execute(new ConsumerTask<T>(this));
735 break;
736 } catch (RuntimeException | Error ex) { // back out
737 for (;;) {
738 if ((c = ctl) < 0 || (c & ACTIVE) == 0 ||
739 U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
740 throw ex;
741 }
742 }
743 }
744 }
745 return 1;
746 }
747
748 /**
749 * Tries to start consumer task upon a signal or request;
750 * disables on failure
751 */
752 final void startOrDisable() {
753 Executor e; // skip if already disabled
754 if ((e = executor) != null) {
755 try {
756 e.execute(new ConsumerTask<T>(this));
757 } catch (Throwable ex) { // back out and force signal
758 for (int c;;) {
759 if ((c = ctl) < 0 || (c & ACTIVE) == 0)
760 break;
761 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
762 closeExceptionally(ex);
763 break;
764 }
765 }
766 }
767 }
768 }
769
770 /**
771 * Nulls out most fields, mainly to avoid garbage retention
772 * until publisher unsubscribes.
773 */
774 final void detach() {
775 pendingError = null;
776 subscriber = null;
777 executor = null;
778 array = null;
779 Thread w = waiter;
780 if (w != null) {
781 waiter = null;
782 LockSupport.unpark(w); // force wakeup
783 }
784 }
785
786 public void request(long n) {
787 if (n > 0L) {
788 for (;;) {
789 long prev = demand, d;
790 if ((d = prev + n) < prev) // saturate
791 d = Long.MAX_VALUE;
792 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
793 for (int c, h;;) {
794 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
795 demand == 0L)
796 break;
797 if ((h = head) != tail) {
798 if (U.compareAndSwapInt(this, CTL, c,
799 c | ACTIVE)) {
800 startOrDisable();
801 break;
802 }
803 }
804 else if (head == h && tail == h)
805 break;
806 }
807 break;
808 }
809 }
810 }
811 else if (n < 0L)
812 closeExceptionally(new IllegalArgumentException(
813 "negative subscription request"));
814 }
815
816 public void cancel() {
817 for (int c;;) {
818 if ((c = ctl) < 0)
819 break;
820 else if ((c & ACTIVE) != 0) {
821 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
822 break; // cause consumer task to exit
823 }
824 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
825 detach();
826 break;
827 }
828 }
829 }
830
831 final void closeExceptionally(Throwable ex) {
832 pendingError = ex;
833 for (int c;;) {
834 if ((c = ctl) < 0)
835 break;
836 else if ((c & ACTIVE) != 0) {
837 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
838 break; // cause consumer task to exit
839 }
840 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
841 Flow.Subscriber<? super T> s = subscriber;
842 if (s != null) {
843 try {
844 s.onError(ex);
845 } catch (Throwable ignore) {
846 }
847 }
848 detach();
849 break;
850 }
851 }
852 }
853
854 final void close() {
855 for (int c;;) {
856 if ((c = ctl) < 0)
857 break;
858 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
859 if ((c & ACTIVE) == 0)
860 startOrDisable();
861 break;
862 }
863 }
864 }
865
866 final int estimateAvailable() {
867 int n;
868 return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
869 }
870
871 // ManagedBlocker support
872
873 public final boolean isReleasable() {
874 T item = putItem;
875 if (item != null) {
876 if ((putStat = offer(item)) == 0)
877 return false;
878 putItem = null;
879 }
880 return true;
881 }
882
883 public final boolean block() {
884 T item = putItem;
885 if (item != null) {
886 putItem = null;
887 long nanos = timeout;
888 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
889 while ((putStat = offer(item)) == 0) {
890 if (Thread.interrupted()) {
891 wasInterrupted = true;
892 if (nanos > 0L)
893 break;
894 }
895 else if (nanos > 0L &&
896 (nanos = deadline - System.nanoTime()) <= 0L)
897 break;
898 else if (waiter == null)
899 waiter = Thread.currentThread();
900 else {
901 if (nanos > 0L)
902 LockSupport.parkNanos(this, nanos);
903 else
904 LockSupport.park(this);
905 waiter = null;
906 }
907 }
908 }
909 waiter = null;
910 return true;
911 }
912
913 final int submit(T item) {
914 putItem = item;
915 timeout = 0L;
916 wasInterrupted = false;
917 try {
918 ForkJoinPool.managedBlock(this);
919 } catch (InterruptedException cantHappen) {
920 }
921 if (wasInterrupted)
922 Thread.currentThread().interrupt();
923 return putStat;
924 }
925
926 final int timedOffer(T item, long nanos) {
927 if (nanos <= 0L)
928 return 0;
929 putItem = item;
930 timeout = nanos;
931 wasInterrupted = false;
932 try {
933 ForkJoinPool.managedBlock(this);
934 } catch (InterruptedException cantHappen) {
935 }
936 if (wasInterrupted)
937 Thread.currentThread().interrupt();
938 return putStat;
939 }
940
941 /** Consume loop called only from ConsumerTask */
942 final void consume() {
943 Flow.Subscriber<? super T> s;
944 if ((s = subscriber) != null) { // else disabled
945 for (;;) {
946 long d = demand; // read volatile fields in acceptable order
947 int c = ctl;
948 int h = head;
949 int t = tail;
950 Object[] a = array;
951 int i, n; Object x; Thread w;
952 if (c < 0) {
953 detach();
954 break;
955 }
956 else if ((c & ERROR) != 0) {
957 Throwable ex = pendingError;
958 ctl = DISABLED; // no need for CAS
959 if (ex != null) {
960 try {
961 s.onError(ex);
962 } catch (Throwable ignore) {
963 }
964 }
965 }
966 else if (h == t) { // empty
967 if (h == tail &&
968 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
969 if (h != tail || c != (c = ctl)) { // recheck
970 if ((c & (ACTIVE | DISABLED)) != 0 ||
971 !U.compareAndSwapInt(this, CTL, c,
972 c | ACTIVE))
973 break;
974 }
975 else if ((c & COMPLETE) != 0) {
976 ctl = DISABLED;
977 try {
978 s.onComplete();
979 } catch (Throwable ignore) {
980 }
981 }
982 else
983 break;
984 }
985 }
986 else if (d == 0L) { // can't take
987 if (demand == 0L &&
988 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
989 ((demand == 0L && c == (c = ctl)) || // recheck
990 (c & (ACTIVE | DISABLED)) != 0 ||
991 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
992 break;
993 }
994 else if (a != null &&
995 (n = a.length) > 0 &&
996 (x = a[i = h & (n - 1)]) != null &&
997 U.compareAndSwapObject(
998 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
999 U.putOrderedInt(this, HEAD, h + 1);
1000 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1001 d = demand; // almost never fails
1002 if ((w = waiter) != null) {
1003 waiter = null;
1004 LockSupport.unpark(w); // release producer
1005 }
1006 try {
1007 @SuppressWarnings("unchecked") T y = (T) x;
1008 s.onNext(y);
1009 } catch (Throwable ex) {
1010 ctl = DISABLED;
1011 }
1012 }
1013 }
1014 }
1015 }
1016
1017 // Unsafe setup
1018 private static final sun.misc.Unsafe U;
1019 private static final long CTL;
1020 private static final long TAIL;
1021 private static final long HEAD;
1022 private static final long DEMAND;
1023 private static final int ABASE;
1024 private static final int ASHIFT;
1025
1026 static {
1027 try {
1028 U = sun.misc.Unsafe.getUnsafe();
1029 Class<?> k = BufferedSubscription.class;
1030 CTL = U.objectFieldOffset
1031 (k.getDeclaredField("ctl"));
1032 TAIL = U.objectFieldOffset
1033 (k.getDeclaredField("tail"));
1034 HEAD = U.objectFieldOffset
1035 (k.getDeclaredField("head"));
1036 DEMAND = U.objectFieldOffset
1037 (k.getDeclaredField("demand"));
1038 Class<?> ak = Object[].class;
1039 ABASE = U.arrayBaseOffset(ak);
1040 int scale = U.arrayIndexScale(ak);
1041 if ((scale & (scale - 1)) != 0)
1042 throw new Error("data type scale not a power of two");
1043 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1044 } catch (Exception e) {
1045 throw new Error(e);
1046 }
1047 }
1048 }
1049
1050 static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1051 final Function<? super U, ? extends R> fn;
1052 final SubmissionPublisher<R> sink;
1053 Flow.Subscription subscription;
1054 final long requestSize;
1055 long count;
1056 TransformProcessor(long requestSize,
1057 Function<? super U, ? extends R> fn,
1058 SubmissionPublisher<R> sink) {
1059 this.fn = fn;
1060 this.sink = sink;
1061 this.requestSize = requestSize;
1062 this.count = requestSize >>> 1;
1063 }
1064 public void subscribe(Flow.Subscriber<? super R> subscriber) {
1065 sink.subscribe(subscriber);
1066 }
1067 public void onSubscribe(Flow.Subscription subscription) {
1068 (this.subscription = subscription).request(requestSize);
1069 }
1070 public void onNext(U item) {
1071 Flow.Subscription s = subscription;
1072 if (s == null)
1073 sink.closeExceptionally(
1074 new IllegalStateException("onNext without subscription"));
1075 else {
1076 try {
1077 if (--count <= 0)
1078 s.request(count = requestSize);
1079 sink.submit(fn.apply(item));
1080 } catch (Throwable ex) {
1081 try {
1082 s.cancel();
1083 } finally {
1084 sink.closeExceptionally(ex);
1085 }
1086 }
1087 }
1088 }
1089 public void onError(Throwable ex) {
1090 sink.closeExceptionally(ex);
1091 }
1092 public void onComplete() {
1093 sink.close();
1094 }
1095 }
1096 }