--- jsr166/src/jsr166y/Phaser.java 2009/01/05 09:11:26 1.9 +++ jsr166/src/jsr166y/Phaser.java 2009/07/25 00:34:00 1.20 @@ -7,10 +7,9 @@ package jsr166y; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; + +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import sun.misc.Unsafe; -import java.lang.reflect.*; /** * A reusable synchronization barrier, similar in functionality to a @@ -57,15 +56,16 @@ import java.lang.reflect.*; * effect as providing a barrier action to a CyclicBarrier. * *
  • Phasers may enter a termination state in which all - * await actions immediately return, indicating (via a negative phase - * value) that execution is complete. Termination is triggered by - * executing the overridable {@code onAdvance} method that is invoked - * each time the barrier is about to be tripped. When a Phaser is - * controlling an action with a fixed number of iterations, it is - * often convenient to override this method to cause termination when - * the current phase number reaches a threshold. Method - * {@code forceTermination} is also available to abruptly release - * waiting threads and allow them to terminate. + * actions immediately return without updating phaser state or waiting + * for advance, and indicating (via a negative phase value) that + * execution is complete. Termination is triggered by executing the + * overridable {@code onAdvance} method that is invoked each time the + * barrier is about to be tripped. When a Phaser is controlling an + * action with a fixed number of iterations, it is often convenient to + * override this method to cause termination when the current phase + * number reaches a threshold. Method {@code forceTermination} is also + * available to abruptly release waiting threads and allow them to + * terminate. * *
  • Phasers may be tiered to reduce contention. Phasers with large * numbers of parties that would otherwise experience heavy @@ -81,6 +81,8 @@ import java.lang.reflect.*; * within handlers of those exceptions, often after invoking * {@code forceTermination}. * + *
  • Phasers ensure lack of starvation when used by ForkJoinTasks. + * * * *

    Sample usages: @@ -90,18 +92,18 @@ import java.lang.reflect.*; * idiom is for the method setting this up to first register, then * start the actions, then deregister, as in: * - *

    - *  void runTasks(List<Runnable> list) {
    - *    final Phaser phaser = new Phaser(1); // "1" to register self
    - *    for (Runnable r : list) {
    - *      phaser.register();
    - *      new Thread() {
    - *        public void run() {
    - *          phaser.arriveAndAwaitAdvance(); // await all creation
    - *          r.run();
    - *          phaser.arriveAndDeregister();   // signal completion
    - *        }
    - *      }.start();
    + *  
     {@code
    + * void runTasks(List list) {
    + *   final Phaser phaser = new Phaser(1); // "1" to register self
    + *   for (Runnable r : list) {
    + *     phaser.register();
    + *     new Thread() {
    + *       public void run() {
    + *         phaser.arriveAndAwaitAdvance(); // await all creation
    + *         r.run();
    + *         phaser.arriveAndDeregister();   // signal completion
    + *       }
    + *     }.start();
      *   }
      *
      *   doSomethingOnBehalfOfWorkers();
    @@ -110,59 +112,55 @@ import java.lang.reflect.*;
      *   p = phaser.awaitAdvance(p); // ... and await arrival
      *   otherActions(); // do other things while tasks execute
      *   phaser.awaitAdvance(p); // await final completion
    - * }
    - * 
    + * }}
    * *

    One way to cause a set of threads to repeatedly perform actions * for a given number of iterations is to override {@code onAdvance}: * - *

    - *  void startTasks(List<Runnable> list, final int iterations) {
    - *    final Phaser phaser = new Phaser() {
    - *       public boolean onAdvance(int phase, int registeredParties) {
    - *         return phase >= iterations || registeredParties == 0;
    + *  
     {@code
    + * void startTasks(List list, final int iterations) {
    + *   final Phaser phaser = new Phaser() {
    + *     public boolean onAdvance(int phase, int registeredParties) {
    + *       return phase >= iterations || registeredParties == 0;
    + *     }
    + *   };
    + *   phaser.register();
    + *   for (Runnable r : list) {
    + *     phaser.register();
    + *     new Thread() {
    + *       public void run() {
    + *         do {
    + *           r.run();
    + *           phaser.arriveAndAwaitAdvance();
    + *         } while(!phaser.isTerminated();
      *       }
    - *    };
    - *    phaser.register();
    - *    for (Runnable r : list) {
    - *      phaser.register();
    - *      new Thread() {
    - *        public void run() {
    - *           do {
    - *             r.run();
    - *             phaser.arriveAndAwaitAdvance();
    - *           } while(!phaser.isTerminated();
    - *        }
    - *      }.start();
    + *     }.start();
      *   }
      *   phaser.arriveAndDeregister(); // deregister self, don't wait
    - * }
    - * 
    + * }}
    * *

    To create a set of tasks using a tree of Phasers, * you could use code of the following form, assuming a * Task class with a constructor accepting a Phaser that * it registers for upon construction: - *

    - *  void build(Task[] actions, int lo, int hi, Phaser b) {
    - *    int step = (hi - lo) / TASKS_PER_PHASER;
    - *    if (step > 1) {
    - *       int i = lo;
    - *       while (i < hi) {
    - *         int r = Math.min(i + step, hi);
    - *         build(actions, i, r, new Phaser(b));
    - *         i = r;
    - *       }
    - *    }
    - *    else {
    - *      for (int i = lo; i < hi; ++i)
    - *        actions[i] = new Task(b);
    - *        // assumes new Task(b) performs b.register()
    - *    }
    - *  }
    - *  // .. initially called, for n tasks via
    - *  build(new Task[n], 0, n, new Phaser());
    - * 
    + *
     {@code
    + * void build(Task[] actions, int lo, int hi, Phaser b) {
    + *   int step = (hi - lo) / TASKS_PER_PHASER;
    + *   if (step > 1) {
    + *     int i = lo;
    + *     while (i < hi) {
    + *       int r = Math.min(i + step, hi);
    + *       build(actions, i, r, new Phaser(b));
    + *       i = r;
    + *     }
    + *   } else {
    + *     for (int i = lo; i < hi; ++i)
    + *       actions[i] = new Task(b);
    + *       // assumes new Task(b) performs b.register()
    + *   }
    + * }
    + * // .. initially called, for n tasks via
    + * build(new Task[n], 0, n, new Phaser());}
    * * The best value of {@code TASKS_PER_PHASER} depends mainly on * expected barrier synchronization rates. A value as low as four may @@ -176,6 +174,9 @@ import java.lang.reflect.*; * parties result in IllegalStateExceptions. However, you can and * should create tiered phasers to accommodate arbitrarily large sets * of participants. + * + * @since 1.7 + * @author Doug Lea */ public class Phaser { /* @@ -200,24 +201,24 @@ public class Phaser { * and encoding simple, and keeping race windows short. * * Note: there are some cheats in arrive() that rely on unarrived - * being lowest 16 bits. + * count being lowest 16 bits. */ private volatile long state; private static final int ushortBits = 16; - private static final int ushortMask = (1 << ushortBits) - 1; - private static final int phaseMask = 0x7fffffff; + private static final int ushortMask = 0xffff; + private static final int phaseMask = 0x7fffffff; private static int unarrivedOf(long s) { - return (int)(s & ushortMask); + return (int) (s & ushortMask); } private static int partiesOf(long s) { - return (int)(s & (ushortMask << 16)) >>> 16; + return ((int) s) >>> 16; } private static int phaseOf(long s) { - return (int)(s >>> 32); + return (int) (s >>> 32); } private static int arrivedOf(long s) { @@ -225,17 +226,21 @@ public class Phaser { } private static long stateFor(int phase, int parties, int unarrived) { - return (((long)phase) << 32) | ((parties << 16) | unarrived); + return ((((long) phase) << 32) | (((long) parties) << 16) | + (long) unarrived); } private static long trippedStateFor(int phase, int parties) { - return (((long)phase) << 32) | ((parties << 16) | parties); + long lp = (long) parties; + return (((long) phase) << 32) | (lp << 16) | lp; } - private static IllegalStateException badBounds(int parties, int unarrived) { - return new IllegalStateException - ("Attempt to set " + unarrived + - " unarrived of " + parties + " parties"); + /** + * Returns message string for bad bounds exceptions. + */ + private static String badBounds(int parties, int unarrived) { + return ("Attempt to set " + unarrived + + " unarrived of " + parties + " parties"); } /** @@ -252,7 +257,7 @@ public class Phaser { // Wait queues /** - * Heads of Treiber stacks waiting for nonFJ threads. To eliminate + * Heads of Treiber stacks for waiting threads. To eliminate * contention while releasing some threads while adding others, we * use two of them, alternating across even and odd phases. */ @@ -260,7 +265,7 @@ public class Phaser { private final AtomicReference oddQ = new AtomicReference(); private AtomicReference queueFor(int phase) { - return (phase & 1) == 0? evenQ : oddQ; + return ((phase & 1) == 0) ? evenQ : oddQ; } /** @@ -268,7 +273,7 @@ public class Phaser { * root if necessary. */ private long getReconciledState() { - return parent == null? state : reconcileState(); + return (parent == null) ? state : reconcileState(); } /** @@ -296,7 +301,8 @@ public class Phaser { /** * Creates a new Phaser without any initially registered parties, - * initial phase number 0, and no parent. + * initial phase number 0, and no parent. Any thread using this + * Phaser will need to first register for it. */ public Phaser() { this(null); @@ -305,9 +311,10 @@ public class Phaser { /** * Creates a new Phaser with the given numbers of registered * unarrived parties, initial phase number 0, and no parent. - * @param parties the number of parties required to trip barrier. + * + * @param parties the number of parties required to trip barrier * @throws IllegalArgumentException if parties less than zero - * or greater than the maximum number of parties supported. + * or greater than the maximum number of parties supported */ public Phaser(int parties) { this(null, parties); @@ -318,7 +325,8 @@ public class Phaser { * initially registered parties. If parent is non-null this phaser * is registered with the parent and its initial phase number is * the same as that of parent phaser. - * @param parent the parent phaser. + * + * @param parent the parent phaser */ public Phaser(Phaser parent) { int phase = 0; @@ -334,13 +342,14 @@ public class Phaser { /** * Creates a new Phaser with the given parent and numbers of - * registered unarrived parties. If parent is non-null this phaser + * registered unarrived parties. If parent is non-null, this phaser * is registered with the parent and its initial phase number is * the same as that of parent phaser. - * @param parent the parent phaser. - * @param parties the number of parties required to trip barrier. + * + * @param parent the parent phaser + * @param parties the number of parties required to trip barrier * @throws IllegalArgumentException if parties less than zero - * or greater than the maximum number of parties supported. + * or greater than the maximum number of parties supported */ public Phaser(Phaser parent, int parties) { if (parties < 0 || parties > ushortMask) @@ -358,9 +367,10 @@ public class Phaser { /** * Adds a new unarrived party to this phaser. + * * @return the current barrier phase number upon registration * @throws IllegalStateException if attempting to register more - * than the maximum supported number of parties. + * than the maximum supported number of parties */ public int register() { return doRegister(1); @@ -368,10 +378,11 @@ public class Phaser { /** * Adds the given number of new unarrived parties to this phaser. - * @param parties the number of parties required to trip barrier. + * + * @param parties the number of parties required to trip barrier * @return the current barrier phase number upon registration * @throws IllegalStateException if attempting to register more - * than the maximum supported number of parties. + * than the maximum supported number of parties */ public int bulkRegister(int parties) { if (parties < 0) @@ -394,7 +405,7 @@ public class Phaser { if (phase < 0) break; if (parties > ushortMask || unarrived > ushortMask) - throw badBounds(parties, unarrived); + throw new IllegalStateException(badBounds(parties, unarrived)); if (phase == phaseOf(root.state) && casState(s, stateFor(phase, parties, unarrived))) break; @@ -407,15 +418,17 @@ public class Phaser { * in turn wait for others via {@link #awaitAdvance}). * * @return the barrier phase number upon entry to this method, or a - * negative value if terminated; + * negative value if terminated * @throws IllegalStateException if not terminated and the number - * of unarrived parties would become negative. + * of unarrived parties would become negative */ public int arrive() { int phase; for (;;) { long s = state; phase = phaseOf(s); + if (phase < 0) + break; int parties = partiesOf(s); int unarrived = unarrivedOf(s) - 1; if (unarrived > 0) { // Not the last arrival @@ -427,7 +440,7 @@ public class Phaser { if (par == null) { // directly trip if (casState (s, - trippedStateFor(onAdvance(phase, parties)? -1 : + trippedStateFor(onAdvance(phase, parties) ? -1 : ((phase + 1) & phaseMask), parties))) { releaseWaiters(phase); break; @@ -441,12 +454,10 @@ public class Phaser { } } } - else if (phase < 0) // Don't throw exception if terminated - break; else if (phase != phaseOf(root.state)) // or if unreconciled reconcileState(); else - throw badBounds(parties, unarrived); + throw new IllegalStateException(badBounds(parties, unarrived)); } return phase; } @@ -459,9 +470,9 @@ public class Phaser { * zero parties, this phaser is also deregistered from its parent. * * @return the current barrier phase number upon entry to - * this method, or a negative value if terminated; + * this method, or a negative value if terminated * @throws IllegalStateException if not terminated and the number - * of registered or unarrived parties would become negative. + * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { // similar code to arrive, but too different to merge @@ -470,6 +481,8 @@ public class Phaser { for (;;) { long s = state; phase = phaseOf(s); + if (phase < 0) + break; int parties = partiesOf(s) - 1; int unarrived = unarrivedOf(s) - 1; if (parties >= 0) { @@ -488,21 +501,19 @@ public class Phaser { if (unarrived == 0) { if (casState (s, - trippedStateFor(onAdvance(phase, parties)? -1 : + trippedStateFor(onAdvance(phase, parties) ? -1 : ((phase + 1) & phaseMask), parties))) { releaseWaiters(phase); break; } continue; } - if (phase < 0) - break; if (par != null && phase != phaseOf(root.state)) { reconcileState(); continue; } } - throw badBounds(parties, unarrived); + throw new IllegalStateException(badBounds(parties, unarrived)); } return phase; } @@ -512,9 +523,10 @@ public class Phaser { * to {@code awaitAdvance(arrive())}. If you instead need to * await with interruption of timeout, and/or deregister upon * arrival, you can arrange them using analogous constructions. + * * @return the phase on entry to this method * @throws IllegalStateException if not terminated and the number - * of unarrived parties would become negative. + * of unarrived parties would become negative */ public int arriveAndAwaitAdvance() { return awaitAdvance(arrive()); @@ -524,6 +536,7 @@ public class Phaser { * Awaits the phase of the barrier to advance from the given * value, or returns immediately if argument is negative or this * barrier is terminated. + * * @param phase the phase on entry to this method * @return the phase on exit from this method */ @@ -534,7 +547,7 @@ public class Phaser { int p = phaseOf(s); if (p != phase) return p; - if (unarrivedOf(s) == 0) + if (unarrivedOf(s) == 0 && parent != null) parent.awaitAdvance(phase); // Fall here even if parent waited, to reconcile and help release return untimedWait(phase); @@ -545,18 +558,20 @@ public class Phaser { * value, or returns immediately if argument is negative or this * barrier is terminated, or throws InterruptedException if * interrupted while waiting. + * * @param phase the phase on entry to this method * @return the phase on exit from this method * @throws InterruptedException if thread interrupted while waiting */ - public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { + public int awaitAdvanceInterruptibly(int phase) + throws InterruptedException { if (phase < 0) return phase; long s = getReconciledState(); int p = phaseOf(s); if (p != phase) return p; - if (unarrivedOf(s) != 0) + if (unarrivedOf(s) == 0 && parent != null) parent.awaitAdvanceInterruptibly(phase); return interruptibleWait(phase); } @@ -565,12 +580,14 @@ public class Phaser { * Awaits the phase of the barrier to advance from the given value * or the given timeout elapses, or returns immediately if * argument is negative or this barrier is terminated. + * * @param phase the phase on entry to this method * @return the phase on exit from this method * @throws InterruptedException if thread interrupted while waiting * @throws TimeoutException if timed out while waiting */ - public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) + public int awaitAdvanceInterruptibly(int phase, + long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { if (phase < 0) return phase; @@ -578,7 +595,7 @@ public class Phaser { int p = phaseOf(s); if (p != phase) return p; - if (unarrivedOf(s) == 0) + if (unarrivedOf(s) == 0 && parent != null) parent.awaitAdvanceInterruptibly(phase, timeout, unit); return timedWait(phase, unit.toNanos(timeout)); } @@ -611,6 +628,7 @@ public class Phaser { * Returns the current phase number. The maximum phase number is * {@code Integer.MAX_VALUE}, after which it restarts at * zero. Upon termination, the phase number is negative. + * * @return the phase number, or a negative value if terminated */ public final int getPhase() { @@ -619,6 +637,7 @@ public class Phaser { /** * Returns {@code true} if the current phase number equals the given phase. + * * @param phase the phase * @return {@code true} if the current phase number equals the given phase */ @@ -628,6 +647,7 @@ public class Phaser { /** * Returns the number of parties registered at this barrier. + * * @return the number of parties */ public int getRegisteredParties() { @@ -637,6 +657,7 @@ public class Phaser { /** * Returns the number of parties that have arrived at the current * phase of this barrier. + * * @return the number of arrived parties */ public int getArrivedParties() { @@ -646,6 +667,7 @@ public class Phaser { /** * Returns the number of registered parties that have not yet * arrived at the current phase of this barrier. + * * @return the number of unarrived parties */ public int getUnarrivedParties() { @@ -654,6 +676,7 @@ public class Phaser { /** * Returns the parent of this phaser, or null if none. + * * @return the parent of this phaser, or null if none */ public Phaser getParent() { @@ -663,6 +686,7 @@ public class Phaser { /** * Returns the root ancestor of this phaser, which is the same as * this phaser if it has no parent. + * * @return the root ancestor of this phaser */ public Phaser getRoot() { @@ -671,6 +695,7 @@ public class Phaser { /** * Returns {@code true} if this barrier has been terminated. + * * @return {@code true} if this barrier has been terminated */ public boolean isTerminated() { @@ -729,38 +754,51 @@ public class Phaser { // methods for waiting - /** The number of CPUs, for spin control */ - static final int NCPUS = Runtime.getRuntime().availableProcessors(); - - /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived. - */ - static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; - - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - static final int maxUntimedSpins = maxTimedSpins * 32; - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - static final long spinForTimeoutThreshold = 1000L; - /** - * Wait nodes for Treiber stack representing wait queue for non-FJ - * tasks. + * Wait nodes for Treiber stack representing wait queue */ - static final class QNode { - QNode next; + static final class QNode implements ForkJoinPool.ManagedBlocker { + final Phaser phaser; + final int phase; + final long startTime; + final long nanos; + final boolean timed; + final boolean interruptible; + volatile boolean wasInterrupted = false; volatile Thread thread; // nulled to cancel wait - QNode() { + QNode next; + QNode(Phaser phaser, int phase, boolean interruptible, + boolean timed, long startTime, long nanos) { + this.phaser = phaser; + this.phase = phase; + this.timed = timed; + this.interruptible = interruptible; + this.startTime = startTime; + this.nanos = nanos; thread = Thread.currentThread(); } + public boolean isReleasable() { + return (thread == null || + phaser.getPhase() != phase || + (interruptible && wasInterrupted) || + (timed && (nanos - (System.nanoTime() - startTime)) <= 0)); + } + public boolean block() { + if (Thread.interrupted()) { + wasInterrupted = true; + if (interruptible) + return true; + } + if (!timed) + LockSupport.park(this); + else { + long waitTime = nanos - (System.nanoTime() - startTime); + if (waitTime <= 0) + return true; + LockSupport.parkNanos(this, waitTime); + } + return isReleasable(); + } void signal() { Thread t = thread; if (t != null) { @@ -768,10 +806,20 @@ public class Phaser { LockSupport.unpark(t); } } + boolean doWait() { + if (thread != null) { + try { + ForkJoinPool.managedBlock(this, false); + } catch (InterruptedException ie) { + } + } + return wasInterrupted; + } + } /** - * Removes and signals waiting threads from wait queue + * Removes and signals waiting threads from wait queue. */ private void releaseWaiters(int phase) { AtomicReference head = queueFor(phase); @@ -783,137 +831,147 @@ public class Phaser { } /** + * Tries to enqueue given node in the appropriate wait queue. + * + * @return true if successful + */ + private boolean tryEnqueue(QNode node) { + AtomicReference head = queueFor(node.phase); + return head.compareAndSet(node.next = head.get(), node); + } + + /** * Enqueues node and waits unless aborted or signalled. + * + * @return current phase */ private int untimedWait(int phase) { - int spins = maxUntimedSpins; QNode node = null; - boolean interrupted = false; boolean queued = false; + boolean interrupted = false; int p; while ((p = getPhase()) == phase) { - interrupted = Thread.interrupted(); - if (node != null) { - if (!queued) { - AtomicReference head = queueFor(phase); - queued = head.compareAndSet(node.next = head.get(), node); - } - else if (node.thread != null) - LockSupport.park(this); - } - else if (spins <= 0) - node = new QNode(); + if (Thread.interrupted()) + interrupted = true; + else if (node == null) + node = new QNode(this, phase, false, false, 0, 0); + else if (!queued) + queued = tryEnqueue(node); else - --spins; + interrupted = node.doWait(); } if (node != null) node.thread = null; + releaseWaiters(phase); if (interrupted) Thread.currentThread().interrupt(); - releaseWaiters(phase); return p; } /** - * Messier interruptible version + * Interruptible version + * @return current phase */ private int interruptibleWait(int phase) throws InterruptedException { - int spins = maxUntimedSpins; QNode node = null; boolean queued = false; boolean interrupted = false; int p; - while ((p = getPhase()) == phase) { - if (interrupted = Thread.interrupted()) - break; - if (node != null) { - if (!queued) { - AtomicReference head = queueFor(phase); - queued = head.compareAndSet(node.next = head.get(), node); - } - else if (node.thread != null) - LockSupport.park(this); - } - else if (spins <= 0) - node = new QNode(); + while ((p = getPhase()) == phase && !interrupted) { + if (Thread.interrupted()) + interrupted = true; + else if (node == null) + node = new QNode(this, phase, true, false, 0, 0); + else if (!queued) + queued = tryEnqueue(node); else - --spins; + interrupted = node.doWait(); } if (node != null) node.thread = null; + if (p != phase || (p = getPhase()) != phase) + releaseWaiters(phase); if (interrupted) throw new InterruptedException(); - releaseWaiters(phase); return p; } /** - * Even messier timeout version. + * Timeout version. + * @return current phase */ private int timedWait(int phase, long nanos) throws InterruptedException, TimeoutException { + long startTime = System.nanoTime(); + QNode node = null; + boolean queued = false; + boolean interrupted = false; int p; - if ((p = getPhase()) == phase) { - long lastTime = System.nanoTime(); - int spins = maxTimedSpins; - QNode node = null; - boolean queued = false; - boolean interrupted = false; - while ((p = getPhase()) == phase) { - if (interrupted = Thread.interrupted()) - break; - long now = System.nanoTime(); - if ((nanos -= now - lastTime) <= 0) - break; - lastTime = now; - if (node != null) { - if (!queued) { - AtomicReference head = queueFor(phase); - queued = head.compareAndSet(node.next = head.get(), node); - } - else if (node.thread != null && - nanos > spinForTimeoutThreshold) { - LockSupport.parkNanos(this, nanos); - } - } - else if (spins <= 0) - node = new QNode(); - else - --spins; - } - if (node != null) - node.thread = null; - if (interrupted) - throw new InterruptedException(); - if (p == phase && (p = getPhase()) == phase) - throw new TimeoutException(); + while ((p = getPhase()) == phase && !interrupted) { + if (Thread.interrupted()) + interrupted = true; + else if (nanos - (System.nanoTime() - startTime) <= 0) + break; + else if (node == null) + node = new QNode(this, phase, true, true, startTime, nanos); + else if (!queued) + queued = tryEnqueue(node); + else + interrupted = node.doWait(); } - releaseWaiters(phase); + if (node != null) + node.thread = null; + if (p != phase || (p = getPhase()) != phase) + releaseWaiters(phase); + if (interrupted) + throw new InterruptedException(); + if (p == phase) + throw new TimeoutException(); return p; } - // Temporary Unsafe mechanics for preliminary release + // Unsafe mechanics for jsr166y 3rd party package. + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + return getUnsafeByReflection(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } + } - static final Unsafe _unsafe; - static final long stateOffset; + private static sun.misc.Unsafe getUnsafeByReflection() + throws NoSuchFieldException, IllegalAccessException { + java.lang.reflect.Field f = + sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + } - static { + private static long fieldOffset(String fieldName, Class klazz) { try { - if (Phaser.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - stateOffset = _unsafe.objectFieldOffset - (Phaser.class.getDeclaredField("state")); - } catch (Exception e) { - throw new RuntimeException("Could not initialize intrinsics", e); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName)); + } catch (NoSuchFieldException e) { + // Convert Exception to Error + NoSuchFieldError error = new NoSuchFieldError(fieldName); + error.initCause(e); + throw error; } } + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + static final long stateOffset = + fieldOffset("state", Phaser.class); + final boolean casState(long cmp, long val) { - return _unsafe.compareAndSwapLong(this, stateOffset, cmp, val); + return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val); } }