--- jsr166/src/jsr166y/Phaser.java 2010/11/06 16:12:10 1.50 +++ jsr166/src/jsr166y/Phaser.java 2010/11/24 15:48:01 1.58 @@ -6,7 +6,8 @@ package jsr166y; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -85,9 +86,9 @@ import java.util.concurrent.locks.LockSu * #forceTermination} is also available to abruptly release waiting * threads and allow them to terminate. * - *
Tiering. Phasers may be tiered (i.e., arranged - * in tree structures) to reduce contention. Phasers with large - * numbers of parties that would otherwise experience heavy + *
Tiering. Phasers may be tiered (i.e.,
+ * constructed in tree structures) to reduce contention. Phasers with
+ * large numbers of parties that would otherwise experience heavy
* synchronization contention costs may instead be set up so that
* groups of sub-phasers share a common parent. This may greatly
* increase throughput even though it incurs greater per-operation
@@ -226,59 +227,47 @@ public class Phaser {
* Barrier state representation. Conceptually, a barrier contains
* four values:
*
- * * parties -- the number of parties to wait (16 bits)
- * * unarrived -- the number of parties yet to hit barrier (16 bits)
- * * phase -- the generation of the barrier (31 bits)
- * * terminated -- set if barrier is terminated (1 bit)
+ * * unarrived -- the number of parties yet to hit barrier (bits 0-15)
+ * * parties -- the number of parties to wait (bits 16-31)
+ * * phase -- the generation of the barrier (bits 32-62)
+ * * terminated -- set if barrier is terminated (bit 63 / sign)
*
* However, to efficiently maintain atomicity, these values are
* packed into a single (atomic) long. Termination uses the sign
* bit of 32 bit representation of phase, so phase is set to -1 on
* termination. Good performance relies on keeping state decoding
* and encoding simple, and keeping race windows short.
- *
- * Note: there are some cheats in arrive() that rely on unarrived
- * count being lowest 16 bits.
*/
private volatile long state;
- private static final int ushortMask = 0xffff;
- private static final int phaseMask = 0x7fffffff;
+ private static final int MAX_PARTIES = 0xffff;
+ private static final int MAX_PHASE = 0x7fffffff;
+ private static final int PARTIES_SHIFT = 16;
+ private static final int PHASE_SHIFT = 32;
+ private static final int UNARRIVED_MASK = 0xffff;
+ private static final long PARTIES_MASK = 0xffff0000L; // for masking long
+ private static final long ONE_ARRIVAL = 1L;
+ private static final long ONE_PARTY = 1L << PARTIES_SHIFT;
+ private static final long TERMINATION_PHASE = -1L << PHASE_SHIFT;
+
+ // The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
- return (int) (s & ushortMask);
+ return (int)s & UNARRIVED_MASK;
}
private static int partiesOf(long s) {
- return ((int) s) >>> 16;
+ return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
- return (int) (s >>> 32);
+ return (int) (s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
return partiesOf(s) - unarrivedOf(s);
}
- private static long stateFor(int phase, int parties, int unarrived) {
- return ((((long) phase) << 32) | (((long) parties) << 16) |
- (long) unarrived);
- }
-
- private static long trippedStateFor(int phase, int parties) {
- long lp = (long) parties;
- return (((long) phase) << 32) | (lp << 16) | lp;
- }
-
- /**
- * Returns message string for bad bounds exceptions.
- */
- private static String badBounds(int parties, int unarrived) {
- return ("Attempt to set " + unarrived +
- " unarrived of " + parties + " parties");
- }
-
/**
* The parent of this phaser, or null if none
*/
@@ -290,8 +279,6 @@ public class Phaser {
*/
private final Phaser root;
- // Wait queues
-
/**
* Heads of Treiber stacks for waiting threads. To eliminate
* contention when releasing some threads while adding others, we
@@ -306,29 +293,117 @@ public class Phaser {
}
/**
- * Returns current state, first resolving lagged propagation from
- * root if necessary.
+ * Main implementation for methods arrive and arriveAndDeregister.
+ * Manually tuned to speed up and minimize race windows for the
+ * common case of just decrementing unarrived field.
+ *
+ * @param adj - adjustment to apply to state -- either
+ * ONE_ARRIVAL (for arrive) or
+ * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
+ */
+ private int doArrive(long adj) {
+ for (;;) {
+ long s = state;
+ int phase = (int)(s >>> PHASE_SHIFT);
+ if (phase < 0)
+ return phase;
+ int unarrived = (int)s & UNARRIVED_MASK;
+ if (unarrived == 0)
+ checkBadArrive(s);
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
+ if (unarrived == 1) {
+ long p = s & PARTIES_MASK; // unshifted parties field
+ long lu = p >>> PARTIES_SHIFT;
+ int u = (int)lu;
+ int nextPhase = (phase + 1) & MAX_PHASE;
+ long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
+ final Phaser parent = this.parent;
+ if (parent == null) {
+ if (onAdvance(phase, u))
+ next |= TERMINATION_PHASE; // obliterate phase
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
+ releaseWaiters(phase);
+ }
+ else {
+ parent.doArrive((u == 0) ?
+ ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
+ if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase ||
+ ((int)(state >>> PHASE_SHIFT) != nextPhase &&
+ !UNSAFE.compareAndSwapLong(this, stateOffset,
+ s, next)))
+ reconcileState();
+ }
+ }
+ return phase;
+ }
+ }
+ }
+
+ /**
+ * Rechecks state and throws bounds exceptions on arrival -- called
+ * only if unarrived is apparently zero.
*/
- private long getReconciledState() {
- return (parent == null) ? state : reconcileState();
+ private void checkBadArrive(long s) {
+ if (reconcileState() == s)
+ throw new IllegalStateException
+ ("Attempted arrival of unregistered party for " +
+ stateToString(s));
}
/**
- * Recursively resolves state.
+ * Implementation of register, bulkRegister
+ *
+ * @param registrations number to add to both parties and
+ * unarrived fields. Must be greater than zero.
+ */
+ private int doRegister(int registrations) {
+ // adjustment to state
+ long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
+ final Phaser parent = this.parent;
+ for (;;) {
+ long s = (parent == null) ? state : reconcileState();
+ int parties = (int)s >>> PARTIES_SHIFT;
+ int phase = (int)(s >>> PHASE_SHIFT);
+ if (phase < 0)
+ return phase;
+ else if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0)
+ internalAwaitAdvance(phase, null); // wait for onAdvance
+ else if (registrations > MAX_PARTIES - parties)
+ throw new IllegalStateException(badRegister(s));
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
+ return phase;
+ }
+ }
+
+ /**
+ * Returns message string for out of bounds exceptions on registration.
+ */
+ private String badRegister(long s) {
+ return "Attempt to register more than " +
+ MAX_PARTIES + " parties for " + stateToString(s);
+ }
+
+ /**
+ * Recursively resolves lagged phase propagation from root if necessary.
*/
private long reconcileState() {
Phaser par = parent;
long s = state;
if (par != null) {
- int phase, rootPhase;
- while ((phase = phaseOf(s)) >= 0 &&
- (rootPhase = phaseOf(root.state)) != phase &&
- (rootPhase < 0 || unarrivedOf(s) == 0)) {
- int parentPhase = phaseOf(par.getReconciledState());
- if (parentPhase != phase) {
- long next = trippedStateFor(parentPhase, partiesOf(s));
- if (state == s)
- UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
+ Phaser rt = root;
+ int phase, rPhase;
+ while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
+ (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
+ if ((int)(par.state >>> PHASE_SHIFT) != rPhase)
+ par.reconcileState();
+ else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
+ long u = s & PARTIES_MASK; // reset unarrived to parties
+ long next = ((((long) rPhase) << PHASE_SHIFT) | u |
+ (u >>> PARTIES_SHIFT));
+ if (state == s &&
+ UNSAFE.compareAndSwapLong(this, stateOffset,
+ s, s = next))
+ break;
}
s = state;
}
@@ -358,10 +433,7 @@ public class Phaser {
}
/**
- * Creates a new phaser with the given parent, without any
- * initially registered parties. If parent is non-null this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
+ * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
*
* @param parent the parent phaser
*/
@@ -371,9 +443,20 @@ public class Phaser {
/**
* Creates a new phaser with the given parent and number of
- * registered unarrived parties. If parent is non-null, this phaser
- * is registered with the parent and its initial phase number is
- * the same as that of parent phaser.
+ * registered unarrived parties. If parent is non-null, this
+ * phaser is registered with the parent and its initial phase
+ * number is the same as that of parent phaser. If the number of
+ * parties is zero, the parent phaser will not proceed until this
+ * child phaser registers parties and advances, or this child
+ * phaser deregisters with its parent, or the parent is otherwise
+ * terminated. This child Phaser will be deregistered from its
+ * parent automatically upon any invocation of the child's {@link
+ * #arriveAndDeregister} method that results in the child's number
+ * of registered parties becoming zero. (Although rarely
+ * appropriate, this child may also explicity deregister from its
+ * parent using {@code getParent().arriveAndDeregister()}.) After
+ * deregistration, the child cannot re-register. (Instead, you can
+ * create a new child Phaser.)
*
* @param parent the parent phaser
* @param parties the number of parties required to trip barrier
@@ -381,7 +464,7 @@ public class Phaser {
* or greater than the maximum number of parties supported
*/
public Phaser(Phaser parent, int parties) {
- if (parties < 0 || parties > ushortMask)
+ if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase;
this.parent = parent;
@@ -390,7 +473,7 @@ public class Phaser {
this.root = r;
this.evenQ = r.evenQ;
this.oddQ = r.oddQ;
- phase = parent.register();
+ phase = parent.doRegister(1);
}
else {
this.root = this;
@@ -398,7 +481,8 @@ public class Phaser {
this.oddQ = new AtomicReference The arguments to this method provide the state of the phaser
- * prevailing for the current transition. The results and effects
- * of invoking phase-related methods (including {@code getPhase}
- * as well as arrival, registration, and waiting methods) from
+ * prevailing for the current transition. The effects of invoking
+ * arrival, registration, and waiting methods on this Phaser from
* within {@code onAdvance} are unspecified and should not be
- * relied on. Similarly, while it is possible to override this
- * method to produce side-effects visible to participating tasks,
- * it is in general safe to do so only in designs in which all
- * parties register before any arrive, and all {@link
- * #awaitAdvance} at each phase.
+ * relied on.
+ *
+ * If this Phaser is a member of a tiered set of Phasers, then
+ * {@code onAdvance} is invoked only for its root Phaser on each
+ * advance.
*
* The default version returns {@code true} when the number of
* registered parties is zero. Normally, overrides that arrange
@@ -779,14 +789,119 @@ public class Phaser {
* @return a string identifying this barrier, as well as its state
*/
public String toString() {
- long s = getReconciledState();
+ return stateToString(reconcileState());
+ }
+
+ /**
+ * Implementation of toString and string-based error messages
+ */
+ private String stateToString(long s) {
return super.toString() +
"[phase = " + phaseOf(s) +
" parties = " + partiesOf(s) +
" arrived = " + arrivedOf(s) + "]";
}
- // methods for waiting
+ // Waiting mechanics
+
+ /**
+ * Removes and signals threads from queue for phase.
+ */
+ private void releaseWaiters(int phase) {
+ AtomicReference