--- jsr166/src/jsr166y/Phaser.java 2009/08/25 16:32:28 1.44 +++ jsr166/src/jsr166y/Phaser.java 2010/11/05 23:01:47 1.49 @@ -7,7 +7,6 @@ package jsr166y; import java.util.concurrent.*; - import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -109,9 +108,9 @@ import java.util.concurrent.locks.LockSu *

Sample usages: * *

A {@code Phaser} may be used instead of a {@code CountDownLatch} - * to control a one-shot action serving a variable number of - * parties. The typical idiom is for the method setting this up to - * first register, then start the actions, then deregister, as in: + * to control a one-shot action serving a variable number of parties. + * The typical idiom is for the method setting this up to first + * register, then start the actions, then deregister, as in: * *

 {@code
  * void runTasks(List tasks) {
@@ -142,14 +141,14 @@ import java.util.concurrent.locks.LockSu
  *     }
  *   };
  *   phaser.register();
- *   for (Runnable task : tasks) {
+ *   for (final Runnable task : tasks) {
  *     phaser.register();
  *     new Thread() {
  *       public void run() {
  *         do {
  *           task.run();
  *           phaser.arriveAndAwaitAdvance();
- *         } while(!phaser.isTerminated();
+ *         } while (!phaser.isTerminated());
  *       }
  *     }.start();
  *   }
@@ -158,35 +157,34 @@ import java.util.concurrent.locks.LockSu
  *
  * If the main task must later await termination, it
  * may re-register and then execute a similar loop:
- * 
 {@code
+ *  
 {@code
  *   // ...
  *   phaser.register();
  *   while (!phaser.isTerminated())
- *     phaser.arriveAndAwaitAdvance();
- * }
+ * phaser.arriveAndAwaitAdvance();}
* - * Related constructions may be used to await particular phase numbers + *

Related constructions may be used to await particular phase numbers * in contexts where you are sure that the phase will never wrap around * {@code Integer.MAX_VALUE}. For example: * - *

 {@code
- *   void awaitPhase(Phaser phaser, int phase) {
- *     int p = phaser.register(); // assumes caller not already registered
- *     while (p < phase) {
- *       if (phaser.isTerminated())
- *         // ... deal with unexpected termination
- *       else
- *         p = phaser.arriveAndAwaitAdvance();
- *     }
- *     phaser.arriveAndDeregister();
+ *  
 {@code
+ * void awaitPhase(Phaser phaser, int phase) {
+ *   int p = phaser.register(); // assumes caller not already registered
+ *   while (p < phase) {
+ *     if (phaser.isTerminated())
+ *       // ... deal with unexpected termination
+ *     else
+ *       p = phaser.arriveAndAwaitAdvance();
  *   }
- * }
+ * phaser.arriveAndDeregister(); + * }}
* * *

To create a set of tasks using a tree of phasers, * you could use code of the following form, assuming a * Task class with a constructor accepting a phaser that - * it registers for upon construction: + * it registers with upon construction: + * *

 {@code
  * void build(Task[] actions, int lo, int hi, Phaser ph) {
  *   if (hi - lo > TASKS_PER_PHASER) {
@@ -208,8 +206,6 @@ import java.util.concurrent.locks.LockSu
  * be appropriate for extremely small per-barrier task bodies (thus
  * high rates), or up to hundreds for extremely large ones.
  *
- * 
- * *

Implementation notes: This implementation restricts the * maximum number of parties to 65535. Attempts to register additional * parties result in {@code IllegalStateException}. However, you can and @@ -298,14 +294,16 @@ public class Phaser { /** * Heads of Treiber stacks for waiting threads. To eliminate - * contention while releasing some threads while adding others, we + * contention when releasing some threads while adding others, we * use two of them, alternating across even and odd phases. + * Subphasers share queues with root to speed up releases. */ private final AtomicReference evenQ = new AtomicReference(); private final AtomicReference oddQ = new AtomicReference(); private AtomicReference queueFor(int phase) { - return ((phase & 1) == 0) ? evenQ : oddQ; + Phaser r = root; + return ((phase & 1) == 0) ? r.evenQ : r.oddQ; } /** @@ -320,20 +318,25 @@ public class Phaser { * Recursively resolves state. */ private long reconcileState() { - Phaser p = parent; + Phaser par = parent; long s = state; - if (p != null) { - while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) { - long parentState = p.getReconciledState(); + if (par != null) { + int phase, rootPhase; + while ((phase = phaseOf(s)) >= 0 && + (rootPhase = phaseOf(root.state)) != phase && + (rootPhase < 0 || unarrivedOf(s) == 0)) { + long parentState = par.getReconciledState(); int parentPhase = phaseOf(parentState); - int phase = phaseOf(s = state); - if (phase != parentPhase) { - long next = trippedStateFor(parentPhase, partiesOf(s)); - if (casState(s, next)) { - releaseWaiters(phase); - s = next; - } + int parties = partiesOf(s); + long next = trippedStateFor(parentPhase, parties); + if (phaseOf(root.state) == rootPhase && + parentPhase != phase && + state == s && casState(s, next)) { + releaseWaiters(phase); + if (parties == 0) // exit if the final deregistration + break; } + s = state; } } return s; @@ -349,7 +352,7 @@ public class Phaser { } /** - * Creates a new phaser with the given numbers of registered + * Creates a new phaser with the given number of registered * unarrived parties, initial phase number 0, and no parent. * * @param parties the number of parties required to trip barrier @@ -381,7 +384,7 @@ public class Phaser { } /** - * Creates a new phaser with the given parent and numbers of + * Creates a new phaser with the given parent and number of * registered unarrived parties. If parent is non-null, this phaser * is registered with the parent and its initial phase number is * the same as that of parent phaser. @@ -407,6 +410,8 @@ public class Phaser { /** * Adds a new unarrived party to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method waits until its completion before registering. * * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more @@ -418,11 +423,14 @@ public class Phaser { /** * Adds the given number of new unarrived parties to this phaser. + * If an ongoing invocation of {@link #onAdvance} is in progress, + * this method waits until its completion before registering. * - * @param parties the number of parties required to trip barrier + * @param parties the number of additional parties required to trip barrier * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more * than the maximum supported number of parties + * @throws IllegalArgumentException if {@code parties < 0} */ public int bulkRegister(int parties) { if (parties < 0) @@ -436,19 +444,23 @@ public class Phaser { * Shared code for register, bulkRegister */ private int doRegister(int registrations) { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = getReconciledState(); - phase = phaseOf(s); - int unarrived = unarrivedOf(s) + registrations; - int parties = partiesOf(s) + registrations; - if (phase < 0) - break; - if (parties > ushortMask || unarrived > ushortMask) - throw new IllegalStateException(badBounds(parties, unarrived)); - if (phase == phaseOf(root.state) && - casState(s, stateFor(phase, parties, unarrived))) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { + int p = partiesOf(s); + int u = unarrivedOf(s); + int unarrived = u + registrations; + int parties = p + registrations; + if (par == null || phase == phaseOf(root.state)) { + if (parties > ushortMask || unarrived > ushortMask) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (p != 0 && u == 0) // back off if advancing + Thread.yield(); // not worth actually blocking + else if (casState(s, stateFor(phase, parties, unarrived))) + break; + } } return phase; } @@ -464,41 +476,32 @@ public class Phaser { * of unarrived parties would become negative */ public int arrive() { + Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s); int unarrived = unarrivedOf(s) - 1; - if (unarrived > 0) { // Not the last arrival - if (casState(s, s - 1)) // s-1 adds one arrival + if (parties == 0 || unarrived < 0) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (unarrived > 0) { // Not the last arrival + if (casState(s, s - 1)) // s-1 adds one arrival break; } - else if (unarrived == 0) { // the last arrival - Phaser par = parent; - if (par == null) { // directly trip - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - } - else { // cascade to parent - if (casState(s, s - 1)) { // zeroes unarrived - par.arrive(); - reconcileState(); - break; - } + else if (par == null) { // directly trip + if (casState(s, trippedStateFor(onAdvance(phase, parties) ? -1 : + ((phase + 1) & phaseMask), + parties))) { + releaseWaiters(phase); + break; } } - else if (phase != phaseOf(root.state)) // or if unreconciled + else if (phaseOf(root.state) == phase && casState(s, s - 1)) { + par.arrive(); // cascade to parent reconcileState(); - else - throw new IllegalStateException(badBounds(parties, unarrived)); + break; + } } return phase; } @@ -517,45 +520,37 @@ public class Phaser { * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { - // similar code to arrive, but too different to merge + // similar to arrive, but too different to merge Phaser par = parent; + long s; int phase; - for (;;) { - long s = state; - phase = phaseOf(s); - if (phase < 0) - break; + while ((phase = phaseOf(s = par==null? state:reconcileState())) >= 0) { int parties = partiesOf(s) - 1; int unarrived = unarrivedOf(s) - 1; - if (parties >= 0) { - if (unarrived > 0 || (unarrived == 0 && par != null)) { - if (casState - (s, - stateFor(phase, parties, unarrived))) { - if (unarrived == 0) { - par.arriveAndDeregister(); - reconcileState(); - } - break; - } - continue; - } - if (unarrived == 0) { - if (casState - (s, - trippedStateFor(onAdvance(phase, parties) ? -1 : - ((phase + 1) & phaseMask), parties))) { - releaseWaiters(phase); - break; - } - continue; - } - if (par != null && phase != phaseOf(root.state)) { - reconcileState(); - continue; + if (parties < 0 || unarrived < 0) + throw new IllegalStateException(badBounds(parties, + unarrived)); + else if (unarrived > 0) { + if (casState(s, stateFor(phase, parties, unarrived))) + break; + } + else if (par == null) { + if (casState(s, trippedStateFor(onAdvance(phase, parties)? -1: + (phase + 1) & phaseMask, + parties))) { + releaseWaiters(phase); + break; } } - throw new IllegalStateException(badBounds(parties, unarrived)); + else if (phaseOf(root.state) == phase && + casState(s, stateFor(phase, parties, 0))) { + if (parties == 0) + par.arriveAndDeregister(); + else + par.arrive(); + reconcileState(); + break; + } } return phase; } @@ -564,10 +559,10 @@ public class Phaser { * Arrives at the barrier and awaits others. Equivalent in effect * to {@code awaitAdvance(arrive())}. If you need to await with * interruption or timeout, you can arrange this with an analogous - * construction using one of the other forms of the awaitAdvance - * method. If instead you need to deregister upon arrival use - * {@code arriveAndDeregister}. It is an unenforced usage error - * for an unregistered party to invoke this method. + * construction using one of the other forms of the {@code + * awaitAdvance} method. If instead you need to deregister upon + * arrival, use {@link #arriveAndDeregister}. It is an unenforced + * usage error for an unregistered party to invoke this method. * * @return the arrival phase number, or a negative number if terminated * @throws IllegalStateException if not terminated and the number @@ -593,13 +588,9 @@ public class Phaser { public int awaitAdvance(int phase) { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvance(phase); - // Fall here even if parent waited, to reconcile and help release return untimedWait(phase); } @@ -622,12 +613,9 @@ public class Phaser { throws InterruptedException { if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase); return interruptibleWait(phase); } @@ -655,15 +643,13 @@ public class Phaser { public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + long nanos = unit.toNanos(timeout); if (phase < 0) return phase; - long s = getReconciledState(); - int p = phaseOf(s); + int p = getPhase(); if (p != phase) return p; - if (unarrivedOf(s) == 0 && parent != null) - parent.awaitAdvanceInterruptibly(phase, timeout, unit); - return timedWait(phase, unit.toNanos(timeout)); + return timedWait(phase, nanos); } /** @@ -674,20 +660,13 @@ public class Phaser { * unexpected exceptions. */ public void forceTermination() { - for (;;) { - long s = getReconciledState(); - int phase = phaseOf(s); - int parties = partiesOf(s); - int unarrived = unarrivedOf(s); - if (phase < 0 || - casState(s, stateFor(-1, parties, unarrived))) { - releaseWaiters(0); - releaseWaiters(1); - if (parent != null) - parent.forceTermination(); - return; - } - } + Phaser r = root; // force at root then reconcile + long s; + while (phaseOf(s = r.state) >= 0) + r.casState(s, stateFor(-1, partiesOf(s), unarrivedOf(s))); + reconcileState(); + releaseWaiters(0); // ensure wakeups on both queues + releaseWaiters(1); } /** @@ -707,7 +686,7 @@ public class Phaser { * @return the number of parties */ public int getRegisteredParties() { - return partiesOf(state); + return partiesOf(getReconciledState()); } /** @@ -717,7 +696,7 @@ public class Phaser { * @return the number of arrived parties */ public int getArrivedParties() { - return arrivedOf(state); + return arrivedOf(getReconciledState()); } /** @@ -727,7 +706,7 @@ public class Phaser { * @return the number of unarrived parties */ public int getUnarrivedParties() { - return unarrivedOf(state); + return unarrivedOf(getReconciledState()); } /** @@ -781,14 +760,6 @@ public class Phaser { * termination for other reasons should also preserve this * property. * - *

You may override this method to perform an action with side - * effects visible to participating tasks, but doing so requires - * care: Method {@code onAdvance} may be invoked more than once - * per transition. Further, unless all parties register before - * any arrive, and all {@link #awaitAdvance} at each phase, then - * you cannot ensure lack of interference from other parties - * during the invocation of this method. - * * @param phase the phase number on entering the barrier * @param registeredParties the current number of registered parties * @return {@code true} if this barrier should terminate @@ -829,6 +800,7 @@ public class Phaser { volatile boolean wasInterrupted = false; volatile Thread thread; // nulled to cancel wait QNode next; + QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long startTime, long nanos) { this.phaser = phaser; @@ -839,12 +811,14 @@ public class Phaser { this.nanos = nanos; thread = Thread.currentThread(); } + public boolean isReleasable() { return (thread == null || phaser.getPhase() != phase || (interruptible && wasInterrupted) || (timed && (nanos - (System.nanoTime() - startTime)) <= 0)); } + public boolean block() { if (Thread.interrupted()) { wasInterrupted = true; @@ -861,6 +835,7 @@ public class Phaser { } return isReleasable(); } + void signal() { Thread t = thread; if (t != null) { @@ -868,16 +843,17 @@ public class Phaser { LockSupport.unpark(t); } } + boolean doWait() { if (thread != null) { try { - ForkJoinPool.managedBlock(this, false); + ForkJoinPool.managedBlock(this); } catch (InterruptedException ie) { + wasInterrupted = true; // can't currently happen } } return wasInterrupted; } - } /** @@ -919,8 +895,8 @@ public class Phaser { node = new QNode(this, phase, false, false, 0, 0); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null; @@ -946,8 +922,8 @@ public class Phaser { node = new QNode(this, phase, true, false, 0, 0); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null; @@ -978,8 +954,8 @@ public class Phaser { node = new QNode(this, phase, true, true, startTime, nanos); else if (!queued) queued = tryEnqueue(node); - else - interrupted = node.doWait(); + else if (node.doWait()) + interrupted = true; } if (node != null) node.thread = null;