--- jsr166/src/jsr166y/Phaser.java 2010/11/05 23:01:47 1.49 +++ jsr166/src/jsr166y/Phaser.java 2010/11/06 16:12:10 1.50 @@ -298,12 +298,11 @@ public class Phaser { * use two of them, alternating across even and odd phases. * Subphasers share queues with root to speed up releases. */ - private final AtomicReference evenQ = new AtomicReference(); - private final AtomicReference oddQ = new AtomicReference(); + private final AtomicReference evenQ; + private final AtomicReference oddQ; private AtomicReference queueFor(int phase) { - Phaser r = root; - return ((phase & 1) == 0) ? r.evenQ : r.oddQ; + return ((phase & 1) == 0) ? evenQ : oddQ; } /** @@ -325,16 +324,11 @@ public class Phaser { while ((phase = phaseOf(s)) >= 0 && (rootPhase = phaseOf(root.state)) != phase && (rootPhase < 0 || unarrivedOf(s) == 0)) { - long parentState = par.getReconciledState(); - int parentPhase = phaseOf(parentState); - int parties = partiesOf(s); - long next = trippedStateFor(parentPhase, parties); - if (phaseOf(root.state) == rootPhase && - parentPhase != phase && - state == s && casState(s, next)) { - releaseWaiters(phase); - if (parties == 0) // exit if the final deregistration - break; + int parentPhase = phaseOf(par.getReconciledState()); + if (parentPhase != phase) { + long next = trippedStateFor(parentPhase, partiesOf(s)); + if (state == s) + UNSAFE.compareAndSwapLong(this, stateOffset, s, next); } s = state; } @@ -348,7 +342,7 @@ public class Phaser { * phaser will need to first register for it. */ public Phaser() { - this(null); + this(null, 0); } /** @@ -372,15 +366,7 @@ public class Phaser { * @param parent the parent phaser */ public Phaser(Phaser parent) { - int phase = 0; - this.parent = parent; - if (parent != null) { - this.root = parent.root; - phase = parent.register(); - } - else - this.root = this; - this.state = trippedStateFor(phase, 0); + this(parent, 0); } /** @@ -397,21 +383,28 @@ public class Phaser { public Phaser(Phaser parent, int parties) { if (parties < 0 || parties > ushortMask) throw new IllegalArgumentException("Illegal number of parties"); - int phase = 0; + int phase; this.parent = parent; if (parent != null) { - this.root = parent.root; + Phaser r = parent.root; + this.root = r; + this.evenQ = r.evenQ; + this.oddQ = r.oddQ; phase = parent.register(); } - else + else { this.root = this; + this.evenQ = new AtomicReference(); + this.oddQ = new AtomicReference(); + phase = 0; + } this.state = trippedStateFor(phase, parties); } /** * Adds a new unarrived party to this phaser. * If an ongoing invocation of {@link #onAdvance} is in progress, - * this method waits until its completion before registering. + * this method may wait until its completion before registering. * * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more @@ -424,7 +417,7 @@ public class Phaser { /** * Adds the given number of new unarrived parties to this phaser. * If an ongoing invocation of {@link #onAdvance} is in progress, - * this method waits until its completion before registering. + * this method may wait until its completion before registering. * * @param parties the number of additional parties required to trip barrier * @return the arrival phase number to which this registration applied @@ -452,13 +445,13 @@ public class Phaser { int u = unarrivedOf(s); int unarrived = u + registrations; int parties = p + registrations; - if (par == null || phase == phaseOf(root.state)) { - if (parties > ushortMask || unarrived > ushortMask) - throw new IllegalStateException(badBounds(parties, - unarrived)); - else if (p != 0 && u == 0) // back off if advancing - Thread.yield(); // not worth actually blocking - else if (casState(s, stateFor(phase, parties, unarrived))) + if (u == 0 && p != 0) // if tripped, wait for advance + untimedWait(phase); + else if (parties > ushortMask) + throw new IllegalStateException(badBounds(parties, unarrived)); + else if (par == null || phaseOf(root.state) == phase) { + long next = stateFor(phase, parties, unarrived); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } } @@ -482,22 +475,23 @@ public class Phaser { while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s); int unarrived = unarrivedOf(s) - 1; - if (parties == 0 || unarrived < 0) - throw new IllegalStateException(badBounds(parties, - unarrived)); - else if (unarrived > 0) { // Not the last arrival - if (casState(s, s - 1)) // s-1 adds one arrival - break; + if (unarrived > 0) { // Not the last arrival + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) + break; // s-1 adds one arrival } + else if (unarrived < 0) + throw new IllegalStateException(badBounds(parties, unarrived)); else if (par == null) { // directly trip - if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), - parties))) { + long next = trippedStateFor(onAdvance(phase, parties) ? -1 : + ((phase + 1) & phaseMask), + parties); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { releaseWaiters(phase); break; } } - else if (phaseOf(root.state) == phase && casState(s, s - 1)) { + else if (phaseOf(root.state) == phase && + UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) { par.arrive(); // cascade to parent reconcileState(); break; @@ -527,29 +521,32 @@ public class Phaser { while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s) - 1; int unarrived = unarrivedOf(s) - 1; - if (parties < 0 || unarrived < 0) - throw new IllegalStateException(badBounds(parties, - unarrived)); - else if (unarrived > 0) { - if (casState(s, stateFor(phase, parties, unarrived))) + if (unarrived > 0) { + long next = stateFor(phase, parties, unarrived); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } + else if (unarrived < 0) + throw new IllegalStateException(badBounds(parties, unarrived)); else if (par == null) { - if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1: - (phase + 1) & phaseMask, - parties))) { + long next = trippedStateFor(onAdvance(phase, parties)? -1: + (phase + 1) & phaseMask, + parties); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { releaseWaiters(phase); break; } } - else if (phaseOf(root.state) == phase && - casState(s, stateFor(phase, parties, 0))) { - if (parties == 0) - par.arriveAndDeregister(); - else - par.arrive(); - reconcileState(); - break; + else if (phaseOf(root.state) == phase) { + long next = stateFor(phase, parties, 0); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { + if (parties == 0) + par.arriveAndDeregister(); + else + par.arrive(); + reconcileState(); + break; + } } } return phase; @@ -576,8 +573,7 @@ public class Phaser { * Awaits the phase of the barrier to advance from the given phase * value, returning immediately if the current phase of the * barrier is not equal to the given phase value or this barrier - * is terminated. It is an unenforced usage error for an - * unregistered party to invoke this method. + * is terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a @@ -599,8 +595,7 @@ public class Phaser { * value, throwing {@code InterruptedException} if interrupted * while waiting, or returning immediately if the current phase of * the barrier is not equal to the given phase value or this - * barrier is terminated. It is an unenforced usage error for an - * unregistered party to invoke this method. + * barrier is terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a @@ -625,8 +620,7 @@ public class Phaser { * InterruptedException} if interrupted while waiting, or * returning immediately if the current phase of the barrier is * not equal to the given phase value or this barrier is - * terminated. It is an unenforced usage error for an - * unregistered party to invoke this method. + * terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a @@ -663,7 +657,9 @@ public class Phaser { Phaser r = root; // force at root then reconcile long s; while (phaseOf(s = r.state) >= 0) - r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s))); + UNSAFE.compareAndSwapLong(r, stateOffset, s, + stateFor(-1, partiesOf(s), + unarrivedOf(s))); reconcileState(); releaseWaiters(0); // ensure wakeups on both queues releaseWaiters(1); @@ -750,10 +746,15 @@ public class Phaser { * which case no advance occurs. * *

The arguments to this method provide the state of the phaser - * prevailing for the current transition. (When called from within - * an implementation of {@code onAdvance} the values returned by - * methods such as {@code getPhase} may or may not reliably - * indicate the state to which this transition applies.) + * prevailing for the current transition. The results and effects + * of invoking phase-related methods (including {@code getPhase} + * as well as arrival, registration, and waiting methods) from + * within {@code onAdvance} are unspecified and should not be + * relied on. Similarly, while it is possible to override this + * method to produce side-effects visible to participating tasks, + * it is in general safe to do so only in designs in which all + * parties register before any arrive, and all {@link + * #awaitAdvance} at each phase. * *

The default version returns {@code true} when the number of * registered parties is zero. Normally, overrides that arrange @@ -879,6 +880,12 @@ public class Phaser { } /** + * The number of times to spin before blocking waiting for advance. + */ + static final int MAX_SPINS = + Runtime.getRuntime().availableProcessors() == 1 ? 0 : 1 << 8; + + /** * Enqueues node and waits unless aborted or signalled. * * @return current phase @@ -887,10 +894,15 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase) { if (Thread.interrupted()) interrupted = true; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, false, false, 0, 0); else if (!queued) @@ -914,10 +926,15 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase && !interrupted) { if (Thread.interrupted()) interrupted = true; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, true, false, 0, 0); else if (!queued) @@ -944,12 +961,17 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase && !interrupted) { if (Thread.interrupted()) interrupted = true; else if (nanos - (System.nanoTime() - startTime) <= 0) break; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, true, true, startTime, nanos); else if (!queued) @@ -974,10 +996,6 @@ public class Phaser { private static final long stateOffset = objectFieldOffset("state", Phaser.class); - private final boolean casState(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val); - } - private static long objectFieldOffset(String field, Class klazz) { try { return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));