--- jsr166/src/jsr166y/Phaser.java 2010/12/03 21:29:34 1.67
+++ jsr166/src/jsr166y/Phaser.java 2011/05/25 16:08:03 1.73
@@ -1,7 +1,7 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/licenses/publicdomain
+ * http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
@@ -75,9 +75,10 @@ import java.util.concurrent.locks.LockSu
*
*
*
Termination. A phaser may enter a termination
- * state in which all synchronization methods immediately return
- * without updating phaser state or waiting for advance, and
- * indicating (via a negative phase value) that execution is complete.
+ * state, that may be checked using method {@link #isTerminated}. Upon
+ * termination, all synchronization methods immediately return without
+ * waiting for advance, as indicated by a negative return value.
+ * Similarly, attempts to register upon termination have no effect.
* Termination is triggered when an invocation of {@code onAdvance}
* returns {@code true}. The default implementation returns {@code
* true} if a deregistration has caused the number of registered
@@ -96,6 +97,16 @@ import java.util.concurrent.locks.LockSu
* increase throughput even though it incurs greater per-operation
* overhead.
*
+ *
In a tree of tiered phasers, registration and deregistration of
+ * child phasers with their parent are managed automatically.
+ * Whenever the number of registered parties of a child phaser becomes
+ * non-zero (as established in the {@link #Phaser(Phaser,int)}
+ * constructor, {@link #register}, or {@link #bulkRegister}), the
+ * child phaser is registered with its parent. Whenever the number of
+ * registered parties becomes zero as the result of an invocation of
+ * {@link #arriveAndDeregister}, the child phaser is deregistered
+ * from its parent.
+ *
*
Monitoring. While synchronization methods may be invoked
* only by registered parties, the current state of a phaser may be
* monitored by any caller. At any given moment there are {@link
@@ -119,7 +130,7 @@ import java.util.concurrent.locks.LockSu
* void runTasks(List tasks) {
* final Phaser phaser = new Phaser(1); // "1" to register self
* // create and start threads
- * for (Runnable task : tasks) {
+ * for (final Runnable task : tasks) {
* phaser.register();
* new Thread() {
* public void run() {
@@ -226,15 +237,15 @@ public class Phaser {
*/
/**
- * Primary state representation, holding four fields:
+ * Primary state representation, holding four bit-fields:
*
- * * unarrived -- the number of parties yet to hit barrier (bits 0-15)
- * * parties -- the number of parties to wait (bits 16-31)
- * * phase -- the generation of the barrier (bits 32-62)
- * * terminated -- set if barrier is terminated (bit 63 / sign)
+ * unarrived -- the number of parties yet to hit barrier (bits 0-15)
+ * parties -- the number of parties to wait (bits 16-31)
+ * phase -- the generation of the barrier (bits 32-62)
+ * terminated -- set if barrier is terminated (bit 63 / sign)
*
* Except that a phaser with no registered parties is
- * distinguished with the otherwise illegal state of having zero
+ * distinguished by the otherwise illegal state of having zero
* parties and one unarrived parties (encoded as EMPTY below).
*
* To efficiently maintain atomicity, these values are packed into
@@ -249,15 +260,13 @@ public class Phaser {
* parent.
*
* The phase of a subphaser is allowed to lag that of its
- * ancestors until it is actually accessed. Method reconcileState
- * is usually attempted only only when the number of unarrived
- * parties appears to be zero, which indicates a potential lag in
- * updating phase after the root advanced.
+ * ancestors until it is actually accessed -- see method
+ * reconcileState.
*/
private volatile long state;
private static final int MAX_PARTIES = 0xffff;
- private static final int MAX_PHASE = 0x7fffffff;
+ private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
@@ -277,12 +286,11 @@ public class Phaser {
}
private static int partiesOf(long s) {
- int counts = (int)s;
- return (counts == EMPTY) ? 0 : counts >>> PARTIES_SHIFT;
+ return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
- return (int) (s >>> PHASE_SHIFT);
+ return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
@@ -339,41 +347,37 @@ public class Phaser {
*/
private int doArrive(boolean deregister) {
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
- long s;
- int phase;
- while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0) {
+ final Phaser root = this.root;
+ for (;;) {
+ long s = (root == this) ? state : reconcileState();
+ int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
- int unarrived = counts & UNARRIVED_MASK;
- if (counts == EMPTY || unarrived == 0) {
- if (reconcileState() == s)
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
+ if (phase < 0)
+ return phase;
+ else if (counts == EMPTY || unarrived < 0) {
+ if (root == this || reconcileState() == s)
throw new IllegalStateException(badArrive(s));
}
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
- if (unarrived == 1) {
- long n = s & PARTIES_MASK; // unshifted parties field
- int u = ((int)n) >>> PARTIES_SHIFT;
- Phaser par = parent;
- if (par != null) {
- par.doArrive(u == 0);
- reconcileState();
- }
- else {
- n |= (((long)((phase+1) & MAX_PHASE)) << PHASE_SHIFT);
- if (onAdvance(phase, u))
- n |= TERMINATION_BIT;
- else if (u == 0)
- n |= EMPTY; // reset to unregistered
- else
- n |= (long)u; // reset unarr to parties
- // assert state == s || isTerminated();
- UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
- releaseWaiters(phase);
- }
+ if (unarrived == 0) {
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (root != this)
+ return parent.doArrive(nextUnarrived == 0);
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
}
- break;
+ return phase;
}
}
- return phase;
}
/**
@@ -385,7 +389,7 @@ public class Phaser {
private int doRegister(int registrations) {
// adjustment to state
long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
- Phaser par = parent;
+ final Phaser parent = this.parent;
int phase;
for (;;) {
long s = state;
@@ -397,7 +401,7 @@ public class Phaser {
else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
break;
else if (counts != EMPTY) { // not 1st registration
- if (par == null || reconcileState() == s) {
+ if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
@@ -405,21 +409,21 @@ public class Phaser {
break;
}
}
- else if (par == null) { // 1st root registration
- long next = (((long) phase) << PHASE_SHIFT) | adj;
+ else if (parent == null) { // 1st root registration
+ long next = ((long)phase << PHASE_SHIFT) | adj;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
- par.doRegister(1);
+ parent.doRegister(1);
do { // force current phase
phase = (int)(root.state >>> PHASE_SHIFT);
// assert phase < 0 || (int)state == EMPTY;
} while (!UNSAFE.compareAndSwapLong
(this, stateOffset, state,
- (((long) phase) << PHASE_SHIFT) | adj));
+ ((long)phase << PHASE_SHIFT) | adj));
break;
}
}
@@ -430,29 +434,32 @@ public class Phaser {
/**
* Resolves lagged phase propagation from root if necessary.
+ * Reconciliation normally occurs when root has advanced but
+ * subphasers have not yet done so, in which case they must finish
+ * their own advance by setting unarrived to parties (or if
+ * parties is zero, resetting to unregistered EMPTY state).
+ * However, this method may also be called when "floating"
+ * subphasers with possibly some unarrived parties are merely
+ * catching up to current phase, in which case counts are
+ * unaffected.
+ *
+ * @return reconciled state
*/
private long reconcileState() {
- Phaser rt = root;
+ final Phaser root = this.root;
long s = state;
- if (rt != this) {
- int phase;
- while ((phase = (int)(rt.state >>> PHASE_SHIFT)) !=
- (int)(s >>> PHASE_SHIFT)) {
- // assert phase < 0 || unarrivedOf(s) == 0
- long t; // to reread s
- long p = s & PARTIES_MASK; // unshifted parties field
- long n = (((long) phase) << PHASE_SHIFT) | p;
- if (phase >= 0) {
- if (p == 0L)
- n |= EMPTY; // reset to empty
- else
- n |= p >>> PARTIES_SHIFT; // set unarr to parties
- }
- if ((t = state) == s &&
- UNSAFE.compareAndSwapLong(this, stateOffset, s, s = n))
- break;
- s = t;
- }
+ if (root != this) {
+ int phase, u, p;
+ // CAS root phase with current parties; possibly trip unarrived
+ while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
+ (int)(s >>> PHASE_SHIFT) &&
+ !UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ s = (((long)phase << PHASE_SHIFT) |
+ (s & PARTIES_MASK) |
+ ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
+ (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
+ s = state;
}
return s;
}
@@ -490,15 +497,9 @@ public class Phaser {
/**
* Creates a new phaser with the given parent and number of
- * registered unarrived parties. Registration and deregistration
- * of this child phaser with its parent are managed automatically.
- * If the given parent is non-null, whenever this child phaser has
- * any registered parties (as established in this constructor,
- * {@link #register}, or {@link #bulkRegister}), this child phaser
- * is registered with its parent. Whenever the number of
- * registered parties becomes zero as the result of an invocation
- * of {@link #arriveAndDeregister}, this child phaser is
- * deregistered from its parent.
+ * registered unarrived parties. When the given parent is non-null
+ * and the given number of parties is greater than zero, this
+ * child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
@@ -512,10 +513,10 @@ public class Phaser {
int phase = 0;
this.parent = parent;
if (parent != null) {
- Phaser r = parent.root;
- this.root = r;
- this.evenQ = r.evenQ;
- this.oddQ = r.oddQ;
+ final Phaser root = parent.root;
+ this.root = root;
+ this.evenQ = root.evenQ;
+ this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
@@ -524,10 +525,10 @@ public class Phaser {
this.evenQ = new AtomicReference();
this.oddQ = new AtomicReference();
}
- this.state = (parties == 0) ? ((long) EMPTY) :
- ((((long) phase) << PHASE_SHIFT) |
- (((long) parties) << PARTIES_SHIFT) |
- ((long) parties));
+ this.state = (parties == 0) ? (long)EMPTY :
+ ((long)phase << PHASE_SHIFT) |
+ ((long)parties << PARTIES_SHIFT) |
+ ((long)parties);
}
/**
@@ -535,9 +536,13 @@ public class Phaser {
* invocation of {@link #onAdvance} is in progress, this method
* may await its completion before returning. If this phaser has
* a parent, and this phaser previously had no registered parties,
- * this phaser is also registered with its parent.
- *
- * @return the arrival phase number to which this registration applied
+ * this child phaser is also registered with its parent. If
+ * this phaser is terminated, the attempt to register has
+ * no effect, and a negative value is returned.
+ *
+ * @return the arrival phase number to which this registration
+ * applied. If this value is negative, then this phaser has
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
*/
@@ -549,13 +554,17 @@ public class Phaser {
* Adds the given number of new unarrived parties to this phaser.
* If an ongoing invocation of {@link #onAdvance} is in progress,
* this method may await its completion before returning. If this
- * phaser has a parent, and the given number of parties is
- * greater than zero, and this phaser previously had no registered
- * parties, this phaser is also registered with its parent.
+ * phaser has a parent, and the given number of parties is greater
+ * than zero, and this phaser previously had no registered
+ * parties, this child phaser is also registered with its parent.
+ * If this phaser is terminated, the attempt to register has no
+ * effect, and a negative value is returned.
*
* @param parties the number of additional parties required to
* advance to the next phase
- * @return the arrival phase number to which this registration applied
+ * @return the arrival phase number to which this registration
+ * applied. If this value is negative, then this phaser has
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
* @throws IllegalArgumentException if {@code parties < 0}
@@ -617,12 +626,47 @@ public class Phaser {
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
- * @return the arrival phase number, or a negative number if terminated
+ * @return the arrival phase number, or the (negative)
+ * {@linkplain #getPhase() current phase} if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arriveAndAwaitAdvance() {
- return awaitAdvance(doArrive(false));
+ // Specialization of doArrive+awaitAdvance eliminating some reads/paths
+ final Phaser root = this.root;
+ for (;;) {
+ long s = (root == this) ? state : reconcileState();
+ int phase = (int)(s >>> PHASE_SHIFT);
+ int counts = (int)s;
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
+ if (phase < 0)
+ return phase;
+ else if (counts == EMPTY || unarrived < 0) {
+ if (reconcileState() == s)
+ throw new IllegalStateException(badArrive(s));
+ }
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
+ s -= ONE_ARRIVAL)) {
+ if (unarrived != 0)
+ return root.internalAwaitAdvance(phase, null);
+ if (root != this)
+ return parent.arriveAndAwaitAdvance();
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ int nextPhase = (phase + 1) & MAX_PHASE;
+ n |= (long)nextPhase << PHASE_SHIFT;
+ if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
+ return (int)(state >>> PHASE_SHIFT); // terminated
+ releaseWaiters(phase);
+ return nextPhase;
+ }
+ }
}
/**
@@ -633,19 +677,18 @@ public class Phaser {
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
* previous call to {@code arrive} or {@code arriveAndDeregister}.
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
*/
public int awaitAdvance(int phase) {
- Phaser rt;
- int p = (int)(state >>> PHASE_SHIFT);
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
- if (p == phase) {
- if ((p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase)
- return rt.internalAwaitAdvance(phase, null);
- reconcileState();
- }
+ if (p == phase)
+ return root.internalAwaitAdvance(phase, null);
return p;
}
@@ -659,25 +702,23 @@ public class Phaser {
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
* previous call to {@code arrive} or {@code arriveAndDeregister}.
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
* @throws InterruptedException if thread interrupted while waiting
*/
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
- Phaser rt;
- int p = (int)(state >>> PHASE_SHIFT);
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
- if ((p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
- QNode node = new QNode(this, phase, true, false, 0L);
- p = rt.internalAwaitAdvance(phase, node);
- if (node.wasInterrupted)
- throw new InterruptedException();
- }
- else
- reconcileState();
+ QNode node = new QNode(this, phase, true, false, 0L);
+ p = root.internalAwaitAdvance(phase, node);
+ if (node.wasInterrupted)
+ throw new InterruptedException();
}
return p;
}
@@ -696,8 +737,9 @@ public class Phaser {
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
- * @return the next arrival phase number, or a negative value
- * if terminated or argument is negative
+ * @return the next arrival phase number, or the argument if it is
+ * negative, or the (negative) {@linkplain #getPhase() current phase}
+ * if terminated
* @throws InterruptedException if thread interrupted while waiting
* @throws TimeoutException if timed out while waiting
*/
@@ -705,21 +747,18 @@ public class Phaser {
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
- Phaser rt;
- int p = (int)(state >>> PHASE_SHIFT);
+ final Phaser root = this.root;
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
- if ((p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
- QNode node = new QNode(this, phase, true, true, nanos);
- p = rt.internalAwaitAdvance(phase, node);
- if (node.wasInterrupted)
- throw new InterruptedException();
- else if (p == phase)
- throw new TimeoutException();
- }
- else
- reconcileState();
+ QNode node = new QNode(this, phase, true, true, nanos);
+ p = root.internalAwaitAdvance(phase, node);
+ if (node.wasInterrupted)
+ throw new InterruptedException();
+ else if (p == phase)
+ throw new TimeoutException();
}
return p;
}
@@ -738,9 +777,10 @@ public class Phaser {
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
- long next = (s & ~(long)(MAX_PARTIES)) | TERMINATION_BIT;
- if (UNSAFE.compareAndSwapLong(root, stateOffset, s, next)) {
- releaseWaiters(0); // signal all threads
+ if (UNSAFE.compareAndSwapLong(root, stateOffset,
+ s, s | TERMINATION_BIT)) {
+ // signal all threads
+ releaseWaiters(0);
releaseWaiters(1);
return;
}
@@ -771,7 +811,8 @@ public class Phaser {
/**
* Returns the number of registered parties that have arrived at
- * the current phase of this phaser.
+ * the current phase of this phaser. If this phaser has terminated,
+ * the returned value is meaningless and arbitrary.
*
* @return the number of arrived parties
*/
@@ -781,7 +822,8 @@ public class Phaser {
/**
* Returns the number of registered parties that have not yet
- * arrived at the current phase of this phaser.
+ * arrived at the current phase of this phaser. If this phaser has
+ * terminated, the returned value is meaningless and arbitrary.
*
* @return the number of unarrived parties
*/
@@ -891,9 +933,7 @@ public class Phaser {
*/
private void releaseWaiters(int phase) {
QNode q; // first element of queue
- int p; // its phase
Thread t; // its thread
- // assert phase != phaseOf(root.state);
AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
@@ -905,6 +945,30 @@ public class Phaser {
}
}
+ /**
+ * Variant of releaseWaiters that additionally tries to remove any
+ * nodes no longer waiting for advance due to timeout or
+ * interrupt. Currently, nodes are removed only if they are at
+ * head of queue, which suffices to reduce memory footprint in
+ * most usages.
+ *
+ * @return current phase on exit
+ */
+ private int abortWait(int phase) {
+ AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
+ for (;;) {
+ Thread t;
+ QNode q = head.get();
+ int p = (int)(root.state >>> PHASE_SHIFT);
+ if (q == null || ((t = q.thread) != null && q.phase == p))
+ return p;
+ if (head.compareAndSet(q, q.next) && t != null) {
+ q.thread = null;
+ LockSupport.unpark(t);
+ }
+ }
+ }
+
/** The number of CPUs, for spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
@@ -973,7 +1037,7 @@ public class Phaser {
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
- return p; // recheck abort
+ return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
@@ -1044,18 +1108,16 @@ public class Phaser {
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE = getUnsafe();
- private static final long stateOffset =
- objectFieldOffset("state", Phaser.class);
-
- private static long objectFieldOffset(String field, Class> klazz) {
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long stateOffset;
+ static {
try {
- return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
- } catch (NoSuchFieldException e) {
- // Convert Exception to corresponding Error
- NoSuchFieldError error = new NoSuchFieldError(field);
- error.initCause(e);
- throw error;
+ UNSAFE = getUnsafe();
+ Class k = Phaser.class;
+ stateOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("state"));
+ } catch (Exception e) {
+ throw new Error(e);
}
}