--- jsr166/src/jsr166y/Phaser.java 2010/12/03 21:29:34 1.67 +++ jsr166/src/jsr166y/Phaser.java 2011/03/15 19:47:02 1.71 @@ -1,7 +1,7 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; @@ -75,9 +75,10 @@ import java.util.concurrent.locks.LockSu * * *
Termination. A phaser may enter a termination - * state in which all synchronization methods immediately return - * without updating phaser state or waiting for advance, and - * indicating (via a negative phase value) that execution is complete. + * state, that may be checked using method {@link #isTerminated}. Upon + * termination, all synchronization methods immediately return without + * waiting for advance, as indicated by a negative return value. + * Similarly, attempts to register upon termination have no effect. * Termination is triggered when an invocation of {@code onAdvance} * returns {@code true}. The default implementation returns {@code * true} if a deregistration has caused the number of registered @@ -96,6 +97,16 @@ import java.util.concurrent.locks.LockSu * increase throughput even though it incurs greater per-operation * overhead. * + *
In a tree of tiered phasers, registration and deregistration of + * child phasers with their parent are managed automatically. + * Whenever the number of registered parties of a child phaser becomes + * non-zero (as established in the {@link #Phaser(Phaser,int)} + * constructor, {@link #register}, or {@link #bulkRegister}), the + * child phaser is registered with its parent. Whenever the number of + * registered parties becomes zero as the result of an invocation of + * {@link #arriveAndDeregister}, the child phaser is deregistered + * from its parent. + * *
Monitoring. While synchronization methods may be invoked
* only by registered parties, the current state of a phaser may be
* monitored by any caller. At any given moment there are {@link
@@ -249,10 +260,8 @@ public class Phaser {
* parent.
*
* The phase of a subphaser is allowed to lag that of its
- * ancestors until it is actually accessed. Method reconcileState
- * is usually attempted only only when the number of unarrived
- * parties appears to be zero, which indicates a potential lag in
- * updating phase after the root advanced.
+ * ancestors until it is actually accessed -- see method
+ * reconcileState.
*/
private volatile long state;
@@ -260,6 +269,7 @@ public class Phaser {
private static final int MAX_PHASE = 0x7fffffff;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
+ private static final long PHASE_MASK = -1L << PHASE_SHIFT;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long TERMINATION_BIT = 1L << 63;
@@ -277,8 +287,7 @@ public class Phaser {
}
private static int partiesOf(long s) {
- int counts = (int)s;
- return (counts == EMPTY) ? 0 : counts >>> PARTIES_SHIFT;
+ return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
@@ -339,41 +348,37 @@ public class Phaser {
*/
private int doArrive(boolean deregister) {
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
- long s;
- int phase;
- while ((phase = (int)((s = state) >>> PHASE_SHIFT)) >= 0) {
+ final Phaser root = this.root;
+ for (;;) {
+ long s = (root == this) ? state : reconcileState();
+ int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
- int unarrived = counts & UNARRIVED_MASK;
- if (counts == EMPTY || unarrived == 0) {
- if (reconcileState() == s)
+ int unarrived = (counts & UNARRIVED_MASK) - 1;
+ if (phase < 0)
+ return phase;
+ else if (counts == EMPTY || unarrived < 0) {
+ if (root == this || reconcileState() == s)
throw new IllegalStateException(badArrive(s));
}
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
- if (unarrived == 1) {
- long n = s & PARTIES_MASK; // unshifted parties field
- int u = ((int)n) >>> PARTIES_SHIFT;
- Phaser par = parent;
- if (par != null) {
- par.doArrive(u == 0);
- reconcileState();
- }
- else {
- n |= (((long)((phase+1) & MAX_PHASE)) << PHASE_SHIFT);
- if (onAdvance(phase, u))
- n |= TERMINATION_BIT;
- else if (u == 0)
- n |= EMPTY; // reset to unregistered
- else
- n |= (long)u; // reset unarr to parties
- // assert state == s || isTerminated();
- UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
- releaseWaiters(phase);
- }
+ if (unarrived == 0) {
+ long n = s & PARTIES_MASK; // base of next state
+ int nextUnarrived = ((int)n) >>> PARTIES_SHIFT;
+ if (root != this)
+ return parent.doArrive(nextUnarrived == 0);
+ if (onAdvance(phase, nextUnarrived))
+ n |= TERMINATION_BIT;
+ else if (nextUnarrived == 0)
+ n |= EMPTY;
+ else
+ n |= nextUnarrived;
+ n |= ((long)((phase + 1) & MAX_PHASE)) << PHASE_SHIFT;
+ UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
+ releaseWaiters(phase);
}
- break;
+ return phase;
}
}
- return phase;
}
/**
@@ -430,29 +435,31 @@ public class Phaser {
/**
* Resolves lagged phase propagation from root if necessary.
+ * Reconciliation normally occurs when root has advanced but
+ * subphasers have not yet done so, in which case they must finish
+ * their own advance by setting unarrived to parties (or if
+ * parties is zero, resetting to unregistered EMPTY state).
+ * However, this method may also be called when "floating"
+ * subphasers with possibly some unarrived parties are merely
+ * catching up to current phase, in which case counts are
+ * unaffected.
+ *
+ * @return reconciled state
*/
private long reconcileState() {
- Phaser rt = root;
+ final Phaser root = this.root;
long s = state;
- if (rt != this) {
- int phase;
- while ((phase = (int)(rt.state >>> PHASE_SHIFT)) !=
- (int)(s >>> PHASE_SHIFT)) {
- // assert phase < 0 || unarrivedOf(s) == 0
- long t; // to reread s
- long p = s & PARTIES_MASK; // unshifted parties field
- long n = (((long) phase) << PHASE_SHIFT) | p;
- if (phase >= 0) {
- if (p == 0L)
- n |= EMPTY; // reset to empty
- else
- n |= p >>> PARTIES_SHIFT; // set unarr to parties
- }
- if ((t = state) == s &&
- UNSAFE.compareAndSwapLong(this, stateOffset, s, s = n))
- break;
- s = t;
- }
+ if (root != this) {
+ int phase, u, p;
+ // CAS root phase with current parties; possibly trip unarrived
+ while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
+ (int)(s >>> PHASE_SHIFT) &&
+ !UNSAFE.compareAndSwapLong
+ (this, stateOffset, s,
+ s = ((((long) phase) << PHASE_SHIFT) | (s & PARTIES_MASK) |
+ ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
+ (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
+ s = state;
}
return s;
}
@@ -490,15 +497,9 @@ public class Phaser {
/**
* Creates a new phaser with the given parent and number of
- * registered unarrived parties. Registration and deregistration
- * of this child phaser with its parent are managed automatically.
- * If the given parent is non-null, whenever this child phaser has
- * any registered parties (as established in this constructor,
- * {@link #register}, or {@link #bulkRegister}), this child phaser
- * is registered with its parent. Whenever the number of
- * registered parties becomes zero as the result of an invocation
- * of {@link #arriveAndDeregister}, this child phaser is
- * deregistered from its parent.
+ * registered unarrived parties. When the given parent is non-null
+ * and the given number of parties is greater than zero, this
+ * child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
@@ -512,10 +513,10 @@ public class Phaser {
int phase = 0;
this.parent = parent;
if (parent != null) {
- Phaser r = parent.root;
- this.root = r;
- this.evenQ = r.evenQ;
- this.oddQ = r.oddQ;
+ final Phaser root = parent.root;
+ this.root = root;
+ this.evenQ = root.evenQ;
+ this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
@@ -524,7 +525,7 @@ public class Phaser {
this.evenQ = new AtomicReference