--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/16 15:32:34 1.7 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/07 19:52:32 1.35 @@ -5,125 +5,165 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Random; +import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * methods surrounding the main task processing loop. If you do - * create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. + * are no overridable methods dealing with scheduling or execution. + * However, you can override initialization and termination methods + * surrounding the main task processing loop. If you do create such a + * subclass, you will also need to supply a custom {@link + * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code + * ForkJoinPool}. * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* - * Algorithm overview: + * Overview: + * + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform + * ForkJoinTasks. This class includes bookkeeping in support of + * worker activation, suspension, and lifecycle control described + * in more detail in the internal documentation of class + * ForkJoinPool. And as described further below, this class also + * includes special-cased support for some ForkJoinTask + * methods. But the main mechanics involve work-stealing: * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a nonnull - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probablistic non-blockingness. If - * an attempted steal fails, a thief always chooses a different + * Work-stealing queues are special forms of Deques that support + * only three of the four possible end-operations -- push, pop, + * and deq (aka steal), under the further constraints that push + * and pop are called only from the owning thread, while deq may + * be called from other threads. (If you are unfamiliar with + * them, you probably want to read Herlihy and Shavit's book "The + * Art of Multiprocessor programming", chapter 16 describing these + * in more detail before proceeding.) The main work-stealing + * queue design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from gc requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs deq (steal) from being on the indices + * ("base" and "sp") to the slots themselves (mainly via method + * "casSlotNull()"). So, both a successful pop and deq mainly + * entail a CAS of a slot from non-null to null. Because we rely + * on CASes of references, we do not need tag bits on base or sp. + * They are simple ints as used in any circular array-based queue + * (see for example ArrayDeque). Updates to the indices must + * still be ordered in a way that guarantees that sp == base means + * the queue is empty, but otherwise may err on the side of + * possibly making the queue appear nonempty when a push, pop, or + * deq have not fully committed. Note that this means that the deq + * operation, considered individually, is not wait-free. One thief + * cannot successfully continue until another in-progress one (or, + * if previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on * any empty queue to complete. One reason this works well here is * that apparently-nonempty often means soon-to-be-stealable, - * which gives threads a chance to activate if necessary before - * stealing (see below). + * which gives threads a chance to set activation status if + * necessary before stealing. + * + * This approach also enables support for "async mode" where local + * task processing is in FIFO, not LIFO order; simply by using a + * version of deq rather than pop when locallyFifo is true (as set + * by the ForkJoinPool). This allows use in message-passing + * frameworks in which tasks are never joined. + * + * When a worker would otherwise be blocked waiting to join a + * task, it first tries a form of linear helping: Each worker + * records (in field stolen) the most recent task it stole + * from some other worker. Plus, it records (in field joining) the + * task it is currently actively joining. Method joinTask uses + * these markers to try to find a worker to help (i.e., steal back + * a task from and execute it) that could hasten completion of the + * actively joined task. In essence, the joiner executes a task + * that would be on its own local deque had the to-be-joined task + * not been stolen. This may be seen as a conservative variant of + * the approach in Wagner & Calder "Leapfrogging: a portable + * technique for implementing efficient futures" SIGPLAN Notices, + * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs + * in that: (1) We only maintain dependency links across workers + * upon steals, rather than maintain per-task bookkeeping. This + * requires a linear scan of workers array to locate stealers, + * which isolates cost to when it is needed, rather than adding to + * per-task overhead. (2) It is "shallow", ignoring nesting and + * potentially cyclic mutual steals. (3) It is intentionally + * racy: field joining is updated only while actively joining, + * which means that we could miss links in the chain during + * long-lived tasks, GC stalls etc. (4) We fall back to + * suspending the worker and if necessary replacing it with a + * spare (see ForkJoinPool.tryAwaitJoin). * - * Efficient implementation of this approach currently relies on + * Efficient implementation of these algorithms currently relies on * an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. Since these combinations aren't - * supported using ordinary volatiles, the only way to accomplish - * these effciently is to use direct Unsafe calls. (Using external + * volatile ordering. Variable sp does not require volatile + * writes but still needs store-ordering, which we accomplish by + * pre-incrementing sp before filling the slot with an ordered + * store. (Pre-incrementing also enables backouts used in + * scanWhileJoining.) Because they are protected by volatile base + * reads, reads of the queue array and its slots by other threads + * do not need volatile load semantics, but writes (in push) + * require store order and CASes (in pop and deq) require + * (volatile) CAS semantics. (Michael, Saraswat, and Vechev's + * algorithm has similar properties, but without support for + * nulling slots.) Since these combinations aren't supported + * using ordinary volatiles, the only way to accomplish these + * efficiently is to use direct Unsafe calls. (Using external * AtomicIntegers and AtomicReferenceArrays for the indices and * array is significantly slower because of memory locality and - * indirection effects.) Further, performance on most platforms is - * very sensitive to placement and sizing of the (resizable) queue - * array. Even though these queues don't usually become all that - * big, the initial size must be large enough to counteract cache + * indirection effects.) + * + * Further, performance on most platforms is very sensitive to + * placement and sizing of the (resizable) queue array. Even + * though these queues don't usually become all that big, the + * initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. (See variable "scans".) For this to work, threads - * must be declared active when executing tasks, and before - * stealing a task. They must be inactive before blocking on the - * Pool Barrier (awaiting a new submission or other Pool - * event). In between, there is some free play which we take - * advantage of to avoid contention and rapid flickering of the - * global activeCount: If inactive, we activate only if a victim - * queue appears to be nonempty (see above). Similarly, a thread - * tries to inactivate only after a full scan of other threads. - * The net effect is that contention on activeCount is rarely a - * measurable performance issue. (There are also a few other cases - * where we scan for work rather than retry/block upon - * contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. + * queues are initialized after starting. All together, these + * low-level implementation choices produce as much as a factor of + * 4 performance improvement compared to naive implementations, + * and enable the processing of billions of tasks per second, + * sometimes at the expense of ugliness. + */ + + /** + * Generator for initial random seeds for random victim + * selection. This is used only to create initial seeds. Random + * steals use a cheaper xorshift generator per steal attempt. We + * expect only rare contention on seedGenerator, so just use a + * plain Random. */ + private static final Random seedGenerator = new Random(); + + /** + * The timeout value for suspending spares. Spare workers that + * remain unsignalled for more than this time may be trimmed + * (killed and removed from pool). Since our goal is to avoid + * long-term thread buildup, the exact value of timeout does not + * matter too much so long as it avoids most false-alarm timeouts + * under GC stalls or momentarily high system load. + */ + private static final long SPARE_KEEPALIVE_NANOS = + 5L * 1000L * 1000L * 1000L; // 5 secs /** * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 2, but is + * Must be a power of two. Initial size must be at least 4, but is * padded to minimize cache effects. */ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; @@ -137,23 +177,26 @@ public class ForkJoinWorkerThread extend private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; /** - * The pool this thread works in. Accessed directly by ForkJoinTask + * The pool this thread works in. Accessed directly by ForkJoinTask. */ final ForkJoinPool pool; /** - * The work-stealing queue array. Size must be a power of two. - * Initialized when thread starts, to improve memory locality. + * The task most recently stolen from another worker */ - private ForkJoinTask[] queue; + private volatile ForkJoinTask stolen; /** - * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. + * The task currently being joined, set only when actively + * trying to helpStealer. */ - private volatile int sp; + private volatile ForkJoinTask joining; + + /** + * The work-stealing queue array. Size must be a power of two. + * Initialized in onStart, to improve memory locality. + */ + private ForkJoinTask[] queue; /** * Index (mod queue.length) of least valid queue slot, which is @@ -162,63 +205,108 @@ public class ForkJoinWorkerThread extend private volatile int base; /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync - */ - private boolean active; + * Index (mod queue.length) of next queue slot to push to or pop + * from. It is written only by owner thread, and accessed by other + * threads only after reading (volatile) base. Both sp and base + * are allowed to wrap around on overflow, but (sp - base) still + * estimates size. + */ + private int sp; /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. + * Run state of this worker. In addition to the usual run levels, + * tracks if this worker is suspended as a spare, and if it was + * killed (trimmed) while suspended. However, "active" status is + * maintained separately. */ private volatile int runState; + private static final int TERMINATING = 0x01; + private static final int TERMINATED = 0x02; + private static final int SUSPENDED = 0x04; // inactive spare + private static final int TRIMMED = 0x08; // killed while suspended + + /** + * Number of LockSupport.park calls to block this thread for + * suspension or event waits. Used for internal instrumention; + * currently not exported but included because volatile write upon + * park also provides a workaround for a JVM bug. + */ + volatile int parkCount; + + /** + * Number of steals, transferred and reset in pool callbacks pool + * when idle Accessed directly by pool. + */ + int stealCount; + /** * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be nonzero upon initialization. + * Uses Marsaglia xorshift. Must be initialized as nonzero. */ private int seed; /** - * Number of steals, transferred to pool when idle + * Activity status. When true, this worker is considered active. + * Accessed directly by pool. Must be false upon construction. */ - private int stealCount; + boolean active; /** + * True if use local fifo, not default lifo, for local polling. + * Shadows value from ForkJoinPool, which resets it if changed + * pool-wide. + */ + private final boolean locallyFifo; + + /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * running, and accessed directly by pool to locate this worker in + * its workers array. */ int poolIndex; /** - * The last barrier event waited for. Accessed in pool callback - * methods, but only by current thread. + * The last pool event waited for. Accessed only by pool in + * callback methods invoked within this thread. */ - long lastEventCount; + int lastEventCount; /** - * True if use local fifo, not default lifo, for local polling + * Encoded index and event count of next event waiter. Used only + * by ForkJoinPool for managing event waiters. */ - private boolean locallyFifo; + volatile long nextWaiter; /** * Creates a ForkJoinWorkerThread operating in the given pool. + * * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { - if (pool == null) throw new NullPointerException(); this.pool = pool; - // Note: poolIndex is set by pool during construction - // Remaining initialization is deferred to onStart + this.locallyFifo = pool.locallyFifo; + // To avoid exposing construction details to subclasses, + // remaining initialization is in start() and onStart() + } + + /** + * Performs additional initialization and starts this thread + */ + final void start(int poolIndex, UncaughtExceptionHandler ueh) { + this.poolIndex = poolIndex; + if (ueh != null) + setUncaughtExceptionHandler(ueh); + setDaemon(true); + start(); } - // Public access methods + // Public/protected methods /** - * Returns the pool hosting this thread + * Returns the pool hosting this thread. + * * @return the pool */ public ForkJoinPool getPool() { @@ -231,87 +319,58 @@ public class ForkJoinWorkerThread extend * threads (minus one) that have ever been created in the pool. * This method may be useful for applications that track status or * collect results per-worker rather than per-task. - * @return the index number. + * + * @return the index number */ public int getPoolIndex() { return poolIndex; } /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. - * @param async if true, use locally FIFO scheduling + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke super.onStart() at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. */ - void setAsyncMode(boolean async) { - locallyFifo = async; - } - - // Runstate management - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } + protected void onStart() { + int rs = seedGenerator.nextInt(); + seed = rs == 0? 1 : rs; // seed must be nonzero - /** - * Transition to at least the given state. Return true if not - * already at least given state. - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } + // Allocate name string and arrays in this thread + String pid = Integer.toString(pool.getPoolNumber()); + String wid = Integer.toString(poolIndex); + setName("ForkJoinPool-" + pid + "-worker-" + wid); - /** - * Try to set status to active; fail on contention - */ - private boolean tryActivate() { - if (!active) { - if (!pool.tryIncrementActiveCount()) - return false; - active = true; - } - return true; + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; } /** - * Try to set status to active; fail on contention + * Performs cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * {@code super.onTermination} at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or {@code null} if completed normally */ - private boolean tryInactivate() { - if (active) { - if (!pool.tryDecrementActiveCount()) - return false; - active = false; + protected void onTermination(Throwable exception) { + try { + stolen = null; + joining = null; + cancelTasks(); + setTerminated(); + pool.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + UNSAFE.throwException(exception); } - return true; - } - - /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - */ - private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; } - // Lifecycle methods - /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute @@ -321,7 +380,6 @@ public class ForkJoinWorkerThread extend Throwable exception = null; try { onStart(); - pool.sync(this); // await first pool event mainLoop(); } catch (Throwable ex) { exception = ex; @@ -330,132 +388,133 @@ public class ForkJoinWorkerThread extend } } + // helpers for run() + /** - * Execute tasks until shut down. + * Find and execute tasks and check status while running */ private void mainLoop() { - while (!isShutdown()) { - ForkJoinTask t = pollTask(); - if (t != null || (t = pollSubmission()) != null) - t.quietlyExec(); - else if (tryInactivate()) - pool.sync(this); + boolean ran = false; // true if ran task in last loop iter + boolean prevRan = false; // true if ran on last or previous step + ForkJoinPool p = pool; + for (;;) { + p.preStep(this, prevRan); + if (runState != 0) + return; + ForkJoinTask t; // try to get and run stolen or submitted task + if ((t = scan()) != null || (t = pollSubmission()) != null) { + t.tryExec(); + if (base != sp) + runLocalTasks(); + stolen = null; + prevRan = ran = true; + } + else { + prevRan = ran; + ran = false; + } } } /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. + * Runs local tasks until queue is empty or shut down. Call only + * while active. */ - protected void onStart() { - // Allocate while starting to improve chances of thread-local - // isolation - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - // Initial value of seed need not be especially random but - // should differ across workers and must be nonzero - int p = poolIndex + 1; - seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + private void runLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); + if (t != null) + t.tryExec(); + else if (base == sp) + break; + } } /** - * Perform cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. + * If a submission exists, try to activate and take it * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. + * @return a task, if available */ - protected void onTermination(Throwable exception) { - // Execute remaining local tasks unless aborting or terminating - while (exception == null && !pool.isTerminating() && base != sp) { - try { - ForkJoinTask t = popTask(); - if (t != null) - t.quietlyExec(); - } catch(Throwable ex) { - exception = ex; + private ForkJoinTask pollSubmission() { + ForkJoinPool p = pool; + while (p.hasQueuedSubmissions()) { + if (active || (active = p.tryIncrementActiveCount())) { + ForkJoinTask t = p.pollSubmission(); + return t != null ? t : scan(); // if missed, rescan } } - // Cancel other tasks, transition status, notify pool, and - // propagate exception to uncaught exception handler - try { - do;while (!tryInactivate()); // ensure inactive - cancelTasks(); - runState = TERMINATED; - pool.workerTerminated(this); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } + return null; } - // Intrinsics-based support for queue operations. - - /** - * Add in store-order the given task at given slot of q to - * null. Caller must ensure q is nonnull and index is in range. + /* + * Intrinsics-based atomic writes for queue slots. These are + * basically the same as methods in AtomicObjectArray, but + * specialized for (1) ForkJoinTask elements (2) requirement that + * nullness and bounds checks have already been performed by + * callers and (3) effective offsets are known not to overflow + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't + * need corresponding version for reads: plain array reads are OK + * because they protected by other volatile reads and are + * confirmed by CASes. + * + * Most uses don't actually call these methods, but instead contain + * inlined forms that enable more predictable optimization. We + * don't define the version of write used in pushTask at all, but + * instead inline there a store-fenced array slot write. */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ - _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); - } /** - * CAS given slot of q to null. Caller must ensure q is nonnull - * and index is in range. + * CASes slot i of array q from t to null. Caller must ensure q is + * non-null and index is in range. */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + private static final boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** - * Sets sp in store-order. + * Performs a volatile write of the given task at given slot of + * array q. Caller must ensure q is non-null and index is in + * range. This method is used only during resets and backouts. */ - private void storeSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + private static final void writeSlot(ForkJoinTask[] q, int i, + ForkJoinTask t) { + UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } - // Main queue methods + // queue methods /** - * Pushes a task. Called only by current thread. - * @param t the task. Caller must ensure nonnull + * Pushes a task. Call only from this thread. + * + * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - setSlot(q, s & mask, t); - storeSp(++s); - if ((s -= base) == 1) - pool.signalWork(); - else if (s >= mask) - growQueue(); + int mask = q.length - 1; // implicit assert q != null + int s = sp++; // ok to increment sp before slot write + UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); + if ((s -= base) == 0) + pool.signalWork(); // was empty + else if (s == mask) + growQueue(); // is full } /** * Tries to take a task from the base of the queue, failing if - * either empty or contended. - * @return a task, or null if none or contended. + * empty or contended. Note: Specializations of this code appear + * in locallyDeqTask and elsewhere. + * + * @return a task, or null if none or contended */ final ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; - int i; - int b; - if (sp != (b = base) && + int b, i; + if ((b = base) != sp && (q = queue) != null && // must read q after b - (t = q[i = (q.length - 1) & b]) != null && - casSlotNull(q, i, t)) { + (t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { base = b + 1; return t; } @@ -463,20 +522,42 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Ensures active status - * if nonnull. Called only by current thread. + * Tries to take a task from the base of own queue. Assumes active + * status. Called only by current thread. + * + * @return a task, or null if none + */ + final ForkJoinTask locallyDeqTask() { + ForkJoinTask[] q = queue; + if (q != null) { + ForkJoinTask t; + int b, i; + while (sp != (b = base)) { + if ((t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, + t, null)) { + base = b + 1; + return t; + } + } + } + return null; + } + + /** + * Returns a popped task, or null if empty. Assumes active status. + * Called only by current thread. (Note: a specialization of this + * code appears in popWhileJoining.) */ final ForkJoinTask popTask() { - int s = sp; - while (s != base) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int i = (s - 1) & mask; - ForkJoinTask t = q[i]; - if (t == null || !casSlotNull(q, i, t)) - break; - storeSp(s - 1); + int s; + ForkJoinTask[] q; + if (base != (s = sp) && (q = queue) != null) { + int i = (q.length - 1) & --s; + ForkJoinTask t = q[i]; + if (t != null && UNSAFE.compareAndSwapObject + (q, (i << qShift) + qBase, t, null)) { + sp = s; return t; } } @@ -484,31 +565,33 @@ public class ForkJoinWorkerThread extend } /** - * Specialized version of popTask to pop only if - * topmost element is the given task. Called only - * by current thread while active. - * @param t the task. Caller must ensure nonnull + * Specialized version of popTask to pop only if topmost element + * is the given task. Called only by current thread while + * active. + * + * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (casSlotNull(q, s & mask, t)) { - storeSp(s); + int s; + ForkJoinTask[] q; + if (base != (s = sp) && (q = queue) != null && + UNSAFE.compareAndSwapObject + (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { + sp = s; return true; } return false; } /** - * Returns next task. + * Returns next task or null if empty or contended */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; if (q == null) return null; int mask = q.length - 1; - int i = locallyFifo? base : (sp - 1); + int i = locallyFifo ? base : (sp - 1); return q[i & mask]; } @@ -534,240 +617,504 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; - setSlot(newQ, b & newMask, t); + writeSlot(newQ, b & newMask, t); } while (++b != bf); pool.signalWork(); } /** + * Computes next value for random victim probe in scan(). Scans + * don't require a very high quality generator, but also not a + * crummy one. Marsaglia xor-shift is cheap and works well enough. + * Note: This is manually inlined in scan() + */ + private static final int xorShift(int r) { + r ^= r << 13; + r ^= r >>> 17; + return r ^ (r << 5); + } + + /** * Tries to steal a task from another worker. Starts at a random * index of workers array, and probes workers until finding one * with non-empty queue or finding that all are empty. It * randomly selects the first n probes. If these are empty, it - * resorts to a full circular traversal, which is necessary to - * accurately set active status by caller. Also restarts if pool - * events occurred since last scan, which forces refresh of - * workers array, in case barrier was associated with resize. + * resorts to a circular sweep, which is necessary to accurately + * set active status. (The circular sweep uses steps of + * approximately half the array size plus 1, to avoid bias + * stemming from leftmost packing of the array in ForkJoinPool.) * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. + * those needed to check for and take tasks (or to activate if not + * already active). This accounts for, among other things, + * updating random seed in place without storing it until exit. * * @return a task, or null if none found */ private ForkJoinTask scan() { - ForkJoinTask t = null; - int r = seed; // extract once to keep scan quiet - ForkJoinWorkerThread[] ws; // refreshed on outer loop - int mask; // must be power 2 minus 1 and > 0 - outer:do { - if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { - int idx = r; - int probes = ~mask; // use random index while negative - for (;;) { - r = xorShift(r); // update random seed - ForkJoinWorkerThread v = ws[mask & idx]; - if (v == null || v.sp == v.base) { - if (probes <= mask) - idx = (probes++ < 0)? r : (idx + 1); - else - break; + ForkJoinPool p = pool; + ForkJoinWorkerThread[] ws; // worker array + int n; // upper bound of #workers + if ((ws = p.workers) != null && (n = ws.length) > 1) { + boolean canSteal = active; // shadow active status + int r = seed; // extract seed once + int mask = n - 1; + int j = -n; // loop counter + int k = r; // worker index, random if j < 0 + for (;;) { + ForkJoinWorkerThread v = ws[k & mask]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift + if (v != null && v.base != v.sp) { + if (canSteal || // ensure active status + (canSteal = active = p.tryIncrementActiveCount())) { + int b = v.base; // inline specialized deqTask + ForkJoinTask[] q; + if (b != v.sp && (q = v.queue) != null) { + ForkJoinTask t; + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + if ((t = q[i]) != null && v.base == b && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + stolen = t; + v.base = b + 1; + seed = r; + ++stealCount; + return t; + } + } } - else if (!tryActivate() || (t = v.deqTask()) == null) - continue outer; // restart on contention - else - break outer; + j = -n; + k = r; // restart on contention } + else if (++j <= 0) + k = r; + else if (j <= n) + k += (n >>> 1) | 1; + else + break; } - } while (pool.hasNewSyncEvent(this)); // retry on pool events - seed = r; - return t; + } + return null; } + // Run State management + + // status check methods used mainly by ForkJoinPool + final boolean isTerminating() { return (runState & TERMINATING) != 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + /** - * gets and removes a local or stolen a task - * @return a task, if available + * Sets state to TERMINATING, also resuming if suspended. */ - final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo? deqTask() : popTask(); - if (t == null && (t = scan()) != null) - ++stealCount; - return t; + final void shutdown() { + for (;;) { + int s = runState; + if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) { + LockSupport.unpark(this); + break; + } + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | TERMINATING)) + break; + } } /** - * gets a local task - * @return a task, if available + * Sets state to TERMINATED. Called only by this thread. */ - final ForkJoinTask pollLocalTask() { - return locallyFifo? deqTask() : popTask(); + private void setTerminated() { + int s; + do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, + s = runState, + s | (TERMINATING|TERMINATED))); } /** - * Returns a pool submission, if one exists, activating first. - * @return a submission, if available + * Instrumented version of park used by ForkJoinPool.awaitEvent */ - private ForkJoinTask pollSubmission() { - ForkJoinPool p = pool; - while (p.hasQueuedSubmissions()) { - ForkJoinTask t; - if (tryActivate() && (t = p.pollSubmission()) != null) - return t; + final void doPark() { + ++parkCount; + LockSupport.park(this); + } + + /** + * If suspended, tries to set status to unsuspended. + * Caller must unpark to actually resume + * + * @return true if successful + */ + final boolean tryUnsuspend() { + int s = runState; + if ((s & SUSPENDED) != 0) + return UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED); + return false; + } + + /** + * Sets suspended status and blocks as spare until resumed, + * shutdown, or timed out. + * + * @return false if trimmed + */ + final boolean suspendAsSpare() { + for (;;) { // set suspended unless terminating + int s = runState; + if ((s & TERMINATING) != 0) { // must kill + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | (TRIMMED | TERMINATING))) + return false; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | SUSPENDED)) + break; } - return null; + boolean timed; + long nanos; + long startTime; + if (poolIndex < pool.parallelism) { + timed = false; + nanos = 0L; + startTime = 0L; + } + else { + timed = true; + nanos = SPARE_KEEPALIVE_NANOS; + startTime = System.nanoTime(); + } + pool.accumulateStealCount(this); + lastEventCount = 0; // reset upon resume + interrupted(); // clear/ignore interrupts + while ((runState & SUSPENDED) != 0) { + ++parkCount; + if (!timed) + LockSupport.park(this); + else if ((nanos -= (System.nanoTime() - startTime)) > 0) + LockSupport.parkNanos(this, nanos); + else { // try to trim on timeout + int s = runState; + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) + return false; + } + } + return true; } - // Methods accessed only by Pool + // Misc support methods for ForkJoinPool + + /** + * Returns an estimate of the number of tasks in the queue. Also + * used by ForkJoinTask. + */ + final int getQueueSize() { + return -base + sp; + } /** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) - t.cancelIgnoringExceptions(); + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) + t.cancelIgnoringExceptions(); + } } /** - * Drains tasks to given collection c + * Drains tasks to given collection c. + * * @return the number of tasks drained */ - final int drainTasksTo(Collection> c) { + final int drainTasksTo(Collection> c) { int n = 0; - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) { - c.add(t); - ++n; + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) { + c.add(t); + ++n; + } } return n; } - /** - * Get and clear steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). - */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; - } + // Support methods for ForkJoinTask /** - * Returns true if at least one worker in the given array appears - * to have at least one queued task. - * @param ws array of workers - */ - static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { - if (ws != null) { - int len = ws.length; - for (int j = 0; j < 2; ++j) { // need two passes for clean sweep - for (int i = 0; i < len; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.sp != w.base) - return true; + * Possibly runs some tasks and/or blocks, until task is done. + * + * @param joinMe the task to join + */ + final void joinTask(ForkJoinTask joinMe) { + ForkJoinTask prevJoining = joining; + joining = joinMe; + while (joinMe.status >= 0) { + int s = sp; + if (s == base) { + nonlocalJoinTask(joinMe); + break; + } + // process local task + ForkJoinTask t; + ForkJoinTask[] q = queue; + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + if ((t = q[i]) != null && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + /* + * This recheck (and similarly in nonlocalJoinTask) + * handles cases where joinMe is independently + * cancelled or forced even though there is other work + * available. Back out of the pop by putting t back + * into slot before we commit by setting sp. + */ + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + break; } + sp = s; + t.tryExec(); } } - return false; + joining = prevJoining; } - // Support methods for ForkJoinTask - /** - * Returns an estimate of the number of tasks in the queue. + * Tries to locate and help perform tasks for a stealer of the + * given task (or in turn one of its stealers), blocking (via + * pool.tryAwaitJoin) upon failure to find work. Traces + * stolen->joining links looking for a thread working on + * a descendant of the given task and with a non-empty queue to + * steal back and execute tasks from. Inhibits mutual steal chains + * and scans on outer joins upon nesting to avoid unbounded + * growth. Restarts search upon encountering inconsistencies. + * Tries to block if two passes agree that there are no remaining + * targets. + * + * @param joinMe the task to join */ - final int getQueueSize() { - int n = sp - base; - return n < 0? 0 : n; // suppress momentarily negative values + private void nonlocalJoinTask(ForkJoinTask joinMe) { + ForkJoinPool p = pool; + int scans = p.parallelism; // give up if too many retries + ForkJoinTask bottom = null; // target seen when can't descend + restart: while (joinMe.status >= 0) { + ForkJoinTask target = null; + ForkJoinTask next = joinMe; + while (scans >= 0 && next != null) { + --scans; + target = next; + next = null; + ForkJoinWorkerThread v = null; + ForkJoinWorkerThread[] ws = p.workers; + int n = ws.length; + for (int j = 0; j < n; ++j) { + ForkJoinWorkerThread w = ws[j]; + if (w != null && w.stolen == target) { + v = w; + break; + } + } + if (v != null && v != this) { + ForkJoinTask prevStolen = stolen; + int b; + ForkJoinTask[] q; + while ((b = v.base) != v.sp && (q = v.queue) != null) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; + ForkJoinTask t = q[i]; + if (target.status < 0) + continue restart; + if (t != null && v.base == b && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + return; // back out + } + stolen = t; + v.base = b + 1; + t.tryExec(); + stolen = prevStolen; + } + if (joinMe.status < 0) + return; + } + next = v.joining; + } + if (target.status < 0) + continue restart; // inconsistent + if (joinMe.status < 0) + return; + } + + if (bottom != target) + bottom = target; // recheck landing spot + else if (p.tryAwaitJoin(joinMe) < 0) + return; // successfully blocked + Thread.yield(); // tame spin in case too many active + } } /** * Returns an estimate of the number of tasks, offset by a * function of number of idle workers. + * + * This method provides a cheap heuristic guide for task + * partitioning when programmers, frameworks, tools, or languages + * have little or no idea about task granularity. In essence by + * offering this method, we ask users only about tradeoffs in + * overhead vs expected throughput and its variance, rather than + * how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, each + * thread makes available for stealing enough tasks for other + * threads to remain active. Inductively, if all threads play by + * the same rules, each thread should make available only a + * constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of 1 + * would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should minimize + * steal rates, which in general means that threads nearer the top + * of computation tree should generate more than those nearer the + * bottom. In perfect steady state, each thread is at + * approximately the same level of computation tree. However, + * producing extra tasks amortizes the uncertainty of progress and + * diffusion assumptions. + * + * So, users will want to use values larger, but not much larger + * than 1 to both smooth over transient shortages and hedge + * against uneven progress; as traded off against the cost of + * extra task overhead. We leave the user to pick a threshold + * value to compare with the results of this call to guide + * decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to estimate + * surplus strictly locally. In steady-state, if one thread is + * maintaining say 2 surplus tasks, then so are others. So we can + * just use estimated queue length (although note that (sp - base) + * can be an overestimate because of stealers lagging increments + * of base). However, this strategy alone leads to serious + * mis-estimates in some non-steady-state conditions (ramp-up, + * ramp-down, other stalls). We can detect many of these by + * further considering the number of "idle" threads, that are + * known to have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. */ final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); + return sp - base - pool.idlePerActive(); } /** - * Scan, returning early if joinMe done + * Gets and removes a local task. + * + * @return a task, if available */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = pollTask(); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; + final ForkJoinTask pollLocalTask() { + while (sp != base) { + if (active || (active = pool.tryIncrementActiveCount())) + return locallyFifo? locallyDeqTask() : popTask(); } - return t; + return null; + } + + /** + * Gets and removes a local or stolen task. + * + * @return a task, if available + */ + final ForkJoinTask pollTask() { + ForkJoinTask t; + return (t = pollLocalTask()) != null ? t : scan(); } /** - * Runs tasks until pool isQuiescent + * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { for (;;) { - ForkJoinTask t = pollTask(); - if (t != null) - t.quietlyExec(); - else if (tryInactivate() && pool.isQuiescent()) - break; + ForkJoinTask t = pollLocalTask(); + if (t != null || (t = scan()) != null) { + t.tryExec(); + stolen = null; + } + else { + ForkJoinPool p = pool; + if (active) { + active = false; // inactivate + do {} while (!p.tryDecrementActiveCount()); + } + if (p.isQuiescent()) { + active = true; // re-activate + do {} while (!p.tryIncrementActiveCount()); + return; + } + } } - do;while (!tryActivate()); // re-activate on exit } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long runStateOffset = + objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long qBase = + UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + private static final int qShift; + + static { + int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + } + + private static long objectFieldOffset(String field, Class klazz) { try { - return Unsafe.getUnsafe(); + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long runStateOffset; - static final long qBase; - static final int qShift; - static { - try { - _unsafe = getUnsafe(); - baseOffset = fieldOffset("base"); - spOffset = fieldOffset("sp"); - runStateOffset = fieldOffset("runState"); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } }