--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/16 15:32:34 1.7 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/31 19:52:39 1.24 @@ -5,12 +5,10 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Collection; /** * A thread managed by a {@link ForkJoinPool}. This class is @@ -21,6 +19,8 @@ import java.lang.reflect.*; * create such a subclass, you will also need to supply a custom * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* @@ -44,7 +44,7 @@ public class ForkJoinWorkerThread extend * of tasks. To accomplish this, we shift the CAS arbitrating pop * vs deq (steal) from being on the indices ("base" and "sp") to * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a nonnull + * both a successful pop and deq mainly entail CAS'ing a non-null * slot to null. Because we rely on CASes of references, we do * not need tag bits on base or sp. They are simple ints as used * in any circular array-based queue (see for example ArrayDeque). @@ -56,7 +56,7 @@ public class ForkJoinWorkerThread extend * considered individually, is not wait-free. One thief cannot * successfully continue until another in-progress one (or, if * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probablistic non-blockingness. If + * aggregate, we ensure at least probabilistic non-blockingness. If * an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on @@ -65,6 +65,12 @@ public class ForkJoinWorkerThread extend * which gives threads a chance to activate if necessary before * stealing (see below). * + * This approach also enables support for "async mode" where local + * task processing is in FIFO, not LIFO order; simply by using a + * version of deq rather than pop when locallyFifo is true (as set + * by the ForkJoinPool). This allows use in message-passing + * frameworks in which tasks are never joined. + * * Efficient implementation of this approach currently relies on * an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require @@ -75,7 +81,7 @@ public class ForkJoinWorkerThread extend * push) require store order and CASes (in pop and deq) require * (volatile) CAS semantics. Since these combinations aren't * supported using ordinary volatiles, the only way to accomplish - * these effciently is to use direct Unsafe calls. (Using external + * these efficiently is to use direct Unsafe calls. (Using external * AtomicIntegers and AtomicReferenceArrays for the indices and * array is significantly slower because of memory locality and * indirection effects.) Further, performance on most platforms is @@ -137,7 +143,7 @@ public class ForkJoinWorkerThread extend private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; /** - * The pool this thread works in. Accessed directly by ForkJoinTask + * The pool this thread works in. Accessed directly by ForkJoinTask. */ final ForkJoinPool pool; @@ -165,7 +171,7 @@ public class ForkJoinWorkerThread extend * Activity status. When true, this worker is considered active. * Must be false upon construction. It must be true when executing * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync + * calling pool.sync. */ private boolean active; @@ -188,7 +194,7 @@ public class ForkJoinWorkerThread extend /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * running, and accessed directly by pool during cleanup etc. */ int poolIndex; @@ -205,6 +211,7 @@ public class ForkJoinWorkerThread extend /** * Creates a ForkJoinWorkerThread operating in the given pool. + * * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ @@ -218,7 +225,8 @@ public class ForkJoinWorkerThread extend // Public access methods /** - * Returns the pool hosting this thread + * Returns the pool hosting this thread. + * * @return the pool */ public ForkJoinPool getPool() { @@ -231,7 +239,8 @@ public class ForkJoinWorkerThread extend * threads (minus one) that have ever been created in the pool. * This method may be useful for applications that track status or * collect results per-worker rather than per-task. - * @return the index number. + * + * @return the index number */ public int getPoolIndex() { return poolIndex; @@ -239,7 +248,8 @@ public class ForkJoinWorkerThread extend /** * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. + * tasks that are never joined. + * * @param async if true, use locally FIFO scheduling */ void setAsyncMode(boolean async) { @@ -261,21 +271,22 @@ public class ForkJoinWorkerThread extend final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } /** - * Transition to at least the given state. Return true if not - * already at least given state. + * Transitions to at least the given state. + * + * @return {@code true} if not already at least at given state */ private boolean transitionRunStateTo(int state) { for (;;) { int s = runState; if (s >= state) return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state)) return true; } } /** - * Try to set status to active; fail on contention + * Tries to set status to active; fails on contention. */ private boolean tryActivate() { if (!active) { @@ -287,7 +298,7 @@ public class ForkJoinWorkerThread extend } /** - * Try to set status to active; fail on contention + * Tries to set status to inactive; fails on contention. */ private boolean tryInactivate() { if (active) { @@ -299,15 +310,14 @@ public class ForkJoinWorkerThread extend } /** - * Computes next value for random victim probe. Scans don't + * Computes next value for random victim probe. Scans don't * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. + * one. Marsaglia xor-shift is cheap and works well. */ private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; + r ^= (r << 13); + r ^= (r >>> 17); + return r ^ (r << 5); } // Lifecycle methods @@ -331,7 +341,7 @@ public class ForkJoinWorkerThread extend } /** - * Execute tasks until shut down. + * Executes tasks until shut down. */ private void mainLoop() { while (!isShutdown()) { @@ -363,12 +373,12 @@ public class ForkJoinWorkerThread extend } /** - * Perform cleanup associated with termination of this worker + * Performs cleanup associated with termination of this worker * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. + * {@code super.onTermination} at the end of the overridden method. * * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. + * to an unrecoverable error, or {@code null} if completed normally */ protected void onTermination(Throwable exception) { // Execute remaining local tasks unless aborting or terminating @@ -377,14 +387,14 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = popTask(); if (t != null) t.quietlyExec(); - } catch(Throwable ex) { + } catch (Throwable ex) { exception = ex; } } // Cancel other tasks, transition status, notify pool, and // propagate exception to uncaught exception handler try { - do;while (!tryInactivate()); // ensure inactive + do {} while (!tryInactivate()); // ensure inactive cancelTasks(); runState = TERMINATED; pool.workerTerminated(this); @@ -400,35 +410,36 @@ public class ForkJoinWorkerThread extend // Intrinsics-based support for queue operations. /** - * Add in store-order the given task at given slot of q to - * null. Caller must ensure q is nonnull and index is in range. + * Adds in store-order the given task at given slot of q to null. + * Caller must ensure q is non-null and index is in range. */ private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ - _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); + ForkJoinTask t) { + UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t); } /** - * CAS given slot of q to null. Caller must ensure q is nonnull + * CAS given slot of q to null. Caller must ensure q is non-null * and index is in range. */ private static boolean casSlotNull(ForkJoinTask[] q, int i, ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** * Sets sp in store-order. */ private void storeSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + UNSAFE.putOrderedInt(this, spOffset, s); } // Main queue methods /** * Pushes a task. Called only by current thread. - * @param t the task. Caller must ensure nonnull + * + * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; @@ -445,7 +456,8 @@ public class ForkJoinWorkerThread extend /** * Tries to take a task from the base of the queue, failing if * either empty or contended. - * @return a task, or null if none or contended. + * + * @return a task, or null if none or contended */ final ForkJoinTask deqTask() { ForkJoinTask t; @@ -463,8 +475,30 @@ public class ForkJoinWorkerThread extend } /** + * Tries to take a task from the base of own queue, activating if + * necessary, failing only if empty. Called only by current thread. + * + * @return a task, or null if none + */ + final ForkJoinTask locallyDeqTask() { + int b; + while (sp != (b = base)) { + if (tryActivate()) { + ForkJoinTask[] q = queue; + int i = (q.length - 1) & b; + ForkJoinTask t = q[i]; + if (t != null && casSlotNull(q, i, t)) { + base = b + 1; + return t; + } + } + } + return null; + } + + /** * Returns a popped task, or null if empty. Ensures active status - * if nonnull. Called only by current thread. + * if non-null. Called only by current thread. */ final ForkJoinTask popTask() { int s = sp; @@ -487,7 +521,8 @@ public class ForkJoinWorkerThread extend * Specialized version of popTask to pop only if * topmost element is the given task. Called only * by current thread while active. - * @param t the task. Caller must ensure nonnull + * + * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; @@ -501,14 +536,14 @@ public class ForkJoinWorkerThread extend } /** - * Returns next task. + * Returns next task or null if empty or contended */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; if (q == null) return null; int mask = q.length - 1; - int i = locallyFifo? base : (sp - 1); + int i = locallyFifo ? base : (sp - 1); return q[i & mask]; } @@ -571,7 +606,7 @@ public class ForkJoinWorkerThread extend ForkJoinWorkerThread v = ws[mask & idx]; if (v == null || v.sp == v.base) { if (probes <= mask) - idx = (probes++ < 0)? r : (idx + 1); + idx = (probes++ < 0) ? r : (idx + 1); else break; } @@ -587,26 +622,29 @@ public class ForkJoinWorkerThread extend } /** - * gets and removes a local or stolen a task + * Gets and removes a local or stolen task. + * * @return a task, if available */ final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo? deqTask() : popTask(); + ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); if (t == null && (t = scan()) != null) ++stealCount; return t; } /** - * gets a local task + * Gets a local task. + * * @return a task, if available */ final ForkJoinTask pollLocalTask() { - return locallyFifo? deqTask() : popTask(); + return locallyFifo ? locallyDeqTask() : popTask(); } /** * Returns a pool submission, if one exists, activating first. + * * @return a submission, if available */ private ForkJoinTask pollSubmission() { @@ -632,10 +670,11 @@ public class ForkJoinWorkerThread extend } /** - * Drains tasks to given collection c + * Drains tasks to given collection c. + * * @return the number of tasks drained */ - final int drainTasksTo(Collection> c) { + final int drainTasksTo(Collection> c) { int n = 0; ForkJoinTask t; while (base != sp && (t = deqTask()) != null) { @@ -646,7 +685,7 @@ public class ForkJoinWorkerThread extend } /** - * Get and clear steal count for accumulation by pool. Called + * Gets and clears steal count for accumulation by pool. Called * only when known to be idle (in pool.sync and termination). */ final int getAndClearStealCount() { @@ -656,8 +695,9 @@ public class ForkJoinWorkerThread extend } /** - * Returns true if at least one worker in the given array appears - * to have at least one queued task. + * Returns {@code true} if at least one worker in the given array + * appears to have at least one queued task. + * * @param ws array of workers */ static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { @@ -680,8 +720,8 @@ public class ForkJoinWorkerThread extend * Returns an estimate of the number of tasks in the queue. */ final int getQueueSize() { - int n = sp - base; - return n < 0? 0 : n; // suppress momentarily negative values + // suppress momentarily negative values + return Math.max(0, sp - base); } /** @@ -694,7 +734,7 @@ public class ForkJoinWorkerThread extend } /** - * Scan, returning early if joinMe done + * Scans, returning early if joinMe done. */ final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { ForkJoinTask t = pollTask(); @@ -706,7 +746,7 @@ public class ForkJoinWorkerThread extend } /** - * Runs tasks until pool isQuiescent + * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { for (;;) { @@ -716,58 +756,63 @@ public class ForkJoinWorkerThread extend else if (tryInactivate() && pool.isQuiescent()) break; } - do;while (!tryActivate()); // re-activate on exit + do {} while (!tryActivate()); // re-activate on exit } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); + private static final long runStateOffset = + objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long qBase; + private static final int qShift; + + static { + qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + } + + private static long objectFieldOffset(String field, Class klazz) { try { - return Unsafe.getUnsafe(); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long runStateOffset; - static final long qBase; - static final int qShift; - static { - try { - _unsafe = getUnsafe(); - baseOffset = fieldOffset("base"); - spOffset = fieldOffset("sp"); - runStateOffset = fieldOffset("runState"); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } }