--- jsr166/src/jsr166y/Phaser.java 2011/09/21 12:30:39 1.75 +++ jsr166/src/jsr166y/Phaser.java 2013/01/09 02:51:37 1.79 @@ -17,7 +17,7 @@ import java.util.concurrent.locks.LockSu * {@link java.util.concurrent.CountDownLatch CountDownLatch} * but supporting more flexible usage. * - *
Registration. Unlike the case for other barriers, the + *
Registration. Unlike the case for other barriers, the * number of parties registered to synchronize on a phaser * may vary over time. Tasks may be registered at any time (using * methods {@link #register}, {@link #bulkRegister}, or forms of @@ -30,7 +30,7 @@ import java.util.concurrent.locks.LockSu * (However, you can introduce such bookkeeping by subclassing this * class.) * - *
Synchronization. Like a {@code CyclicBarrier}, a {@code + *
Synchronization. Like a {@code CyclicBarrier}, a {@code * Phaser} may be repeatedly awaited. Method {@link * #arriveAndAwaitAdvance} has effect analogous to {@link * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each @@ -74,7 +74,7 @@ import java.util.concurrent.locks.LockSu * * * - *
Termination. A phaser may enter a termination + *
Termination. A phaser may enter a termination * state, that may be checked using method {@link #isTerminated}. Upon * termination, all synchronization methods immediately return without * waiting for advance, as indicated by a negative return value. @@ -89,7 +89,7 @@ import java.util.concurrent.locks.LockSu * also available to abruptly release waiting threads and allow them * to terminate. * - *
Tiering. Phasers may be tiered (i.e., + *
Tiering. Phasers may be tiered (i.e.,
* constructed in tree structures) to reduce contention. Phasers with
* large numbers of parties that would otherwise experience heavy
* synchronization contention costs may instead be set up so that
@@ -271,18 +271,20 @@ public class Phaser {
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
+ private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// some special values
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
+ private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1;
// The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
int counts = (int)s;
- return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
+ return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static int partiesOf(long s) {
@@ -343,26 +345,25 @@ public class Phaser {
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
- * @param deregister false for arrive, true for arriveAndDeregister
+ * @param adjust value to subtract from state;
+ * ONE_ARRIVAL for arrive,
+ * ONE_DEREGISTER for arriveAndDeregister
*/
- private int doArrive(boolean deregister) {
- int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
+ private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
- int counts = (int)s;
- int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
- else if (counts == EMPTY || unarrived < 0) {
- if (root == this || reconcileState() == s)
- throw new IllegalStateException(badArrive(s));
- }
- else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
- long n = s & PARTIES_MASK; // base of next state
- int nextUnarrived = (int)n >>> PARTIES_SHIFT;
- if (unarrived == 0) {
+ int counts = (int)s;
+ int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
+ if (unarrived <= 0)
+ throw new IllegalStateException(badArrive(s));
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
+ if (unarrived == 1) {
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
@@ -370,17 +371,18 @@ public class Phaser {
n |= EMPTY;
else
n |= nextUnarrived;
- n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
+ int nextPhase = (phase + 1) & MAX_PHASE;
+ n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
- phase = parent.doArrive(true);
+ phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
- phase = parent.doArrive(false);
- releaseWaiters(phase);
+ phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
@@ -395,7 +397,7 @@ public class Phaser {
*/
private int doRegister(int registrations) {
// adjustment to state
- long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
+ long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
@@ -405,32 +407,39 @@ public class Phaser {
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
- else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
+ phase = (int)(s >>> PHASE_SHIFT);
+ if (phase < 0)
break;
- else if (counts != EMPTY) { // not 1st registration
+ if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
- s, s + adj))
+ s, s + adjust))
break;
}
}
else if (parent == null) { // 1st root registration
- long next = ((long)phase << PHASE_SHIFT) | adj;
+ long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
- parent.doRegister(1);
- do { // force current phase
+ phase = parent.doRegister(1);
+ if (phase < 0)
+ break;
+ // finish registration whenever parent registration
+ // succeeded, even when racing with termination,
+ // since these are part of the same "transaction".
+ while (!UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ ((long)phase << PHASE_SHIFT) | adjust)) {
+ s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
- // assert phase < 0 || (int)state == EMPTY;
- } while (!UNSAFE.compareAndSwapLong
- (this, stateOffset, state,
- ((long)phase << PHASE_SHIFT) | adj));
+ // assert (int)s == EMPTY;
+ }
break;
}
}
@@ -445,10 +454,6 @@ public class Phaser {
* subphasers have not yet done so, in which case they must finish
* their own advance by setting unarrived to parties (or if
* parties is zero, resetting to unregistered EMPTY state).
- * However, this method may also be called when "floating"
- * subphasers with possibly some unarrived parties are merely
- * catching up to current phase, in which case counts are
- * unaffected.
*
* @return reconciled state
*/
@@ -456,17 +461,16 @@ public class Phaser {
final Phaser root = this.root;
long s = state;
if (root != this) {
- int phase, u, p;
- // CAS root phase with current parties; possibly trip unarrived
+ int phase, p;
+ // CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
- (s & PARTIES_MASK) |
- ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
- ((u = (int)s & UNARRIVED_MASK) == 0 && phase >= 0) ?
- p : u))))
+ ((phase < 0) ? (s & COUNTS_MASK) :
+ (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
+ ((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
@@ -598,7 +602,7 @@ public class Phaser {
* of unarrived parties would become negative
*/
public int arrive() {
- return doArrive(false);
+ return doArrive(ONE_ARRIVAL);
}
/**
@@ -618,7 +622,7 @@ public class Phaser {
* of registered or unarrived parties would become negative
*/
public int arriveAndDeregister() {
- return doArrive(true);
+ return doArrive(ONE_DEREGISTER);
}
/**
@@ -645,17 +649,15 @@ public class Phaser {
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
- int counts = (int)s;
- int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
- else if (counts == EMPTY || unarrived < 0) {
- if (reconcileState() == s)
- throw new IllegalStateException(badArrive(s));
- }
- else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
- s -= ONE_ARRIVAL)) {
- if (unarrived != 0)
+ int counts = (int)s;
+ int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
+ if (unarrived <= 0)
+ throw new IllegalStateException(badArrive(s));
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
+ s -= ONE_ARRIVAL)) {
+ if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
@@ -788,8 +790,8 @@ public class Phaser {
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
// signal all threads
- releaseWaiters(0);
- releaseWaiters(1);
+ releaseWaiters(0); // Waiters on evenQ
+ releaseWaiters(1); // Waiters on oddQ
return;
}
}
@@ -995,7 +997,7 @@ public class Phaser {
/**
* Possibly blocks and waits for phase to advance unless aborted.
- * Call only from root node.
+ * Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
@@ -1003,6 +1005,7 @@ public class Phaser {
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
+ // assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
@@ -1139,21 +1142,23 @@ public class Phaser {
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
- } catch (SecurityException se) {
- try {
- return java.security.AccessController.doPrivileged
- (new java.security
- .PrivilegedExceptionAction