--- jsr166/src/jsr166y/Phaser.java 2010/12/04 15:25:08 1.68
+++ jsr166/src/jsr166y/Phaser.java 2011/10/15 21:46:25 1.76
@@ -1,7 +1,7 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/licenses/publicdomain
+ * http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
@@ -77,17 +77,17 @@ import java.util.concurrent.locks.LockSu
*
Termination. A phaser may enter a termination
* state, that may be checked using method {@link #isTerminated}. Upon
* termination, all synchronization methods immediately return without
- * waiting for advance, as indicated by a negative return
- * value. Similarly, attempts to register upon termination have no
- * effect. Termination is triggered when an invocation of {@code
- * onAdvance} returns {@code true}. The default implementation returns
- * {@code true} if a deregistration has caused the number of
- * registered parties to become zero. As illustrated below, when
- * phasers control actions with a fixed number of iterations, it is
- * often convenient to override this method to cause termination when
- * the current phase number reaches a threshold. Method {@link
- * #forceTermination} is also available to abruptly release waiting
- * threads and allow them to terminate.
+ * waiting for advance, as indicated by a negative return value.
+ * Similarly, attempts to register upon termination have no effect.
+ * Termination is triggered when an invocation of {@code onAdvance}
+ * returns {@code true}. The default implementation returns {@code
+ * true} if a deregistration has caused the number of registered
+ * parties to become zero. As illustrated below, when phasers control
+ * actions with a fixed number of iterations, it is often convenient
+ * to override this method to cause termination when the current phase
+ * number reaches a threshold. Method {@link #forceTermination} is
+ * also available to abruptly release waiting threads and allow them
+ * to terminate.
*
*
Tiering. Phasers may be tiered (i.e.,
* constructed in tree structures) to reduce contention. Phasers with
@@ -130,7 +130,7 @@ import java.util.concurrent.locks.LockSu
* void runTasks(List tasks) {
* final Phaser phaser = new Phaser(1); // "1" to register self
* // create and start threads
- * for (Runnable task : tasks) {
+ * for (final Runnable task : tasks) {
* phaser.register();
* new Thread() {
* public void run() {
@@ -237,15 +237,15 @@ public class Phaser {
*/
/**
- * Primary state representation, holding four fields:
+ * Primary state representation, holding four bit-fields:
*
- * * unarrived -- the number of parties yet to hit barrier (bits 0-15)
- * * parties -- the number of parties to wait (bits 16-31)
- * * phase -- the generation of the barrier (bits 32-62)
- * * terminated -- set if barrier is terminated (bit 63 / sign)
+ * unarrived -- the number of parties yet to hit barrier (bits 0-15)
+ * parties -- the number of parties to wait (bits 16-31)
+ * phase -- the generation of the barrier (bits 32-62)
+ * terminated -- set if barrier is terminated (bit 63 / sign)
*
* Except that a phaser with no registered parties is
- * distinguished with the otherwise illegal state of having zero
+ * distinguished by the otherwise illegal state of having zero
* parties and one unarrived parties (encoded as EMPTY below).
*
* To efficiently maintain atomicity, these values are packed into
@@ -260,15 +260,13 @@ public class Phaser {
* parent.
*
* The phase of a subphaser is allowed to lag that of its
- * ancestors until it is actually accessed. Method reconcileState
- * is usually attempted only only when the number of unarrived
- * parties appears to be zero, which indicates a potential lag in
- * updating phase after the root advanced.
+ * ancestors until it is actually accessed -- see method
+ * reconcileState.
*/
private volatile long state;
private static final int MAX_PARTIES = 0xffff;
- private static final int MAX_PHASE = 0x7fffffff;
+ private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
@@ -278,13 +276,14 @@ public class Phaser {
// some special values
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
+ private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1;
// The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
int counts = (int)s;
- return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
+ return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static int partiesOf(long s) {
@@ -292,7 +291,7 @@ public class Phaser {
}
private static int phaseOf(long s) {
- return (int) (s >>> PHASE_SHIFT);
+ return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
@@ -345,37 +344,44 @@ public class Phaser {
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
- * @param deregister false for arrive, true for arriveAndDeregister
+ * @param adjust value to subtract from state;
+ * ONE_ARRIVAL for arrive,
+ * ONE_DEREGISTER for arriveAndDeregister
*/
- private int doArrive(boolean deregister) {
- int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
+ private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
- int counts = (int)s;
- int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
- else if (counts == EMPTY || unarrived < 0) {
- if (root == this || reconcileState() == s)
- throw new IllegalStateException(badArrive(s));
- }
- else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
- if (unarrived == 0) {
+ int counts = (int)s;
+ int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
+ if (unarrived <= 0)
+ throw new IllegalStateException(badArrive(s));
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
+ if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
- int nextUnarrived = ((int)n) >>> PARTIES_SHIFT;
- if (root != this)
- return parent.doArrive(nextUnarrived == 0);
- if (onAdvance(phase, nextUnarrived))
- n |= TERMINATION_BIT;
- else if (nextUnarrived == 0)
- n |= EMPTY;
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (root == this) {
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ int nextPhase = (phase + 1) & MAX_PHASE;
+ n |= (long)nextPhase << PHASE_SHIFT;
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
+ }
+ else if (nextUnarrived == 0) { // propagate deregistration
+ phase = parent.doArrive(ONE_DEREGISTER);
+ UNSAFE.compareAndSwapLong(this, stateOffset,
+ s, s | EMPTY);
+ }
else
- n |= nextUnarrived;
- n |= ((long)((phase + 1) & MAX_PHASE)) << PHASE_SHIFT;
- UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
- releaseWaiters(phase);
+ phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
@@ -390,42 +396,49 @@ public class Phaser {
*/
private int doRegister(int registrations) {
// adjustment to state
- long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
- Phaser par = parent;
+ long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
+ final Phaser parent = this.parent;
int phase;
for (;;) {
- long s = state;
+ long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
- else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
+ phase = (int)(s >>> PHASE_SHIFT);
+ if (phase < 0)
break;
- else if (counts != EMPTY) { // not 1st registration
- if (par == null || reconcileState() == s) {
+ if (counts != EMPTY) { // not 1st registration
+ if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
- s, s + adj))
+ s, s + adjust))
break;
}
}
- else if (par == null) { // 1st root registration
- long next = (((long) phase) << PHASE_SHIFT) | adj;
+ else if (parent == null) { // 1st root registration
+ long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
- par.doRegister(1);
- do { // force current phase
+ phase = parent.doRegister(1);
+ if (phase < 0)
+ break;
+ // finish registration whenever parent registration
+ // succeeded, even when racing with termination,
+ // since these are part of the same "transaction".
+ while (!UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ ((long)phase << PHASE_SHIFT) | adjust)) {
+ s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
- // assert phase < 0 || (int)state == EMPTY;
- } while (!UNSAFE.compareAndSwapLong
- (this, stateOffset, state,
- (((long) phase) << PHASE_SHIFT) | adj));
+ // assert (int)s == EMPTY;
+ }
break;
}
}
@@ -436,29 +449,33 @@ public class Phaser {
/**
* Resolves lagged phase propagation from root if necessary.
+ * Reconciliation normally occurs when root has advanced but
+ * subphasers have not yet done so, in which case they must finish
+ * their own advance by setting unarrived to parties (or if
+ * parties is zero, resetting to unregistered EMPTY state).
+ * However, this method may also be called when "floating"
+ * subphasers with possibly some unarrived parties are merely
+ * catching up to current phase, in which case counts are
+ * unaffected.
+ *
+ * @return reconciled state
*/
private long reconcileState() {
- Phaser rt = root;
+ final Phaser root = this.root;
long s = state;
- if (rt != this) {
- int phase;
- while ((phase = (int)(rt.state >>> PHASE_SHIFT)) !=
- (int)(s >>> PHASE_SHIFT)) {
- // assert phase < 0 || unarrivedOf(s) == 0
- long t; // to reread s
- long p = s & PARTIES_MASK; // unshifted parties field
- long n = (((long) phase) << PHASE_SHIFT) | p;
- if (phase >= 0) {
- if (p == 0L)
- n |= EMPTY; // reset to empty
- else
- n |= p >>> PARTIES_SHIFT; // set unarr to parties
- }
- if ((t = state) == s &&
- UNSAFE.compareAndSwapLong(this, stateOffset, s, s = n))
- break;
- s = t;
- }
+ if (root != this) {
+ int phase, u, p;
+ // CAS root phase with current parties; possibly trip unarrived
+ while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
+ (int)(s >>> PHASE_SHIFT) &&
+ !UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ s = (((long)phase << PHASE_SHIFT) |
+ (s & PARTIES_MASK) |
+ ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
+ ((u = (int)s & UNARRIVED_MASK) == 0 && phase >= 0) ?
+ p : u))))
+ s = state;
}
return s;
}
@@ -524,10 +541,10 @@ public class Phaser {
this.evenQ = new AtomicReference();
this.oddQ = new AtomicReference();
}
- this.state = (parties == 0) ? (long) EMPTY :
- ((((long) phase) << PHASE_SHIFT) |
- (((long) parties) << PARTIES_SHIFT) |
- ((long) parties));
+ this.state = (parties == 0) ? (long)EMPTY :
+ ((long)phase << PHASE_SHIFT) |
+ ((long)parties << PARTIES_SHIFT) |
+ ((long)parties);
}
/**
@@ -541,7 +558,7 @@ public class Phaser {
*
* @return the arrival phase number to which this registration
* applied. If this value is negative, then this phaser has
- * terminated, in which casem registration has no effect.
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
*/
@@ -563,7 +580,7 @@ public class Phaser {
* advance to the next phase
* @return the arrival phase number to which this registration
* applied. If this value is negative, then this phaser has
- * terminated, in which casem registration has no effect.
+ * terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
* @throws IllegalArgumentException if {@code parties < 0}
@@ -589,7 +606,7 @@ public class Phaser {
* of unarrived parties would become negative
*/
public int arrive() {
- return doArrive(false);
+ return doArrive(ONE_ARRIVAL);
}
/**
@@ -609,7 +626,7 @@ public class Phaser {
* of registered or unarrived parties would become negative
*/
public int arriveAndDeregister() {
- return doArrive(true);
+ return doArrive(ONE_DEREGISTER);
}
/**
@@ -636,22 +653,20 @@ public class Phaser {
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
- int counts = (int)s;
- int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
- else if (counts == EMPTY || unarrived < 0) {
- if (reconcileState() == s)
- throw new IllegalStateException(badArrive(s));
- }
- else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
- s -= ONE_ARRIVAL)) {
- if (unarrived != 0)
+ int counts = (int)s;
+ int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
+ if (unarrived <= 0)
+ throw new IllegalStateException(badArrive(s));
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
+ s -= ONE_ARRIVAL)) {
+ if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
- int nextUnarrived = ((int)n) >>> PARTIES_SHIFT;
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
@@ -682,7 +697,8 @@ public class Phaser {
*/
public int awaitAdvance(int phase) {
final Phaser root = this.root;
- int p = (int)((root == this? state : reconcileState()) >>> PHASE_SHIFT);
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
@@ -708,7 +724,8 @@ public class Phaser {
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
final Phaser root = this.root;
- int p = (int)((root == this? state : reconcileState()) >>> PHASE_SHIFT);
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
@@ -745,7 +762,8 @@ public class Phaser {
throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
final Phaser root = this.root;
- int p = (int)((root == this? state : reconcileState()) >>> PHASE_SHIFT);
+ long s = (root == this) ? state : reconcileState();
+ int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
@@ -773,11 +791,11 @@ public class Phaser {
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
- long next = (s & ~((long)UNARRIVED_MASK)) | TERMINATION_BIT;
- if (UNSAFE.compareAndSwapLong(root, stateOffset, s, next)) {
+ if (UNSAFE.compareAndSwapLong(root, stateOffset,
+ s, s | TERMINATION_BIT)) {
// signal all threads
- releaseWaiters(0);
- releaseWaiters(1);
+ releaseWaiters(0); // Waiters on evenQ
+ releaseWaiters(1); // Waiters on oddQ
return;
}
}
@@ -807,7 +825,8 @@ public class Phaser {
/**
* Returns the number of registered parties that have arrived at
- * the current phase of this phaser.
+ * the current phase of this phaser. If this phaser has terminated,
+ * the returned value is meaningless and arbitrary.
*
* @return the number of arrived parties
*/
@@ -817,7 +836,8 @@ public class Phaser {
/**
* Returns the number of registered parties that have not yet
- * arrived at the current phase of this phaser.
+ * arrived at the current phase of this phaser. If this phaser has
+ * terminated, the returned value is meaningless and arbitrary.
*
* @return the number of unarrived parties
*/
@@ -981,7 +1001,7 @@ public class Phaser {
/**
* Possibly blocks and waits for phase to advance unless aborted.
- * Call only from root node.
+ * Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
@@ -989,6 +1009,7 @@ public class Phaser {
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
+ // assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
@@ -1102,18 +1123,16 @@ public class Phaser {
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE = getUnsafe();
- private static final long stateOffset =
- objectFieldOffset("state", Phaser.class);
-
- private static long objectFieldOffset(String field, Class> klazz) {
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long stateOffset;
+ static {
try {
- return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
- } catch (NoSuchFieldException e) {
- // Convert Exception to corresponding Error
- NoSuchFieldError error = new NoSuchFieldError(field);
- error.initCause(e);
- throw error;
+ UNSAFE = getUnsafe();
+ Class> k = Phaser.class;
+ stateOffset = UNSAFE.objectFieldOffset
+ (k.getDeclaredField("state"));
+ } catch (Exception e) {
+ throw new Error(e);
}
}