/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package jsr166y; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import sun.misc.Unsafe; import java.lang.reflect.*; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there * are no overridable methods dealing with scheduling or * execution. However, you can override initialization and termination * cleanup methods surrounding the main task processing loop. If you * do create such a subclass, you will also need to supply a custom * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. * */ public class ForkJoinWorkerThread extends Thread { /* * Algorithm overview: * * 1. Work-Stealing: Work-stealing queues are special forms of * Deques that support only three of the four possible * end-operations -- push, pop, and deq (aka steal), and only do * so under the constraints that push and pop are called only from * the owning thread, while deq may be called from other threads. * (If you are unfamiliar with them, you probably want to read * Herlihy and Shavit's book "The Art of Multiprocessor * programming", chapter 16 describing these in more detail before * proceeding.) The main work-stealing queue design is roughly * similar to "Dynamic Circular Work-Stealing Deque" by David * Chase and Yossi Lev, SPAA 2005 * (http://research.sun.com/scalable/pubs/index.html). The main * difference ultimately stems from gc requirements that we null * out taken slots as soon as we can, to maintain as small a * footprint as possible even in programs generating huge numbers * of tasks. To accomplish this, we shift the CAS arbitrating pop * vs deq (steal) from being on the indices ("base" and "sp") to * the slots themselves (mainly via method "casSlotNull()"). So, * both a successful pop and deq mainly entail CAS'ing a nonnull * slot to null. Because we rely on CASes of references, we do * not need tag bits on base or sp. They are simple ints as used * in any circular array-based queue (see for example ArrayDeque). * Updates to the indices must still be ordered in a way that * guarantees that (sp - base) > 0 means the queue is empty, but * otherwise may err on the side of possibly making the queue * appear nonempty when a push, pop, or deq have not fully * committed. Note that this means that the deq operation, * considered individually, is not wait-free. One thief cannot * successfully continue until another in-progress one (or, if * previously empty, a push) completes. However, in the * aggregate, we ensure at least probablistic non-blockingness. If * an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on * any empty queue to complete. One reason this works well here is * that apparently-nonempty often means soon-to-be-stealable, * which gives threads a chance to activate if necessary before * stealing (see below). * * Efficient implementation of this approach currently relies on * an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require * volatile ordering. Variable sp does not require volatile write * but needs cheaper store-ordering on writes. Because they are * protected by volatile base reads, reads of the queue array and * its slots do not need volatile load semantics, but writes (in * push) require store order and CASes (in pop and deq) require * (volatile) CAS semantics. Since these combinations aren't * supported using ordinary volatiles, the only way to accomplish * these effciently is to use direct Unsafe calls. (Using external * AtomicIntegers and AtomicReferenceArrays for the indices and * array is significantly slower because of memory locality and * indirection effects.) Further, performance on most platforms is * very sensitive to placement and sizing of the (resizable) queue * array. Even though these queues don't usually become all that * big, the initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, * queues are currently initialized immediately after the thread * gets the initial signal to start processing tasks. However, * all queue-related methods except pushTask are written in a way * that allows them to instead be lazily allocated and/or disposed * of when empty. All together, these low-level implementation * choices produce as much as a factor of 4 performance * improvement compared to naive implementations, and enable the * processing of billions of tasks per second, sometimes at the * expense of ugliness. * * 2. Run control: The primary run control is based on a global * counter (activeCount) held by the pool. It uses an algorithm * similar to that in Herlihy and Shavit section 17.6 to cause * threads to eventually block when all threads declare they are * inactive. (See variable "scans".) For this to work, threads * must be declared active when executing tasks, and before * stealing a task. They must be inactive before blocking on the * Pool Barrier (awaiting a new submission or other Pool * event). In between, there is some free play which we take * advantage of to avoid contention and rapid flickering of the * global activeCount: If inactive, we activate only if a victim * queue appears to be nonempty (see above). Similarly, a thread * tries to inactivate only after a full scan of other threads. * The net effect is that contention on activeCount is rarely a * measurable performance issue. (There are also a few other cases * where we scan for work rather than retry/block upon * contention.) * * 3. Selection control. We maintain policy of always choosing to * run local tasks rather than stealing, and always trying to * steal tasks before trying to run a new submission. All steals * are currently performed in randomly-chosen deq-order. It may be * worthwhile to bias these with locality / anti-locality * information, but doing this well probably requires more * lower-level information from JVMs than currently provided. */ /** * Capacity of work-stealing queue array upon initialization. * Must be a power of two. Initial size must be at least 2, but is * padded to minimize cache effects. */ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum work-stealing queue array size. Must be less than or * equal to 1 << 28 to ensure lack of index wraparound. (This * is less than usual bounds, because we need leftshift by 3 * to be in int range). */ private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; /** * The pool this thread works in. Accessed directly by ForkJoinTask */ final ForkJoinPool pool; /** * The work-stealing queue array. Size must be a power of two. * Initialized when thread starts, to improve memory locality. */ private ForkJoinTask[] queue; /** * Index (mod queue.length) of next queue slot to push to or pop * from. It is written only by owner thread, via ordered store. * Both sp and base are allowed to wrap around on overflow, but * (sp - base) still estimates size. */ private volatile int sp; /** * Index (mod queue.length) of least valid queue slot, which is * always the next position to steal from if nonempty. */ private volatile int base; /** * Activity status. When true, this worker is considered active. * Must be false upon construction. It must be true when executing * tasks, and BEFORE stealing a task. It must be false before * calling pool.sync */ private boolean active; /** * Run state of this worker. Supports simple versions of the usual * shutdown/shutdownNow control. */ private volatile int runState; /** * Seed for random number generator for choosing steal victims. * Uses Marsaglia xorshift. Must be nonzero upon initialization. */ private int seed; /** * Number of steals, transferred to pool when idle */ private int stealCount; /** * Index of this worker in pool array. Set once by pool before * running, and accessed directly by pool during cleanup etc */ int poolIndex; /** * The last barrier event waited for. Accessed in pool callback * methods, but only by current thread. */ long lastEventCount; /** * Creates a ForkJoinWorkerThread operating in the given pool. * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { if (pool == null) throw new NullPointerException(); this.pool = pool; // Note: poolIndex is set by pool during construction // Remaining initialization is deferred to onStart } // Public access methods /** * Returns the pool hosting this thread * @return the pool */ public ForkJoinPool getPool() { return pool; } /** * Returns the index number of this thread in its pool. The * returned value ranges from zero to the maximum number of * threads (minus one) that have ever been created in the pool. * This method may be useful for applications that track status or * collect results per-worker rather than per-task. * @return the index number. */ public int getPoolIndex() { return poolIndex; } // Runstate management // Runstate values. Order matters private static final int RUNNING = 0; private static final int SHUTDOWN = 1; private static final int TERMINATING = 2; private static final int TERMINATED = 3; final boolean isShutdown() { return runState >= SHUTDOWN; } final boolean isTerminating() { return runState >= TERMINATING; } final boolean isTerminated() { return runState == TERMINATED; } final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } /** * Transition to at least the given state. Return true if not * already at least given state. */ private boolean transitionRunStateTo(int state) { for (;;) { int s = runState; if (s >= state) return false; if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) return true; } } /** * Try to set status to active; fail on contention */ private boolean tryActivate() { if (!active) { if (!pool.tryIncrementActiveCount()) return false; active = true; } return true; } /** * Try to set status to active; fail on contention */ private boolean tryInactivate() { if (active) { if (!pool.tryDecrementActiveCount()) return false; active = false; } return true; } /** * Computes next value for random victim probe. Scans don't * require a very high quality generator, but also not a crummy * one. Marsaglia xor-shift is cheap and works well. */ private static int xorShift(int r) { r ^= r << 1; r ^= r >>> 3; r ^= r << 10; return r; } // Lifecycle methods /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute * ForkJoinTasks. */ public void run() { Throwable exception = null; try { onStart(); pool.sync(this); // await first pool event mainLoop(); } catch (Throwable ex) { exception = ex; } finally { onTermination(exception); } } /** * Execute tasks until shut down. */ private void mainLoop() { while (!isShutdown()) { ForkJoinTask t = pollTask(); if (t != null || (t = pollSubmission()) != null) t.quietlyExec(); else if (tryInactivate()) pool.sync(this); } } /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must * invoke super.onStart() at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts * processing tasks. */ protected void onStart() { // Allocate while starting to improve chances of thread-local // isolation queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; // Initial value of seed need not be especially random but // should differ across workers and must be nonzero int p = poolIndex + 1; seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits } /** * Perform cleanup associated with termination of this worker * thread. If you override this method, you must invoke * super.onTermination at the end of the overridden method. * * @param exception the exception causing this thread to abort due * to an unrecoverable error, or null if completed normally. */ protected void onTermination(Throwable exception) { // Execute remaining local tasks unless aborting or terminating while (exception == null && !pool.isTerminating() && base != sp) { try { ForkJoinTask t = popTask(); if (t != null) t.quietlyExec(); } catch(Throwable ex) { exception = ex; } } // Cancel other tasks, transition status, notify pool, and // propagate exception to uncaught exception handler try { do;while (!tryInactivate()); // ensure inactive cancelTasks(); runState = TERMINATED; pool.workerTerminated(this); } catch (Throwable ex) { // Shouldn't ever happen if (exception == null) // but if so, at least rethrown exception = ex; } finally { if (exception != null) ForkJoinTask.rethrowException(exception); } } // Intrinsics-based support for queue operations. /** * Add in store-order the given task at given slot of q to * null. Caller must ensure q is nonnull and index is in range. */ private static void setSlot(ForkJoinTask[] q, int i, ForkJoinTask t){ _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); } /** * CAS given slot of q to null. Caller must ensure q is nonnull * and index is in range. */ private static boolean casSlotNull(ForkJoinTask[] q, int i, ForkJoinTask t) { return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** * Sets sp in store-order. */ private void storeSp(int s) { _unsafe.putOrderedInt(this, spOffset, s); } // Main queue methods /** * Pushes a task. Called only by current thread. * @param t the task. Caller must ensure nonnull */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; int mask = q.length - 1; int s = sp; setSlot(q, s & mask, t); storeSp(++s); if ((s -= base) == 1) pool.signalWork(); else if (s >= mask) growQueue(); } /** * Tries to take a task from the base of the queue, failing if * either empty or contended. * @return a task, or null if none or contended. */ private ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; int i; int b; if (sp != (b = base) && (q = queue) != null && // must read q after b (t = q[i = (q.length - 1) & b]) != null && casSlotNull(q, i, t)) { base = b + 1; return t; } return null; } /** * Returns a popped task, or null if empty. Ensures active status * if nonnull. Called only by current thread. */ final ForkJoinTask popTask() { int s = sp; while (s != base) { if (tryActivate()) { ForkJoinTask[] q = queue; int mask = q.length - 1; int i = (s - 1) & mask; ForkJoinTask t = q[i]; if (t == null || !casSlotNull(q, i, t)) break; storeSp(s - 1); return t; } } return null; } /** * Specialized version of popTask to pop only if * topmost element is the given task. Called only * by current thread while active. * @param t the task. Caller must ensure nonnull */ final boolean unpushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; int mask = q.length - 1; int s = sp - 1; if (casSlotNull(q, s & mask, t)) { storeSp(s); return true; } return false; } /** * Returns next task to pop. */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; return q == null? null : q[(sp - 1) & (q.length - 1)]; } /** * Doubles queue array size. Transfers elements by emulating * steals (deqs) from old array and placing, oldest first, into * new array. */ private void growQueue() { ForkJoinTask[] oldQ = queue; int oldSize = oldQ.length; int newSize = oldSize << 1; if (newSize > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); ForkJoinTask[] newQ = queue = new ForkJoinTask[newSize]; int b = base; int bf = b + oldSize; int oldMask = oldSize - 1; int newMask = newSize - 1; do { int oldIndex = b & oldMask; ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; setSlot(newQ, b & newMask, t); } while (++b != bf); pool.signalWork(); } /** * Tries to steal a task from another worker. Starts at a random * index of workers array, and probes workers until finding one * with non-empty queue or finding that all are empty. It * randomly selects the first n probes. If these are empty, it * resorts to a full circular traversal, which is necessary to * accurately set active status by caller. Also restarts if pool * events occurred since last scan, which forces refresh of * workers array, in case barrier was associated with resize. * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than * those needed to check for and take tasks. This accounts for, * among other things, updating random seed in place without * storing it until exit. * * @return a task, or null if none found */ private ForkJoinTask scan() { ForkJoinTask t = null; int r = seed; // extract once to keep scan quiet ForkJoinWorkerThread[] ws; // refreshed on outer loop int mask; // must be power 2 minus 1 and > 0 outer:do { if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { int idx = r; int probes = ~mask; // use random index while negative for (;;) { r = xorShift(r); // update random seed ForkJoinWorkerThread v = ws[mask & idx]; if (v == null || v.sp == v.base) { if (probes <= mask) idx = (probes++ < 0)? r : (idx + 1); else break; } else if (!tryActivate() || (t = v.deqTask()) == null) continue outer; // restart on contention else break outer; } } } while (pool.hasNewSyncEvent(this)); // retry on pool events seed = r; return t; } /** * Pops or steals a task * @return a task, if available */ final ForkJoinTask pollTask() { ForkJoinTask t = popTask(); if (t == null && (t = scan()) != null) ++stealCount; return t; } /** * Returns a pool submission, if one exists, activating first. * @return a submission, if available */ private ForkJoinTask pollSubmission() { ForkJoinPool p = pool; while (p.hasQueuedSubmissions()) { ForkJoinTask t; if (tryActivate() && (t = p.pollSubmission()) != null) return t; } return null; } // Methods accessed only by Pool /** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { ForkJoinTask t; while (base != sp && (t = deqTask()) != null) t.cancelIgnoringExceptions(); } /** * Get and clear steal count for accumulation by pool. Called * only when known to be idle (in pool.sync and termination). */ final int getAndClearStealCount() { int sc = stealCount; stealCount = 0; return sc; } /** * Returns true if at least one worker in the given array appears * to have at least one queued task. * @param ws array of workers */ static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { if (ws != null) { int len = ws.length; for (int j = 0; j < 2; ++j) { // need two passes for clean sweep for (int i = 0; i < len; ++i) { ForkJoinWorkerThread w = ws[i]; if (w != null && w.sp != w.base) return true; } } } return false; } // Support methods for ForkJoinTask /** * Returns an estimate of the number of tasks in the queue. */ final int getQueueSize() { int n = sp - base; return n < 0? 0 : n; // suppress momentarily negative values } /** * Returns an estimate of the number of tasks, offset by a * function of number of idle workers. */ final int getEstimatedSurplusTaskCount() { // The halving approximates weighting idle vs non-idle workers return (sp - base) - (pool.getIdleThreadCount() >>> 1); } /** * Scan, returning early if joinMe done */ final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { ForkJoinTask t = pollTask(); if (t != null && joinMe.status < 0 && sp == base) { pushTask(t); // unsteal if done and this task would be stealable t = null; } return t; } /** * Runs tasks until pool isQuiescent */ final void helpQuiescePool() { for (;;) { ForkJoinTask t = pollTask(); if (t != null) t.quietlyExec(); else if (tryInactivate() && pool.isQuiescent()) break; } do;while (!tryActivate()); // re-activate on exit } // Temporary Unsafe mechanics for preliminary release private static Unsafe getUnsafe() throws Throwable { try { return Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged (new java.security.PrivilegedExceptionAction() { public Unsafe run() throws Exception { return getUnsafePrivileged(); }}); } catch (java.security.PrivilegedActionException e) { throw e.getCause(); } } } private static Unsafe getUnsafePrivileged() throws NoSuchFieldException, IllegalAccessException { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); return (Unsafe) f.get(null); } private static long fieldOffset(String fieldName) throws NoSuchFieldException { return _unsafe.objectFieldOffset (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); } static final Unsafe _unsafe; static final long baseOffset; static final long spOffset; static final long runStateOffset; static final long qBase; static final int qShift; static { try { _unsafe = getUnsafe(); baseOffset = fieldOffset("base"); spOffset = fieldOffset("sp"); runStateOffset = fieldOffset("runState"); qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); qShift = 31 - Integer.numberOfLeadingZeros(s); } catch (Throwable e) { throw new RuntimeException("Could not initialize intrinsics", e); } } }