--- jsr166/src/jsr166y/Phaser.java 2010/11/13 13:10:04 1.54 +++ jsr166/src/jsr166y/Phaser.java 2010/11/15 12:51:54 1.55 @@ -240,12 +240,13 @@ public class Phaser { */ private volatile long state; - private static final int MAX_COUNT = 0xffff; + private static final int MAX_PARTIES = 0xffff; private static final int MAX_PHASE = 0x7fffffff; private static final int PARTIES_SHIFT = 16; private static final int PHASE_SHIFT = 32; - private static final long UNARRIVED_MASK = 0xffffL; - private static final long PARTIES_MASK = 0xffff0000L; + private static final int UNARRIVED_MASK = 0xffff; + private static final int PARTIES_MASK = 0xffff0000; + private static final long LPARTIES_MASK = 0xffff0000L; // long version private static final long ONE_ARRIVAL = 1L; private static final long ONE_PARTY = 1L << PARTIES_SHIFT; private static final long TERMINATION_PHASE = -1L << PHASE_SHIFT; @@ -253,11 +254,11 @@ public class Phaser { // The following unpacking methods are usually manually inlined private static int unarrivedOf(long s) { - return (int) (s & UNARRIVED_MASK); + return ((int) s) & UNARRIVED_MASK; } private static int partiesOf(long s) { - return ((int) (s & PARTIES_MASK)) >>> PARTIES_SHIFT; + return (((int) s) & PARTIES_MASK) >>> PARTIES_SHIFT; } private static int phaseOf(long s) { @@ -307,20 +308,20 @@ public class Phaser { int phase, unarrived; if ((phase = (int)((s = state) >>> PHASE_SHIFT)) < 0) return phase; - else if ((unarrived = (int)(s & UNARRIVED_MASK)) == 0) + else if ((unarrived = ((int)s) & UNARRIVED_MASK) == 0) checkBadArrive(s); - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adj)){ + else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { if (unarrived == 1) { Phaser par; - long p = s & PARTIES_MASK; // unshifted parties field + long p = s & LPARTIES_MASK; // unshifted parties field long lu = p >>> PARTIES_SHIFT; int u = (int)lu; int nextPhase = (phase + 1) & MAX_PHASE; long next = ((long)nextPhase << PHASE_SHIFT) | p | lu; if ((par = parent) == null) { - UNSAFE.compareAndSwapLong - (this, stateOffset, s, onAdvance(phase, u)? - next | TERMINATION_PHASE : next); + if (onAdvance(phase, u)) + next |= TERMINATION_PHASE; // obliterate phase + UNSAFE.compareAndSwapLong(this, stateOffset, s, next); releaseWaiters(phase); } else { @@ -363,10 +364,10 @@ public class Phaser { long s = par == null? state : reconcileState(); if ((phase = (int)(s >>> PHASE_SHIFT)) < 0) return phase; - if ((parties = ((int)(s & PARTIES_MASK)) >>> PARTIES_SHIFT) != 0 && - (s & UNARRIVED_MASK) == 0) + if ((parties = (((int)s) & PARTIES_MASK) >>> PARTIES_SHIFT) != 0 && + (((int)s) & UNARRIVED_MASK) == 0) internalAwaitAdvance(phase, null); // wait for onAdvance - else if (parties + registrations > MAX_COUNT) + else if (parties + registrations > MAX_PARTIES) throw new IllegalStateException(badRegister(s)); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) return phase; @@ -374,16 +375,15 @@ public class Phaser { } /** - * Returns message string for bounds exceptions on registration + * Returns message string for out of bounds exceptions on registration. */ private String badRegister(long s) { return "Attempt to register more than " + - MAX_COUNT + " parties for " + stateToString(s); + MAX_PARTIES + " parties for " + stateToString(s); } /** - * Recursively resolves lagged phase propagation from root if - * necessary. + * Recursively resolves lagged phase propagation from root if necessary. */ private long reconcileState() { Phaser par = parent; @@ -398,13 +398,13 @@ public class Phaser { return s; long pState = par.parent == null? par.state : par.reconcileState(); if (state == s) { - if ((rPhase < 0 || (s & UNARRIVED_MASK) == 0) && + if ((rPhase < 0 || (((int)s) & UNARRIVED_MASK) == 0) && ((pPhase = (int)(pState >>> PHASE_SHIFT)) < 0 || pPhase == ((phase + 1) & MAX_PHASE))) UNSAFE.compareAndSwapLong (this, stateOffset, s, (((long) pPhase) << PHASE_SHIFT) | - (u = s & PARTIES_MASK) | + (u = s & LPARTIES_MASK) | (u >>> PARTIES_SHIFT)); // reset unarrived to parties else releaseWaiters(phase); // help release others @@ -457,7 +457,7 @@ public class Phaser { * or greater than the maximum number of parties supported */ public Phaser(Phaser parent, int parties) { - if (parties < 0 || parties > MAX_COUNT) + if (parties >>> PARTIES_SHIFT != 0) throw new IllegalArgumentException("Illegal number of parties"); int phase; this.parent = parent; @@ -475,7 +475,7 @@ public class Phaser { phase = 0; } long p = (long)parties; - this.state = (((long) phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT); + this.state = (((long)phase) << PHASE_SHIFT) | p | (p << PARTIES_SHIFT); } /** @@ -505,7 +505,7 @@ public class Phaser { public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); - if (parties > MAX_COUNT) + if (parties > MAX_PARTIES) throw new IllegalStateException(badRegister(state)); if (parties == 0) return getPhase(); @@ -573,12 +573,14 @@ public class Phaser { * if terminated or argument is negative */ public int awaitAdvance(int phase) { + int p; if (phase < 0) return phase; - int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT); - if (p != phase) + else if ((p = (int)((parent == null? state : reconcileState()) + >>> PHASE_SHIFT)) == phase) + return internalAwaitAdvance(phase, null); + else return p; - return internalAwaitAdvance(phase, null); } /** @@ -597,17 +599,17 @@ public class Phaser { */ public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { + int p; if (phase < 0) return phase; - int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT); - if (p != phase) - return p; - QNode node = new QNode(this, phase, true, false, 0L); - p = internalAwaitAdvance(phase, node); - if (node.wasInterrupted) - throw new InterruptedException(); - else - return p; + if ((p = (int)((parent == null? state : reconcileState()) + >>> PHASE_SHIFT)) == phase) { + QNode node = new QNode(this, phase, true, false, 0L); + p = internalAwaitAdvance(phase, node); + if (node.wasInterrupted) + throw new InterruptedException(); + } + return p; } /** @@ -634,36 +636,42 @@ public class Phaser { long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); + int p; if (phase < 0) return phase; - int p = (int)((parent==null? state : reconcileState()) >>> PHASE_SHIFT); - if (p != phase) - return p; - QNode node = new QNode(this, phase, true, true, nanos); - p = internalAwaitAdvance(phase, node); - if (node.wasInterrupted) - throw new InterruptedException(); - else if (p == phase) - throw new TimeoutException(); - else - return p; + if ((p = (int)((parent == null? state : reconcileState()) + >>> PHASE_SHIFT)) == phase) { + QNode node = new QNode(this, phase, true, true, nanos); + p = internalAwaitAdvance(phase, node); + if (node.wasInterrupted) + throw new InterruptedException(); + else if (p == phase) + throw new TimeoutException(); + } + return p; } /** - * Forces this barrier to enter termination state. Counts of - * arrived and registered parties are unaffected. If this phaser - * has a parent, it too is terminated. This method may be useful - * for coordinating recovery after one or more tasks encounter - * unexpected exceptions. + * Forces this barrier to enter termination state. Counts of + * arrived and registered parties are unaffected. If this phaser + * is a member of a tiered set of phasers, then all of the phasers + * in the set are terminated. If this phaser is already + * terminated, this method has no effect. This method may be + * useful for coordinating recovery after one or more tasks + * encounter unexpected exceptions. */ public void forceTermination() { - Phaser r = root; // force at root then reconcile + // Only need to change root state + final Phaser root = this.root; long s; - while ((s = r.state) >= 0) - UNSAFE.compareAndSwapLong(r, stateOffset, s, s | TERMINATION_PHASE); - reconcileState(); - releaseWaiters(0); // signal all threads - releaseWaiters(1); + while ((s = root.state) >= 0) { + if (UNSAFE.compareAndSwapLong(root, stateOffset, + s, s | TERMINATION_PHASE)) { + releaseWaiters(0); // signal all threads + releaseWaiters(1); + return; + } + } } /** @@ -834,11 +842,12 @@ public class Phaser { * avoid it when threads regularly arrive: When a thread in * internalAwaitAdvance notices another arrival before blocking, * and there appear to be enough CPUs available, it spins - * SPINS_PER_ARRIVAL more times before continuing to try to - * block. The value trades off good-citizenship vs big unnecessary - * slowdowns. + * SPINS_PER_ARRIVAL more times before blocking. Plus, even on + * uniprocessors, there is at least one intervening Thread.yield + * before blocking. The value trades off good-citizenship vs big + * unnecessary slowdowns. */ - static final int SPINS_PER_ARRIVAL = NCPU < 2? 1 : 1 << 8; + static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; /** * Possibly blocks and waits for phase to advance unless aborted. @@ -863,17 +872,20 @@ public class Phaser { releaseWaiters(phase); return p; } - else if ((unarrived = (int)(s & UNARRIVED_MASK)) != lastUnarrived) { - if ((lastUnarrived = unarrived) < NCPU) - spins += SPINS_PER_ARRIVAL; - } - else if (unarrived == 0 && (par = current.parent) != null) { + else if ((unarrived = ((int)s) & UNARRIVED_MASK) == 0 && + (par = current.parent) != null) { current = par; // if all arrived, use parent par = par.parent; lastUnarrived = -1; } - else if (spins > 0) - --spins; + else if (unarrived != lastUnarrived) { + if ((lastUnarrived = unarrived) < NCPU) + spins += SPINS_PER_ARRIVAL; + } + else if (spins > 0) { + if (--spins == (SPINS_PER_ARRIVAL >>> 1)) + Thread.yield(); // yield midway through spin + } else if (node == null) // must be noninterruptible node = new QNode(this, phase, false, false, 0L); else if (node.isReleasable()) {