--- jsr166/src/jsr166y/Phaser.java 2009/08/23 20:12:24 1.36 +++ jsr166/src/jsr166y/Phaser.java 2010/11/06 16:12:10 1.50 @@ -7,105 +7,110 @@ package jsr166y; import java.util.concurrent.*; - import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** - * A reusable synchronization barrier, similar in functionality to a + * A reusable synchronization barrier, similar in functionality to * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and * {@link java.util.concurrent.CountDownLatch CountDownLatch} * but supporting more flexible usage. * - * - * *

Sample usages: * *

A {@code Phaser} may be used instead of a {@code CountDownLatch} - * to control a one-shot action serving a variable number of - * parties. The typical idiom is for the method setting this up to - * first register, then start the actions, then deregister, as in: + * to control a one-shot action serving a variable number of parties. + * The typical idiom is for the method setting this up to first + * register, then start the actions, then deregister, as in: * *

 {@code
  * void runTasks(List tasks) {
@@ -131,43 +136,66 @@ import java.util.concurrent.locks.LockSu
  *  
 {@code
  * void startTasks(List tasks, final int iterations) {
  *   final Phaser phaser = new Phaser() {
- *     public boolean onAdvance(int phase, int registeredParties) {
+ *     protected boolean onAdvance(int phase, int registeredParties) {
  *       return phase >= iterations || registeredParties == 0;
  *     }
  *   };
  *   phaser.register();
- *   for (Runnable task : tasks) {
+ *   for (final Runnable task : tasks) {
  *     phaser.register();
  *     new Thread() {
  *       public void run() {
  *         do {
  *           task.run();
  *           phaser.arriveAndAwaitAdvance();
- *         } while(!phaser.isTerminated();
+ *         } while (!phaser.isTerminated());
  *       }
  *     }.start();
  *   }
  *   phaser.arriveAndDeregister(); // deregister self, don't wait
  * }}
* + * If the main task must later await termination, it + * may re-register and then execute a similar loop: + *
 {@code
+ *   // ...
+ *   phaser.register();
+ *   while (!phaser.isTerminated())
+ *     phaser.arriveAndAwaitAdvance();}
+ * + *

Related constructions may be used to await particular phase numbers + * in contexts where you are sure that the phase will never wrap around + * {@code Integer.MAX_VALUE}. For example: + * + *

 {@code
+ * void awaitPhase(Phaser phaser, int phase) {
+ *   int p = phaser.register(); // assumes caller not already registered
+ *   while (p < phase) {
+ *     if (phaser.isTerminated())
+ *       // ... deal with unexpected termination
+ *     else
+ *       p = phaser.arriveAndAwaitAdvance();
+ *   }
+ *   phaser.arriveAndDeregister();
+ * }}
+ * + * *

To create a set of tasks using a tree of phasers, * you could use code of the following form, assuming a * Task class with a constructor accepting a phaser that - * it registers for upon construction: + * it registers with upon construction: + * *

 {@code
- * void build(Task[] actions, int lo, int hi, Phaser b) {
- *   int step = (hi - lo) / TASKS_PER_PHASER;
- *   if (step > 1) {
- *     int i = lo;
- *     while (i < hi) {
- *       int r = Math.min(i + step, hi);
- *       build(actions, i, r, new Phaser(b));
- *       i = r;
+ * void build(Task[] actions, int lo, int hi, Phaser ph) {
+ *   if (hi - lo > TASKS_PER_PHASER) {
+ *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
+ *       int j = Math.min(i + TASKS_PER_PHASER, hi);
+ *       build(actions, i, j, new Phaser(ph));
  *     }
  *   } else {
  *     for (int i = lo; i < hi; ++i)
- *       actions[i] = new Task(b);
- *       // assumes new Task(b) performs b.register()
+ *       actions[i] = new Task(ph);
+ *       // assumes new Task(ph) performs ph.register()
  *   }
  * }
  * // .. initially called, for n tasks via
@@ -178,8 +206,6 @@ import java.util.concurrent.locks.LockSu
  * be appropriate for extremely small per-barrier task bodies (thus
  * high rates), or up to hundreds for extremely large ones.
  *
- * 
- * *

Implementation notes: This implementation restricts the * maximum number of parties to 65535. Attempts to register additional * parties result in {@code IllegalStateException}. However, you can and @@ -216,7 +242,6 @@ public class Phaser { */ private volatile long state; - private static final int ushortBits = 16; private static final int ushortMask = 0xffff; private static final int phaseMask = 0x7fffffff; @@ -269,11 +294,12 @@ public class Phaser { /** * Heads of Treiber stacks for waiting threads. To eliminate - * contention while releasing some threads while adding others, we + * contention when releasing some threads while adding others, we * use two of them, alternating across even and odd phases. + * Subphasers share queues with root to speed up releases. */ - private final AtomicReference evenQ = new AtomicReference(); - private final AtomicReference oddQ = new AtomicReference(); + private final AtomicReference evenQ; + private final AtomicReference oddQ; private AtomicReference queueFor(int phase) { return ((phase & 1) == 0) ? evenQ : oddQ; @@ -291,20 +317,20 @@ public class Phaser { * Recursively resolves state. */ private long reconcileState() { - Phaser p = parent; + Phaser par = parent; long s = state; - if (p != null) { - while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) { - long parentState = p.getReconciledState(); - int parentPhase = phaseOf(parentState); - int phase = phaseOf(s = state); - if (phase != parentPhase) { + if (par != null) { + int phase, rootPhase; + while ((phase = phaseOf(s)) >= 0 && + (rootPhase = phaseOf(root.state)) != phase && + (rootPhase < 0 || unarrivedOf(s) == 0)) { + int parentPhase = phaseOf(par.getReconciledState()); + if (parentPhase != phase) { long next = trippedStateFor(parentPhase, partiesOf(s)); - if (casState(s, next)) { - releaseWaiters(phase); - s = next; - } + if (state == s) + UNSAFE.compareAndSwapLong(this, stateOffset, s, next); } + s = state; } } return s; @@ -316,11 +342,11 @@ public class Phaser { * phaser will need to first register for it. */ public Phaser() { - this(null); + this(null, 0); } /** - * Creates a new phaser with the given numbers of registered + * Creates a new phaser with the given number of registered * unarrived parties, initial phase number 0, and no parent. * * @param parties the number of parties required to trip barrier @@ -340,19 +366,11 @@ public class Phaser { * @param parent the parent phaser */ public Phaser(Phaser parent) { - int phase = 0; - this.parent = parent; - if (parent != null) { - this.root = parent.root; - phase = parent.register(); - } - else - this.root = this; - this.state = trippedStateFor(phase, 0); + this(parent, 0); } /** - * Creates a new phaser with the given parent and numbers of + * Creates a new phaser with the given parent and number of * registered unarrived parties. If parent is non-null, this phaser * is registered with the parent and its initial phase number is * the same as that of parent phaser. @@ -365,19 +383,28 @@ public class Phaser { public Phaser(Phaser parent, int parties) { if (parties < 0 || parties > ushortMask) throw new IllegalArgumentException("Illegal number of parties"); - int phase = 0; + int phase; this.parent = parent; if (parent != null) { - this.root = parent.root; + Phaser r = parent.root; + this.root = r; + this.evenQ = r.evenQ; + this.oddQ = r.oddQ; phase = parent.register(); } - else + else { this.root = this; + this.evenQ = new AtomicReference(); + this.oddQ = new AtomicReference(); + phase = 0; + } this.state = trippedStateFor(phase, parties); } /** * Adds a new unarrived party to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method may wait until its completion before registering. * * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more @@ -389,11 +416,14 @@ public class Phaser { /** * Adds the given number of new unarrived parties to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method may wait until its completion before registering. * - * @param parties the number of parties required to trip barrier + * @param parties the number of additional parties required to trip barrier * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more * than the maximum supported number of parties + * @throws IllegalArgumentException if {@code parties < 0} */ public int bulkRegister(int parties) { if (parties < 0) @@ -407,67 +437,65 @@ public class Phaser { * Shared code for register, bulkRegister */ private int doRegister(int registrations) { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = getReconciledState(); - phase = phaseOf(s); - int unarrived = unarrivedOf(s) + registrations; - int parties = partiesOf(s) + registrations; - if (phase < 0) - break; - if (parties > ushortMask || unarrived > ushortMask) + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { + int p = partiesOf(s); + int u = unarrivedOf(s); + int unarrived = u + registrations; + int parties = p + registrations; + if (u == 0 && p != 0) // if tripped, wait for advance + untimedWait(phase); + else if (parties > ushortMask) throw new IllegalStateException(badBounds(parties, unarrived)); - if (phase == phaseOf(root.state) && - casState(s, stateFor(phase, parties, unarrived))) - break; + else if (par == null || phaseOf(root.state) == phase) { + long next = stateFor(phase, parties, unarrived); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) + break; + } } return phase; } /** * Arrives at the barrier, but does not wait for others. (You can - * in turn wait for others via {@link #awaitAdvance}). + * in turn wait for others via {@link #awaitAdvance}). It is an + * unenforced usage error for an unregistered party to invoke this + * method. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of unarrived parties would become negative */ public int arrive() { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s); int unarrived = unarrivedOf(s) - 1; - if (unarrived > 0) { // Not the last arrival - if (casState(s, s - 1)) // s-1 adds one arrival - break; + if (unarrived > 0) { // Not the last arrival + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) + break; // s-1 adds one arrival } - else if (unarrived == 0) { // the last arrival - Phaser par = parent; - if (par == null) { // directly trip - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - } - else { // cascade to parent - if (casState(s, s - 1)) { // zeroes unarrived - par.arrive(); - reconcileState(); - break; - } + else if (unarrived < 0) + throw new IllegalStateException(badBounds(parties, unarrived)); + else if (par == null) { // directly trip + long next = trippedStateFor(onAdvance(phase, parties) ? -1 : + ((phase + 1) & phaseMask), + parties); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { + releaseWaiters(phase); + break; } } - else if (phase != phaseOf(root.state)) // or if unreconciled + else if (phaseOf(root.state) == phase && + UNSAFE.compareAndSwapLong(this, stateOffset, s, s - 1)) { + par.arrive(); // cascade to parent reconcileState(); - else - throw new IllegalStateException(badBounds(parties, unarrived)); + break; + } } return phase; } @@ -478,52 +506,48 @@ public class Phaser { * required to trip the barrier in future phases. If this phaser * has a parent, and deregistration causes this phaser to have * zero parties, this phaser also arrives at and is deregistered - * from its parent. + * from its parent. It is an unenforced usage error for an + * unregistered party to invoke this method. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { - // similar code to arrive, but too different to merge + // similar to arrive, but too different to merge Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s) - 1; int unarrived = unarrivedOf(s) - 1; - if (parties >= 0) { - if (unarrived > 0 || (unarrived == 0 && par != null)) { - if (casState - (s, - stateFor(phase, parties, unarrived))) { - if (unarrived == 0) { - par.arriveAndDeregister(); - reconcileState(); - } - break; - } - continue; - } - if (unarrived == 0) { - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - continue; + if (unarrived > 0) { + long next = stateFor(phase, parties, unarrived); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) + break; + } + else if (unarrived < 0) + throw new IllegalStateException(badBounds(parties, unarrived)); + else if (par == null) { + long next = trippedStateFor(onAdvance(phase, parties)? -1: + (phase + 1) & phaseMask, + parties); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { + releaseWaiters(phase); + break; } - if (par != null && phase != phaseOf(root.state)) { + } + else if (phaseOf(root.state) == phase) { + long next = stateFor(phase, parties, 0); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) { + if (parties == 0) + par.arriveAndDeregister(); + else + par.arrive(); reconcileState(); - continue; + break; } } - throw new IllegalStateException(badBounds(parties, unarrived)); } return phase; } @@ -532,9 +556,10 @@ public class Phaser { * Arrives at the barrier and awaits others. Equivalent in effect * to {@code awaitAdvance(arrive())}. If you need to await with * interruption or timeout, you can arrange this with an analogous - * construction using one of the other forms of the awaitAdvance - * method. If instead you need to deregister upon arrival use - * {@code arriveAndDeregister}. + * construction using one of the other forms of the {@code + * awaitAdvance} method. If instead you need to deregister upon + * arrival, use {@link #arriveAndDeregister}. It is an unenforced + * usage error for an unregistered party to invoke this method. * * @return the arrival phase number, or a negative number if terminated * @throws IllegalStateException if not terminated and the number @@ -559,22 +584,18 @@ public class Phaser { public int awaitAdvance(int phase) { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvance(phase); - // Fall here even if parent waited, to reconcile and help release return untimedWait(phase); } /** * Awaits the phase of the barrier to advance from the given phase - * value, throwing {@code InterruptedException} if interrupted while - * waiting, or returning immediately if the current phase of the - * barrier is not equal to the given phase value or this barrier - * is terminated. + * value, throwing {@code InterruptedException} if interrupted + * while waiting, or returning immediately if the current phase of + * the barrier is not equal to the given phase value or this + * barrier is terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a @@ -587,21 +608,19 @@ public class Phaser { throws InterruptedException { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase); return interruptibleWait(phase); } /** * Awaits the phase of the barrier to advance from the given phase - * value or the given timeout to elapse, throwing - * {@code InterruptedException} if interrupted while waiting, or - * returning immediately if the current phase of the barrier is not - * equal to the given phase value or this barrier is terminated. + * value or the given timeout to elapse, throwing {@code + * InterruptedException} if interrupted while waiting, or + * returning immediately if the current phase of the barrier is + * not equal to the given phase value or this barrier is + * terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a @@ -618,15 +637,13 @@ public class Phaser { public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + long nanos = unit.toNanos(timeout); if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase, timeout, unit); - return timedWait(phase, unit.toNanos(timeout)); + return timedWait(phase, nanos); } /** @@ -637,20 +654,15 @@ public class Phaser { * unexpected exceptions. */ public void forceTermination() { - for (;;) { - long s = getReconciledState(); - int phase = phaseOf(s); - int parties = partiesOf(s); - int unarrived = unarrivedOf(s); - if (phase < 0 || - casState(s, stateFor(-1, parties, unarrived))) { - releaseWaiters(0); - releaseWaiters(1); - if (parent != null) - parent.forceTermination(); - return; - } - } + Phaser r = root; // force at root then reconcile + long s; + while (phaseOf(s = r.state) >= 0) + UNSAFE.compareAndSwapLong(r, stateOffset, s, + stateFor(-1, partiesOf(s), + unarrivedOf(s))); + reconcileState(); + releaseWaiters(0); // ensure wakeups on both queues + releaseWaiters(1); } /** @@ -670,7 +682,7 @@ public class Phaser { * @return the number of parties */ public int getRegisteredParties() { - return partiesOf(state); + return partiesOf(getReconciledState()); } /** @@ -680,7 +692,7 @@ public class Phaser { * @return the number of arrived parties */ public int getArrivedParties() { - return arrivedOf(state); + return arrivedOf(getReconciledState()); } /** @@ -690,7 +702,7 @@ public class Phaser { * @return the number of unarrived parties */ public int getUnarrivedParties() { - return unarrivedOf(state); + return unarrivedOf(getReconciledState()); } /** @@ -722,26 +734,33 @@ public class Phaser { } /** - * Overridable method to perform an action upon phase advance, and - * to control termination. This method is invoked whenever the - * barrier is tripped (and thus all other waiting parties are - * dormant). If it returns {@code true}, then, rather than advance - * the phase number, this barrier will be set to a final - * termination state, and subsequent calls to {@link #isTerminated} - * will return true. + * Overridable method to perform an action upon impending phase + * advance, and to control termination. This method is invoked + * upon arrival of the party tripping the barrier (when all other + * waiting parties are dormant). If this method returns {@code + * true}, then, rather than advance the phase number, this barrier + * will be set to a final termination state, and subsequent calls + * to {@link #isTerminated} will return true. Any (unchecked) + * Exception or Error thrown by an invocation of this method is + * propagated to the party attempting to trip the barrier, in + * which case no advance occurs. + * + *

The arguments to this method provide the state of the phaser + * prevailing for the current transition. The results and effects + * of invoking phase-related methods (including {@code getPhase} + * as well as arrival, registration, and waiting methods) from + * within {@code onAdvance} are unspecified and should not be + * relied on. Similarly, while it is possible to override this + * method to produce side-effects visible to participating tasks, + * it is in general safe to do so only in designs in which all + * parties register before any arrive, and all {@link + * #awaitAdvance} at each phase. * *

The default version returns {@code true} when the number of * registered parties is zero. Normally, overrides that arrange * termination for other reasons should also preserve this * property. * - *

You may override this method to perform an action with side - * effects visible to participating tasks, but it is in general - * only sensible to do so in designs where all parties register - * before any arrive, and all {@link #awaitAdvance} at each phase. - * Otherwise, you cannot ensure lack of interference from other - * parties during the invocation of this method. - * * @param phase the phase number on entering the barrier * @param registeredParties the current number of registered parties * @return {@code true} if this barrier should terminate @@ -782,6 +801,7 @@ public class Phaser { volatile boolean wasInterrupted = false; volatile Thread thread; // nulled to cancel wait QNode next; + QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long startTime, long nanos) { this.phaser = phaser; @@ -792,12 +812,14 @@ public class Phaser { this.nanos = nanos; thread = Thread.currentThread(); } + public boolean isReleasable() { return (thread == null || phaser.getPhase() != phase || (interruptible && wasInterrupted) || (timed && (nanos - (System.nanoTime() - startTime)) <= 0)); } + public boolean block() { if (Thread.interrupted()) { wasInterrupted = true; @@ -814,6 +836,7 @@ public class Phaser { } return isReleasable(); } + void signal() { Thread t = thread; if (t != null) { @@ -821,16 +844,17 @@ public class Phaser { LockSupport.unpark(t); } } + boolean doWait() { if (thread != null) { try { - ForkJoinPool.managedBlock(this, false); + ForkJoinPool.managedBlock(this); } catch (InterruptedException ie) { + wasInterrupted = true; // can't currently happen } } return wasInterrupted; } - } /** @@ -856,6 +880,12 @@ public class Phaser { } /** + * The number of times to spin before blocking waiting for advance. + */ + static final int MAX_SPINS = + Runtime.getRuntime().availableProcessors() == 1 ? 0 : 1 << 8; + + /** * Enqueues node and waits unless aborted or signalled. * * @return current phase @@ -864,16 +894,21 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase) { if (Thread.interrupted()) interrupted = true; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, false, false, 0, 0); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null; @@ -891,16 +926,21 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase && !interrupted) { if (Thread.interrupted()) interrupted = true; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, true, false, 0, 0); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null; @@ -921,18 +961,23 @@ public class Phaser { QNode node = null; boolean queued = false; boolean interrupted = false; + int spins = MAX_SPINS; int p; while ((p = getPhase()) == phase && !interrupted) { if (Thread.interrupted()) interrupted = true; else if (nanos - (System.nanoTime() - startTime) <= 0) break; + else if (spins > 0) { + if (--spins == 0) + Thread.yield(); + } else if (node == null) node = new QNode(this, phase, true, true, startTime, nanos); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null; @@ -951,10 +996,6 @@ public class Phaser { private static final long stateOffset = objectFieldOffset("state", Phaser.class); - private final boolean casState(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val); - } - private static long objectFieldOffset(String field, Class klazz) { try { return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));