--- jsr166/src/jsr166y/Phaser.java 2010/10/24 21:45:39 1.48 +++ jsr166/src/jsr166y/Phaser.java 2010/11/05 23:01:47 1.49 @@ -7,7 +7,6 @@ package jsr166y; import java.util.concurrent.*; - import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -295,14 +294,16 @@ public class Phaser { /** * Heads of Treiber stacks for waiting threads. To eliminate - * contention while releasing some threads while adding others, we + * contention when releasing some threads while adding others, we * use two of them, alternating across even and odd phases. + * Subphasers share queues with root to speed up releases. */ private final AtomicReference evenQ = new AtomicReference(); private final AtomicReference oddQ = new AtomicReference(); private AtomicReference queueFor(int phase) { - return ((phase & 1) == 0) ? evenQ : oddQ; + Phaser r = root; + return ((phase & 1) == 0) ? r.evenQ : r.oddQ; } /** @@ -317,20 +318,25 @@ public class Phaser { * Recursively resolves state. */ private long reconcileState() { - Phaser p = parent; + Phaser par = parent; long s = state; - if (p != null) { - while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) { - long parentState = p.getReconciledState(); + if (par != null) { + int phase, rootPhase; + while ((phase = phaseOf(s)) >= 0 && + (rootPhase = phaseOf(root.state)) != phase && + (rootPhase < 0 || unarrivedOf(s) == 0)) { + long parentState = par.getReconciledState(); int parentPhase = phaseOf(parentState); - int phase = phaseOf(s = state); - if (phase != parentPhase) { - long next = trippedStateFor(parentPhase, partiesOf(s)); - if (casState(s, next)) { - releaseWaiters(phase); - s = next; - } + int parties = partiesOf(s); + long next = trippedStateFor(parentPhase, parties); + if (phaseOf(root.state) == rootPhase && + parentPhase != phase && + state == s && casState(s, next)) { + releaseWaiters(phase); + if (parties == 0) // exit if the final deregistration + break; } + s = state; } } return s; @@ -404,6 +410,8 @@ public class Phaser { /** * Adds a new unarrived party to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method waits until its completion before registering. * * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more @@ -415,6 +423,8 @@ public class Phaser { /** * Adds the given number of new unarrived parties to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method waits until its completion before registering. * * @param parties the number of additional parties required to trip barrier * @return the arrival phase number to which this registration applied @@ -434,19 +444,23 @@ public class Phaser { * Shared code for register, bulkRegister */ private int doRegister(int registrations) { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = getReconciledState(); - phase = phaseOf(s); - int unarrived = unarrivedOf(s) + registrations; - int parties = partiesOf(s) + registrations; - if (phase < 0) - break; - if (parties > ushortMask || unarrived > ushortMask) - throw new IllegalStateException(badBounds(parties, unarrived)); - if (phase == phaseOf(root.state) && - casState(s, stateFor(phase, parties, unarrived))) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { + int p = partiesOf(s); + int u = unarrivedOf(s); + int unarrived = u + registrations; + int parties = p + registrations; + if (par == null || phase == phaseOf(root.state)) { + if (parties > ushortMask || unarrived > ushortMask) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (p != 0 && u == 0) // back off if advancing + Thread.yield(); // not worth actually blocking + else if (casState(s, stateFor(phase, parties, unarrived))) + break; + } } return phase; } @@ -462,41 +476,32 @@ public class Phaser { * of unarrived parties would become negative */ public int arrive() { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s); int unarrived = unarrivedOf(s) - 1; - if (unarrived > 0) { // Not the last arrival - if (casState(s, s - 1)) // s-1 adds one arrival + if (parties == 0 || unarrived < 0) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (unarrived > 0) { // Not the last arrival + if (casState(s, s - 1)) // s-1 adds one arrival break; } - else if (unarrived == 0) { // the last arrival - Phaser par = parent; - if (par == null) { // directly trip - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - } - else { // cascade to parent - if (casState(s, s - 1)) { // zeroes unarrived - par.arrive(); - reconcileState(); - break; - } + else if (par == null) { // directly trip + if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 : + ((phase + 1) & phaseMask), + parties))) { + releaseWaiters(phase); + break; } } - else if (phase != phaseOf(root.state)) // or if unreconciled + else if (phaseOf(root.state) == phase && casState(s, s - 1)) { + par.arrive(); // cascade to parent reconcileState(); - else - throw new IllegalStateException(badBounds(parties, unarrived)); + break; + } } return phase; } @@ -515,45 +520,37 @@ public class Phaser { * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { - // similar code to arrive, but too different to merge + // similar to arrive, but too different to merge Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s) - 1; int unarrived = unarrivedOf(s) - 1; - if (parties >= 0) { - if (unarrived > 0 || (unarrived == 0 && par != null)) { - if (casState - (s, - stateFor(phase, parties, unarrived))) { - if (unarrived == 0) { - par.arriveAndDeregister(); - reconcileState(); - } - break; - } - continue; - } - if (unarrived == 0) { - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - continue; - } - if (par != null && phase != phaseOf(root.state)) { - reconcileState(); - continue; + if (parties < 0 || unarrived < 0) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (unarrived > 0) { + if (casState(s, stateFor(phase, parties, unarrived))) + break; + } + else if (par == null) { + if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1: + (phase + 1) & phaseMask, + parties))) { + releaseWaiters(phase); + break; } } - throw new IllegalStateException(badBounds(parties, unarrived)); + else if (phaseOf(root.state) == phase && + casState(s, stateFor(phase, parties, 0))) { + if (parties == 0) + par.arriveAndDeregister(); + else + par.arrive(); + reconcileState(); + break; + } } return phase; } @@ -591,13 +588,9 @@ public class Phaser { public int awaitAdvance(int phase) { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvance(phase); - // Fall here even if parent waited, to reconcile and help release return untimedWait(phase); } @@ -620,12 +613,9 @@ public class Phaser { throws InterruptedException { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase); return interruptibleWait(phase); } @@ -653,15 +643,13 @@ public class Phaser { public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + long nanos = unit.toNanos(timeout); if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase, timeout, unit); - return timedWait(phase, unit.toNanos(timeout)); + return timedWait(phase, nanos); } /** @@ -672,20 +660,13 @@ public class Phaser { * unexpected exceptions. */ public void forceTermination() { - for (;;) { - long s = getReconciledState(); - int phase = phaseOf(s); - int parties = partiesOf(s); - int unarrived = unarrivedOf(s); - if (phase < 0 || - casState(s, stateFor(-1, parties, unarrived))) { - releaseWaiters(0); - releaseWaiters(1); - if (parent != null) - parent.forceTermination(); - return; - } - } + Phaser r = root; // force at root then reconcile + long s; + while (phaseOf(s = r.state) >= 0) + r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s))); + reconcileState(); + releaseWaiters(0); // ensure wakeups on both queues + releaseWaiters(1); } /** @@ -705,7 +686,7 @@ public class Phaser { * @return the number of parties */ public int getRegisteredParties() { - return partiesOf(state); + return partiesOf(getReconciledState()); } /** @@ -715,7 +696,7 @@ public class Phaser { * @return the number of arrived parties */ public int getArrivedParties() { - return arrivedOf(state); + return arrivedOf(getReconciledState()); } /** @@ -725,7 +706,7 @@ public class Phaser { * @return the number of unarrived parties */ public int getUnarrivedParties() { - return unarrivedOf(state); + return unarrivedOf(getReconciledState()); } /** @@ -779,15 +760,6 @@ public class Phaser { * termination for other reasons should also preserve this * property. * - *

You may override this method to perform an action with side - * effects visible to participating tasks, but it is only sensible - * to do so in designs where all parties register before any - * arrive, and all {@link #awaitAdvance} at each phase. - * Otherwise, you cannot ensure lack of interference from other - * parties during the invocation of this method. Additionally, - * method {@code onAdvance} may be invoked more than once per - * transition if registrations are intermixed with arrivals. - * * @param phase the phase number on entering the barrier * @param registeredParties the current number of registered parties * @return {@code true} if this barrier should terminate