--- jsr166/src/jsr166y/Phaser.java 2011/09/21 12:30:39 1.75 +++ jsr166/src/jsr166y/Phaser.java 2011/10/15 21:46:25 1.76 @@ -276,13 +276,14 @@ public class Phaser { // some special values private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; + private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; private static final int EMPTY = 1; // The following unpacking methods are usually manually inlined private static int unarrivedOf(long s) { int counts = (int)s; - return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK; + return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } private static int partiesOf(long s) { @@ -343,26 +344,25 @@ public class Phaser { * Manually tuned to speed up and minimize race windows for the * common case of just decrementing unarrived field. * - * @param deregister false for arrive, true for arriveAndDeregister + * @param adjust value to subtract from state; + * ONE_ARRIVAL for arrive, + * ONE_DEREGISTER for arriveAndDeregister */ - private int doArrive(boolean deregister) { - int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL; + private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); - int counts = (int)s; - int unarrived = (counts & UNARRIVED_MASK) - 1; if (phase < 0) return phase; - else if (counts == EMPTY || unarrived < 0) { - if (root == this || reconcileState() == s) - throw new IllegalStateException(badArrive(s)); - } - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { - long n = s & PARTIES_MASK; // base of next state - int nextUnarrived = (int)n >>> PARTIES_SHIFT; - if (unarrived == 0) { + int counts = (int)s; + int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); + if (unarrived <= 0) + throw new IllegalStateException(badArrive(s)); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { + if (unarrived == 1) { + long n = s & PARTIES_MASK; // base of next state + int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; @@ -370,17 +370,18 @@ public class Phaser { n |= EMPTY; else n |= nextUnarrived; - n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT; + int nextPhase = (phase + 1) & MAX_PHASE; + n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); + releaseWaiters(phase); } else if (nextUnarrived == 0) { // propagate deregistration - phase = parent.doArrive(true); + phase = parent.doArrive(ONE_DEREGISTER); UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else - phase = parent.doArrive(false); - releaseWaiters(phase); + phase = parent.doArrive(ONE_ARRIVAL); } return phase; } @@ -395,7 +396,7 @@ public class Phaser { */ private int doRegister(int registrations) { // adjustment to state - long adj = ((long)registrations << PARTIES_SHIFT) | registrations; + long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { @@ -405,32 +406,39 @@ public class Phaser { int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); - else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0) + phase = (int)(s >>> PHASE_SHIFT); + if (phase < 0) break; - else if (counts != EMPTY) { // not 1st registration + if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, - s, s + adj)) + s, s + adjust)) break; } } else if (parent == null) { // 1st root registration - long next = ((long)phase << PHASE_SHIFT) | adj; + long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock - parent.doRegister(1); - do { // force current phase + phase = parent.doRegister(1); + if (phase < 0) + break; + // finish registration whenever parent registration + // succeeded, even when racing with termination, + // since these are part of the same "transaction". + while (!UNSAFE.compareAndSwapLong + (this, stateOffset, s, + ((long)phase << PHASE_SHIFT) | adjust)) { + s = state; phase = (int)(root.state >>> PHASE_SHIFT); - // assert phase < 0 || (int)state == EMPTY; - } while (!UNSAFE.compareAndSwapLong - (this, stateOffset, state, - ((long)phase << PHASE_SHIFT) | adj)); + // assert (int)s == EMPTY; + } break; } } @@ -598,7 +606,7 @@ public class Phaser { * of unarrived parties would become negative */ public int arrive() { - return doArrive(false); + return doArrive(ONE_ARRIVAL); } /** @@ -618,7 +626,7 @@ public class Phaser { * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { - return doArrive(true); + return doArrive(ONE_DEREGISTER); } /** @@ -645,17 +653,15 @@ public class Phaser { for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); - int counts = (int)s; - int unarrived = (counts & UNARRIVED_MASK) - 1; if (phase < 0) return phase; - else if (counts == EMPTY || unarrived < 0) { - if (reconcileState() == s) - throw new IllegalStateException(badArrive(s)); - } - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, - s -= ONE_ARRIVAL)) { - if (unarrived != 0) + int counts = (int)s; + int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); + if (unarrived <= 0) + throw new IllegalStateException(badArrive(s)); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, + s -= ONE_ARRIVAL)) { + if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); @@ -788,8 +794,8 @@ public class Phaser { if (UNSAFE.compareAndSwapLong(root, stateOffset, s, s | TERMINATION_BIT)) { // signal all threads - releaseWaiters(0); - releaseWaiters(1); + releaseWaiters(0); // Waiters on evenQ + releaseWaiters(1); // Waiters on oddQ return; } } @@ -995,7 +1001,7 @@ public class Phaser { /** * Possibly blocks and waits for phase to advance unless aborted. - * Call only from root node. + * Call only on root phaser. * * @param phase current phase * @param node if non-null, the wait node to track interrupt and timeout; @@ -1003,6 +1009,7 @@ public class Phaser { * @return current phase */ private int internalAwaitAdvance(int phase, QNode node) { + // assert root == this; releaseWaiters(phase-1); // ensure old queue clean boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change