--- jsr166/src/jsr166y/Phaser.java 2010/11/28 21:21:03 1.61 +++ jsr166/src/jsr166y/Phaser.java 2010/11/29 00:52:28 1.62 @@ -347,11 +347,11 @@ public class Phaser { else { parent.doArrive((u == 0) ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL); - if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase || - ((int)(state >>> PHASE_SHIFT) != nextPhase && - !UNSAFE.compareAndSwapLong(this, stateOffset, - s, next))) + if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase) reconcileState(); + else if (state == s) + UNSAFE.compareAndSwapLong(this, stateOffset, s, + next); } } return phase; @@ -413,7 +413,7 @@ public class Phaser { int phase, rPhase; while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 && (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) { - if ((int)(par.state >>> PHASE_SHIFT) != rPhase) + if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase) par.reconcileState(); else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) { long u = s & PARTIES_MASK; // reset unarrived to parties @@ -587,7 +587,7 @@ public class Phaser { * of unarrived parties would become negative */ public int arriveAndAwaitAdvance() { - return awaitAdvance(arrive()); + return awaitAdvance(doArrive(ONE_ARRIVAL)); } /** @@ -603,13 +603,13 @@ public class Phaser { * if terminated or argument is negative */ public int awaitAdvance(int phase) { - Phaser r; + Phaser rt; int p = (int)(state >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase && - (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) - return r.internalAwaitAdvance(phase, null); + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) + return rt.internalAwaitAdvance(phase, null); return p; } @@ -629,14 +629,14 @@ public class Phaser { */ public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { - Phaser r; + Phaser rt; int p = (int)(state >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase && - (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) { + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { QNode node = new QNode(this, phase, true, false, 0L); - p = r.internalAwaitAdvance(phase, node); + p = rt.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); } @@ -667,14 +667,14 @@ public class Phaser { long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); - Phaser r; + Phaser rt; int p = (int)(state >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase && - (p = (int)((r = root).state >>> PHASE_SHIFT)) == phase) { + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { QNode node = new QNode(this, phase, true, true, nanos); - p = r.internalAwaitAdvance(phase, node); + p = rt.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); else if (p == phase) @@ -819,7 +819,7 @@ public class Phaser { * @return {@code true} if this barrier should terminate */ protected boolean onAdvance(int phase, int registeredParties) { - return registeredParties <= 0; + return registeredParties == 0; } /** @@ -851,7 +851,7 @@ public class Phaser { * Removes and signals threads from queue for phase. */ private void releaseWaiters(int phase) { - AtomicReference head = queueFor(phase); + AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ; QNode q; int p; while ((q = head.get()) != null && @@ -873,10 +873,8 @@ public class Phaser { * avoid it when threads regularly arrive: When a thread in * internalAwaitAdvance notices another arrival before blocking, * and there appear to be enough CPUs available, it spins - * SPINS_PER_ARRIVAL more times before blocking. Plus, even on - * uniprocessors, there is at least one intervening Thread.yield - * before blocking. The value trades off good-citizenship vs big - * unnecessary slowdowns. + * SPINS_PER_ARRIVAL more times before blocking. The value trades + * off good-citizenship vs big unnecessary slowdowns. */ static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; @@ -897,31 +895,32 @@ public class Phaser { int p; while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { int unarrived = (int)s & UNARRIVED_MASK; - if (unarrived != lastUnarrived) { + if (node != null && node.isReleasable()) { + p = (int)(state >>> PHASE_SHIFT); + break; // done or aborted + } + else if (node == null && Thread.interrupted()) { + node = new QNode(this, phase, false, false, 0L); + node.wasInterrupted = true; + } + else if (unarrived != lastUnarrived) { if (lastUnarrived == -1) // ensure old queue clean releaseWaiters(phase-1); if ((lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; } - else if (spins > 0) { - if (--spins == (SPINS_PER_ARRIVAL >>> 1)) - Thread.yield(); // yield midway through spin - } - else if (node == null) // must be noninterruptible + else if (spins > 0) + --spins; + else if (node == null) // null if noninterruptible mode node = new QNode(this, phase, false, false, 0L); - else if (node.isReleasable()) { - p = (int)(state >>> PHASE_SHIFT); - break; // aborted - } else if (!queued) { // push onto queue - AtomicReference head = queueFor(phase); + AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = head.get(); if (q == null || q.phase == phase) { node.next = q; if ((p = (int)(state >>> PHASE_SHIFT)) != phase) break; // recheck to avoid stale enqueue - else - queued = head.compareAndSet(q, node); + queued = head.compareAndSet(q, node); } } else { @@ -936,7 +935,7 @@ public class Phaser { if (node != null) { if (node.thread != null) node.thread = null; // disable unpark() in node.signal - if (!node.interruptible && node.wasInterrupted) + if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); } if (p != phase) @@ -977,7 +976,7 @@ public class Phaser { else { if (Thread.interrupted()) wasInterrupted = true; - if (interruptible && wasInterrupted) + if (wasInterrupted && interruptible) t = null; else if (timed) { if (nanos > 0) {