--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/31 19:52:39 1.24 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/24 20:28:18 1.39 @@ -8,62 +8,74 @@ package jsr166y; import java.util.concurrent.*; +import java.util.Random; import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * methods surrounding the main task processing loop. If you do - * create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. + * are no overridable methods dealing with scheduling or execution. + * However, you can override initialization and termination methods + * surrounding the main task processing loop. If you do create such a + * subclass, you will also need to supply a custom {@link + * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code + * ForkJoinPool}. * * @since 1.7 * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* - * Algorithm overview: + * Overview: * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a non-null - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probabilistic non-blockingness. If - * an attempted steal fails, a thief always chooses a different + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform + * ForkJoinTasks. This class includes bookkeeping in support of + * worker activation, suspension, and lifecycle control described + * in more detail in the internal documentation of class + * ForkJoinPool. And as described further below, this class also + * includes special-cased support for some ForkJoinTask + * methods. But the main mechanics involve work-stealing: + * + * Work-stealing queues are special forms of Deques that support + * only three of the four possible end-operations -- push, pop, + * and deq (aka steal), under the further constraints that push + * and pop are called only from the owning thread, while deq may + * be called from other threads. (If you are unfamiliar with + * them, you probably want to read Herlihy and Shavit's book "The + * Art of Multiprocessor programming", chapter 16 describing these + * in more detail before proceeding.) The main work-stealing + * queue design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from gc requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs deq (steal) from being on the indices + * ("base" and "sp") to the slots themselves (mainly via method + * "casSlotNull()"). So, both a successful pop and deq mainly + * entail a CAS of a slot from non-null to null. Because we rely + * on CASes of references, we do not need tag bits on base or sp. + * They are simple ints as used in any circular array-based queue + * (see for example ArrayDeque). Updates to the indices must + * still be ordered in a way that guarantees that sp == base means + * the queue is empty, but otherwise may err on the side of + * possibly making the queue appear nonempty when a push, pop, or + * deq have not fully committed. Note that this means that the deq + * operation, considered individually, is not wait-free. One thief + * cannot successfully continue until another in-progress one (or, + * if previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on * any empty queue to complete. One reason this works well here is * that apparently-nonempty often means soon-to-be-stealable, - * which gives threads a chance to activate if necessary before - * stealing (see below). + * which gives threads a chance to set activation status if + * necessary before stealing. * * This approach also enables support for "async mode" where local * task processing is in FIFO, not LIFO order; simply by using a @@ -71,65 +83,100 @@ public class ForkJoinWorkerThread extend * by the ForkJoinPool). This allows use in message-passing * frameworks in which tasks are never joined. * - * Efficient implementation of this approach currently relies on - * an uncomfortable amount of "Unsafe" mechanics. To maintain + * When a worker would otherwise be blocked waiting to join a + * task, it first tries a form of linear helping: Each worker + * records (in field currentSteal) the most recent task it stole + * from some other worker. Plus, it records (in field currentJoin) + * the task it is currently actively joining. Method joinTask uses + * these markers to try to find a worker to help (i.e., steal back + * a task from and execute it) that could hasten completion of the + * actively joined task. In essence, the joiner executes a task + * that would be on its own local deque had the to-be-joined task + * not been stolen. This may be seen as a conservative variant of + * the approach in Wagner & Calder "Leapfrogging: a portable + * technique for implementing efficient futures" SIGPLAN Notices, + * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs + * in that: (1) We only maintain dependency links across workers + * upon steals, rather than use per-task bookkeeping. This may + * require a linear scan of workers array to locate stealers, but + * usually doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. This isolates cost to + * when it is needed, rather than adding to per-task overhead. + * (2) It is "shallow", ignoring nesting and potentially cyclic + * mutual steals. (3) It is intentionally racy: field currentJoin + * is updated only while actively joining, which means that we + * miss links in the chain during long-lived tasks, GC stalls etc + * (which is OK since blocking in such cases is usually a good + * idea). (4) We bound the number of attempts to find work (see + * MAX_HELP_DEPTH) and fall back to suspending the worker and if + * necessary replacing it with a spare (see + * ForkJoinPool.tryAwaitJoin). + * + * Efficient implementation of these algorithms currently relies + * on an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. Since these combinations aren't - * supported using ordinary volatiles, the only way to accomplish - * these efficiently is to use direct Unsafe calls. (Using external - * AtomicIntegers and AtomicReferenceArrays for the indices and - * array is significantly slower because of memory locality and - * indirection effects.) Further, performance on most platforms is - * very sensitive to placement and sizing of the (resizable) queue - * array. Even though these queues don't usually become all that - * big, the initial size must be large enough to counteract cache + * volatile ordering. Variable sp does not require volatile + * writes but still needs store-ordering, which we accomplish by + * pre-incrementing sp before filling the slot with an ordered + * store. (Pre-incrementing also enables backouts used in + * joinTask.) Because they are protected by volatile base reads, + * reads of the queue array and its slots by other threads do not + * need volatile load semantics, but writes (in push) require + * store order and CASes (in pop and deq) require (volatile) CAS + * semantics. (Michael, Saraswat, and Vechev's algorithm has + * similar properties, but without support for nulling slots.) + * Since these combinations aren't supported using ordinary + * volatiles, the only way to accomplish these efficiently is to + * use direct Unsafe calls. (Using external AtomicIntegers and + * AtomicReferenceArrays for the indices and array is + * significantly slower because of memory locality and indirection + * effects.) + * + * Further, performance on most platforms is very sensitive to + * placement and sizing of the (resizable) queue array. Even + * though these queues don't usually become all that big, the + * initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. (See variable "scans".) For this to work, threads - * must be declared active when executing tasks, and before - * stealing a task. They must be inactive before blocking on the - * Pool Barrier (awaiting a new submission or other Pool - * event). In between, there is some free play which we take - * advantage of to avoid contention and rapid flickering of the - * global activeCount: If inactive, we activate only if a victim - * queue appears to be nonempty (see above). Similarly, a thread - * tries to inactivate only after a full scan of other threads. - * The net effect is that contention on activeCount is rarely a - * measurable performance issue. (There are also a few other cases - * where we scan for work rather than retry/block upon - * contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. + * queues are initialized after starting. All together, these + * low-level implementation choices produce as much as a factor of + * 4 performance improvement compared to naive implementations, + * and enable the processing of billions of tasks per second, + * sometimes at the expense of ugliness. + */ + + /** + * Generator for initial random seeds for random victim + * selection. This is used only to create initial seeds. Random + * steals use a cheaper xorshift generator per steal attempt. We + * expect only rare contention on seedGenerator, so just use a + * plain Random. + */ + private static final Random seedGenerator = new Random(); + + /** + * The timeout value for suspending spares. Spare workers that + * remain unsignalled for more than this time may be trimmed + * (killed and removed from pool). Since our goal is to avoid + * long-term thread buildup, the exact value of timeout does not + * matter too much so long as it avoids most false-alarm timeouts + * under GC stalls or momentarily high system load. + */ + private static final long SPARE_KEEPALIVE_NANOS = + 5L * 1000L * 1000L * 1000L; // 5 secs + + /** + * The maximum stolen->joining link depth allowed in helpJoinTask. + * Depths for legitimate chains are unbounded, but we use a fixed + * constant to avoid (otherwise unchecked) cycles and bound + * staleness of traversal parameters at the expense of sometimes + * blocking when we could be helping. */ + private static final int MAX_HELP_DEPTH = 8; /** * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 2, but is + * Must be a power of two. Initial size must be at least 4, but is * padded to minimize cache effects. */ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; @@ -149,65 +196,111 @@ public class ForkJoinWorkerThread extend /** * The work-stealing queue array. Size must be a power of two. - * Initialized when thread starts, to improve memory locality. + * Initialized in onStart, to improve memory locality. */ private ForkJoinTask[] queue; /** - * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. - */ - private volatile int sp; - - /** * Index (mod queue.length) of least valid queue slot, which is * always the next position to steal from if nonempty. */ private volatile int base; /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync. + * Index (mod queue.length) of next queue slot to push to or pop + * from. It is written only by owner thread, and accessed by other + * threads only after reading (volatile) base. Both sp and base + * are allowed to wrap around on overflow, but (sp - base) still + * estimates size. */ - private boolean active; + private int sp; /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. + * The index of most recent stealer, used as a hint to avoid + * traversal in method helpJoinTask. This is only a hint because a + * worker might have had multiple steals and this only holds one + * of them (usually the most current). Declared non-volatile, + * relying on other prevailing sync to keep reasonably current. + */ + private int stealHint; + + /** + * Run state of this worker. In addition to the usual run levels, + * tracks if this worker is suspended as a spare, and if it was + * killed (trimmed) while suspended. However, "active" status is + * maintained separately. */ private volatile int runState; + private static final int TERMINATING = 0x01; + private static final int TERMINATED = 0x02; + private static final int SUSPENDED = 0x04; // inactive spare + private static final int TRIMMED = 0x08; // killed while suspended + + /** + * Number of LockSupport.park calls to block this thread for + * suspension or event waits. Used for internal instrumention; + * currently not exported but included because volatile write upon + * park also provides a workaround for a JVM bug. + */ + volatile int parkCount; + + /** + * Number of steals, transferred and reset in pool callbacks pool + * when idle Accessed directly by pool. + */ + int stealCount; + /** * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be nonzero upon initialization. + * Uses Marsaglia xorshift. Must be initialized as nonzero. */ private int seed; /** - * Number of steals, transferred to pool when idle + * Activity status. When true, this worker is considered active. + * Accessed directly by pool. Must be false upon construction. + */ + boolean active; + + /** + * True if use local fifo, not default lifo, for local polling. + * Shadows value from ForkJoinPool. */ - private int stealCount; + private final boolean locallyFifo; /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc. + * running, and accessed directly by pool to locate this worker in + * its workers array. */ int poolIndex; /** - * The last barrier event waited for. Accessed in pool callback - * methods, but only by current thread. + * The last pool event waited for. Accessed only by pool in + * callback methods invoked within this thread. */ - long lastEventCount; + int lastEventCount; /** - * True if use local fifo, not default lifo, for local polling + * Encoded index and event count of next event waiter. Used only + * by ForkJoinPool for managing event waiters. */ - private boolean locallyFifo; + volatile long nextWaiter; + + /** + * The task currently being joined, set only when actively trying + * to helpStealer. Written only by current thread, but read by + * others. + */ + private volatile ForkJoinTask currentJoin; + + /** + * The task most recently stolen from another worker (or + * submission queue). Not volatile because always read/written in + * presence of related volatiles in those cases where it matters. + */ + private ForkJoinTask currentSteal; /** * Creates a ForkJoinWorkerThread operating in the given pool. @@ -216,13 +309,24 @@ public class ForkJoinWorkerThread extend * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { - if (pool == null) throw new NullPointerException(); this.pool = pool; - // Note: poolIndex is set by pool during construction - // Remaining initialization is deferred to onStart + this.locallyFifo = pool.locallyFifo; + // To avoid exposing construction details to subclasses, + // remaining initialization is in start() and onStart() + } + + /** + * Performs additional initialization and starts this thread + */ + final void start(int poolIndex, UncaughtExceptionHandler ueh) { + this.poolIndex = poolIndex; + setDaemon(true); + if (ueh != null) + setUncaughtExceptionHandler(ueh); + start(); } - // Public access methods + // Public/protected methods /** * Returns the pool hosting this thread. @@ -247,81 +351,48 @@ public class ForkJoinWorkerThread extend } /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. - * - * @param async if true, use locally FIFO scheduling + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke super.onStart() at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. */ - void setAsyncMode(boolean async) { - locallyFifo = async; - } - - // Runstate management - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; + protected void onStart() { + int rs = seedGenerator.nextInt(); + seed = rs == 0? 1 : rs; // seed must be nonzero - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } + // Allocate name string and arrays in this thread + String pid = Integer.toString(pool.getPoolNumber()); + String wid = Integer.toString(poolIndex); + setName("ForkJoinPool-" + pid + "-worker-" + wid); - /** - * Transitions to at least the given state. - * - * @return {@code true} if not already at least at given state - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } - - /** - * Tries to set status to active; fails on contention. - */ - private boolean tryActivate() { - if (!active) { - if (!pool.tryIncrementActiveCount()) - return false; - active = true; - } - return true; + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; } /** - * Tries to set status to inactive; fails on contention. + * Performs cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * {@code super.onTermination} at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or {@code null} if completed normally */ - private boolean tryInactivate() { - if (active) { - if (!pool.tryDecrementActiveCount()) - return false; - active = false; + protected void onTermination(Throwable exception) { + try { + cancelTasks(); + setTerminated(); + pool.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + UNSAFE.throwException(exception); } - return true; - } - - /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - */ - private static int xorShift(int r) { - r ^= (r << 13); - r ^= (r >>> 17); - return r ^ (r << 5); } - // Lifecycle methods - /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute @@ -331,7 +402,6 @@ public class ForkJoinWorkerThread extend Throwable exception = null; try { onStart(); - pool.sync(this); // await first pool event mainLoop(); } catch (Throwable ex) { exception = ex; @@ -340,134 +410,134 @@ public class ForkJoinWorkerThread extend } } + // helpers for run() + /** - * Executes tasks until shut down. + * Find and execute tasks and check status while running */ private void mainLoop() { - while (!isShutdown()) { - ForkJoinTask t = pollTask(); - if (t != null || (t = pollSubmission()) != null) - t.quietlyExec(); - else if (tryInactivate()) - pool.sync(this); + int emptyScans = 0; // consecutive times failed to find work + ForkJoinPool p = pool; + for (;;) { + p.preStep(this, emptyScans); + if (runState != 0) + return; + ForkJoinTask t; // try to get and run stolen or submitted task + if ((t = scan()) != null || (t = pollSubmission()) != null) { + t.tryExec(); + if (base != sp) + runLocalTasks(); + currentSteal = null; + emptyScans = 0; + } + else + ++emptyScans; } } /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. + * Runs local tasks until queue is empty or shut down. Call only + * while active. */ - protected void onStart() { - // Allocate while starting to improve chances of thread-local - // isolation - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - // Initial value of seed need not be especially random but - // should differ across workers and must be nonzero - int p = poolIndex + 1; - seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + private void runLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); + if (t != null) + t.tryExec(); + else if (base == sp) + break; + } } /** - * Performs cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * {@code super.onTermination} at the end of the overridden method. + * If a submission exists, try to activate and take it * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or {@code null} if completed normally + * @return a task, if available */ - protected void onTermination(Throwable exception) { - // Execute remaining local tasks unless aborting or terminating - while (exception == null && !pool.isTerminating() && base != sp) { - try { - ForkJoinTask t = popTask(); - if (t != null) - t.quietlyExec(); - } catch (Throwable ex) { - exception = ex; + private ForkJoinTask pollSubmission() { + ForkJoinPool p = pool; + while (p.hasQueuedSubmissions()) { + if (active || (active = p.tryIncrementActiveCount())) { + ForkJoinTask t = p.pollSubmission(); + if (t != null) { + currentSteal = t; + return t; + } + return scan(); // if missed, rescan } } - // Cancel other tasks, transition status, notify pool, and - // propagate exception to uncaught exception handler - try { - do {} while (!tryInactivate()); // ensure inactive - cancelTasks(); - runState = TERMINATED; - pool.workerTerminated(this); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } + return null; } - // Intrinsics-based support for queue operations. - - /** - * Adds in store-order the given task at given slot of q to null. - * Caller must ensure q is non-null and index is in range. + /* + * Intrinsics-based atomic writes for queue slots. These are + * basically the same as methods in AtomicObjectArray, but + * specialized for (1) ForkJoinTask elements (2) requirement that + * nullness and bounds checks have already been performed by + * callers and (3) effective offsets are known not to overflow + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't + * need corresponding version for reads: plain array reads are OK + * because they protected by other volatile reads and are + * confirmed by CASes. + * + * Most uses don't actually call these methods, but instead contain + * inlined forms that enable more predictable optimization. We + * don't define the version of write used in pushTask at all, but + * instead inline there a store-fenced array slot write. */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t) { - UNSAFE.putOrderedObject(q, (i << qShift) + qBase, t); - } /** - * CAS given slot of q to null. Caller must ensure q is non-null - * and index is in range. + * CASes slot i of array q from t to null. Caller must ensure q is + * non-null and index is in range. */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { + private static final boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** - * Sets sp in store-order. + * Performs a volatile write of the given task at given slot of + * array q. Caller must ensure q is non-null and index is in + * range. This method is used only during resets and backouts. */ - private void storeSp(int s) { - UNSAFE.putOrderedInt(this, spOffset, s); + private static final void writeSlot(ForkJoinTask[] q, int i, + ForkJoinTask t) { + UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } - // Main queue methods + // queue methods /** - * Pushes a task. Called only by current thread. + * Pushes a task. Call only from this thread. * * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - setSlot(q, s & mask, t); - storeSp(++s); - if ((s -= base) == 1) - pool.signalWork(); - else if (s >= mask) - growQueue(); + int mask = q.length - 1; // implicit assert q != null + int s = sp++; // ok to increment sp before slot write + UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); + if ((s -= base) == 0) + pool.signalWork(); // was empty + else if (s == mask) + growQueue(); // is full } /** * Tries to take a task from the base of the queue, failing if - * either empty or contended. + * empty or contended. Note: Specializations of this code appear + * in locallyDeqTask and elsewhere. * * @return a task, or null if none or contended */ final ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; - int i; - int b; - if (sp != (b = base) && + int b, i; + if ((b = base) != sp && (q = queue) != null && // must read q after b - (t = q[i = (q.length - 1) & b]) != null && - casSlotNull(q, i, t)) { + (t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { base = b + 1; return t; } @@ -475,19 +545,20 @@ public class ForkJoinWorkerThread extend } /** - * Tries to take a task from the base of own queue, activating if - * necessary, failing only if empty. Called only by current thread. + * Tries to take a task from the base of own queue. Assumes active + * status. Called only by current thread. * * @return a task, or null if none */ final ForkJoinTask locallyDeqTask() { - int b; - while (sp != (b = base)) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int i = (q.length - 1) & b; - ForkJoinTask t = q[i]; - if (t != null && casSlotNull(q, i, t)) { + ForkJoinTask[] q = queue; + if (q != null) { + ForkJoinTask t; + int b, i; + while (sp != (b = base)) { + if ((t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, + t, null)) { base = b + 1; return t; } @@ -497,20 +568,18 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Ensures active status - * if non-null. Called only by current thread. + * Returns a popped task, or null if empty. Assumes active status. + * Called only by current thread. */ final ForkJoinTask popTask() { - int s = sp; - while (s != base) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int i = (s - 1) & mask; - ForkJoinTask t = q[i]; - if (t == null || !casSlotNull(q, i, t)) - break; - storeSp(s - 1); + int s; + ForkJoinTask[] q; + if (base != (s = sp) && (q = queue) != null) { + int i = (q.length - 1) & --s; + ForkJoinTask t = q[i]; + if (t != null && UNSAFE.compareAndSwapObject + (q, (i << qShift) + qBase, t, null)) { + sp = s; return t; } } @@ -518,18 +587,19 @@ public class ForkJoinWorkerThread extend } /** - * Specialized version of popTask to pop only if - * topmost element is the given task. Called only - * by current thread while active. + * Specialized version of popTask to pop only if topmost element + * is the given task. Called only by current thread while + * active. * * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (casSlotNull(q, s & mask, t)) { - storeSp(s); + int s; + ForkJoinTask[] q; + if (base != (s = sp) && (q = queue) != null && + UNSAFE.compareAndSwapObject + (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { + sp = s; return true; } return false; @@ -569,104 +639,232 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; - setSlot(newQ, b & newMask, t); + writeSlot(newQ, b & newMask, t); } while (++b != bf); pool.signalWork(); } /** + * Computes next value for random victim probe in scan(). Scans + * don't require a very high quality generator, but also not a + * crummy one. Marsaglia xor-shift is cheap and works well enough. + * Note: This is manually inlined in scan() + */ + private static final int xorShift(int r) { + r ^= r << 13; + r ^= r >>> 17; + return r ^ (r << 5); + } + + /** * Tries to steal a task from another worker. Starts at a random * index of workers array, and probes workers until finding one * with non-empty queue or finding that all are empty. It * randomly selects the first n probes. If these are empty, it - * resorts to a full circular traversal, which is necessary to - * accurately set active status by caller. Also restarts if pool - * events occurred since last scan, which forces refresh of - * workers array, in case barrier was associated with resize. + * resorts to a circular sweep, which is necessary to accurately + * set active status. (The circular sweep uses steps of + * approximately half the array size plus 1, to avoid bias + * stemming from leftmost packing of the array in ForkJoinPool.) * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. + * those needed to check for and take tasks (or to activate if not + * already active). This accounts for, among other things, + * updating random seed in place without storing it until exit. * * @return a task, or null if none found */ private ForkJoinTask scan() { - ForkJoinTask t = null; - int r = seed; // extract once to keep scan quiet - ForkJoinWorkerThread[] ws; // refreshed on outer loop - int mask; // must be power 2 minus 1 and > 0 - outer:do { - if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { - int idx = r; - int probes = ~mask; // use random index while negative - for (;;) { - r = xorShift(r); // update random seed - ForkJoinWorkerThread v = ws[mask & idx]; - if (v == null || v.sp == v.base) { - if (probes <= mask) - idx = (probes++ < 0) ? r : (idx + 1); - else - break; + ForkJoinPool p = pool; + ForkJoinWorkerThread[] ws; // worker array + int n; // upper bound of #workers + if ((ws = p.workers) != null && (n = ws.length) > 1) { + boolean canSteal = active; // shadow active status + int r = seed; // extract seed once + int mask = n - 1; + int j = -n; // loop counter + int k = r; // worker index, random if j < 0 + for (;;) { + ForkJoinWorkerThread v = ws[k & mask]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift + if (v != null && v.base != v.sp) { + if (canSteal || // ensure active status + (canSteal = active = p.tryIncrementActiveCount())) { + int b = v.base; // inline specialized deqTask + ForkJoinTask[] q; + if (b != v.sp && (q = v.queue) != null) { + ForkJoinTask t; + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + if ((t = q[i]) != null && v.base == b && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + currentSteal = t; + v.stealHint = poolIndex; + v.base = b + 1; + seed = r; + ++stealCount; + return t; + } + } } - else if (!tryActivate() || (t = v.deqTask()) == null) - continue outer; // restart on contention - else - break outer; + j = -n; + k = r; // restart on contention } + else if (++j <= 0) + k = r; + else if (j <= n) + k += (n >>> 1) | 1; + else + break; } - } while (pool.hasNewSyncEvent(this)); // retry on pool events - seed = r; - return t; + } + return null; } + // Run State management + + // status check methods used mainly by ForkJoinPool + final boolean isTerminating() { return (runState & TERMINATING) != 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + /** - * Gets and removes a local or stolen task. - * - * @return a task, if available + * Sets state to TERMINATING, also resuming if suspended. */ - final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); - if (t == null && (t = scan()) != null) - ++stealCount; - return t; + final void shutdown() { + for (;;) { + int s = runState; + if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) { + LockSupport.unpark(this); + break; + } + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | TERMINATING)) + break; + } + } + + /** + * Sets state to TERMINATED. Called only by this thread. + */ + private void setTerminated() { + int s; + do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, + s = runState, + s | (TERMINATING|TERMINATED))); + } + + /** + * Instrumented version of park used by ForkJoinPool.eventSync + */ + final void doPark() { + ++parkCount; + LockSupport.park(this); } /** - * Gets a local task. + * If suspended, tries to set status to unsuspended and unparks. * - * @return a task, if available + * @return true if successful */ - final ForkJoinTask pollLocalTask() { - return locallyFifo ? locallyDeqTask() : popTask(); + final boolean tryResumeSpare() { + int s = runState; + if ((s & SUSPENDED) != 0 && + UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)) { + LockSupport.unpark(this); + return true; + } + return false; } /** - * Returns a pool submission, if one exists, activating first. + * Sets suspended status and blocks as spare until resumed, + * shutdown, or timed out. * - * @return a submission, if available + * @return false if trimmed */ - private ForkJoinTask pollSubmission() { - ForkJoinPool p = pool; - while (p.hasQueuedSubmissions()) { - ForkJoinTask t; - if (tryActivate() && (t = p.pollSubmission()) != null) - return t; + final boolean suspendAsSpare() { + for (;;) { // set suspended unless terminating + int s = runState; + if ((s & TERMINATING) != 0) { // must kill + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | (TRIMMED | TERMINATING))) + return false; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | SUSPENDED)) + break; } - return null; + int pc = pool.parallelism; + pool.accumulateStealCount(this); + boolean timed; + long nanos; + long startTime; + if (poolIndex < pc) { // untimed wait for core threads + timed = false; + nanos = 0L; + startTime = 0L; + } + else { // timed wait for added threads + timed = true; + nanos = SPARE_KEEPALIVE_NANOS; + startTime = System.nanoTime(); + } + lastEventCount = 0; // reset upon resume + interrupted(); // clear/ignore interrupts + while ((runState & SUSPENDED) != 0) { + ++parkCount; + if (!timed) + LockSupport.park(this); + else if ((nanos -= (System.nanoTime() - startTime)) > 0) + LockSupport.parkNanos(this, nanos); + else { // try to trim on timeout + int s = runState; + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) + return false; + } + } + return true; } - // Methods accessed only by Pool + // Misc support methods for ForkJoinPool + + /** + * Returns an estimate of the number of tasks in the queue. Also + * used by ForkJoinTask. + */ + final int getQueueSize() { + return -base + sp; + } /** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) - t.cancelIgnoringExceptions(); + ForkJoinTask cj = currentJoin; // try to kill live tasks + if (cj != null) { + currentJoin = null; + cj.cancelIgnoringExceptions(); + } + ForkJoinTask cs = currentSteal; + if (cs != null) { + currentSteal = null; + cs.cancelIgnoringExceptions(); + } + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) + t.cancelIgnoringExceptions(); + } } /** @@ -676,73 +874,238 @@ public class ForkJoinWorkerThread extend */ final int drainTasksTo(Collection> c) { int n = 0; - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) { - c.add(t); - ++n; + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) { + c.add(t); + ++n; + } } return n; } + // Support methods for ForkJoinTask + /** - * Gets and clears steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). + * Gets and removes a local task. + * + * @return a task, if available */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; + final ForkJoinTask pollLocalTask() { + while (sp != base) { + if (active || (active = pool.tryIncrementActiveCount())) + return locallyFifo? locallyDeqTask() : popTask(); + } + return null; } /** - * Returns {@code true} if at least one worker in the given array - * appears to have at least one queued task. + * Gets and removes a local or stolen task. * - * @param ws array of workers + * @return a task, if available */ - static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { - if (ws != null) { - int len = ws.length; - for (int j = 0; j < 2; ++j) { // need two passes for clean sweep - for (int i = 0; i < len; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.sp != w.base) - return true; - } - } + final ForkJoinTask pollTask() { + ForkJoinTask t = pollLocalTask(); + if (t == null) { + t = scan(); + currentSteal = null; // cannot retain/track } - return false; + return t; } - // Support methods for ForkJoinTask + /** + * Possibly runs some tasks and/or blocks, until task is done. + * The main body is basically a big spinloop, alternating between + * calls to helpJoinTask and pool.tryAwaitJoin with increased + * patience parameters until either the task is done without + * waiting, or we have, if necessary, created or resumed a + * replacement for this thread while it blocks. + * + * @param joinMe the task to join + * @return task status on exit + */ + final int joinTask(ForkJoinTask joinMe) { + int stat; + ForkJoinTask prevJoin = currentJoin; + // Only written by this thread; only need ordered store + UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); + if ((stat = joinMe.status) >= 0 && + (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) { + for (int retries = 0; ; ++retries) { + helpJoinTask(joinMe, retries); + if ((stat = joinMe.status) < 0) + break; + pool.tryAwaitJoin(joinMe, retries); + if ((stat = joinMe.status) < 0) + break; + Thread.yield(); // tame unbounded loop + } + } + UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); + return stat; + } /** - * Returns an estimate of the number of tasks in the queue. + * Run tasks in local queue until given task is done. + * + * @param joinMe the task to join + * @return task status on exit */ - final int getQueueSize() { - // suppress momentarily negative values - return Math.max(0, sp - base); + private int localHelpJoinTask(ForkJoinTask joinMe) { + int stat, s; + ForkJoinTask[] q; + while ((stat = joinMe.status) >= 0 && + base != (s = sp) && (q = queue) != null) { + ForkJoinTask t; + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + if ((t = q[i]) != null && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + /* + * This recheck (and similarly in helpJoinTask) + * handles cases where joinMe is independently + * cancelled or forced even though there is other work + * available. Back out of the pop by putting t back + * into slot before we commit by writing sp. + */ + if ((stat = joinMe.status) < 0) { + UNSAFE.putObjectVolatile(q, u, t); + break; + } + sp = s; + t.tryExec(); + } + } + return stat; } /** - * Returns an estimate of the number of tasks, offset by a - * function of number of idle workers. - */ - final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); + * Tries to locate and help perform tasks for a stealer of the + * given task, or in turn one of its stealers. Traces + * currentSteal->currentJoin links looking for a thread working on + * a descendant of the given task and with a non-empty queue to + * steal back and execute tasks from. Restarts search upon + * encountering chains that are stale, unknown, or of length + * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles. + * + * The implementation is very branchy to cope with the restart + * cases. Returns void, not task status (which must be reread by + * caller anyway) to slightly simplify control paths. + * + * @param joinMe the task to join + * @param rescans the number of times to recheck for work + */ + private void helpJoinTask(ForkJoinTask joinMe, int rescans) { + ForkJoinWorkerThread[] ws = pool.workers; + int n; + if (ws == null || (n = ws.length) <= 1) + return; // need at least 2 workers + restart:while (rescans-- >= 0 && joinMe.status >= 0) { + ForkJoinTask task = joinMe; // base of chain + ForkJoinWorkerThread thread = this; // thread with stolen task + for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) { + // Try to find v, the stealer of task, by first using hint + ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; + if (v == null || v.currentSteal != task) { + for (int j = 0; ; ++j) { // search array + if (task.status < 0 || j == n) + continue restart; // stale or no stealer + if ((v = ws[j]) != null && v.currentSteal == task) { + thread.stealHint = j; // save for next time + break; + } + } + } + // Try to help v, using specialized form of deqTask + int b; + ForkJoinTask[] q; + while ((b = v.base) != v.sp && (q = v.queue) != null) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; + ForkJoinTask t = q[i]; + if (task.status < 0) // stale + continue restart; + if (t != null) { + if (v.base == b && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + return; // back out on cancel + } + ForkJoinTask prevSteal = currentSteal; + currentSteal = t; + v.stealHint = poolIndex; + v.base = b + 1; + t.tryExec(); + currentSteal = prevSteal; + } + } + else if (v.base == b) // producer stalled + continue restart; // retry via restart + if (joinMe.status < 0) + return; + } + // Try to descend to find v's stealer + ForkJoinTask next = v.currentJoin; + if (next == null || next == task || task.status < 0) + continue restart; // no descendent or stale + if (joinMe.status < 0) + return; + task = next; + thread = v; + } + } } /** - * Scans, returning early if joinMe done. + * Returns an estimate of the number of tasks, offset by a + * function of number of idle workers. + * + * This method provides a cheap heuristic guide for task + * partitioning when programmers, frameworks, tools, or languages + * have little or no idea about task granularity. In essence by + * offering this method, we ask users only about tradeoffs in + * overhead vs expected throughput and its variance, rather than + * how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, each + * thread makes available for stealing enough tasks for other + * threads to remain active. Inductively, if all threads play by + * the same rules, each thread should make available only a + * constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of 1 + * would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should minimize + * steal rates, which in general means that threads nearer the top + * of computation tree should generate more than those nearer the + * bottom. In perfect steady state, each thread is at + * approximately the same level of computation tree. However, + * producing extra tasks amortizes the uncertainty of progress and + * diffusion assumptions. + * + * So, users will want to use values larger, but not much larger + * than 1 to both smooth over transient shortages and hedge + * against uneven progress; as traded off against the cost of + * extra task overhead. We leave the user to pick a threshold + * value to compare with the results of this call to guide + * decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to estimate + * surplus strictly locally. In steady-state, if one thread is + * maintaining say 2 surplus tasks, then so are others. So we can + * just use estimated queue length (although note that (sp - base) + * can be an overestimate because of stealers lagging increments + * of base). However, this strategy alone leads to serious + * mis-estimates in some non-steady-state conditions (ramp-up, + * ramp-down, other stalls). We can detect many of these by + * further considering the number of "idle" threads, that are + * known to have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = pollTask(); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; - } - return t; + final int getEstimatedSurplusTaskCount() { + return sp - base - pool.idlePerActive(); } /** @@ -750,27 +1113,38 @@ public class ForkJoinWorkerThread extend */ final void helpQuiescePool() { for (;;) { - ForkJoinTask t = pollTask(); - if (t != null) - t.quietlyExec(); - else if (tryInactivate() && pool.isQuiescent()) - break; + ForkJoinTask t = pollLocalTask(); + if (t != null || (t = scan()) != null) { + t.tryExec(); + currentSteal = null; + } + else { + ForkJoinPool p = pool; + if (active) { + active = false; // inactivate + do {} while (!p.tryDecrementActiveCount()); + } + if (p.isQuiescent()) { + active = true; // re-activate + do {} while (!p.tryIncrementActiveCount()); + return; + } + } } - do {} while (!tryActivate()); // re-activate on exit } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = getUnsafe(); - private static final long spOffset = - objectFieldOffset("sp", ForkJoinWorkerThread.class); private static final long runStateOffset = objectFieldOffset("runState", ForkJoinWorkerThread.class); - private static final long qBase; + private static final long currentJoinOffset = + objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); + private static final long qBase = + UNSAFE.arrayBaseOffset(ForkJoinTask[].class); private static final int qShift; static { - qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two");