--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/10/06 19:02:48 1.30 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/04/05 15:52:26 1.31 @@ -8,7 +8,9 @@ package jsr166y; import java.util.concurrent.*; +import java.util.Random; import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is @@ -25,46 +27,55 @@ import java.util.Collection; */ public class ForkJoinWorkerThread extends Thread { /* - * Algorithm overview: + * Overview: * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a non-null - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probabilistic - * non-blockingness. If an attempted steal fails, a thief always - * chooses a different random victim target to try next. So, in - * order for one thief to progress, it suffices for any - * in-progress deq or new push on any empty queue to complete. One - * reason this works well here is that apparently-nonempty often - * means soon-to-be-stealable, which gives threads a chance to - * activate if necessary before stealing (see below). + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform + * ForkJoinTasks. This class includes bookkeeping in support of + * worker activation, suspension, and lifecycle control described + * in more detail in the internal documentation of class + * ForkJoinPool. And as described further below, this class also + * includes special-cased support for some ForkJoinTask + * methods. But the main mechanics involve work-stealing: + * + * Work-stealing queues are special forms of Deques that support + * only three of the four possible end-operations -- push, pop, + * and deq (aka steal), under the further constraints that push + * and pop are called only from the owning thread, while deq may + * be called from other threads. (If you are unfamiliar with + * them, you probably want to read Herlihy and Shavit's book "The + * Art of Multiprocessor programming", chapter 16 describing these + * in more detail before proceeding.) The main work-stealing + * queue design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from gc requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs deq (steal) from being on the indices + * ("base" and "sp") to the slots themselves (mainly via method + * "casSlotNull()"). So, both a successful pop and deq mainly + * entail a CAS of a slot from non-null to null. Because we rely + * on CASes of references, we do not need tag bits on base or sp. + * They are simple ints as used in any circular array-based queue + * (see for example ArrayDeque). Updates to the indices must + * still be ordered in a way that guarantees that sp == base means + * the queue is empty, but otherwise may err on the side of + * possibly making the queue appear nonempty when a push, pop, or + * deq have not fully committed. Note that this means that the deq + * operation, considered individually, is not wait-free. One thief + * cannot successfully continue until another in-progress one (or, + * if previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different + * random victim target to try next. So, in order for one thief to + * progress, it suffices for any in-progress deq or new push on + * any empty queue to complete. One reason this works well here is + * that apparently-nonempty often means soon-to-be-stealable, + * which gives threads a chance to set activation status if + * necessary before stealing. * * This approach also enables support for "async mode" where local * task processing is in FIFO, not LIFO order; simply by using a @@ -75,17 +86,18 @@ public class ForkJoinWorkerThread extend * Efficient implementation of this approach currently relies on * an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. (See "Idempotent work stealing" by - * Michael, Saraswat, and Vechev, PPoPP 2009 - * http://portal.acm.org/citation.cfm?id=1504186 for an algorithm - * with similar properties, but without support for nulling - * slots.) Since these combinations aren't supported using - * ordinary volatiles, the only way to accomplish these + * volatile ordering. Variable sp does not require volatile + * writes but still needs store-ordering, which we accomplish by + * pre-incrementing sp before filling the slot with an ordered + * store. (Pre-incrementing also enables backouts used in + * scanWhileJoining.) Because they are protected by volatile base + * reads, reads of the queue array and its slots by other threads + * do not need volatile load semantics, but writes (in push) + * require store order and CASes (in pop and deq) require + * (volatile) CAS semantics. (Michael, Saraswat, and Vechev's + * algorithm has similar properties, but without support for + * nulling slots.) Since these combinations aren't supported + * using ordinary volatiles, the only way to accomplish these * efficiently is to use direct Unsafe calls. (Using external * AtomicIntegers and AtomicReferenceArrays for the indices and * array is significantly slower because of memory locality and @@ -97,43 +109,34 @@ public class ForkJoinWorkerThread extend * initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. For this to work, threads must be declared active - * when executing tasks, and before stealing a task. They must be - * inactive before blocking on the Pool Barrier (awaiting a new - * submission or other Pool event). In between, there is some free - * play which we take advantage of to avoid contention and rapid - * flickering of the global activeCount: If inactive, we activate - * only if a victim queue appears to be nonempty (see above). - * Similarly, a thread tries to inactivate only after a full scan - * of other threads. The net effect is that contention on - * activeCount is rarely a measurable performance issue. (There - * are also a few other cases where we scan for work rather than - * retry/block upon contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. + * queues are initialized after starting. All together, these + * low-level implementation choices produce as much as a factor of + * 4 performance improvement compared to naive implementations, + * and enable the processing of billions of tasks per second, + * sometimes at the expense of ugliness. */ /** + * Generator for initial random seeds for random victim + * selection. This is used only to create initial seeds. Random + * steals use a cheaper xorshift generator per steal attempt. We + * expect only rare contention on seedGenerator, so just use a + * plain Random. + */ + private static final Random seedGenerator = new Random(); + + /** + * The timeout value for suspending spares. Spare workers that + * remain unsignalled for more than this time may be trimmed + * (killed and removed from pool). Since our goal is to avoid + * long-term thread buildup, the exact value of timeout does not + * matter too much so long as it avoids most false-alarm timeouts + * under GC stalls or momentarily high system load. + */ + private static final long SPARE_KEEPALIVE_NANOS = + 5L * 1000L * 1000L * 1000L; // 5 secs + + /** * Capacity of work-stealing queue array upon initialization. * Must be a power of two. Initial size must be at least 2, but is * padded to minimize cache effects. @@ -155,65 +158,89 @@ public class ForkJoinWorkerThread extend /** * The work-stealing queue array. Size must be a power of two. - * Initialized when thread starts, to improve memory locality. + * Initialized in onStart, to improve memory locality. */ private ForkJoinTask[] queue; /** - * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. - */ - private volatile int sp; - - /** * Index (mod queue.length) of least valid queue slot, which is * always the next position to steal from if nonempty. */ private volatile int base; /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync. - */ - private boolean active; + * Index (mod queue.length) of next queue slot to push to or pop + * from. It is written only by owner thread, and accessed by other + * threads only after reading (volatile) base. Both sp and base + * are allowed to wrap around on overflow, but (sp - base) still + * estimates size. + */ + private int sp; /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. + * Run state of this worker. In addition to the usual run levels, + * tracks if this worker is suspended as a spare, and if it was + * killed (trimmed) while suspended. However, "active" status is + * maintained separately. */ private volatile int runState; + private static final int TERMINATING = 0x01; + private static final int TERMINATED = 0x02; + private static final int SUSPENDED = 0x04; // inactive spare + private static final int TRIMMED = 0x08; // killed while suspended + + /** + * Number of LockSupport.park calls to block this thread for + * suspension or event waits. Used for internal instrumention; + * currently not exported but included because volatile write upon + * park also provides a workaround for a JVM bug. + */ + private volatile int parkCount; + + /** + * Number of steals, transferred and reset in pool callbacks pool + * when idle Accessed directly by pool. + */ + int stealCount; + /** * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be nonzero upon initialization. + * Uses Marsaglia xorshift. Must be initialized as nonzero. */ private int seed; /** - * Number of steals, transferred to pool when idle + * Activity status. When true, this worker is considered active. + * Accessed directly by pool. Must be false upon construction. + */ + boolean active; + + /** + * True if use local fifo, not default lifo, for local polling. + * Shadows value from ForkJoinPool, which resets it if changed + * pool-wide. */ - private int stealCount; + private boolean locallyFifo; /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc. + * running, and accessed directly by pool to locate this worker in + * its workers array. */ int poolIndex; /** - * The last barrier event waited for. Accessed in pool callback - * methods, but only by current thread. + * The last pool event waited for. Accessed only by pool in + * callback methods invoked within this thread. */ - long lastEventCount; + int lastEventCount; /** - * True if use local fifo, not default lifo, for local polling + * Encoded index and event count of next event waiter. Used only + * by ForkJoinPool for managing event waiters. */ - private boolean locallyFifo; + volatile long nextWaiter; /** * Creates a ForkJoinWorkerThread operating in the given pool. @@ -224,11 +251,24 @@ public class ForkJoinWorkerThread extend protected ForkJoinWorkerThread(ForkJoinPool pool) { if (pool == null) throw new NullPointerException(); this.pool = pool; - // Note: poolIndex is set by pool during construction - // Remaining initialization is deferred to onStart + // To avoid exposing construction details to subclasses, + // remaining initialization is in start() and onStart() } - // Public access methods + /** + * Performs additional initialization and starts this thread + */ + final void start(int poolIndex, boolean locallyFifo, + UncaughtExceptionHandler ueh) { + this.poolIndex = poolIndex; + this.locallyFifo = locallyFifo; + if (ueh != null) + setUncaughtExceptionHandler(ueh); + setDaemon(true); + start(); + } + + // Public/protected methods /** * Returns the pool hosting this thread. @@ -253,82 +293,49 @@ public class ForkJoinWorkerThread extend } /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. - * - * @param async if true, use locally FIFO scheduling + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke super.onStart() at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. */ - void setAsyncMode(boolean async) { - locallyFifo = async; - } - - // Runstate management - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } + protected void onStart() { + int rs = seedGenerator.nextInt(); + seed = rs == 0? 1 : rs; // seed must be nonzero - /** - * Transitions to at least the given state. - * - * @return {@code true} if not already at least at given state - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } + // Allocate name string and queue array in this thread + String pid = Integer.toString(pool.getPoolNumber()); + String wid = Integer.toString(poolIndex); + setName("ForkJoinPool-" + pid + "-worker-" + wid); - /** - * Tries to set status to active; fails on contention. - */ - private boolean tryActivate() { - if (!active) { - if (!pool.tryIncrementActiveCount()) - return false; - active = true; - } - return true; + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; } /** - * Tries to set status to inactive; fails on contention. + * Performs cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * {@code super.onTermination} at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or {@code null} if completed normally */ - private boolean tryInactivate() { - if (active) { - if (!pool.tryDecrementActiveCount()) - return false; - active = false; + protected void onTermination(Throwable exception) { + try { + cancelTasks(); + setTerminated(); + pool.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + UNSAFE.throwException(exception); } - return true; } /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - */ - private static int xorShift(int r) { - r ^= (r << 13); - r ^= (r >>> 17); - return r ^ (r << 5); - } - - // Lifecycle methods - - /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute * ForkJoinTasks. @@ -337,7 +344,6 @@ public class ForkJoinWorkerThread extend Throwable exception = null; try { onStart(); - pool.sync(this); // await first pool event mainLoop(); } catch (Throwable ex) { exception = ex; @@ -346,138 +352,126 @@ public class ForkJoinWorkerThread extend } } + // helpers for run() + /** - * Executes tasks until shut down. + * Find and execute tasks and check status while running */ private void mainLoop() { - while (!isShutdown()) { - ForkJoinTask t = pollTask(); - if (t != null || (t = pollSubmission()) != null) - t.quietlyExec(); - else if (tryInactivate()) - pool.sync(this); + boolean ran = false; // true if ran task on previous step + ForkJoinPool p = pool; + for (;;) { + p.preStep(this, ran); + if (runState != 0) + return; + ForkJoinTask t; // try to get and run stolen or submitted task + if (ran = (t = scan()) != null || (t = pollSubmission()) != null) { + t.tryExec(); + if (base != sp) + runLocalTasks(); + } } } /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. + * Runs local tasks until queue is empty or shut down. Call only + * while active. */ - protected void onStart() { - // Allocate while starting to improve chances of thread-local - // isolation - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - // Initial value of seed need not be especially random but - // should differ across workers and must be nonzero - int p = poolIndex + 1; - seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + private void runLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); + if (t != null) + t.tryExec(); + else if (base == sp) + break; + } } /** - * Performs cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * {@code super.onTermination} at the end of the overridden method. + * If a submission exists, try to activate and take it * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or {@code null} if completed normally + * @return a task, if available */ - protected void onTermination(Throwable exception) { - // Execute remaining local tasks unless aborting or terminating - while (exception == null && pool.isProcessingTasks() && base != sp) { - try { - ForkJoinTask t = popTask(); - if (t != null) - t.quietlyExec(); - } catch (Throwable ex) { - exception = ex; + private ForkJoinTask pollSubmission() { + ForkJoinPool p = pool; + while (p.hasQueuedSubmissions()) { + if (active || (active = p.tryIncrementActiveCount())) { + ForkJoinTask t = p.pollSubmission(); + return t != null ? t : scan(); // if missed, rescan } } - // Cancel other tasks, transition status, notify pool, and - // propagate exception to uncaught exception handler - try { - do {} while (!tryInactivate()); // ensure inactive - cancelTasks(); - runState = TERMINATED; - pool.workerTerminated(this); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } - } - - // Intrinsics-based support for queue operations. - - private static long slotOffset(int i) { - return ((long) i << qShift) + qBase; + return null; } - /** - * Adds in store-order the given task at given slot of q to null. - * Caller must ensure q is non-null and index is in range. + /* + * Intrinsics-based atomic writes for queue slots. These are + * basically the same as methods in AtomicObjectArray, but + * specialized for (1) ForkJoinTask elements (2) requirement that + * nullness and bounds checks have already been performed by + * callers and (3) effective offsets are known not to overflow + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't + * need corresponding version for reads: plain array reads are OK + * because they protected by other volatile reads and are + * confirmed by CASes. + * + * Most uses don't actually call these methods, but instead contain + * inlined forms that enable more predictable optimization. We + * don't define the version of write used in pushTask at all, but + * instead inline there a store-fenced array slot write. */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t) { - UNSAFE.putOrderedObject(q, slotOffset(i), t); - } /** - * CAS given slot of q to null. Caller must ensure q is non-null - * and index is in range. + * CASes slot i of array q from t to null. Caller must ensure q is + * non-null and index is in range. */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { - return UNSAFE.compareAndSwapObject(q, slotOffset(i), t, null); + private static final boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** - * Sets sp in store-order. + * Performs a volatile write of the given task at given slot of + * array q. Caller must ensure q is non-null and index is in + * range. This method is used only during resets and backouts. */ - private void storeSp(int s) { - UNSAFE.putOrderedInt(this, spOffset, s); + private static final void writeSlot(ForkJoinTask[] q, int i, + ForkJoinTask t) { + UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } - // Main queue methods + // queue methods /** - * Pushes a task. Called only by current thread. + * Pushes a task. Call only from this thread. * * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { + int s; ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - setSlot(q, s & mask, t); - storeSp(++s); - if ((s -= base) == 1) + int mask = q.length - 1; // implicit assert q != null + UNSAFE.putOrderedObject(q, (((s = sp++) & mask) << qShift) + qBase, t); + if ((s -= base) <= 0) pool.signalWork(); - else if (s >= mask) + else if (s + 1 >= mask) growQueue(); } /** * Tries to take a task from the base of the queue, failing if - * either empty or contended. + * empty or contended. Note: Specializations of this code appear + * in scan and scanWhileJoining. * * @return a task, or null if none or contended */ final ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; - int i; - int b; - if (sp != (b = base) && + int b, i; + if ((b = base) != sp && (q = queue) != null && // must read q after b (t = q[i = (q.length - 1) & b]) != null && - casSlotNull(q, i, t)) { + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { base = b + 1; return t; } @@ -485,19 +479,20 @@ public class ForkJoinWorkerThread extend } /** - * Tries to take a task from the base of own queue, activating if - * necessary, failing only if empty. Called only by current thread. + * Tries to take a task from the base of own queue. Assumes active + * status. Called only by current thread. * * @return a task, or null if none */ final ForkJoinTask locallyDeqTask() { - int b; - while (sp != (b = base)) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int i = (q.length - 1) & b; - ForkJoinTask t = q[i]; - if (t != null && casSlotNull(q, i, t)) { + ForkJoinTask[] q = queue; + if (q != null) { + ForkJoinTask t; + int b, i; + while (sp != (b = base)) { + if ((t = q[i = (q.length - 1) & b]) != null && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, + t, null)) { base = b + 1; return t; } @@ -507,20 +502,19 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Ensures active status - * if non-null. Called only by current thread. + * Returns a popped task, or null if empty. Assumes active status. + * Called only by current thread. (Note: a specialization of this + * code appears in scanWhileJoining.) */ final ForkJoinTask popTask() { - int s = sp; - while (s != base) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int i = (s - 1) & mask; - ForkJoinTask t = q[i]; - if (t == null || !casSlotNull(q, i, t)) - break; - storeSp(s - 1); + int s; + ForkJoinTask[] q = queue; + if (q != null && (s = sp) != base) { + int i = (q.length - 1) & --s; + ForkJoinTask t = q[i]; + if (t != null && UNSAFE.compareAndSwapObject + (q, (i << qShift) + qBase, t, null)) { + sp = s; return t; } } @@ -535,11 +529,11 @@ public class ForkJoinWorkerThread extend * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { + int s; ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (casSlotNull(q, s & mask, t)) { - storeSp(s); + if (q != null && UNSAFE.compareAndSwapObject + (q, (((q.length - 1) & (s = sp - 1)) << qShift) + qBase, t, null)){ + sp = s; return true; } return false; @@ -579,104 +573,214 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; - setSlot(newQ, b & newMask, t); + writeSlot(newQ, b & newMask, t); } while (++b != bf); pool.signalWork(); } /** + * Computes next value for random victim probe in scan(). Scans + * don't require a very high quality generator, but also not a + * crummy one. Marsaglia xor-shift is cheap and works well enough. + * Note: This is manually inlined in scan() + */ + private static final int xorShift(int r) { + r ^= r << 13; + r ^= r >>> 17; + return r ^ (r << 5); + } + + /** * Tries to steal a task from another worker. Starts at a random * index of workers array, and probes workers until finding one * with non-empty queue or finding that all are empty. It * randomly selects the first n probes. If these are empty, it - * resorts to a full circular traversal, which is necessary to - * accurately set active status by caller. Also restarts if pool - * events occurred since last scan, which forces refresh of - * workers array, in case barrier was associated with resize. + * resorts to a circular sweep, which is necessary to accurately + * set active status. (The circular sweep uses steps of + * approximately half the array size plus 1, to avoid bias + * stemming from leftmost packing of the array in ForkJoinPool.) * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. + * those needed to check for and take tasks (or to activate if not + * already active). This accounts for, among other things, + * updating random seed in place without storing it until exit. * * @return a task, or null if none found */ private ForkJoinTask scan() { - ForkJoinTask t = null; - int r = seed; // extract once to keep scan quiet - ForkJoinWorkerThread[] ws; // refreshed on outer loop - int mask; // must be power 2 minus 1 and > 0 - outer:do { - if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { - int idx = r; - int probes = ~mask; // use random index while negative - for (;;) { - r = xorShift(r); // update random seed - ForkJoinWorkerThread v = ws[mask & idx]; - if (v == null || v.sp == v.base) { - if (probes <= mask) - idx = (probes++ < 0) ? r : (idx + 1); - else - break; + ForkJoinPool p = pool; + ForkJoinWorkerThread[] ws = p.workers; + int n = ws.length; // upper bound of #workers + boolean canSteal = active; // shadow active status + int r = seed; // extract seed once + int k = r; // index: random if j<0 else step + for (int j = -n; j < n; ++j) { + ForkJoinWorkerThread v = ws[k & (n - 1)]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + if (v != null && v.base != v.sp) { + if (canSteal || // ensure active status + (canSteal = active = p.tryIncrementActiveCount())) { + int b, i; // inlined specialization of deqTask + ForkJoinTask t; + ForkJoinTask[] q; + if ((b = v.base) != v.sp && // recheck + (q = v.queue) != null && + (t = q[i = (q.length - 1) & b]) != null && + UNSAFE.compareAndSwapObject + (q, (i << qShift) + qBase, t, null)) { + v.base = b + 1; + seed = r; + ++stealCount; + return t; } - else if (!tryActivate() || (t = v.deqTask()) == null) - continue outer; // restart on contention - else - break outer; } + j = -n; // reset on contention } - } while (pool.hasNewSyncEvent(this)); // retry on pool events - seed = r; - return t; + k = j >= 0? k + ((n >>> 1) | 1) : r; + } + return null; } + // Run State management + + // status check methods used mainly by ForkJoinPool + final boolean isTerminating() { return (runState & TERMINATING) != 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + /** - * Gets and removes a local or stolen task. - * - * @return a task, if available + * Sets state to TERMINATING, also resuming if suspended. */ - final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); - if (t == null && (t = scan()) != null) - ++stealCount; - return t; + final void shutdown() { + for (;;) { + int s = runState; + if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) { + LockSupport.unpark(this); + break; + } + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | TERMINATING)) + break; + } + } + + /** + * Sets state to TERMINATED. Called only by this thread. + */ + private void setTerminated() { + int s; + do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, + s = runState, + s | (TERMINATING|TERMINATED))); } /** - * Gets a local task. + * Instrumented version of park. Also used by ForkJoinPool.awaitEvent + */ + final void doPark() { + ++parkCount; + LockSupport.park(this); + } + + /** + * If suspended, tries to set status to unsuspended. + * Caller must unpark to actually resume * - * @return a task, if available + * @return true if successful */ - final ForkJoinTask pollLocalTask() { - return locallyFifo ? locallyDeqTask() : popTask(); + final boolean tryUnsuspend() { + int s; + return (((s = runState) & SUSPENDED) != 0 && + UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)); } /** - * Returns a pool submission, if one exists, activating first. + * Sets suspended status and blocks as spare until resumed, + * shutdown, or timed out. * - * @return a submission, if available + * @return false if trimmed */ - private ForkJoinTask pollSubmission() { + final boolean suspendAsSpare() { + for (;;) { // set suspended unless terminating + int s = runState; + if ((s & TERMINATING) != 0) { // must kill + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | (TRIMMED | TERMINATING))) + return false; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | SUSPENDED)) + break; + } + lastEventCount = 0; // reset upon resume ForkJoinPool p = pool; - while (p.hasQueuedSubmissions()) { - ForkJoinTask t; - if (tryActivate() && (t = p.pollSubmission()) != null) - return t; + p.releaseWaiters(); // help others progress + p.accumulateStealCount(this); + interrupted(); // clear/ignore interrupts + if (poolIndex < p.getParallelism()) { // untimed wait + while ((runState & SUSPENDED) != 0) + doPark(); + return true; } - return null; + return timedSuspend(); // timed wait if apparently non-core } - // Methods accessed only by Pool + /** + * Blocks as spare until resumed or timed out + * @return false if trimmed + */ + private boolean timedSuspend() { + long nanos = SPARE_KEEPALIVE_NANOS; + long startTime = System.nanoTime(); + while ((runState & SUSPENDED) != 0) { + ++parkCount; + if ((nanos -= (System.nanoTime() - startTime)) > 0) + LockSupport.parkNanos(this, nanos); + else { // try to trim on timeout + int s = runState; + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) + return false; + } + } + return true; + } + + // Misc support methods for ForkJoinPool + + /** + * Returns an estimate of the number of tasks in the queue. Also + * used by ForkJoinTask. + */ + final int getQueueSize() { + return -base + sp; + } + + /** + * Set locallyFifo mode. Called only by ForkJoinPool + */ + final void setAsyncMode(boolean async) { + locallyFifo = async; + } /** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) - t.cancelIgnoringExceptions(); + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) + t.cancelIgnoringExceptions(); + } } /** @@ -686,73 +790,157 @@ public class ForkJoinWorkerThread extend */ final int drainTasksTo(Collection> c) { int n = 0; - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) { - c.add(t); - ++n; + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) { + c.add(t); + ++n; + } } return n; } - /** - * Gets and clears steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). - */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; - } + // Support methods for ForkJoinTask /** - * Returns {@code true} if at least one worker in the given array - * appears to have at least one queued task. + * Returns an estimate of the number of tasks, offset by a + * function of number of idle workers. * - * @param ws array of workers + * This method provides a cheap heuristic guide for task + * partitioning when programmers, frameworks, tools, or languages + * have little or no idea about task granularity. In essence by + * offering this method, we ask users only about tradeoffs in + * overhead vs expected throughput and its variance, rather than + * how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, each + * thread makes available for stealing enough tasks for other + * threads to remain active. Inductively, if all threads play by + * the same rules, each thread should make available only a + * constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of 1 + * would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should minimize + * steal rates, which in general means that threads nearer the top + * of computation tree should generate more than those nearer the + * bottom. In perfect steady state, each thread is at + * approximately the same level of computation tree. However, + * producing extra tasks amortizes the uncertainty of progress and + * diffusion assumptions. + * + * So, users will want to use values larger, but not much larger + * than 1 to both smooth over transient shortages and hedge + * against uneven progress; as traded off against the cost of + * extra task overhead. We leave the user to pick a threshold + * value to compare with the results of this call to guide + * decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to estimate + * surplus strictly locally. In steady-state, if one thread is + * maintaining say 2 surplus tasks, then so are others. So we can + * just use estimated queue length (although note that (sp - base) + * can be an overestimate because of stealers lagging increments + * of base). However, this strategy alone leads to serious + * mis-estimates in some non-steady-state conditions (ramp-up, + * ramp-down, other stalls). We can detect many of these by + * further considering the number of "idle" threads, that are + * known to have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. */ - static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { - if (ws != null) { - int len = ws.length; - for (int j = 0; j < 2; ++j) { // need two passes for clean sweep - for (int i = 0; i < len; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.sp != w.base) - return true; - } - } - } - return false; + final int getEstimatedSurplusTaskCount() { + return sp - base - pool.idlePerActive(); } - // Support methods for ForkJoinTask - /** - * Returns an estimate of the number of tasks in the queue. + * Gets and removes a local task. + * + * @return a task, if available */ - final int getQueueSize() { - // suppress momentarily negative values - return Math.max(0, sp - base); + final ForkJoinTask pollLocalTask() { + while (base != sp) { + if (active || (active = pool.tryIncrementActiveCount())) + return locallyFifo? locallyDeqTask() : popTask(); + } + return null; } /** - * Returns an estimate of the number of tasks, offset by a - * function of number of idle workers. + * Gets and removes a local or stolen task. + * + * @return a task, if available */ - final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); + final ForkJoinTask pollTask() { + ForkJoinTask t; + return (t = pollLocalTask()) != null ? t : scan(); } /** - * Scans, returning early if joinMe done. + * Returns a stolen task, if available, unless joinMe is done + * + * This method is intrinsically nonmodular. To maintain the + * property that tasks are never stolen if the awaited task is + * ready, we must interleave mechanics of scan with status + * checks. We rely here on the commit points of deq that allow us + * to cancel a steal even after CASing slot to null, but before + * adjusting base index: If, after the CAS, we see that joinMe is + * ready, we can back out by placing the task back into the slot, + * without adjusting index. The scan loop is otherwise the same as + * in scan. + * + * The outer loop cannot be allowed to run forever, because it + * could lead to a form of deadlock if all threads are executing + * this method. However, we must also be patient before giving up, + * to cope with GC stalls, transient high loads, etc. The loop + * terminates (causing caller to possibly block this thread and + * create a replacement) only after #workers clean sweeps during + * which all running threads are active. */ final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = pollTask(); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; + int sweeps = 0; + int r = seed; + ForkJoinPool p = pool; + p.releaseWaiters(); // help other threads progress + while (joinMe.status >= 0) { + ForkJoinWorkerThread[] ws = p.workers; + int n = ws.length; + int k = r; + for (int j = -n; j < n; ++j) { + ForkJoinWorkerThread v = ws[k & (n - 1)]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + if (v != null) { + int b = v.base; + ForkJoinTask[] q; + if (b != v.sp && (q = v.queue) != null) { + int i = (q.length - 1) & b; + ForkJoinTask t = q[i]; + if (t != null) { + if (joinMe.status < 0) + return null; + if (UNSAFE.compareAndSwapObject + (q, (i << qShift) + qBase, t, null)) { + if (joinMe.status < 0) { + writeSlot(q, i, t); // back out + return null; + } + v.base = b + 1; + seed = r; + ++stealCount; + return t; + } + } + sweeps = 0; // ensure rescan on contention + } + } + k = j >= 0? k + ((n >>> 1) | 1) : r; + if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck + return null; + } + if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n) + return null; } - return t; + return null; } /** @@ -760,27 +948,34 @@ public class ForkJoinWorkerThread extend */ final void helpQuiescePool() { for (;;) { - ForkJoinTask t = pollTask(); - if (t != null) - t.quietlyExec(); - else if (tryInactivate() && pool.isQuiescent()) - break; + ForkJoinTask t = pollLocalTask(); + if (t != null || (t = scan()) != null) + t.tryExec(); + else { + ForkJoinPool p = pool; + if (active) { + active = false; // inactivate + do {} while (!p.tryDecrementActiveCount()); + } + if (p.isQuiescent()) { + active = true; // re-activate + do {} while (!p.tryIncrementActiveCount()); + return; + } + } } - do {} while (!tryActivate()); // re-activate on exit } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - private static final long spOffset = - objectFieldOffset("sp", ForkJoinWorkerThread.class); private static final long runStateOffset = objectFieldOffset("runState", ForkJoinWorkerThread.class); - private static final long qBase; + private static final long qBase = + UNSAFE.arrayBaseOffset(ForkJoinTask[].class); private static final int qShift; static { - qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two");