--- jsr166/src/jsr166y/Phaser.java 2010/11/29 15:47:19 1.63 +++ jsr166/src/jsr166y/Phaser.java 2011/06/01 21:04:30 1.74 @@ -1,7 +1,7 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; @@ -75,9 +75,10 @@ import java.util.concurrent.locks.LockSu * * *
Termination. A phaser may enter a termination - * state in which all synchronization methods immediately return - * without updating phaser state or waiting for advance, and - * indicating (via a negative phase value) that execution is complete. + * state, that may be checked using method {@link #isTerminated}. Upon + * termination, all synchronization methods immediately return without + * waiting for advance, as indicated by a negative return value. + * Similarly, attempts to register upon termination have no effect. * Termination is triggered when an invocation of {@code onAdvance} * returns {@code true}. The default implementation returns {@code * true} if a deregistration has caused the number of registered @@ -96,6 +97,16 @@ import java.util.concurrent.locks.LockSu * increase throughput even though it incurs greater per-operation * overhead. * + *
In a tree of tiered phasers, registration and deregistration of + * child phasers with their parent are managed automatically. + * Whenever the number of registered parties of a child phaser becomes + * non-zero (as established in the {@link #Phaser(Phaser,int)} + * constructor, {@link #register}, or {@link #bulkRegister}), the + * child phaser is registered with its parent. Whenever the number of + * registered parties becomes zero as the result of an invocation of + * {@link #arriveAndDeregister}, the child phaser is deregistered + * from its parent. + * *
Monitoring. While synchronization methods may be invoked
* only by registered parties, the current state of a phaser may be
* monitored by any caller. At any given moment there are {@link
@@ -119,7 +130,7 @@ import java.util.concurrent.locks.LockSu
* void runTasks(List To create a set of tasks using a tree of phasers,
- * you could use code of the following form, assuming a
- * Task class with a constructor accepting a {@code Phaser} that
- * it registers with upon construction:
+ * To create a set of {@code n} tasks using a tree of phasers, you
+ * could use code of the following form, assuming a Task class with a
+ * constructor accepting a {@code Phaser} that it registers with upon
+ * construction. After invocation of {@code build(new Task[n], 0, n,
+ * new Phaser())}, these tasks could then be started, for example by
+ * submitting to a pool:
*
* The arguments to this method provide the state of the phaser
* prevailing for the current transition. The effects of invoking
@@ -854,12 +933,10 @@ public class Phaser {
*/
private void releaseWaiters(int phase) {
QNode q; // first element of queue
- int p; // its phase
Thread t; // its thread
AtomicReference {@code
- * void build(Task[] actions, int lo, int hi, Phaser ph) {
+ * void build(Task[] tasks, int lo, int hi, Phaser ph) {
* if (hi - lo > TASKS_PER_PHASER) {
* for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
* int j = Math.min(i + TASKS_PER_PHASER, hi);
- * build(actions, i, j, new Phaser(ph));
+ * build(tasks, i, j, new Phaser(ph));
* }
* } else {
* for (int i = lo; i < hi; ++i)
- * actions[i] = new Task(ph);
+ * tasks[i] = new Task(ph);
* // assumes new Task(ph) performs ph.register()
* }
- * }
- * // .. initially called, for n tasks via
- * build(new Task[n], 0, n, new Phaser());}
+ * }}
*
* The best value of {@code TASKS_PER_PHASER} depends mainly on
* expected synchronization rates. A value as low as four may
@@ -226,35 +237,52 @@ public class Phaser {
*/
/**
- * Primary state representation, holding four fields:
+ * Primary state representation, holding four bit-fields:
*
- * * unarrived -- the number of parties yet to hit barrier (bits 0-15)
- * * parties -- the number of parties to wait (bits 16-31)
- * * phase -- the generation of the barrier (bits 32-62)
- * * terminated -- set if barrier is terminated (bit 63 / sign)
- *
- * However, to efficiently maintain atomicity, these values are
- * packed into a single (atomic) long. Termination uses the sign
- * bit of 32 bit representation of phase, so phase is set to -1 on
- * termination. Good performance relies on keeping state decoding
- * and encoding simple, and keeping race windows short.
+ * unarrived -- the number of parties yet to hit barrier (bits 0-15)
+ * parties -- the number of parties to wait (bits 16-31)
+ * phase -- the generation of the barrier (bits 32-62)
+ * terminated -- set if barrier is terminated (bit 63 / sign)
+ *
+ * Except that a phaser with no registered parties is
+ * distinguished by the otherwise illegal state of having zero
+ * parties and one unarrived parties (encoded as EMPTY below).
+ *
+ * To efficiently maintain atomicity, these values are packed into
+ * a single (atomic) long. Good performance relies on keeping
+ * state decoding and encoding simple, and keeping race windows
+ * short.
+ *
+ * All state updates are performed via CAS except initial
+ * registration of a sub-phaser (i.e., one with a non-null
+ * parent). In this (relatively rare) case, we use built-in
+ * synchronization to lock while first registering with its
+ * parent.
+ *
+ * The phase of a subphaser is allowed to lag that of its
+ * ancestors until it is actually accessed -- see method
+ * reconcileState.
*/
private volatile long state;
private static final int MAX_PARTIES = 0xffff;
- private static final int MAX_PHASE = 0x7fffffff;
+ private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
- private static final long ONE_ARRIVAL = 1L;
- private static final long ONE_PARTY = 1L << PARTIES_SHIFT;
private static final long TERMINATION_BIT = 1L << 63;
+ // some special values
+ private static final int ONE_ARRIVAL = 1;
+ private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
+ private static final int EMPTY = 1;
+
// The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) {
- return (int)s & UNARRIVED_MASK;
+ int counts = (int)s;
+ return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
}
private static int partiesOf(long s) {
@@ -262,11 +290,13 @@ public class Phaser {
}
private static int phaseOf(long s) {
- return (int) (s >>> PHASE_SHIFT);
+ return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
- return partiesOf(s) - unarrivedOf(s);
+ int counts = (int)s;
+ return (counts == EMPTY) ? 0 :
+ (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
/**
@@ -275,8 +305,7 @@ public class Phaser {
private final Phaser parent;
/**
- * The root of phaser tree. Equals this if not in a tree. Used to
- * support faster state push-down.
+ * The root of phaser tree. Equals this if not in a tree.
*/
private final Phaser root;
@@ -314,44 +343,37 @@ public class Phaser {
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
- * @param adj - adjustment to apply to state -- either
- * ONE_ARRIVAL (for arrive) or
- * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
+ * @param deregister false for arrive, true for arriveAndDeregister
*/
- private int doArrive(long adj) {
+ private int doArrive(boolean deregister) {
+ int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
+ final Phaser root = this.root;
for (;;) {
- long s = state;
- int unarrived = (int)s & UNARRIVED_MASK;
+ long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
+ int counts = (int)s;
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
- else if (unarrived == 0) {
- if (reconcileState() == s) // recheck
+ else if (counts == EMPTY || unarrived < 0) {
+ if (root == this || reconcileState() == s)
throw new IllegalStateException(badArrive(s));
}
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
- if (unarrived == 1) {
- long p = s & PARTIES_MASK; // unshifted parties field
- long lu = p >>> PARTIES_SHIFT;
- int u = (int)lu;
- int nextPhase = (phase + 1) & MAX_PHASE;
- long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
- final Phaser parent = this.parent;
- if (parent == null) {
- if (onAdvance(phase, u))
- next |= TERMINATION_BIT;
- UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
- releaseWaiters(phase);
- }
- else {
- parent.doArrive((u == 0) ?
- ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
- if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase)
- reconcileState();
- else if (state == s)
- UNSAFE.compareAndSwapLong(this, stateOffset, s,
- next);
- }
+ if (unarrived == 0) {
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = (int)n >>> PARTIES_SHIFT;
+ if (root != this)
+ return parent.doArrive(nextUnarrived == 0);
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
}
return phase;
}
@@ -368,60 +390,76 @@ public class Phaser {
// adjustment to state
long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
+ int phase;
for (;;) {
- long s = (parent == null) ? state : reconcileState();
- int parties = (int)s >>> PARTIES_SHIFT;
- int phase = (int)(s >>> PHASE_SHIFT);
- if (phase < 0)
- return phase;
- else if (registrations > MAX_PARTIES - parties)
+ long s = state;
+ int counts = (int)s;
+ int parties = counts >>> PARTIES_SHIFT;
+ int unarrived = counts & UNARRIVED_MASK;
+ if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
- else if ((parties == 0 && parent == null) || // first reg of root
- ((int)s & UNARRIVED_MASK) != 0) { // not advancing
- if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
- return phase;
+ else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
+ break;
+ else if (counts != EMPTY) { // not 1st registration
+ if (parent == null || reconcileState() == s) {
+ if (unarrived == 0) // wait out advance
+ root.internalAwaitAdvance(phase, null);
+ else if (UNSAFE.compareAndSwapLong(this, stateOffset,
+ s, s + adj))
+ break;
+ }
+ }
+ else if (parent == null) { // 1st root registration
+ long next = ((long)phase << PHASE_SHIFT) | adj;
+ if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
+ break;
}
- else if (parties != 0) // wait for onAdvance
- root.internalAwaitAdvance(phase, null);
- else { // 1st registration of child
- synchronized (this) { // register parent first
- if (reconcileState() == s) { // recheck under lock
- parent.doRegister(1); // OK if throws IllegalState
- for (;;) { // simpler form of outer loop
- s = reconcileState();
- phase = (int)(s >>> PHASE_SHIFT);
- if (phase < 0 ||
- UNSAFE.compareAndSwapLong(this, stateOffset,
- s, s + adj))
- return phase;
- }
+ else {
+ synchronized (this) { // 1st sub registration
+ if (state == s) { // recheck under lock
+ parent.doRegister(1);
+ do { // force current phase
+ phase = (int)(root.state >>> PHASE_SHIFT);
+ // assert phase < 0 || (int)state == EMPTY;
+ } while (!UNSAFE.compareAndSwapLong
+ (this, stateOffset, state,
+ ((long)phase << PHASE_SHIFT) | adj));
+ break;
}
}
}
}
+ return phase;
}
/**
- * Recursively resolves lagged phase propagation from root if necessary.
+ * Resolves lagged phase propagation from root if necessary.
+ * Reconciliation normally occurs when root has advanced but
+ * subphasers have not yet done so, in which case they must finish
+ * their own advance by setting unarrived to parties (or if
+ * parties is zero, resetting to unregistered EMPTY state).
+ * However, this method may also be called when "floating"
+ * subphasers with possibly some unarrived parties are merely
+ * catching up to current phase, in which case counts are
+ * unaffected.
+ *
+ * @return reconciled state
*/
private long reconcileState() {
- Phaser par = parent;
+ final Phaser root = this.root;
long s = state;
- if (par != null) {
- Phaser rt = root;
- int phase, rPhase;
- while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
- (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
- if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase)
- par.reconcileState();
- else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
- long u = s & PARTIES_MASK; // reset unarrived to parties
- long next = ((((long) rPhase) << PHASE_SHIFT) | u |
- (u >>> PARTIES_SHIFT));
- UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
- }
+ if (root != this) {
+ int phase, u, p;
+ // CAS root phase with current parties; possibly trip unarrived
+ while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
+ (int)(s >>> PHASE_SHIFT) &&
+ !UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ s = (((long)phase << PHASE_SHIFT) |
+ (s & PARTIES_MASK) |
+ ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
+ (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
s = state;
- }
}
return s;
}
@@ -459,15 +497,9 @@ public class Phaser {
/**
* Creates a new phaser with the given parent and number of
- * registered unarrived parties. Registration and deregistration
- * of this child phaser with its parent are managed automatically.
- * If the given parent is non-null, whenever this child phaser has
- * any registered parties (as established in this constructor,
- * {@link #register}, or {@link #bulkRegister}), this child phaser
- * is registered with its parent. Whenever the number of
- * registered parties becomes zero as the result of an invocation
- * of {@link #arriveAndDeregister}, this child phaser is
- * deregistered from its parent.
+ * registered unarrived parties. When the given parent is non-null
+ * and the given number of parties is greater than zero, this
+ * child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
@@ -478,22 +510,25 @@ public class Phaser {
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
- long s = ((long) parties) | (((long) parties) << PARTIES_SHIFT);
+ int phase = 0;
this.parent = parent;
if (parent != null) {
- Phaser r = parent.root;
- this.root = r;
- this.evenQ = r.evenQ;
- this.oddQ = r.oddQ;
+ final Phaser root = parent.root;
+ this.root = root;
+ this.evenQ = root.evenQ;
+ this.oddQ = root.oddQ;
if (parties != 0)
- s |= ((long)(parent.doRegister(1))) << PHASE_SHIFT;
+ phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference