next = b.next;
if (b.isDisabled()) { // remove
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
present = true;
break;
}
else
pred = b;
b = next;
}
if (!present) {
if (pred == null)
clients = subscription;
else
pred.next = subscription;
subscription.onSubscribe();
}
}
}
if (present)
subscriber.onError(new IllegalStateException("Already subscribed"));
else if (complete)
subscription.onComplete();
}
/**
* Publishes the given item to each current subscriber by
* asynchronously invoking its onNext method, blocking
* uninterruptibly while resources for any subscriber are
* unavailable. This method returns an estimate of the maximum lag
* (number of items submitted but not yet consumed) among all
* current subscribers. This value is at least one (accounting for
* this submitted item) if there are any subscribers, else zero.
*
* If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers,
* then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @return the estimated maximum lag among subscribers
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int submit(T item) {
/*
* To reduce head-of-line blocking, try offer() on each,
* place saturated ones in retries list, and later wait
* them out.
*/
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete;
synchronized (this) {
complete = closed;
BufferedSubscription b = clients, pred = null;
BufferedSubscription retries = null, rtail = null;
if (!complete) {
while (b != null) {
int stat;
BufferedSubscription next = b.next;
if ((stat = b.offer(item)) < 0) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if (stat == 0) {
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
stat = maxBufferCapacity;
}
if (stat > lag)
lag = stat;
pred = b;
}
b = next;
}
if (retries != null)
retrySubmit(retries, item);
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
}
/**
* Calls submit on each subscription on retry list.
*/
private void retrySubmit(BufferedSubscription retries, T item) {
for (BufferedSubscription r = retries; r != null;) {
BufferedSubscription nextRetry = r.nextRetry;
r.nextRetry = null;
r.submit(item);
r = nextRetry;
}
}
/**
* Publishes the given item, if possible, to each current
* subscriber by asynchronously invoking its onNext method. The
* item may be dropped by one or more subscribers if resource
* limits are exceeded, in which case the given handler (if
* non-null) is invoked, and if it returns true, retried once.
* Other calls to methods in this class by other threads are
* blocked while the handler is invoked. Unless recovery is
* assured, options are usually limited to logging the error
* and/or issuing an onError signal to the subscriber.
*
* This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
*
If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
* @return if negative, the (negative) number of drops; otherwise
* an estimate of maximum lag
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int offer(T item,
BiPredicate, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0, drops = 0;
boolean complete;
synchronized (this) {
complete = closed;
BufferedSubscription b = clients, pred = null, next;
if (!complete) {
for (; b != null; b = next) {
int stat;
next = b.next;
if ((stat = b.offer(item)) == 0 &&
onDrop != null &&
onDrop.test(b.subscriber, item))
stat = b.offer(item);
if (stat < 0) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if (stat == 0)
++drops;
else if (stat > lag)
lag = stat;
pred = b;
}
}
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return (drops > 0) ? -drops : lag;
}
/**
* Publishes the given item, if possible, to each current
* subscriber by asynchronously invoking its onNext method,
* blocking while resources for any subscription are unavailable,
* up to the specified timeout or the caller thread is
* interrupted, at which point the given handler (if non-null) is
* invoked, and if it returns true, retried once. (The drop
* handler may distinguish timeouts from interrupts by checking
* whether the current thread is interrupted.) Other calls to
* methods in this class by other threads are blocked while the
* handler is invoked. Unless recovery is assured, options are
* usually limited to logging the error and/or issuing an onError
* signal to the subscriber.
*
* This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
*
If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param timeout how long to wait for resources for any subscriber
* before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
* @return if negative, the (negative) number of drops; otherwise
* an estimate of maximum lag
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int offer(T item, long timeout, TimeUnit unit,
BiPredicate, ? super T> onDrop) {
// Same idea as submit
if (item == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
if (nanos <= 0L)
return offer(item, onDrop);
int lag = 0, drops = 0;
boolean complete;
synchronized (this) {
complete = closed;
BufferedSubscription b = clients, pred = null;
BufferedSubscription retries = null, rtail = null;
if (!complete) {
while (b != null) {
int stat;
BufferedSubscription next = b.next;
if ((stat = b.offer(item)) < 0) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if (stat == 0) {
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
stat = maxBufferCapacity;
}
if (stat > lag)
lag = stat;
pred = b;
}
b = next;
}
if (retries != null)
drops = retryTimedOffer(retries, item, nanos, onDrop);
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return (drops > 0) ? -drops : lag;
}
/**
* Calls timedOffer on each subscription on retry list, retrying
* offer if onDrop true.
* @return number of drops
*/
private int retryTimedOffer(BufferedSubscription retries, T item,
long nanos,
BiPredicate, ? super T> onDrop) {
int drops = 0;
for (BufferedSubscription r = retries; r != null;) {
BufferedSubscription nextRetry = r.nextRetry;
r.nextRetry = null;
if (r.timedOffer(item, nanos) == 0 &&
(onDrop == null || !onDrop.test(r.subscriber, item) ||
r.offer(item) == 0))
++drops;
r = nextRetry;
}
return drops;
}
/**
* Unless already closed, issues onComplete signals to current
* subscribers, and disallows subsequent attempts to publish.
*/
public void close() {
if (!closed) {
BufferedSubscription b, next;
synchronized (this) {
b = clients;
clients = null;
closed = true;
}
while (b != null) {
next = b.next;
b.onComplete();
b = next;
}
}
}
/**
* Unless already closed, issues onError signals to current
* subscribers with the given error, and disallows subsequent
* attempts to publish.
*
* @param error the onError argument sent to subscribers
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
if (!closed) {
BufferedSubscription b, next;
synchronized (this) {
b = clients;
clients = null;
closed = true;
}
while (b != null) {
next = b.next;
b.onError(error);
b = next;
}
}
}
/**
* Returns true if this publisher is not accepting submissions.
*
* @return true if closed
*/
public boolean isClosed() {
return closed;
}
/**
* Returns true if this publisher has any subscribers.
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
if (!closed) {
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
next = b.next;
if (b.isDisabled()) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
nonEmpty = true;
break;
}
}
}
}
return nonEmpty;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
int count = 0;
if (!closed) {
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
next = b.next;
if (b.isDisabled()) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
pred = b;
++count;
}
}
}
}
return count;
}
/**
* Returns the Executor used for asynchronous delivery.
*
* @return the Executor used for asynchronous delivery
*/
public Executor getExecutor() {
return executor;
}
/**
* Returns the maximum per-subscriber buffer capacity.
*
* @return the maximum per-subscriber buffer capacity
*/
public int getMaxBufferCapacity() {
return maxBufferCapacity;
}
/**
* Returns a list of current subscribers.
*
* @return list of current subscribers
*/
public List> getSubscribers() {
ArrayList> subs = new ArrayList<>();
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
next = b.next;
if (b.isDisabled()) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else
subs.add(b.subscriber);
}
}
return subs;
}
/**
* Returns true if the given Subscriber is currently subscribed.
*
* @param subscriber the subscriber
* @return true if currently subscribed
* @throws NullPointerException if subscriber is null
*/
public boolean isSubscribed(Flow.Subscriber super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
if (!closed) {
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
next = b.next;
if (b.isDisabled()) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber))
return true;
else
pred = b;
}
}
}
return false;
}
/**
* Returns an estimate of the minimum number of items requested
* (via {@link Flow.Subscription#request}) but not yet produced,
* among all current subscribers.
*
* @return the estimate, or zero if no subscribers
*/
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
int n; long d;
next = b.next;
if ((n = b.estimateLag()) < 0) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if ((d = b.demand - n) < min)
min = d;
nonEmpty = true;
}
}
}
return nonEmpty ? min : 0;
}
/**
* Returns an estimate of the maximum number of items produced but
* not yet consumed among all current subscribers.
*
* @return the estimate
*/
public int estimateMaximumLag() {
int max = 0;
synchronized (this) {
BufferedSubscription pred = null, next;
for (BufferedSubscription b = clients; b != null; b = next) {
int n;
next = b.next;
if ((n = b.estimateLag()) < 0) {
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (n > max)
max = n;
}
}
return max;
}
/**
* A task for consuming buffer items and signals, created and
* executed whenever they become available. A task consumes as
* many items/signals as possible before terminating, at which
* point another task is created when needed. The dual Runnable
* and ForkJoinTask declaration saves overhead when executed by
* ForkJoinPools, without impacting other kinds of Executors.
*/
@SuppressWarnings("serial")
static final class ConsumerTask extends ForkJoinTask
implements Runnable {
final BufferedSubscription consumer;
ConsumerTask(BufferedSubscription consumer) {
this.consumer = consumer;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { consumer.consume(); return false; }
public final void run() { consumer.consume(); }
}
/**
* A bounded (ring) buffer with integrated control to start a
* consumer task whenever items are available. The buffer
* algorithm is similar to one used inside ForkJoinPool,
* specialized for the case of at most one concurrent producer and
* consumer, and power of two buffer sizes. This allows methods to
* operate without locks even while supporting resizing, blocking,
* task-triggering, and garbage-free buffers (nulling out elements
* when consumed), although supporting these does impose a bit of
* overhead compared to plain fixed-size ring buffers.
*
* The publisher guarantees a single producer via its lock. We
* ensure in this class that there is at most one consumer. The
* request and cancel methods must be fully thread-safe but are
* coded to exploit the most common case in which they are only
* called by consumers (usually within onNext).
*
* Execution control is managed using the ACTIVE ctl bit. We
* ensure that a task is active when consumable items (and
* usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
* there is demand (unfilled requests). This is complicated on
* the creation side by the possibility of exceptions when trying
* to execute tasks. These eventually force DISABLED state, but
* sometimes not directly. On the task side, termination (clearing
* ACTIVE) that would otherwise race with producers or request()
* calls uses the KEEPALIVE bit to force a recheck.
*
* The ctl field also manages run state. When DISABLED, no further
* updates are possible. Disabling may be preceded by setting
* ERROR or COMPLETE (or both -- ERROR has precedence), in which
* case the associated Subscriber methods are invoked, possibly
* synchronously if there is no active consumer task (including
* cases where execute() failed). The cancel() method is supported
* by treating as ERROR but suppressing onError signal.
*
* Support for blocking also exploits the fact that there is only
* one possible waiter. ManagedBlocker-compatible control fields
* are placed in this class itself rather than in wait-nodes.
* Blocking control relies on the "waiter" field. Producers set
* the field before trying to block, but must then recheck (via
* offer) before parking. Signalling then just unparks and clears
* waiter field. If the producer and consumer are both in the same
* ForkJoinPool, the producer attempts to help run consumer tasks
* that it forked before blocking.
*
* This class uses @Contended and heuristic field declaration
* ordering to reduce memory contention on BufferedSubscription
* itself, but it does not currently attempt to avoid memory
* contention (especially including card-marks) among buffer
* elements, that can significantly slow down some usages.
* Addressing this may require allocating substantially more space
* than users expect.
*/
@SuppressWarnings("serial")
@sun.misc.Contended
static final class BufferedSubscription
implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
// Order-sensitive field declarations
long timeout; // > 0 if timed wait
volatile long demand; // # unfilled requests
int maxCapacity; // reduced on OOME
int putStat; // offer result for ManagedBlocker
volatile int ctl; // atomic run state flags
volatile int head; // next position to take
volatile int tail; // next position to put
Object[] array; // buffer: null if disabled
Flow.Subscriber super T> subscriber; // null if disabled
Executor executor; // null if disabled
volatile Throwable pendingError; // holds until onError issued
volatile Thread waiter; // blocked producer thread
T putItem; // for offer within ManagedBlocker
BufferedSubscription next; // used only by publisher
BufferedSubscription nextRetry; // used only by publisher
// ctl values
static final int ACTIVE = 0x01; // consumer task active
static final int KEEPALIVE = 0x02; // force termination recheck
static final int DISABLED = 0x04; // final state
static final int ERROR = 0x08; // signal onError then disable
static final int SUBSCRIBE = 0x10; // signal onSubscribe
static final int COMPLETE = 0x20; // signal onComplete when done
static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
/**
* Initial/Minimum buffer capacity. Must be a power of two, at least 2.
*/
static final int MINCAP = 8;
BufferedSubscription(Flow.Subscriber super T> subscriber,
Executor executor, int maxBufferCapacity) {
this.subscriber = subscriber;
this.executor = executor;
this.maxCapacity = maxBufferCapacity;
}
final boolean isDisabled() {
return ctl == DISABLED;
}
/**
* Returns estimated number of buffered items, or -1 if
* disabled
*/
final int estimateLag() {
int n;
return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
}
/**
* Tries to add item and start consumer task if necessary.
* @return -1 if disabled, 0 if dropped, else estimated lag
*/
final int offer(T item) {
Object[] a = array;
int t = tail, size = t + 1 - head, stat, cap, i;
if (a == null || (cap = a.length) < size || (i = t & (cap - 1)) < 0)
stat = growAndAdd(a, item);
else {
a[i] = item;
U.putOrderedInt(this, TAIL, t + 1);
stat = size;
}
return (stat > 0 && (ctl & (ACTIVE|KEEPALIVE)) != (ACTIVE|KEEPALIVE)) ?
startOnOffer(stat) : stat;
}
/**
* Tries to create or expand buffer, then adds item if possible
*/
private int growAndAdd(Object[] a, T item) {
int cap, stat;
if ((ctl & (ERROR | DISABLED)) != 0) {
cap = 0;
stat = -1;
}
else if (a == null) {
cap = 0;
stat = 1;
}
else if ((cap = a.length) >= maxCapacity)
stat = 0; // cannot grow
else
stat = cap + 1;
if (stat > 0) {
int newCap = (cap > 0 ? cap << 1 :
maxCapacity >= MINCAP ? MINCAP :
maxCapacity >= 2 ? maxCapacity : 2);
if (newCap <= cap)
stat = 0;
else {
Object[] newArray = null;
try {
newArray = new Object[newCap];
} catch (Throwable ex) { // try to cope with OOME
}
if (newArray == null) {
if (cap > 0)
maxCapacity = cap; // avoid continuous failure
stat = 0;
}
else {
array = newArray;
int t = tail;
int newMask = newCap - 1;
if (a != null && cap > 0) {
int mask = cap - 1;
for (int j = head; j != t; ++j) {
Object x; // races with consumer
int k = j & mask;
if ((x = a[k]) != null &&
U.compareAndSwapObject(
a, (((long)k) << ASHIFT) + ABASE,
x, null))
newArray[j & newMask] = x;
}
}
newArray[t & newMask] = item;
tail = t + 1;
}
}
}
return stat;
}
/**
* Spins/helps/blocks while offer returns 0. Called only if
* initial offer return 0. Tries helping if either the
* producer and consumers are in same ForkJoinPool, or
* consumers are in commonPool.
*
*/
final int submit(T item) {
Executor e = executor;
Thread thread = Thread.currentThread();
int stat = 0;
if (e instanceof ForkJoinPool) {
if ((thread instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)thread).getPool() == e) {
ForkJoinTask> t;
while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
(t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 || !t.tryUnfork())
break;
((ConsumerTask>)t).consumer.consume();
}
}
else if (e == ForkJoinPool.commonPool()) {
ForkJoinTask> t;
while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
(t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 || !t.tryUnfork())
break;
((ConsumerTask>)t).consumer.consume();
}
}
}
if (stat == 0 && (stat = offer(item)) == 0) {
putItem = item;
timeout = 0L;
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
}
stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
return stat;
}
/**
* Timeout version; similar to submit
*/
final int timedOffer(T item, long nanos) {
Executor e = executor;
Thread thread = Thread.currentThread();
int stat = 0;
if ((e instanceof ForkJoinPool) &&
(((thread instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)thread).getPool() == e) ||
e == ForkJoinPool.commonPool())) {
ForkJoinTask> t;
long deadline = System.nanoTime() + nanos;
while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
(t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 ||
(nanos = deadline - System.nanoTime()) <= 0L ||
!t.tryUnfork())
break;
((ConsumerTask>)t).consumer.consume();
}
}
if (stat == 0 && (stat = offer(item)) == 0 &&
(timeout = nanos) > 0L) {
putItem = item;
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
}
stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
return stat;
}
/**
* Tries to start consumer task after offer.
* @return -1 if now disabled, else argument
*/
private int startOnOffer(int stat) {
for (;;) {
Executor e; int c;
if ((c = ctl) == DISABLED || (e = executor) == null) {
stat = -1;
break;
}
else if ((c & ACTIVE) != 0) { // ensure keep-alive
if ((c & KEEPALIVE) != 0 ||
U.compareAndSwapInt(this, CTL, c,
c | KEEPALIVE))
break;
}
else if (demand == 0L || tail == head)
break;
else if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | KEEPALIVE))) {
try {
e.execute(new ConsumerTask(this));
break;
} catch (RuntimeException | Error ex) { // back out
do {} while ((c = ctl) >= 0 &&
(c & ACTIVE) != 0 &&
!U.compareAndSwapInt(this, CTL, c,
c & ~ACTIVE));
throw ex;
}
}
}
return stat;
}
/**
* Nulls out most fields, mainly to avoid garbage retention
* until publisher unsubscribes, but also to help cleanly stop
* upon error by nulling required components.
*/
private void detach() {
Thread w = waiter;
array = null;
executor = null;
subscriber = null;
pendingError = null;
waiter = null;
if (w != null)
LockSupport.unpark(w); // force wakeup
}
/**
* Issues error signal, asynchronously if a task is running,
* else synchronously.
*/
final void onError(Throwable ex) {
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
pendingError = ex;
if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
break; // cause consumer task to exit
}
else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
Flow.Subscriber super T> s = subscriber;
if (s != null && ex != null) {
try {
s.onError(ex);
} catch (Throwable ignore) {
}
}
detach();
break;
}
}
}
/**
* Tries to start consumer task upon a signal or request;
* disables on failure.
*/
private void startOrDisable() {
Executor e;
if ((e = executor) != null) { // skip if already disabled
try {
e.execute(new ConsumerTask(this));
} catch (Throwable ex) { // back out and force signal
for (int c;;) {
if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
break;
if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
onError(ex);
break;
}
}
}
}
}
final void onComplete() {
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | KEEPALIVE | COMPLETE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
}
}
}
final void onSubscribe() {
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | KEEPALIVE | SUBSCRIBE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
}
}
}
/**
* Causes consumer task to exit if active (without reporting
* onError unless there is already a pending error), and
* disables.
*/
public void cancel() {
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
break;
}
else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
detach();
break;
}
}
}
/**
* Adds to demand and possibly starts task.
*/
public void request(long n) {
if (n > 0L) {
for (;;) {
long prev = demand, d;
if ((d = prev + n) < prev) // saturate
d = Long.MAX_VALUE;
if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
while (d != 0L) {
int c, h;
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
if ((c & KEEPALIVE) != 0 ||
U.compareAndSwapInt(this, CTL, c,
c | KEEPALIVE))
break;
}
else if ((h = head) != tail) {
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | KEEPALIVE))) {
startOrDisable();
break;
}
}
else if (head == h && tail == h)
break; // else stale
d = demand;
}
break;
}
}
}
else if (n < 0L)
onError(new IllegalArgumentException(
"negative subscription request"));
}
public final boolean isReleasable() { // for ManagedBlocker
T item = putItem;
if (item != null) { // A few randomized spins
for (int spins = MINCAP, r = 0;;) {
if ((putStat = offer(item)) != 0) {
putItem = null;
break;
}
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
else {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0 && --spins <= 0)
return false;
}
}
}
return true;
}
public final boolean block() { // for ManagedBlocker
T item = putItem;
if (item != null) {
putItem = null;
long nanos = timeout;
long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
while ((putStat = offer(item)) == 0) {
if (Thread.interrupted()) {
timeout = INTERRUPTED;
if (nanos > 0L)
break;
}
else if (nanos > 0L &&
(nanos = deadline - System.nanoTime()) <= 0L)
break;
else if (waiter == null)
waiter = Thread.currentThread();
else {
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
else
LockSupport.park(this);
waiter = null;
}
}
}
waiter = null;
return true;
}
/** Consumer loop called only from ConsumerTask */
final void consume() {
Flow.Subscriber super T> s;
if ((s = subscriber) != null) { // else disabled
for (int h = head;;) {
long d = demand;
int c, n, i; Object[] a; Object x; Thread w;
if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
if ((c & ERROR) != 0) {
Throwable ex = pendingError;
ctl = DISABLED; // no need for CAS
if (ex != null) { // null if errorless cancel
try {
s.onError(ex);
} catch (Throwable ignore) {
}
}
}
else if ((c & SUBSCRIBE) != 0) {
if (U.compareAndSwapInt(this, CTL, c,
c & ~SUBSCRIBE)) {
try {
s.onSubscribe(this);
} catch (Throwable ex) {
ctl = DISABLED; // disable on throw
}
}
}
else {
detach();
break;
}
}
else if (h == tail) { // apparently empty
if ((c & KEEPALIVE) != 0) // recheck
U.compareAndSwapInt(this, CTL, c, c & ~KEEPALIVE);
else if ((c & COMPLETE) != 0) {
ctl = DISABLED;
try {
s.onComplete();
} catch (Throwable ignore) {
}
}
else if (U.compareAndSwapInt(this, CTL, c,
c & ~ACTIVE))
break;
}
else if (d == 0L) { // can't consume
if (demand == 0L) { // recheck
if ((c & KEEPALIVE) != 0)
U.compareAndSwapInt(this, CTL, c,
c & ~KEEPALIVE);
else if (U.compareAndSwapInt(this, CTL, c,
c & ~ACTIVE))
break;
}
}
else if ((a = array) == null || (n = a.length) == 0 ||
(x = a[i = h & (n - 1)]) == null)
; // stale; retry
else if ((c & KEEPALIVE) == 0)
U.compareAndSwapInt(this, CTL, c, c | KEEPALIVE);
else if (U.compareAndSwapObject(
a, (((long)i) << ASHIFT) + ABASE, x, null)) {
U.putOrderedInt(this, HEAD, ++h);
while (!U.compareAndSwapLong(this, DEMAND, d, d - 1L))
d = demand; // almost never fails
if ((w = waiter) != null) {
waiter = null;
LockSupport.unpark(w); // release producer
}
try {
@SuppressWarnings("unchecked") T y = (T) x;
s.onNext(y);
} catch (Throwable ex) { // disable on throw
ctl = DISABLED;
}
}
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long CTL;
private static final long TAIL;
private static final long HEAD;
private static final long DEMAND;
private static final int ABASE;
private static final int ASHIFT;
static {
try {
CTL = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("ctl"));
TAIL = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("tail"));
HEAD = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("head"));
DEMAND = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("demand"));
ABASE = U.arrayBaseOffset(Object[].class);
int scale = U.arrayIndexScale(Object[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
}