--- jsr166/src/jsr166y/Phaser.java 2009/07/21 00:15:14 1.14 +++ jsr166/src/jsr166y/Phaser.java 2010/11/06 16:12:10 1.50 @@ -7,157 +7,195 @@ package jsr166y; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import sun.misc.Unsafe; -import java.lang.reflect.*; /** - * A reusable synchronization barrier, similar in functionality to a + * A reusable synchronization barrier, similar in functionality to * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and * {@link java.util.concurrent.CountDownLatch CountDownLatch} * but supporting more flexible usage. * - *
Registration. Unlike the case for other barriers, the + * number of parties registered to synchronize on a phaser + * may vary over time. Tasks may be registered at any time (using + * methods {@link #register}, {@link #bulkRegister}, or forms of + * constructors establishing initial numbers of parties), and + * optionally deregistered upon any arrival (using {@link + * #arriveAndDeregister}). As is the case with most basic + * synchronization constructs, registration and deregistration affect + * only internal counts; they do not establish any further internal + * bookkeeping, so tasks cannot query whether they are registered. + * (However, you can introduce such bookkeeping by subclassing this + * class.) + * + *
Synchronization. Like a {@code CyclicBarrier}, a {@code + * Phaser} may be repeatedly awaited. Method {@link + * #arriveAndAwaitAdvance} has effect analogous to {@link + * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each + * generation of a {@code Phaser} has an associated phase number. The + * phase number starts at zero, and advances when all parties arrive + * at the barrier, wrapping around to zero after reaching {@code + * Integer.MAX_VALUE}. The use of phase numbers enables independent + * control of actions upon arrival at a barrier and upon awaiting + * others, via two kinds of methods that may be invoked by any + * registered party: * *
Termination. A {@code Phaser} may enter a + * termination state in which all synchronization methods + * immediately return without updating phaser state or waiting for + * advance, and indicating (via a negative phase value) that execution + * is complete. Termination is triggered when an invocation of {@code + * onAdvance} returns {@code true}. As illustrated below, when + * phasers control actions with a fixed number of iterations, it is + * often convenient to override this method to cause termination when + * the current phase number reaches a threshold. Method {@link + * #forceTermination} is also available to abruptly release waiting + * threads and allow them to terminate. * - *
Tiering. Phasers may be tiered (i.e., arranged + * in tree structures) to reduce contention. Phasers with large * numbers of parties that would otherwise experience heavy - * synchronization contention costs may instead be arranged in trees. - * This will typically greatly increase throughput even though it - * incurs somewhat greater per-operation overhead. - * - *
Monitoring. While synchronization methods may be invoked + * only by registered parties, the current state of a phaser may be + * monitored by any caller. At any given moment there are {@link + * #getRegisteredParties} parties in total, of which {@link + * #getArrivedParties} have arrived at the current phase ({@link + * #getPhase}). When the remaining ({@link #getUnarrivedParties}) + * parties arrive, the phase advances. The values returned by these + * methods may reflect transient states and so are not in general + * useful for synchronization control. Method {@link #toString} + * returns snapshots of these state queries in a form convenient for + * informal monitoring. * *
Sample usages: * - *
A Phaser may be used instead of a {@code CountDownLatch} to control - * a one-shot action serving a variable number of parties. The typical - * idiom is for the method setting this up to first register, then - * start the actions, then deregister, as in: + *
A {@code Phaser} may be used instead of a {@code CountDownLatch} + * to control a one-shot action serving a variable number of parties. + * The typical idiom is for the method setting this up to first + * register, then start the actions, then deregister, as in: * *
{@code - * void runTasks(List* *list) { + * void runTasks(List tasks) { * final Phaser phaser = new Phaser(1); // "1" to register self - * for (Runnable r : list) { + * // create and start threads + * for (Runnable task : tasks) { * phaser.register(); * new Thread() { * public void run() { * phaser.arriveAndAwaitAdvance(); // await all creation - * r.run(); - * phaser.arriveAndDeregister(); // signal completion + * task.run(); * } * }.start(); * } * - * doSomethingOnBehalfOfWorkers(); - * phaser.arrive(); // allow threads to start - * int p = phaser.arriveAndDeregister(); // deregister self ... - * p = phaser.awaitAdvance(p); // ... and await arrival - * otherActions(); // do other things while tasks execute - * phaser.awaitAdvance(p); // await final completion + * // allow threads to start and deregister self + * phaser.arriveAndDeregister(); * }}
One way to cause a set of threads to repeatedly perform actions * for a given number of iterations is to override {@code onAdvance}: * *
{@code - * void startTasks(List* - *list, final int iterations) { + * void startTasks(List tasks, final int iterations) { * final Phaser phaser = new Phaser() { - * public boolean onAdvance(int phase, int registeredParties) { + * protected boolean onAdvance(int phase, int registeredParties) { * return phase >= iterations || registeredParties == 0; * } * }; * phaser.register(); - * for (Runnable r : list) { + * for (final Runnable task : tasks) { * phaser.register(); * new Thread() { * public void run() { * do { - * r.run(); + * task.run(); * phaser.arriveAndAwaitAdvance(); - * } while(!phaser.isTerminated(); + * } while (!phaser.isTerminated()); * } * }.start(); * } * phaser.arriveAndDeregister(); // deregister self, don't wait * }}
To create a set of tasks using a tree of Phasers, + * If the main task must later await termination, it + * may re-register and then execute a similar loop: + *
{@code + * // ... + * phaser.register(); + * while (!phaser.isTerminated()) + * phaser.arriveAndAwaitAdvance();}+ * + *
Related constructions may be used to await particular phase numbers + * in contexts where you are sure that the phase will never wrap around + * {@code Integer.MAX_VALUE}. For example: + * + *
{@code + * void awaitPhase(Phaser phaser, int phase) { + * int p = phaser.register(); // assumes caller not already registered + * while (p < phase) { + * if (phaser.isTerminated()) + * // ... deal with unexpected termination + * else + * p = phaser.arriveAndAwaitAdvance(); + * } + * phaser.arriveAndDeregister(); + * }}+ * + * + *
To create a set of tasks using a tree of phasers, * you could use code of the following form, assuming a - * Task class with a constructor accepting a Phaser that - * it registers for upon construction: + * Task class with a constructor accepting a phaser that + * it registers with upon construction: + * *
{@code - * void build(Task[] actions, int lo, int hi, Phaser b) { - * int step = (hi - lo) / TASKS_PER_PHASER; - * if (step > 1) { - * int i = lo; - * while (i < hi) { - * int r = Math.min(i + step, hi); - * build(actions, i, r, new Phaser(b)); - * i = r; + * void build(Task[] actions, int lo, int hi, Phaser ph) { + * if (hi - lo > TASKS_PER_PHASER) { + * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { + * int j = Math.min(i + TASKS_PER_PHASER, hi); + * build(actions, i, j, new Phaser(ph)); * } * } else { * for (int i = lo; i < hi; ++i) - * actions[i] = new Task(b); - * // assumes new Task(b) performs b.register() + * actions[i] = new Task(ph); + * // assumes new Task(ph) performs ph.register() * } * } * // .. initially called, for n tasks via @@ -168,13 +206,14 @@ import java.lang.reflect.*; * be appropriate for extremely small per-barrier task bodies (thus * high rates), or up to hundreds for extremely large ones. * - *- * *
Implementation notes: This implementation restricts the
* maximum number of parties to 65535. Attempts to register additional
- * parties result in IllegalStateExceptions. However, you can and
+ * parties result in {@code IllegalStateException}. However, you can and
* should create tiered phasers to accommodate arbitrarily large sets
* of participants.
+ *
+ * @since 1.7
+ * @author Doug Lea
*/
public class Phaser {
/*
@@ -203,20 +242,19 @@ public class Phaser {
*/
private volatile long state;
- private static final int ushortBits = 16;
private static final int ushortMask = 0xffff;
private static final int phaseMask = 0x7fffffff;
private static int unarrivedOf(long s) {
- return (int)(s & ushortMask);
+ return (int) (s & ushortMask);
}
private static int partiesOf(long s) {
- return ((int)s) >>> 16;
+ return ((int) s) >>> 16;
}
private static int phaseOf(long s) {
- return (int)(s >>> 32);
+ return (int) (s >>> 32);
}
private static int arrivedOf(long s) {
@@ -224,13 +262,13 @@ public class Phaser {
}
private static long stateFor(int phase, int parties, int unarrived) {
- return ((((long)phase) << 32) | (((long)parties) << 16) |
- (long)unarrived);
+ return ((((long) phase) << 32) | (((long) parties) << 16) |
+ (long) unarrived);
}
private static long trippedStateFor(int phase, int parties) {
- long lp = (long)parties;
- return (((long)phase) << 32) | (lp << 16) | lp;
+ long lp = (long) parties;
+ return (((long) phase) << 32) | (lp << 16) | lp;
}
/**
@@ -247,7 +285,7 @@ public class Phaser {
private final Phaser parent;
/**
- * The root of Phaser tree. Equals this if not in a tree. Used to
+ * The root of phaser tree. Equals this if not in a tree. Used to
* support faster state push-down.
*/
private final Phaser root;
@@ -256,14 +294,15 @@ public class Phaser {
/**
* Heads of Treiber stacks for waiting threads. To eliminate
- * contention while releasing some threads while adding others, we
+ * contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
+ * Subphasers share queues with root to speed up releases.
*/
- private final AtomicReference The arguments to this method provide the state of the phaser
+ * prevailing for the current transition. The results and effects
+ * of invoking phase-related methods (including {@code getPhase}
+ * as well as arrival, registration, and waiting methods) from
+ * within {@code onAdvance} are unspecified and should not be
+ * relied on. Similarly, while it is possible to override this
+ * method to produce side-effects visible to participating tasks,
+ * it is in general safe to do so only in designs in which all
+ * parties register before any arrive, and all {@link
+ * #awaitAdvance} at each phase.
*
- * The default version returns true when the number of
+ * The default version returns {@code true} when the number of
* registered parties is zero. Normally, overrides that arrange
* termination for other reasons should also preserve this
* property.
*
- * You may override this method to perform an action with side
- * effects visible to participating tasks, but it is in general
- * only sensible to do so in designs where all parties register
- * before any arrive, and all {@code awaitAdvance} at each phase.
- * Otherwise, you cannot ensure lack of interference. In
- * particular, this method may be invoked more than once per
- * transition if other parties successfully register while the
- * invocation of this method is in progress, thus postponing the
- * transition until those parties also arrive, re-triggering this
- * method.
- *
* @param phase the phase number on entering the barrier
* @param registeredParties the current number of registered parties
* @return {@code true} if this barrier should terminate
@@ -764,6 +801,7 @@ public class Phaser {
volatile boolean wasInterrupted = false;
volatile Thread thread; // nulled to cancel wait
QNode next;
+
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long startTime, long nanos) {
this.phaser = phaser;
@@ -774,12 +812,14 @@ public class Phaser {
this.nanos = nanos;
thread = Thread.currentThread();
}
+
public boolean isReleasable() {
return (thread == null ||
phaser.getPhase() != phase ||
(interruptible && wasInterrupted) ||
(timed && (nanos - (System.nanoTime() - startTime)) <= 0));
}
+
public boolean block() {
if (Thread.interrupted()) {
wasInterrupted = true;
@@ -796,6 +836,7 @@ public class Phaser {
}
return isReleasable();
}
+
void signal() {
Thread t = thread;
if (t != null) {
@@ -803,16 +844,17 @@ public class Phaser {
LockSupport.unpark(t);
}
}
+
boolean doWait() {
if (thread != null) {
try {
- ForkJoinPool.managedBlock(this, false);
+ ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
+ wasInterrupted = true; // can't currently happen
}
}
return wasInterrupted;
}
-
}
/**
@@ -838,6 +880,12 @@ public class Phaser {
}
/**
+ * The number of times to spin before blocking waiting for advance.
+ */
+ static final int MAX_SPINS =
+ Runtime.getRuntime().availableProcessors() == 1 ? 0 : 1 << 8;
+
+ /**
* Enqueues node and waits unless aborted or signalled.
*
* @return current phase
@@ -846,16 +894,21 @@ public class Phaser {
QNode node = null;
boolean queued = false;
boolean interrupted = false;
+ int spins = MAX_SPINS;
int p;
while ((p = getPhase()) == phase) {
if (Thread.interrupted())
interrupted = true;
+ else if (spins > 0) {
+ if (--spins == 0)
+ Thread.yield();
+ }
else if (node == null)
node = new QNode(this, phase, false, false, 0, 0);
else if (!queued)
queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
+ else if (node.doWait())
+ interrupted = true;
}
if (node != null)
node.thread = null;
@@ -873,16 +926,21 @@ public class Phaser {
QNode node = null;
boolean queued = false;
boolean interrupted = false;
+ int spins = MAX_SPINS;
int p;
while ((p = getPhase()) == phase && !interrupted) {
if (Thread.interrupted())
interrupted = true;
+ else if (spins > 0) {
+ if (--spins == 0)
+ Thread.yield();
+ }
else if (node == null)
node = new QNode(this, phase, true, false, 0, 0);
else if (!queued)
queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
+ else if (node.doWait())
+ interrupted = true;
}
if (node != null)
node.thread = null;
@@ -903,18 +961,23 @@ public class Phaser {
QNode node = null;
boolean queued = false;
boolean interrupted = false;
+ int spins = MAX_SPINS;
int p;
while ((p = getPhase()) == phase && !interrupted) {
if (Thread.interrupted())
interrupted = true;
else if (nanos - (System.nanoTime() - startTime) <= 0)
break;
+ else if (spins > 0) {
+ if (--spins == 0)
+ Thread.yield();
+ }
else if (node == null)
node = new QNode(this, phase, true, true, startTime, nanos);
else if (!queued)
queued = tryEnqueue(node);
- else
- interrupted = node.doWait();
+ else if (node.doWait())
+ interrupted = true;
}
if (node != null)
node.thread = null;
@@ -927,49 +990,48 @@ public class Phaser {
return p;
}
- // Temporary Unsafe mechanics for preliminary release
- private static Unsafe getUnsafe() throws Throwable {
+ // Unsafe mechanics
+
+ private static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ private static final long stateOffset =
+ objectFieldOffset("state", Phaser.class);
+
+ private static long objectFieldOffset(String field, Class> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
try {
- return Unsafe.getUnsafe();
+ return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
- (new java.security.PrivilegedExceptionAction