ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
Revision: 1.15
Committed: Sat Jan 17 15:05:20 2015 UTC (9 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.14: +60 -35 lines
Log Message:
Reduce overhead; add example

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiPredicate;
13 import java.util.function.Function;
14 import java.util.function.Supplier;
15
16 /**
17 * A {@link Flow.Publisher} that asynchronously issues submitted items
18 * to current subscribers until it is closed. Each current subscriber
19 * receives newly submitted items in the same order unless drops or
20 * exceptions are encountered. Using a SubmissionPublisher allows
21 * item generators to act as Publishers, although without integrated
22 * flow control. Instead they rely on drop handling and/or blocking.
23 *
24 * <p>A SubmissionPublisher uses the Executor supplied in its
25 * constructor for delivery to subscribers. The best choice of
26 * Executor depends on expected usage. If the generator(s) of
27 * submitted items run in separate threads, and the number of
28 * subscribers can be estimated, consider using a {@link
29 * Executors#newFixedThreadPool}. Otherwise consider using a
30 * work-stealing pool (including {@link ForkJoinPool#commonPool}).
31 *
32 * <p>Buffering allows producers and consumers to transiently operate
33 * at different rates. Each subscriber uses an independent buffer.
34 * Buffers are created upon first use with a given initial capacity,
35 * and are resized as needed up to the maximum. (Capacity arguments
36 * may be rounded up to powers of two.) Invocations of {@code
37 * subscription.request} do not directly result in buffer expansion,
38 * but risk saturation if unfulfilled requests exceed the maximum
39 * capacity. Choices of buffer parameters rely on expected rates,
40 * resources, and usages, that usually benefit from empirical testing.
41 * As first guesses, consider initial 8 and maximum 1024.
42 *
43 * <p>Publication methods support different policies about what to do
44 * when buffers are saturated. Method {@link #submit} blocks until
45 * resources are available. This is simplest (and often appropriate
46 * for relay stages; see {@link #newTransformProcessor newTransformProcessor}),
47 * but least responsive. The {@code offer} methods may either
48 * immediately, or with bounded timeout, drop items, but provide an
49 * opportunity to interpose a handler and then retry. While the
50 * handler is invoked, other calls to methods in this class by other
51 * threads are blocked. Unless recovery is assured, options are
52 * usually limited to logging the error and/or issuing an onError
53 * signal to the subscriber.
54 *
55 * <p>If any Subscriber method throws an exception, its subscription
56 * is cancelled. If the supplied Executor throws
57 * RejectedExecutionException (or any other RuntimeException or Error)
58 * when attempting to execute a task, or a drop handler throws an
59 * exception when processing a dropped item, then the exception is
60 * rethrown. In these cases, some but not all subscribers may have
61 * received items. It is usually good practice to {@link
62 * #closeExceptionally closeExceptionally} in these cases.
63 *
64 * <p>This class may also serve as a convenient base for subclasses
65 * that generate items, and use the methods in this class to publish
66 * them. For example here is a class that periodically publishes the
67 * items generated from a supplier. (In practice you might add methods
68 * to independently start and stop generation, to share schedulers
69 * among publishers, and so on.)
70 *
71 * <pre> {@code
72 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
73 * final ScheduledFuture<?> periodicTask;
74 * final ScheduledExecutorService scheduler;
75 * PeriodicPublisher(Executor executor, int initialBufferCapacity,
76 * int maxBufferCapacity, Supplier<? extends T> supplier,
77 * long period, TimeUnit unit) {
78 * super(executor, initialBufferCapacity, maxBufferCapacity);
79 * scheduler = new ScheduledThreadPoolExecutor(1);
80 * periodicTask = scheduler.scheduleAtFixedRate(
81 * () -> submit(supplier.get()), 0, period, unit);
82 * }
83 * public void close() {
84 * periodicTask.cancel(false);
85 * scheduler.shutdown();
86 * super.close();
87 * }
88 * }}</pre>
89 *
90 * @param <T> the published item type
91 * @author Doug Lea
92 * @since 1.9
93 */
94 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
95 AutoCloseable {
96 /*
97 * Most mechanics are handled by BufferedSubscription. This class
98 * mainly ensures sequentiality by using built-in synchronization
99 * locks across public methods. (Using built-in locks works well
100 * in the most typical case in which only one thread submits
101 * items).
102 */
103
104 // Ensuring that all arrays have power of two length
105
106 static final int MAXIMUM_BUFFER_CAPACITY = 1 << 30;
107 static final int roundCapacity(int cap) { // to nearest power of 2
108 int n = cap - 1;
109 n |= n >>> 1;
110 n |= n >>> 2;
111 n |= n >>> 4;
112 n |= n >>> 8;
113 n |= n >>> 16;
114 return (n < 0) ? 1 :
115 (n >= MAXIMUM_BUFFER_CAPACITY) ? MAXIMUM_BUFFER_CAPACITY : n + 1;
116 }
117
118 /**
119 * Clients (BufferedSubscriptions) are maintained in a linked list
120 * (via their "next" fields). This works well for publish loops.
121 * It requires O(n) traversal to check for duplicate subscribers,
122 * but we expect that subscribing is much less common than
123 * publishing. Unsubscribing occurs only during traversal loops,
124 * when BufferedSubscription methods or status checks return
125 * negative values signifying that they have been disabled.
126 */
127 BufferedSubscription<T> clients;
128
129 // Parameters for constructing BufferedSubscriptions
130 final Executor executor;
131 final int minBufferCapacity;
132 final int maxBufferCapacity;
133
134 /** Run status, updated only within locks */
135 volatile boolean closed;
136
137 /**
138 * Creates a new SubmissionPublisher using the given Executor for
139 * async delivery to subscribers, and with the given initial and
140 * maximum buffer sizes for each subscriber. In the absence of
141 * other constraints, consider using {@code
142 * ForkJoinPool.commonPool(), 8, 1024}.
143 *
144 * @param executor the executor to use for async delivery,
145 * supporting creation of at least one independent thread
146 * @param initialBufferCapacity the initial capacity for each
147 * subscriber's buffer (the actual capacity may be rounded up to
148 * the nearest power of two)
149 * @param maxBufferCapacity the maximum capacity for each
150 * subscriber's buffer (the actual capacity may be rounded up to
151 * the nearest power of two)
152 * @throws NullPointerException if executor is null
153 * @throws IllegalArgumentException if initialBufferCapacity is
154 * not positive or exceeds maxBufferCapacity, or maxBufferCapacity
155 * exceeds {@code 1<<30} (about 1 billion), the maximum bound for
156 * a power of two array size
157 */
158 public SubmissionPublisher(Executor executor,
159 int initialBufferCapacity,
160 int maxBufferCapacity) {
161 if (executor == null)
162 throw new NullPointerException();
163 if (initialBufferCapacity <= 0 || maxBufferCapacity <= 0)
164 throw new IllegalArgumentException("capacity must be positive");
165 if (maxBufferCapacity > MAXIMUM_BUFFER_CAPACITY)
166 throw new IllegalArgumentException("capacity exceeds limit");
167 if (initialBufferCapacity > maxBufferCapacity)
168 throw new IllegalArgumentException("initial cannot exceed max capacity");
169 int minc = roundCapacity(initialBufferCapacity);
170 int maxc = roundCapacity(maxBufferCapacity);
171 this.executor = executor;
172 this.minBufferCapacity = minc;
173 this.maxBufferCapacity = maxc;
174 }
175
176 /**
177 * Adds the given Subscriber unless already subscribed. If
178 * already subscribed, the Subscriber's onError method is invoked
179 * with an IllegalStateException. Otherwise, upon success, the
180 * Subscriber's onSubscribe method is invoked with a new
181 * Subscription (upon exception, the exception is rethrown and the
182 * Subscriber remains unsubscribed). If this SubmissionPublisher
183 * is closed, the subscriber's onComplete method is then invoked.
184 * Subscribers may enable receiving items by invoking the {@code
185 * request} method of the new Subscription, and may unsubscribe by
186 * invoking its cancel method.
187 *
188 * @param subscriber the subscriber
189 * @throws NullPointerException if subscriber is null
190 */
191 public void subscribe(Flow.Subscriber<? super T> subscriber) {
192 if (subscriber == null) throw new NullPointerException();
193 BufferedSubscription<T> sub = new BufferedSubscription<T>(
194 subscriber, executor, minBufferCapacity, maxBufferCapacity);
195 boolean present = false, clsd;
196 synchronized (this) {
197 clsd = closed;
198 BufferedSubscription<T> pred = null, next;
199 for (BufferedSubscription<T> b = clients; b != null; b = next) {
200 next = b.next;
201 if (b.ctl < 0) { // disabled; remove
202 if (pred == null)
203 clients = next;
204 else
205 pred.next = next;
206 }
207 else if (subscriber == b.subscriber) {
208 present = true;
209 break;
210 }
211 pred = b;
212 }
213 if (!present) {
214 subscriber.onSubscribe(sub); // don't link on exception
215 if (!clsd) {
216 if (pred == null)
217 clients = sub;
218 else
219 pred.next = sub;
220 }
221 }
222 }
223 if (clsd)
224 subscriber.onComplete();
225 else if (present)
226 subscriber.onError(new IllegalStateException("Already subscribed"));
227 }
228
229 /**
230 * Publishes the given item, if possible, to each current
231 * subscriber by asynchronously invoking its onNext method. The
232 * item may be dropped by one or more subscribers if resource
233 * limits are exceeded, in which case the given handler (if
234 * non-null) is invoked, and if it returns true, retried once.
235 *
236 * <p>If the Executor for this publisher throws a
237 * RejectedExecutionException (or any other RuntimeException or
238 * Error) when attempting to asynchronously notify subscribers, or
239 * the drop handler throws an exception when processing a dropped
240 * item, then this exception is rethrown.
241 *
242 * @param item the (non-null) item to publish
243 * @param onDrop if non-null, the handler invoked upon a drop to a
244 * subscriber, with arguments of the subscriber and item; if it
245 * returns true, an offer is re-attempted (once)
246 * @return the number of drops (failed attempts to issue the item
247 * to a subscriber)
248 * @throws IllegalStateException if closed
249 * @throws NullPointerException if item is null
250 * @throws RejectedExecutionException if thrown by Executor
251 */
252 public int offer(T item,
253 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
254 if (item == null) throw new NullPointerException();
255 int drops = 0;
256 synchronized (this) {
257 if (closed)
258 throw new IllegalStateException("Closed");
259 BufferedSubscription<T> pred = null, next;
260 for (BufferedSubscription<T> b = clients; b != null; b = next) {
261 int stat;
262 next = b.next;
263 if ((stat = b.offer(item)) == 0 &&
264 onDrop != null &&
265 onDrop.test(b.subscriber, item))
266 stat = b.offer(item);
267 if (stat < 0) {
268 if (pred == null)
269 clients = next;
270 else
271 pred.next = next;
272 }
273 else {
274 pred = b;
275 if (stat == 0)
276 ++drops;
277 }
278 }
279 return drops;
280 }
281 }
282
283 /**
284 * Publishes the given item, if possible, to each current
285 * subscriber by asynchronously invoking its onNext method,
286 * blocking while resources for any subscription are unavailable,
287 * up to the specified timeout or the caller thread is
288 * interrupted, at which point the given handler (if non-null)
289 * is invoked, and if it returns true, retried once.
290 *
291 * <p>If the Executor for this publisher throws a
292 * RejectedExecutionException (or any other RuntimeException or
293 * Error) when attempting to asynchronously notify subscribers, or
294 * the drop handler throws an exception when processing a dropped
295 * item, then this exception is rethrown.
296 *
297 * @param item the (non-null) item to publish
298 * @param timeout how long to wait for resources for any subscriber
299 * before giving up, in units of {@code unit}
300 * @param unit a {@code TimeUnit} determining how to interpret the
301 * {@code timeout} parameter
302 * @param onDrop if non-null, the handler invoked upon a drop to a
303 * subscriber, with arguments of the subscriber and item; if it
304 * returns true, an offer is re-attempted (once)
305 * @return the number of drops (failed attempts to issue the item
306 * to a subscriber)
307 * @throws IllegalStateException if closed
308 * @throws NullPointerException if item is null
309 * @throws RejectedExecutionException if thrown by Executor
310 */
311 public int offer(T item, long timeout, TimeUnit unit,
312 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
313 if (item == null) throw new NullPointerException();
314 long nanos = unit.toNanos(timeout);
315 int drops = 0;
316 synchronized (this) {
317 if (closed)
318 throw new IllegalStateException("Closed");
319 BufferedSubscription<T> pred = null, next;
320 for (BufferedSubscription<T> b = clients; b != null; b = next) {
321 int stat;
322 next = b.next;
323 if ((stat = b.offer(item)) == 0 &&
324 (stat = b.timedOffer(item, nanos)) == 0 &&
325 onDrop != null &&
326 onDrop.test(b.subscriber, item))
327 stat = b.offer(item);
328 if (stat < 0) {
329 if (pred == null)
330 clients = next;
331 else
332 pred.next = next;
333 }
334 else {
335 pred = b;
336 if (stat == 0)
337 ++drops;
338 }
339 }
340 }
341 return drops;
342 }
343
344 /**
345 * Publishes the given item to each current subscriber by
346 * asynchronously invoking its onNext method, blocking
347 * uninterruptibly while resources for any subscriber are
348 * unavailable.
349 *
350 * <p>If the Executor for this publisher throws a
351 * RejectedExecutionException (or any other RuntimeException or
352 * Error) when attempting to asynchronously notify subscribers,
353 * then this exception is rethrown.
354 *
355 * @param item the (non-null) item to publish
356 * @throws IllegalStateException if closed
357 * @throws NullPointerException if item is null
358 * @throws RejectedExecutionException if thrown by Executor
359 */
360 public void submit(T item) {
361 if (item == null) throw new NullPointerException();
362 synchronized (this) {
363 if (closed)
364 throw new IllegalStateException("Closed");
365 BufferedSubscription<T> pred = null, next;
366 for (BufferedSubscription<T> b = clients; b != null; b = next) {
367 int stat;
368 next = b.next;
369 if ((stat = b.offer(item)) == 0)
370 stat = b.submit(item);
371 if (stat < 0) {
372 if (pred == null)
373 clients = next;
374 else
375 pred.next = next;
376 }
377 else
378 pred = b;
379 }
380 }
381 }
382
383 /**
384 * Unless already closed, issues onComplete signals to current
385 * subscribers, and disallows subsequent attempts to publish.
386 */
387 public void close() {
388 if (!closed) {
389 BufferedSubscription<T> b, next;
390 synchronized (this) {
391 b = clients;
392 clients = null;
393 closed = true;
394 }
395 while (b != null) {
396 next = b.next;
397 b.close();
398 b = next;
399 }
400 }
401 }
402
403 /**
404 * Unless already closed, issues onError signals to current
405 * subscribers with the given error, and disallows subsequent
406 * attempts to publish.
407 *
408 * @param error the onError argument sent to subscribers
409 * @throws NullPointerException if error is null
410 */
411 public void closeExceptionally(Throwable error) {
412 if (error == null)
413 throw new NullPointerException();
414 if (!closed) {
415 BufferedSubscription<T> b, next;
416 synchronized (this) {
417 b = clients;
418 clients = null;
419 closed = true;
420 }
421 while (b != null) {
422 next = b.next;
423 b.closeExceptionally(error);
424 b = next;
425 }
426 }
427 }
428
429 /**
430 * Returns true if this publisher is not accepting submissions.
431 *
432 * @return true if closed
433 */
434 public boolean isClosed() {
435 return closed;
436 }
437
438 /**
439 * Returns true if this publisher has any subscribers.
440 *
441 * @return true if this publisher has any subscribers
442 */
443 public boolean hasSubscribers() {
444 boolean nonEmpty = false;
445 if (!closed) {
446 synchronized (this) {
447 BufferedSubscription<T> pred = null, next;
448 for (BufferedSubscription<T> b = clients; b != null; b = next) {
449 next = b.next;
450 if (b.ctl < 0) {
451 if (pred == null)
452 clients = next;
453 else
454 pred.next = next;
455 }
456 else {
457 nonEmpty = true;
458 break;
459 }
460 }
461 }
462 }
463 return nonEmpty;
464 }
465
466 /**
467 * Returns the Executor used for asynchronous delivery.
468 *
469 * @return the Executor used for asynchronous delivery
470 */
471 public Executor getExecutor() {
472 return executor;
473 }
474
475 /**
476 * Returns the initial per-subscriber buffer capacity.
477 *
478 * @return the initial per-subscriber buffer capacity
479 */
480 public int getInitialBufferCapacity() {
481 return minBufferCapacity;
482 }
483
484 /**
485 * Returns the maximum per-subscriber buffer capacity.
486 *
487 * @return the maximum per-subscriber buffer capacity
488 */
489 public int getMaxBufferCapacity() {
490 return maxBufferCapacity;
491 }
492
493 /**
494 * Returns a list of current subscribers.
495 *
496 * @return list of current subscribers
497 */
498 public List<Flow.Subscriber<? super T>> getSubscribers() {
499 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
500 synchronized (this) {
501 BufferedSubscription<T> pred = null, next;
502 for (BufferedSubscription<T> b = clients; b != null; b = next) {
503 next = b.next;
504 if (b.ctl < 0) {
505 if (pred == null)
506 clients = next;
507 else
508 pred.next = next;
509 }
510 else
511 subs.add(b.subscriber);
512 }
513 }
514 return subs;
515 }
516
517 /**
518 * Returns true if the given Subscriber is currently subscribed.
519 *
520 * @param subscriber the subscriber
521 * @return true if currently subscribed
522 */
523 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
524 if (!closed) {
525 synchronized (this) {
526 BufferedSubscription<T> pred = null, next;
527 for (BufferedSubscription<T> b = clients; b != null; b = next) {
528 next = b.next;
529 if (b.ctl < 0) {
530 if (pred == null)
531 clients = next;
532 else
533 pred.next = next;
534 }
535 else if (subscriber == b.subscriber)
536 return true;
537 }
538 }
539 }
540 return false;
541 }
542
543 /**
544 * If the given subscription is managed by a SubmissionPublisher,
545 * returns an estimate of the number of items produced but not yet
546 * consumed; otherwise returns zero. This method is designed only
547 * for monitoring purposes, not for control.
548 *
549 * @param subscription the subscription
550 * @return the estimate, or zero if the subscription is of an
551 * unknown type
552 */
553 public static int estimateAvailable(Flow.Subscription subscription) {
554 if (subscription instanceof BufferedSubscription)
555 return ((BufferedSubscription)subscription).estimateAvailable();
556 else
557 return 0;
558 }
559
560 /**
561 * Returns a new {@link Flow.Processor} that uses this
562 * SubmissionPublisher to relay transformed items from its source
563 * using method {@link submit}, and using the given request size
564 * for requests to its source subscription. (Typically,
565 * requestSizes should range from the initial and maximum buffer
566 * capacity of this SubmissionPublisher.)
567 *
568 * @param <S> the subscribed item type
569 * @param requestSize the request size for subscriptions
570 * with the source
571 * @param transform the transform function
572 * @return the new Processor
573 * @throws NullPointerException if transform is null
574 * @throws IllegalArgumentException if requestSize not positive
575 */
576 public <S> Flow.Processor<S,T> newTransformProcessor(
577 long requestSize,
578 Function<? super S, ? extends T> transform) {
579 if (requestSize <= 0L)
580 throw new IllegalArgumentException("requestSize must be positive");
581 if (transform == null)
582 throw new NullPointerException();
583 return new TransformProcessor<S,T>(requestSize, transform, this);
584 }
585
586 /**
587 * A bounded (ring) buffer with integrated control to start a
588 * consumer task whenever items are available. The buffer
589 * algorithm is similar to one used inside ForkJoinPool,
590 * specialized for the case of at most one concurrent producer and
591 * consumer, and power of two buffer sizes. This allows methods to
592 * operate without locks even while supporting resizing, blocking,
593 * task-triggering, and garbage-free buffers (nulling out elements
594 * when consumed), although supporting these does impose a bit of
595 * overhead compared to plain fixed-size ring buffers.
596 *
597 * The publisher guarantees a single producer via its lock. We
598 * ensure in this class that there is at most one consumer. The
599 * request and cancel methods must be fully thread-safe but are
600 * coded to exploit the most common case in which they are only
601 * called by consumers (usually within onNext).
602 *
603 * This class also serves as its own consumer task, consuming as
604 * many items/signals as possible before terminating, at which
605 * point it is re-executed created when needed. (The dual Runnable
606 * and ForkJoinTask declaration saves overhead when executed by
607 * ForkJoinPools, without impacting other kinds of Executors.)
608 * Execution control is managed using the ACTIVE ctl bit. We
609 * ensure that a task is active when consumable items (and
610 * usually, ERROR or COMPLETE signals) are present and there is
611 * demand (unfulfilled requests). This is complicated on the
612 * creation side by the possibility of exceptions when trying to
613 * execute tasks. These eventually force DISABLED state, but
614 * sometimes not directly. On the task side, termination (clearing
615 * ACTIVE) may race with producers or request() calls, so in some
616 * cases requires a re-check, re-activating if possible.
617 *
618 * The ctl field also manages run state. When DISABLED, no further
619 * updates are possible (to simplify checks, DISABLED is defined
620 * as a negative value). Disabling may be preceded by setting
621 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
622 * case the associated Subscriber methods are invoked, possibly
623 * synchronously if there is no active consumer task (including
624 * cases where execute() failed).
625 *
626 * Support for blocking also exploits the fact that there is only
627 * one possible waiter. ManagedBlocker-compatible control fields
628 * are placed in this class itself rather than in wait-nodes.
629 * Blocking control relies on the "waiter" field. Producers set
630 * the field before trying to block, but must then recheck (via
631 * offer) before parking. Signalling then just unparks and clears
632 * waiter field.
633 *
634 * This class uses @Contended and heuristic field declaration
635 * ordering to reduce memory contention on BufferedSubscription
636 * itself, but it does not currently attempt to avoid memory
637 * contention (especially including card-marks) among buffer
638 * elements, that can significantly slow down some usages.
639 * Addressing this may require allocating substantially more space
640 * than users expect.
641 */
642 @SuppressWarnings("serial")
643 @sun.misc.Contended
644 static final class BufferedSubscription<T> extends ForkJoinTask<Void>
645 implements Runnable, Flow.Subscription, ForkJoinPool.ManagedBlocker {
646 // Order-sensitive field declarations
647 long timeout; // > 0 if timed wait
648 volatile long demand; // # unfilled requests
649 final int minCapacity; // initial buffer size
650 int maxCapacity; // reduced on OOME
651 int putStat; // offer result for ManagedBlocker
652 volatile int ctl; // atomic run state flags
653 volatile int head; // next position to take
654 volatile int tail; // next position to put
655 boolean wasInterrupted; // true if interrupted while waiting
656 volatile Object[] array; // buffer: null if disabled
657 Flow.Subscriber<? super T> subscriber; // null if disabled
658 Executor executor; // null if disabled
659 volatile Throwable pendingError; // holds until onError issued
660 volatile Thread waiter; // blocked producer thread
661 T putItem; // for offer within ManagedBlocker
662 BufferedSubscription<T> next; // used only by publisher
663
664 // ctl values
665 static final int ACTIVE = 0x01; // consumer task active
666 static final int ERROR = 0x02; // signal pending error
667 static final int COMPLETE = 0x04; // signal completion when done
668 static final int DISABLED = 0x80000000; // must be negative
669
670 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
671 Executor executor, int minBufferCapacity,
672 int maxBufferCapacity) {
673 this.subscriber = subscriber;
674 this.executor = executor;
675 this.minCapacity = minBufferCapacity;
676 this.maxCapacity = maxBufferCapacity;
677 }
678
679 /**
680 * @return -1 if disabled, 0 if dropped, else 1
681 */
682 final int offer(T item) {
683 Object[] a = array;
684 int t = tail, h = head, mask;
685 if (a == null || (mask = a.length - 1) < 0 || t - h >= mask)
686 return growAndOffer(item);
687 else {
688 a[t & mask] = item;
689 U.putOrderedInt(this, TAIL, t + 1);
690 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
691 }
692 }
693
694 /**
695 * Creates or expands buffer if possible, then offers.
696 */
697 final int growAndOffer(T item) {
698 int oldLen, len;
699 Object[] oldA = array, a = null;
700 if (oldA != null)
701 len = (oldLen = oldA.length) << 1;
702 else if (ctl < 0)
703 return -1; // disabled
704 else {
705 oldLen = 0;
706 len = minCapacity;
707 }
708 if (oldLen >= maxCapacity || len <= 0)
709 return 0;
710 try {
711 a = new Object[len];
712 } catch (Throwable ex) { // try to cope with OOME
713 if (oldLen != 0) // avoid continuous failure
714 maxCapacity = oldLen;
715 return 0;
716 }
717 array = a;
718 int oldMask = oldLen - 1, mask = len - 1, oldTail = tail, j = head;
719 if (oldA != null && oldMask >= 0 && mask > oldMask) {
720 for (Object x; j != oldTail; ++j) { // races with consumer
721 int i = j & oldMask;
722 if ((x = oldA[i]) != null &&
723 U.compareAndSwapObject(oldA,
724 (((long)i) << ASHIFT) + ABASE,
725 x, null))
726 a[j & mask] = x;
727 }
728 }
729 a[j & mask] = item;
730 U.putOrderedInt(this, TAIL, j + 1);
731 return (ctl & ACTIVE) != 0 ? 1 : startOnOffer();
732 }
733
734 /**
735 * Tries to start consumer task if items are available.
736 * Backs out and relays exception if executor fails.
737 */
738 final int startOnOffer() {
739 if (demand != 0L) {
740 for (int c;;) {
741 Executor e = executor;
742 if (((c = ctl) & ACTIVE) != 0)
743 break;
744 else if (c < 0 || e == null)
745 return -1;
746 else if (U.compareAndSwapInt(this, CTL, c, c | ACTIVE)) {
747 try {
748 e.execute(this);
749 break;
750 } catch (RuntimeException | Error ex) { // back out
751 do {} while ((c = ctl) >= 0 &&
752 (c & ACTIVE) != 0 &&
753 !U.compareAndSwapInt(this, CTL, c,
754 c & ~ACTIVE));
755 throw ex;
756 }
757 }
758 else if (demand == 0L || tail == head)
759 break;
760 }
761 }
762 return 1;
763 }
764
765 /**
766 * Tries to start consumer task upon a signal or request;
767 * disables on failure.
768 */
769 final void startOrDisable() {
770 Executor e; // skip if already disabled
771 if ((e = executor) != null) {
772 try {
773 e.execute(this);
774 } catch (Throwable ex) { // back out and force signal
775 for (int c;;) {
776 if ((c = ctl) < 0 || (c & ACTIVE) == 0)
777 break;
778 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
779 closeExceptionally(ex);
780 break;
781 }
782 }
783 }
784 }
785 }
786
787 /**
788 * Nulls out most fields, mainly to avoid garbage retention
789 * until publisher unsubscribes.
790 */
791 final void detach() {
792 pendingError = null;
793 subscriber = null;
794 executor = null;
795 array = null;
796 Thread w = waiter;
797 if (w != null) {
798 waiter = null;
799 LockSupport.unpark(w); // force wakeup
800 }
801 }
802
803 public void request(long n) {
804 if (n > 0L) {
805 for (;;) {
806 long prev = demand, d;
807 if ((d = prev + n) < prev) // saturate
808 d = Long.MAX_VALUE;
809 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
810 for (int c, h;;) {
811 if (((c = ctl) & (ACTIVE | DISABLED)) != 0 ||
812 demand == 0L)
813 break;
814 if ((h = head) != tail) {
815 if (U.compareAndSwapInt(this, CTL, c,
816 c | ACTIVE)) {
817 startOrDisable();
818 break;
819 }
820 }
821 else if (head == h && tail == h)
822 break;
823 }
824 break;
825 }
826 }
827 }
828 else if (n < 0L)
829 closeExceptionally(new IllegalArgumentException(
830 "negative subscription request"));
831 }
832
833 public void cancel() {
834 for (int c;;) {
835 if ((c = ctl) < 0)
836 break;
837 else if ((c & ACTIVE) != 0) {
838 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
839 break; // cause consumer task to exit
840 }
841 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
842 detach();
843 break;
844 }
845 }
846 }
847
848 final void closeExceptionally(Throwable ex) {
849 pendingError = ex;
850 for (int c;;) {
851 if ((c = ctl) < 0)
852 break;
853 else if ((c & ACTIVE) != 0) {
854 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
855 break; // cause consumer task to exit
856 }
857 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
858 Flow.Subscriber<? super T> s = subscriber;
859 if (s != null) {
860 try {
861 s.onError(ex);
862 } catch (Throwable ignore) {
863 }
864 }
865 detach();
866 break;
867 }
868 }
869 }
870
871 final void close() {
872 for (int c;;) {
873 if ((c = ctl) < 0)
874 break;
875 if (U.compareAndSwapInt(this, CTL, c, c | (ACTIVE | COMPLETE))) {
876 if ((c & ACTIVE) == 0)
877 startOrDisable();
878 break;
879 }
880 }
881 }
882
883 final int estimateAvailable() {
884 int n;
885 return (ctl >= 0 && (n = tail - head) > 0) ? n : 0;
886 }
887
888 // ManagedBlocker support
889
890 public final boolean isReleasable() {
891 T item = putItem;
892 if (item != null) {
893 if ((putStat = offer(item)) == 0)
894 return false;
895 putItem = null;
896 }
897 return true;
898 }
899
900 public final boolean block() {
901 T item = putItem;
902 if (item != null) {
903 putItem = null;
904 long nanos = timeout;
905 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
906 while ((putStat = offer(item)) == 0) {
907 if (Thread.interrupted()) {
908 wasInterrupted = true;
909 if (nanos > 0L)
910 break;
911 }
912 else if (nanos > 0L &&
913 (nanos = deadline - System.nanoTime()) <= 0L)
914 break;
915 else if (waiter == null)
916 waiter = Thread.currentThread();
917 else {
918 if (nanos > 0L)
919 LockSupport.parkNanos(this, nanos);
920 else
921 LockSupport.park(this);
922 waiter = null;
923 }
924 }
925 }
926 waiter = null;
927 return true;
928 }
929
930 final int submit(T item) {
931 putItem = item;
932 timeout = 0L;
933 wasInterrupted = false;
934 try {
935 ForkJoinPool.managedBlock(this);
936 } catch (InterruptedException cantHappen) {
937 }
938 if (wasInterrupted)
939 Thread.currentThread().interrupt();
940 return putStat;
941 }
942
943 final int timedOffer(T item, long nanos) {
944 if (nanos <= 0L)
945 return 0;
946 putItem = item;
947 timeout = nanos;
948 wasInterrupted = false;
949 try {
950 ForkJoinPool.managedBlock(this);
951 } catch (InterruptedException cantHappen) {
952 }
953 if (wasInterrupted)
954 Thread.currentThread().interrupt();
955 return putStat;
956 }
957
958 /** Consumer task loop */
959 public final void run() {
960 Flow.Subscriber<? super T> s;
961 if ((s = subscriber) != null) { // else disabled
962 for (;;) {
963 long d = demand; // read volatile fields in acceptable order
964 int c = ctl;
965 int h = head;
966 int t = tail;
967 Object[] a = array;
968 int i, n; Object x; Thread w;
969 if (c < 0) {
970 detach();
971 break;
972 }
973 else if ((c & ERROR) != 0) {
974 Throwable ex = pendingError;
975 ctl = DISABLED; // no need for CAS
976 if (ex != null) {
977 try {
978 s.onError(ex);
979 } catch (Throwable ignore) {
980 }
981 }
982 }
983 else if (h == t) { // empty
984 if (h == tail &&
985 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE)) {
986 if (h != tail || c != (c = ctl)) { // recheck
987 if ((c & (ACTIVE | DISABLED)) != 0 ||
988 !U.compareAndSwapInt(this, CTL, c,
989 c | ACTIVE))
990 break;
991 }
992 else if ((c & COMPLETE) != 0) {
993 ctl = DISABLED;
994 try {
995 s.onComplete();
996 } catch (Throwable ignore) {
997 }
998 }
999 else
1000 break;
1001 }
1002 }
1003 else if (d == 0L) { // can't take
1004 if (demand == 0L &&
1005 U.compareAndSwapInt(this, CTL, c, c &= ~ACTIVE) &&
1006 ((demand == 0L && c == (c = ctl)) || // recheck
1007 (c & (ACTIVE | DISABLED)) != 0 ||
1008 !U.compareAndSwapInt(this, CTL, c, c | ACTIVE)))
1009 break;
1010 }
1011 else if (a != null &&
1012 (n = a.length) > 0 &&
1013 (x = a[i = h & (n - 1)]) != null &&
1014 U.compareAndSwapObject(
1015 a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1016 U.putOrderedInt(this, HEAD, h + 1);
1017 while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
1018 d = demand; // almost never fails
1019 if ((w = waiter) != null) {
1020 waiter = null;
1021 LockSupport.unpark(w); // release producer
1022 }
1023 try {
1024 @SuppressWarnings("unchecked") T y = (T) x;
1025 s.onNext(y);
1026 } catch (Throwable ex) {
1027 ctl = DISABLED;
1028 }
1029 }
1030 }
1031 }
1032 }
1033
1034 /** Allows resubmission when used as ForkJoinTask */
1035 public final boolean exec() {
1036 try {
1037 run();
1038 } catch (Throwable ex) {
1039 reinitialize();
1040 }
1041 return false; // must return false
1042 }
1043 public final Void getRawResult() { return null; }
1044 public final void setRawResult(Void v) {}
1045
1046 // Unsafe mechanics
1047 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1048 private static final long CTL;
1049 private static final long TAIL;
1050 private static final long HEAD;
1051 private static final long DEMAND;
1052 private static final int ABASE;
1053 private static final int ASHIFT;
1054
1055 static {
1056 try {
1057 CTL = U.objectFieldOffset
1058 (BufferedSubscription.class.getDeclaredField("ctl"));
1059 TAIL = U.objectFieldOffset
1060 (BufferedSubscription.class.getDeclaredField("tail"));
1061 HEAD = U.objectFieldOffset
1062 (BufferedSubscription.class.getDeclaredField("head"));
1063 DEMAND = U.objectFieldOffset
1064 (BufferedSubscription.class.getDeclaredField("demand"));
1065
1066 ABASE = U.arrayBaseOffset(Object[].class);
1067 int scale = U.arrayIndexScale(Object[].class);
1068 if ((scale & (scale - 1)) != 0)
1069 throw new Error("data type scale not a power of two");
1070 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1071 } catch (ReflectiveOperationException e) {
1072 throw new Error(e);
1073 }
1074 }
1075 }
1076
1077 static final class TransformProcessor<U,R> implements Flow.Processor<U,R> {
1078 final Function<? super U, ? extends R> fn;
1079 final SubmissionPublisher<R> sink;
1080 Flow.Subscription subscription;
1081 final long requestSize;
1082 long count;
1083 TransformProcessor(long requestSize,
1084 Function<? super U, ? extends R> fn,
1085 SubmissionPublisher<R> sink) {
1086 this.fn = fn;
1087 this.sink = sink;
1088 this.requestSize = requestSize;
1089 this.count = requestSize >>> 1;
1090 }
1091 public void subscribe(Flow.Subscriber<? super R> subscriber) {
1092 sink.subscribe(subscriber);
1093 }
1094 public void onSubscribe(Flow.Subscription subscription) {
1095 (this.subscription = subscription).request(requestSize);
1096 }
1097 public void onNext(U item) {
1098 Flow.Subscription s = subscription;
1099 if (s == null)
1100 sink.closeExceptionally(
1101 new IllegalStateException("onNext without subscription"));
1102 else {
1103 try {
1104 if (--count <= 0)
1105 s.request(count = requestSize);
1106 sink.submit(fn.apply(item));
1107 } catch (Throwable ex) {
1108 try {
1109 s.cancel();
1110 } finally {
1111 sink.closeExceptionally(ex);
1112 }
1113 }
1114 }
1115 }
1116 public void onError(Throwable ex) {
1117 sink.closeExceptionally(ex);
1118 }
1119 public void onComplete() {
1120 sink.close();
1121 }
1122 }
1123
1124 }