--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/21 00:15:14 1.11 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/31 19:52:39 1.24 @@ -5,12 +5,10 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Collection; /** * A thread managed by a {@link ForkJoinPool}. This class is @@ -21,6 +19,8 @@ import java.lang.reflect.*; * create such a subclass, you will also need to supply a custom * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* @@ -65,6 +65,12 @@ public class ForkJoinWorkerThread extend * which gives threads a chance to activate if necessary before * stealing (see below). * + * This approach also enables support for "async mode" where local + * task processing is in FIFO, not LIFO order; simply by using a + * version of deq rather than pop when locallyFifo is true (as set + * by the ForkJoinPool). This allows use in message-passing + * frameworks in which tasks are never joined. + * * Efficient implementation of this approach currently relies on * an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require @@ -137,7 +143,7 @@ public class ForkJoinWorkerThread extend private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; /** - * The pool this thread works in. Accessed directly by ForkJoinTask + * The pool this thread works in. Accessed directly by ForkJoinTask. */ final ForkJoinPool pool; @@ -165,7 +171,7 @@ public class ForkJoinWorkerThread extend * Activity status. When true, this worker is considered active. * Must be false upon construction. It must be true when executing * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync + * calling pool.sync. */ private boolean active; @@ -188,7 +194,7 @@ public class ForkJoinWorkerThread extend /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * running, and accessed directly by pool during cleanup etc. */ int poolIndex; @@ -265,15 +271,16 @@ public class ForkJoinWorkerThread extend final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } /** - * Transitions to at least the given state. Returns true if not - * already at least at given state. + * Transitions to at least the given state. + * + * @return {@code true} if not already at least at given state */ private boolean transitionRunStateTo(int state) { for (;;) { int s = runState; if (s >= state) return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state)) return true; } } @@ -291,7 +298,7 @@ public class ForkJoinWorkerThread extend } /** - * Tries to set status to active; fails on contention. + * Tries to set status to inactive; fails on contention. */ private boolean tryInactivate() { if (active) { @@ -308,10 +315,9 @@ public class ForkJoinWorkerThread extend * one. Marsaglia xor-shift is cheap and works well. */ private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; + r ^= (r << 13); + r ^= (r >>> 17); + return r ^ (r << 5); } // Lifecycle methods @@ -369,10 +375,10 @@ public class ForkJoinWorkerThread extend /** * Performs cleanup associated with termination of this worker * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. + * {@code super.onTermination} at the end of the overridden method. * * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally + * to an unrecoverable error, or {@code null} if completed normally */ protected void onTermination(Throwable exception) { // Execute remaining local tasks unless aborting or terminating @@ -381,14 +387,14 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = popTask(); if (t != null) t.quietlyExec(); - } catch(Throwable ex) { + } catch (Throwable ex) { exception = ex; } } // Cancel other tasks, transition status, notify pool, and // propagate exception to uncaught exception handler try { - do;while (!tryInactivate()); // ensure inactive + do {} while (!tryInactivate()); // ensure inactive cancelTasks(); runState = TERMINATED; pool.workerTerminated(this); @@ -408,8 +414,8 @@ public class ForkJoinWorkerThread extend * Caller must ensure q is non-null and index is in range. */ private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ - _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); + ForkJoinTask t) { + UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t); } /** @@ -418,14 +424,14 @@ public class ForkJoinWorkerThread extend */ private static boolean casSlotNull(ForkJoinTask[] q, int i, ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** * Sets sp in store-order. */ private void storeSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + UNSAFE.putOrderedInt(this, spOffset, s); } // Main queue methods @@ -469,6 +475,28 @@ public class ForkJoinWorkerThread extend } /** + * Tries to take a task from the base of own queue, activating if + * necessary, failing only if empty. Called only by current thread. + * + * @return a task, or null if none + */ + final ForkJoinTask locallyDeqTask() { + int b; + while (sp != (b = base)) { + if (tryActivate()) { + ForkJoinTask[] q = queue; + int i = (q.length - 1) & b; + ForkJoinTask t = q[i]; + if (t != null && casSlotNull(q, i, t)) { + base = b + 1; + return t; + } + } + } + return null; + } + + /** * Returns a popped task, or null if empty. Ensures active status * if non-null. Called only by current thread. */ @@ -508,14 +536,14 @@ public class ForkJoinWorkerThread extend } /** - * Returns next task. + * Returns next task or null if empty or contended */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; if (q == null) return null; int mask = q.length - 1; - int i = locallyFifo? base : (sp - 1); + int i = locallyFifo ? base : (sp - 1); return q[i & mask]; } @@ -578,7 +606,7 @@ public class ForkJoinWorkerThread extend ForkJoinWorkerThread v = ws[mask & idx]; if (v == null || v.sp == v.base) { if (probes <= mask) - idx = (probes++ < 0)? r : (idx + 1); + idx = (probes++ < 0) ? r : (idx + 1); else break; } @@ -599,7 +627,7 @@ public class ForkJoinWorkerThread extend * @return a task, if available */ final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo? deqTask() : popTask(); + ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); if (t == null && (t = scan()) != null) ++stealCount; return t; @@ -611,7 +639,7 @@ public class ForkJoinWorkerThread extend * @return a task, if available */ final ForkJoinTask pollLocalTask() { - return locallyFifo? deqTask() : popTask(); + return locallyFifo ? locallyDeqTask() : popTask(); } /** @@ -646,7 +674,7 @@ public class ForkJoinWorkerThread extend * * @return the number of tasks drained */ - final int drainTasksTo(Collection> c) { + final int drainTasksTo(Collection> c) { int n = 0; ForkJoinTask t; while (base != sp && (t = deqTask()) != null) { @@ -667,8 +695,9 @@ public class ForkJoinWorkerThread extend } /** - * Returns true if at least one worker in the given array appears - * to have at least one queued task. + * Returns {@code true} if at least one worker in the given array + * appears to have at least one queued task. + * * @param ws array of workers */ static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { @@ -691,8 +720,8 @@ public class ForkJoinWorkerThread extend * Returns an estimate of the number of tasks in the queue. */ final int getQueueSize() { - int n = sp - base; - return n < 0? 0 : n; // suppress momentarily negative values + // suppress momentarily negative values + return Math.max(0, sp - base); } /** @@ -705,7 +734,7 @@ public class ForkJoinWorkerThread extend } /** - * Scans, returning early if joinMe done + * Scans, returning early if joinMe done. */ final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { ForkJoinTask t = pollTask(); @@ -717,7 +746,7 @@ public class ForkJoinWorkerThread extend } /** - * Runs tasks until pool isQuiescent. + * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { for (;;) { @@ -727,58 +756,63 @@ public class ForkJoinWorkerThread extend else if (tryInactivate() && pool.isQuiescent()) break; } - do;while (!tryActivate()); // re-activate on exit + do {} while (!tryActivate()); // re-activate on exit + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); + private static final long runStateOffset = + objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long qBase; + private static final int qShift; + + static { + qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + } + + private static long objectFieldOffset(String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long runStateOffset; - static final long qBase; - static final int qShift; - static { - try { - _unsafe = getUnsafe(); - baseOffset = fieldOffset("base"); - spOffset = fieldOffset("sp"); - runStateOffset = fieldOffset("runState"); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } }