--- jsr166/src/jsr166y/Phaser.java 2010/11/24 15:48:01 1.58 +++ jsr166/src/jsr166y/Phaser.java 2010/11/27 16:46:53 1.59 @@ -79,7 +79,9 @@ import java.util.concurrent.locks.LockSu * immediately return without updating phaser state or waiting for * advance, and indicating (via a negative phase value) that execution * is complete. Termination is triggered when an invocation of {@code - * onAdvance} returns {@code true}. As illustrated below, when + * onAdvance} returns {@code true}. The default implementation returns + * {@code true} if a deregistration has caused the number of + * registered parties to become zero. As illustrated below, when * phasers control actions with a fixed number of iterations, it is * often convenient to override this method to cause termination when * the current phase number reaches a threshold. Method {@link @@ -240,15 +242,15 @@ public class Phaser { */ private volatile long state; - private static final int MAX_PARTIES = 0xffff; - private static final int MAX_PHASE = 0x7fffffff; - private static final int PARTIES_SHIFT = 16; - private static final int PHASE_SHIFT = 32; - private static final int UNARRIVED_MASK = 0xffff; - private static final long PARTIES_MASK = 0xffff0000L; // for masking long - private static final long ONE_ARRIVAL = 1L; - private static final long ONE_PARTY = 1L << PARTIES_SHIFT; - private static final long TERMINATION_PHASE = -1L << PHASE_SHIFT; + private static final int MAX_PARTIES = 0xffff; + private static final int MAX_PHASE = 0x7fffffff; + private static final int PARTIES_SHIFT = 16; + private static final int PHASE_SHIFT = 32; + private static final int UNARRIVED_MASK = 0xffff; // to mask ints + private static final long PARTIES_MASK = 0xffff0000L; // to mask longs + private static final long ONE_ARRIVAL = 1L; + private static final long ONE_PARTY = 1L << PARTIES_SHIFT; + private static final long TERMINATION_BIT = 1L << 63; // The following unpacking methods are usually manually inlined @@ -293,6 +295,22 @@ public class Phaser { } /** + * Returns message string for bounds exceptions on arrival. + */ + private String badArrive(long s) { + return "Attempted arrival of unregistered party for " + + stateToString(s); + } + + /** + * Returns message string for bounds exceptions on registration. + */ + private String badRegister(long s) { + return "Attempt to register more than " + + MAX_PARTIES + " parties for " + stateToString(s); + } + + /** * Main implementation for methods arrive and arriveAndDeregister. * Manually tuned to speed up and minimize race windows for the * common case of just decrementing unarrived field. @@ -304,12 +322,14 @@ public class Phaser { private int doArrive(long adj) { for (;;) { long s = state; + int unarrived = (int)s & UNARRIVED_MASK; int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; - int unarrived = (int)s & UNARRIVED_MASK; - if (unarrived == 0) - checkBadArrive(s); + else if (unarrived == 0) { + if (reconcileState() == s) // recheck + throw new IllegalStateException(badArrive(s)); + } else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { if (unarrived == 1) { long p = s & PARTIES_MASK; // unshifted parties field @@ -320,7 +340,7 @@ public class Phaser { final Phaser parent = this.parent; if (parent == null) { if (onAdvance(phase, u)) - next |= TERMINATION_PHASE; // obliterate phase + next |= TERMINATION_BIT; UNSAFE.compareAndSwapLong(this, stateOffset, s, next); releaseWaiters(phase); } @@ -340,17 +360,6 @@ public class Phaser { } /** - * Rechecks state and throws bounds exceptions on arrival -- called - * only if unarrived is apparently zero. - */ - private void checkBadArrive(long s) { - if (reconcileState() == s) - throw new IllegalStateException - ("Attempted arrival of unregistered party for " + - stateToString(s)); - } - - /** * Implementation of register, bulkRegister * * @param registrations number to add to both parties and @@ -366,24 +375,34 @@ public class Phaser { int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; - else if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0) - internalAwaitAdvance(phase, null); // wait for onAdvance else if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) - return phase; + else if ((parties == 0 && parent == null) || // first reg of root + ((int)s & UNARRIVED_MASK) != 0) { // not advancing + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) + return phase; + } + else if (parties != 0) // wait for onAdvance + internalAwaitAdvance(phase, null); + else { // 1st registration of child + synchronized(this) { // register parent first + if (reconcileState() == s) { // recheck under lock + parent.doRegister(1); // OK if throws IllegalState + for (;;) { // simpler form of outer loop + s = reconcileState(); + phase = (int)(s >>> PHASE_SHIFT); + if (phase < 0 || + UNSAFE.compareAndSwapLong(this, stateOffset, + s, s + adj)) + return phase; + } + } + } + } } } /** - * Returns message string for out of bounds exceptions on registration. - */ - private String badRegister(long s) { - return "Attempt to register more than " + - MAX_PARTIES + " parties for " + stateToString(s); - } - - /** * Recursively resolves lagged phase propagation from root if necessary. */ private long reconcileState() { @@ -400,10 +419,7 @@ public class Phaser { long u = s & PARTIES_MASK; // reset unarrived to parties long next = ((((long) rPhase) << PHASE_SHIFT) | u | (u >>> PARTIES_SHIFT)); - if (state == s && - UNSAFE.compareAndSwapLong(this, stateOffset, - s, s = next)) - break; + UNSAFE.compareAndSwapLong(this, stateOffset, s, next); } s = state; } @@ -433,6 +449,12 @@ public class Phaser { } /** + * Creates a new phaser with the given parent, and without any + * initially registered parties. Any thread using this phaser + * will need to first register for it, at which point, if the + * given parent is non-null, this phaser will also be registered + * with the parent. + * * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. * * @param parent the parent phaser @@ -443,20 +465,9 @@ public class Phaser { /** * Creates a new phaser with the given parent and number of - * registered unarrived parties. If parent is non-null, this - * phaser is registered with the parent and its initial phase - * number is the same as that of parent phaser. If the number of - * parties is zero, the parent phaser will not proceed until this - * child phaser registers parties and advances, or this child - * phaser deregisters with its parent, or the parent is otherwise - * terminated. This child Phaser will be deregistered from its - * parent automatically upon any invocation of the child's {@link - * #arriveAndDeregister} method that results in the child's number - * of registered parties becoming zero. (Although rarely - * appropriate, this child may also explicity deregister from its - * parent using {@code getParent().arriveAndDeregister()}.) After - * deregistration, the child cannot re-register. (Instead, you can - * create a new child Phaser.) + * registered unarrived parties. If parent is non-null and + * the number of parties is non-zero, this phaser is registered + * with the parent. * * @param parent the parent phaser * @param parties the number of parties required to trip barrier @@ -473,7 +484,7 @@ public class Phaser { this.root = r; this.evenQ = r.evenQ; this.oddQ = r.oddQ; - phase = parent.doRegister(1); + phase = (parties == 0) ? parent.getPhase() : parent.doRegister(1); } else { this.root = this; @@ -486,9 +497,12 @@ public class Phaser { } /** - * Adds a new unarrived party to this phaser. - * If an ongoing invocation of {@link #onAdvance} is in progress, - * this method may wait until its completion before registering. + * Adds a new unarrived party to this phaser. If an ongoing + * invocation of {@link #onAdvance} is in progress, this method + * may wait until its completion before registering. If this + * phaser has a parent, and this phaser previously had no + * registered parties, this phaser is also registered with its + * parent. * * @return the arrival phase number to which this registration applied * @throws IllegalStateException if attempting to register more @@ -502,6 +516,10 @@ public class Phaser { * Adds the given number of new unarrived parties to this phaser. * If an ongoing invocation of {@link #onAdvance} is in progress, * this method may wait until its completion before registering. + * If this phaser has a parent, and the given number of parities + * is greater than zero, and this phaser previously had no + * registered parties, this phaser is also registered with its + * parent. * * @param parties the number of additional parties required to trip barrier * @return the arrival phase number to which this registration applied @@ -512,16 +530,18 @@ public class Phaser { public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); - if (parties == 0) + else if (parties == 0) return getPhase(); return doRegister(parties); } /** * Arrives at the barrier, but does not wait for others. (You can - * in turn wait for others via {@link #awaitAdvance}). It is an - * unenforced usage error for an unregistered party to invoke this - * method. + * in turn wait for others via {@link #awaitAdvance}). It is a + * usage error for an unregistered party to invoke this + * method. However, it is possible that this error will result in + * an {code IllegalStateException} only when some other + * party arrives. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number @@ -537,8 +557,10 @@ public class Phaser { * required to trip the barrier in future phases. If this phaser * has a parent, and deregistration causes this phaser to have * zero parties, this phaser also arrives at and is deregistered - * from its parent. It is an unenforced usage error for an - * unregistered party to invoke this method. + * from its parent. It is a usage error for an unregistered party + * to invoke this method. However, it is possible that this error + * will result in an {code IllegalStateException} only when some + * other party arrives. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number @@ -554,8 +576,11 @@ public class Phaser { * interruption or timeout, you can arrange this with an analogous * construction using one of the other forms of the {@code * awaitAdvance} method. If instead you need to deregister upon - * arrival, use {@link #arriveAndDeregister}. It is an unenforced - * usage error for an unregistered party to invoke this method. + * arrival, use {@link #arriveAndDeregister}. It is a usage error + * for an unregistered party to invoke this method. However, it is + * possible that this error will result in an {code + * IllegalStateException} only when some other party + * arrives. * * @return the arrival phase number, or a negative number if terminated * @throws IllegalStateException if not terminated and the number @@ -668,7 +693,7 @@ public class Phaser { long s; while ((s = root.state) >= 0) { if (UNSAFE.compareAndSwapLong(root, stateOffset, - s, s | TERMINATION_PHASE)) { + s, s | TERMINATION_BIT)) { releaseWaiters(0); // signal all threads releaseWaiters(1); return; @@ -766,10 +791,18 @@ public class Phaser { * {@code onAdvance} is invoked only for its root Phaser on each * advance. * - *

The default version returns {@code true} when the number of - * registered parties is zero. Normally, overrides that arrange - * termination for other reasons should also preserve this - * property. + *

To support the most common use cases, the default + * implementation of this method returns {@code true} when the + * number of registered parties has become zero as the result of a + * party invoking {@code arriveAndDeregister}. You can disable + * this behavior, thus enabling continuation upon future + * registrations, by overriding this method to always return + * {@code false}: + * + *

 {@code
+     * Phaser phaser = new Phaser() {
+     *   protected boolean onAdvance(int phase, int parties) { return false; }
+     * }}
* * @param phase the phase number on entering the barrier * @param registeredParties the current number of registered parties