--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/01/07 19:12:36 1.3 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/09/20 20:42:37 1.51 @@ -5,328 +5,558 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Random; +import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * cleanup methods surrounding the main task processing loop. If you - * do create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. - * - *

This class also provides methods for generating per-thread - * random numbers, with the same properties as {@link - * java.util.Random} but with each generator isolated from those of - * other threads. + * are no overridable methods dealing with scheduling or execution. + * However, you can override initialization and termination methods + * surrounding the main task processing loop. If you do create such a + * subclass, you will also need to supply a custom {@link + * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code + * ForkJoinPool}. + * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* - * Algorithm overview: + * Overview: + * + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform + * ForkJoinTasks. This class includes bookkeeping in support of + * worker activation, suspension, and lifecycle control described + * in more detail in the internal documentation of class + * ForkJoinPool. And as described further below, this class also + * includes special-cased support for some ForkJoinTask + * methods. But the main mechanics involve work-stealing: * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a nonnull - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probablistic non-blockingness. If - * an attempted steal fails, a thief always chooses a different + * Work-stealing queues are special forms of Deques that support + * only three of the four possible end-operations -- push, pop, + * and deq (aka steal), under the further constraints that push + * and pop are called only from the owning thread, while deq may + * be called from other threads. (If you are unfamiliar with + * them, you probably want to read Herlihy and Shavit's book "The + * Art of Multiprocessor programming", chapter 16 describing these + * in more detail before proceeding.) The main work-stealing + * queue design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from gc requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs deq (steal) from being on the indices + * ("base" and "sp") to the slots themselves (mainly via method + * "casSlotNull()"). So, both a successful pop and deq mainly + * entail a CAS of a slot from non-null to null. Because we rely + * on CASes of references, we do not need tag bits on base or sp. + * They are simple ints as used in any circular array-based queue + * (see for example ArrayDeque). Updates to the indices must + * still be ordered in a way that guarantees that sp == base means + * the queue is empty, but otherwise may err on the side of + * possibly making the queue appear nonempty when a push, pop, or + * deq have not fully committed. Note that this means that the deq + * operation, considered individually, is not wait-free. One thief + * cannot successfully continue until another in-progress one (or, + * if previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on * any empty queue to complete. One reason this works well here is * that apparently-nonempty often means soon-to-be-stealable, - * which gives threads a chance to activate if necessary before - * stealing (see below). + * which gives threads a chance to set activation status if + * necessary before stealing. * - * Efficient implementation of this approach currently relies on - * an uncomfortable amount of "Unsafe" mechanics. To maintain + * This approach also enables support for "async mode" where local + * task processing is in FIFO, not LIFO order; simply by using a + * version of deq rather than pop when locallyFifo is true (as set + * by the ForkJoinPool). This allows use in message-passing + * frameworks in which tasks are never joined. + * + * When a worker would otherwise be blocked waiting to join a + * task, it first tries a form of linear helping: Each worker + * records (in field currentSteal) the most recent task it stole + * from some other worker. Plus, it records (in field currentJoin) + * the task it is currently actively joining. Method joinTask uses + * these markers to try to find a worker to help (i.e., steal back + * a task from and execute it) that could hasten completion of the + * actively joined task. In essence, the joiner executes a task + * that would be on its own local deque had the to-be-joined task + * not been stolen. This may be seen as a conservative variant of + * the approach in Wagner & Calder "Leapfrogging: a portable + * technique for implementing efficient futures" SIGPLAN Notices, + * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs + * in that: (1) We only maintain dependency links across workers + * upon steals, rather than use per-task bookkeeping. This may + * require a linear scan of workers array to locate stealers, but + * usually doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. This isolates cost to + * when it is needed, rather than adding to per-task overhead. + * (2) It is "shallow", ignoring nesting and potentially cyclic + * mutual steals. (3) It is intentionally racy: field currentJoin + * is updated only while actively joining, which means that we + * miss links in the chain during long-lived tasks, GC stalls etc + * (which is OK since blocking in such cases is usually a good + * idea). (4) We bound the number of attempts to find work (see + * MAX_HELP_DEPTH) and fall back to suspending the worker and if + * necessary replacing it with a spare (see + * ForkJoinPool.awaitJoin). + * + * Efficient implementation of these algorithms currently relies + * on an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. Since these combinations aren't - * supported using ordinary volatiles, the only way to accomplish - * these effciently is to use direct Unsafe calls. (Using external - * AtomicIntegers and AtomicReferenceArrays for the indices and - * array is significantly slower because of memory locality and - * indirection effects.) Further, performance on most platforms is - * very sensitive to placement and sizing of the (resizable) queue - * array. Even though these queues don't usually become all that - * big, the initial size must be large enough to counteract cache + * volatile ordering. Variable sp does not require volatile + * writes but still needs store-ordering, which we accomplish by + * pre-incrementing sp before filling the slot with an ordered + * store. (Pre-incrementing also enables backouts used in + * joinTask.) Because they are protected by volatile base reads, + * reads of the queue array and its slots by other threads do not + * need volatile load semantics, but writes (in push) require + * store order and CASes (in pop and deq) require (volatile) CAS + * semantics. (Michael, Saraswat, and Vechev's algorithm has + * similar properties, but without support for nulling slots.) + * Since these combinations aren't supported using ordinary + * volatiles, the only way to accomplish these efficiently is to + * use direct Unsafe calls. (Using external AtomicIntegers and + * AtomicReferenceArrays for the indices and array is + * significantly slower because of memory locality and indirection + * effects.) + * + * Further, performance on most platforms is very sensitive to + * placement and sizing of the (resizable) queue array. Even + * though these queues don't usually become all that big, the + * initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. (See variable "scans".) For this to work, threads - * must be declared active when executing tasks, and before - * stealing a task. They must be inactive before blocking on the - * Pool Barrier (awaiting a new submission or other Pool - * event). In between, there is some free play which we take - * advantage of to avoid contention and rapid flickering of the - * global activeCount: If inactive, we activate only if a victim - * queue appears to be nonempty (see above). Similarly, a thread - * tries to inactivate only after a full scan of other threads. - * The net effect is that contention on activeCount is rarely a - * measurable performance issue. (There are also a few other cases - * where we scan for work rather than retry/block upon - * contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. + * queues are initialized after starting. All together, these + * low-level implementation choices produce as much as a factor of + * 4 performance improvement compared to naive implementations, + * and enable the processing of billions of tasks per second, + * sometimes at the expense of ugliness. */ /** + * Generator for initial random seeds for random victim + * selection. This is used only to create initial seeds. Random + * steals use a cheaper xorshift generator per steal attempt. We + * expect only rare contention on seedGenerator, so just use a + * plain Random. + */ + private static final Random seedGenerator = new Random(); + + /** + * The maximum stolen->joining link depth allowed in helpJoinTask. + * Depths for legitimate chains are unbounded, but we use a fixed + * constant to avoid (otherwise unchecked) cycles and bound + * staleness of traversal parameters at the expense of sometimes + * blocking when we could be helping. + */ + private static final int MAX_HELP_DEPTH = 8; + + /** * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 2, but is + * Must be a power of two. Initial size must be at least 4, but is * padded to minimize cache effects. */ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum work-stealing queue array size. Must be less than or - * equal to 1 << 30 to ensure lack of index wraparound. + * equal to 1 << (31 - width of array entry) to ensure lack of + * index wraparound. The value is set in the static block + * at the end of this file after obtaining width. */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30; + private static final int MAXIMUM_QUEUE_CAPACITY; /** - * Generator of seeds for per-thread random numbers. + * The pool this thread works in. Accessed directly by ForkJoinTask. */ - private static final Random randomSeedGenerator = new Random(); + final ForkJoinPool pool; /** * The work-stealing queue array. Size must be a power of two. + * Initialized in onStart, to improve memory locality. */ private ForkJoinTask[] queue; /** + * Index (mod queue.length) of least valid queue slot, which is + * always the next position to steal from if nonempty. + */ + private volatile int base; + + /** * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. + * from. It is written only by owner thread, and accessed by other + * threads only after reading (volatile) base. Both sp and base + * are allowed to wrap around on overflow, but (sp - base) still + * estimates size. */ - private volatile int sp; + private int sp; /** - * Index (mod queue.length) of least valid queue slot, which is - * always the next position to steal from if nonempty. + * The index of most recent stealer, used as a hint to avoid + * traversal in method helpJoinTask. This is only a hint because a + * worker might have had multiple steals and this only holds one + * of them (usually the most current). Declared non-volatile, + * relying on other prevailing sync to keep reasonably current. */ - private volatile int base; + private int stealHint; /** - * The pool this thread works in. + * Run state of this worker. In addition to the usual run levels, + * tracks if this worker is suspended as a spare, and if it was + * killed (trimmed) while suspended. However, "active" status is + * maintained separately and modified only in conjunction with + * CASes of the pool's runState (which are currently sadly + * manually inlined for performance.) Accessed directly by pool + * to simplify checks for normal (zero) status. */ - final ForkJoinPool pool; + volatile int runState; + + private static final int TERMINATING = 0x01; + private static final int TERMINATED = 0x02; + private static final int SUSPENDED = 0x04; // inactive spare + private static final int TRIMMED = 0x08; // killed while suspended + + /** + * Number of steals. Directly accessed (and reset) by + * pool.tryAccumulateStealCount when idle. + */ + int stealCount; + + /** + * Seed for random number generator for choosing steal victims. + * Uses Marsaglia xorshift. Must be initialized as nonzero. + */ + private int seed; + + /** + * Activity status. When true, this worker is considered active. + * Accessed directly by pool. Must be false upon construction. + */ + boolean active; + + /** + * True if use local fifo, not default lifo, for local polling. + * Shadows value from ForkJoinPool. + */ + private final boolean locallyFifo; /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * running, and accessed directly by pool to locate this worker in + * its workers array. */ int poolIndex; /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. + * The last pool event waited for. Accessed only by pool in + * callback methods invoked within this thread. */ - private volatile int runState; - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; + int lastEventCount; /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * blocking on the Pool Barrier. + * Encoded index and event count of next event waiter. Accessed + * only by ForkJoinPool for managing event waiters. */ - private boolean active; + volatile long nextWaiter; /** - * Number of steals, transferred to pool when idle + * Number of times this thread suspended as spare. Accessed only + * by pool. */ - private int stealCount; + int spareCount; /** - * Seed for random number generator for choosing steal victims + * Encoded index and count of next spare waiter. Accessed only + * by ForkJoinPool for managing spares. */ - private int randomVictimSeed; + volatile int nextSpare; /** - * Seed for embedded Jurandom + * The task currently being joined, set only when actively trying + * to help other stealers in helpJoinTask. Written only by this + * thread, but read by others. */ - private long juRandomSeed; + private volatile ForkJoinTask currentJoin; /** - * The last barrier event waited for + * The task most recently stolen from another worker (or + * submission queue). Written only by this thread, but read by + * others. */ - private long eventCount; + private volatile ForkJoinTask currentSteal; /** * Creates a ForkJoinWorkerThread operating in the given pool. + * * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { - if (pool == null) throw new NullPointerException(); this.pool = pool; - // remaining initialization deferred to onStart + this.locallyFifo = pool.locallyFifo; + setDaemon(true); + // To avoid exposing construction details to subclasses, + // remaining initialization is in start() and onStart() } - // public access methods + /** + * Performs additional initialization and starts this thread. + */ + final void start(int poolIndex, UncaughtExceptionHandler ueh) { + this.poolIndex = poolIndex; + if (ueh != null) + setUncaughtExceptionHandler(ueh); + start(); + } + + // Public/protected methods /** - * Returns the pool hosting the current task execution. + * Returns the pool hosting this thread. + * * @return the pool */ - public static ForkJoinPool getPool() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).pool; + public ForkJoinPool getPool() { + return pool; + } + + /** + * Returns the index number of this thread in its pool. The + * returned value ranges from zero to the maximum number of + * threads (minus one) that have ever been created in the pool. + * This method may be useful for applications that track status or + * collect results per-worker rather than per-task. + * + * @return the index number + */ + public int getPoolIndex() { + return poolIndex; + } + + /** + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke @code{super.onStart()} at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. + */ + protected void onStart() { + int rs = seedGenerator.nextInt(); + seed = rs == 0? 1 : rs; // seed must be nonzero + + // Allocate name string and arrays in this thread + String pid = Integer.toString(pool.getPoolNumber()); + String wid = Integer.toString(poolIndex); + setName("ForkJoinPool-" + pid + "-worker-" + wid); + + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + } + + /** + * Performs cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * {@code super.onTermination} at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or {@code null} if completed normally + */ + protected void onTermination(Throwable exception) { + try { + ForkJoinPool p = pool; + if (active) { + int a; // inline p.tryDecrementActiveCount + active = false; + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)); + } + cancelTasks(); + setTerminated(); + p.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + UNSAFE.throwException(exception); + } } /** - * Returns the index number of the current worker thread in its - * pool. The returned value ranges from zero to the maximum - * number of threads (minus one) that have ever been created in - * the pool. This method may be useful for applications that - * track status or collect results on a per-worker basis. - * @return the index number. + * This method is required to be public, but should never be + * called explicitly. It performs the main run loop to execute + * ForkJoinTasks. */ - public static int getPoolIndex() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex; + public void run() { + Throwable exception = null; + try { + onStart(); + mainLoop(); + } catch (Throwable ex) { + exception = ex; + } finally { + onTermination(exception); + } } - // Access methods used by Pool + // helpers for run() /** - * Get and clear steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). + * Finds and executes tasks, and checks status while running. */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; + private void mainLoop() { + boolean ran = false; // true if ran a task on last step + ForkJoinPool p = pool; + for (;;) { + p.preStep(this, ran); + if (runState != 0) + break; + ran = tryExecSteal() || tryExecSubmission(); + } } /** - * Returns estimate of the number of tasks in the queue, without - * correcting for transient negative values + * Tries to steal a task and execute it. + * + * @return true if ran a task */ - final int getRawQueueSize() { - return sp - base; + private boolean tryExecSteal() { + ForkJoinTask t; + if ((t = scan()) != null) { + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; + } + return false; } - // Intrinsics-based support for queue operations. - // Currently these three (setSp, setSlot, casSlotNull) are - // usually manually inlined to improve performance + /** + * If a submission exists, try to activate and run it. + * + * @return true if ran a task + */ + private boolean tryExecSubmission() { + ForkJoinPool p = pool; + // This loop is needed in case attempt to activate fails, in + // which case we only retry if there still appears to be a + // submission. + while (p.hasQueuedSubmissions()) { + ForkJoinTask t; int a; + if (active || // inline p.tryIncrementActiveCount + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) { + if ((t = p.pollSubmission()) != null) { + UNSAFE.putOrderedObject(this, currentStealOffset, t); + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; + } + } + } + return false; + } /** - * Sets sp in store-order. + * Runs local tasks until queue is empty or shut down. Call only + * while active. */ - private void setSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + private void execLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); + if (t != null) + t.quietlyExec(); + else if (sp == base) + break; + } } + /* + * Intrinsics-based atomic writes for queue slots. These are + * basically the same as methods in AtomicReferenceArray, but + * specialized for (1) ForkJoinTask elements (2) requirement that + * nullness and bounds checks have already been performed by + * callers and (3) effective offsets are known not to overflow + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't + * need corresponding version for reads: plain array reads are OK + * because they are protected by other volatile reads and are + * confirmed by CASes. + * + * Most uses don't actually call these methods, but instead contain + * inlined forms that enable more predictable optimization. We + * don't define the version of write used in pushTask at all, but + * instead inline there a store-fenced array slot write. + */ + /** - * Add in store-order the given task at given slot of q to - * null. Caller must ensure q is nonnull and index is in range. + * CASes slot i of array q from t to null. Caller must ensure q is + * non-null and index is in range. */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ - _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); + private static final boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** - * CAS given slot of q to null. Caller must ensure q is nonnull - * and index is in range. + * Performs a volatile write of the given task at given slot of + * array q. Caller must ensure q is non-null and index is in + * range. This method is used only during resets and backouts. */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + private static final void writeSlot(ForkJoinTask[] q, int i, + ForkJoinTask t) { + UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } - // Main queue methods + // queue methods /** - * Pushes a task. Called only by current thread. - * @param t the task. Caller must ensure nonnull + * Pushes a task. Call only from this thread. + * + * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); - _unsafe.putOrderedInt(this, spOffset, ++s); - if ((s -= base) == 1) - pool.signalNonEmptyWorkerQueue(); - else if (s >= mask) - growQueue(); + int mask = q.length - 1; // implicit assert q != null + int s = sp++; // ok to increment sp before slot write + UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); + if ((s -= base) == 0) + pool.signalWork(); // was empty + else if (s == mask) + growQueue(); // is full } /** * Tries to take a task from the base of the queue, failing if - * either empty or contended. - * @return a task, or null if none or contended. + * empty or contended. Note: Specializations of this code appear + * in locallyDeqTask and elsewhere. + * + * @return a task, or null if none or contended */ - private ForkJoinTask deqTask() { - ForkJoinTask[] q; + final ForkJoinTask deqTask() { ForkJoinTask t; - int i; - int b; + ForkJoinTask[] q; + int b, i; if (sp != (b = base) && (q = queue) != null && // must read q after b - (t = q[i = (q.length - 1) & b]) != null && - _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { + (t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { base = b + 1; return t; } @@ -334,48 +564,81 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Called only by - * current thread. + * Tries to take a task from the base of own queue. Assumes active + * status. Called only by this thread. + * + * @return a task, or null if none */ - final ForkJoinTask popTask() { - ForkJoinTask t; - int i; + final ForkJoinTask locallyDeqTask() { ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - if (s != base && - (t = q[i = (s - 1) & mask]) != null && - _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { - _unsafe.putOrderedInt(this, spOffset, s - 1); - return t; + if (q != null) { + ForkJoinTask t; + int b, i; + while (sp != (b = base)) { + if ((t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, + t, null)) { + base = b + 1; + return t; + } + } + } + return null; + } + + /** + * Returns a popped task, or null if empty. Assumes active status. + * Called only by this thread. + */ + private ForkJoinTask popTask() { + ForkJoinTask[] q = queue; + if (q != null) { + int s; + while ((s = sp) != base) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (t == null) // lost to stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); + return t; + } + } } return null; } /** - * Specialized version of popTask to pop only if - * topmost element is the given task. Called only - * by current thread. - * @param t the task. Caller must ensure nonnull + * Specialized version of popTask to pop only if topmost element + * is the given task. Called only by this thread while active. + * + * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { + int s; ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase, - t, null)) { - _unsafe.putOrderedInt(this, spOffset, s); + if ((s = sp) != base && q != null && + UNSAFE.compareAndSwapObject + (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); return true; } return false; } /** - * Returns next task to pop. + * Returns next task, or null if empty or contended. */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; - return q == null? null : q[(sp - 1) & (q.length - 1)]; + if (q == null) + return null; + int mask = q.length - 1; + int i = locallyFifo ? base : (sp - 1); + return q[i & mask]; } /** @@ -400,120 +663,192 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; - setSlot(newQ, b & newMask, t); + writeSlot(newQ, b & newMask, t); } while (++b != bf); - pool.signalIdleWorkers(false); + pool.signalWork(); } - // Runstate management - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } - /** - * Transition to at least the given state. Return true if not - * already at least given state. + * Computes next value for random victim probe in scan(). Scans + * don't require a very high quality generator, but also not a + * crummy one. Marsaglia xor-shift is cheap and works well enough. + * Note: This is manually inlined in scan(). */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } + private static final int xorShift(int r) { + r ^= r << 13; + r ^= r >>> 17; + return r ^ (r << 5); } /** - * Ensure status is active and if necessary adjust pool active count + * Tries to steal a task from another worker. Starts at a random + * index of workers array, and probes workers until finding one + * with non-empty queue or finding that all are empty. It + * randomly selects the first n probes. If these are empty, it + * resorts to a circular sweep, which is necessary to accurately + * set active status. (The circular sweep uses steps of + * approximately half the array size plus 1, to avoid bias + * stemming from leftmost packing of the array in ForkJoinPool.) + * + * This method must be both fast and quiet -- usually avoiding + * memory accesses that could disrupt cache sharing etc other than + * those needed to check for and take tasks (or to activate if not + * already active). This accounts for, among other things, + * updating random seed in place without storing it until exit. + * + * @return a task, or null if none found */ - final void activate() { - if (!active) { - active = true; - pool.incrementActiveCount(); + private ForkJoinTask scan() { + ForkJoinPool p = pool; + ForkJoinWorkerThread[] ws; // worker array + int n; // upper bound of #workers + if ((ws = p.workers) != null && (n = ws.length) > 1) { + boolean canSteal = active; // shadow active status + int r = seed; // extract seed once + int mask = n - 1; + int j = -n; // loop counter + int k = r; // worker index, random if j < 0 + for (;;) { + ForkJoinWorkerThread v = ws[k & mask]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift + ForkJoinTask[] q; ForkJoinTask t; int b, a; + if (v != null && (b = v.base) != v.sp && + (q = v.queue) != null) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + int pid = poolIndex; + if ((t = q[i]) != null) { + if (!canSteal && // inline p.tryIncrementActiveCount + UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1)) + canSteal = active = true; + if (canSteal && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, + currentStealOffset, t); + seed = r; + ++stealCount; + return t; + } + } + j = -n; + k = r; // restart on contention + } + else if (++j <= 0) + k = r; + else if (j <= n) + k += (n >>> 1) | 1; + else + break; + } } + return null; + } + + // Run State management + + // status check methods used mainly by ForkJoinPool + final boolean isRunning() { return runState == 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + + final boolean isTerminating() { + if ((runState & TERMINATING) != 0) + return true; + if (pool.isAtLeastTerminating()) { // propagate pool state + shutdown(); + return true; + } + return false; } /** - * Ensure status is inactive and if necessary adjust pool active count + * Sets state to TERMINATING. Does NOT unpark or interrupt + * to wake up if currently blocked. Callers must do so if desired. */ - final void inactivate() { - if (active) { - active = false; - pool.decrementActiveCount(); + final void shutdown() { + for (;;) { + int s = runState; + if ((s & (TERMINATING|TERMINATED)) != 0) + break; + if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) + break; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | TERMINATING)) + break; } } - // Lifecycle methods - /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. + * Sets state to TERMINATED. Called only by onTermination(). */ - protected void onStart() { - juRandomSeed = randomSeedGenerator.nextLong(); - do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero - if (queue == null) - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - - // Heuristically allow one initial thread to warm up; others wait - if (poolIndex < pool.getParallelism() - 1) { - eventCount = pool.sync(this, 0); - activate(); - } + private void setTerminated() { + int s; + do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, + s = runState, + s | (TERMINATING|TERMINATED))); } /** - * Perform cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. + * If suspended, tries to set status to unsuspended. + * Does NOT wake up if blocked. * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. + * @return true if successful */ - protected void onTermination(Throwable exception) { - try { - clearLocalTasks(); - inactivate(); - cancelTasks(); - } finally { - terminate(exception); + final boolean tryUnsuspend() { + int s; + while (((s = runState) & SUSPENDED) != 0) { + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)) + return true; } + return false; } /** - * Notify pool of termination and, if exception is nonnull, - * rethrow it to trigger this thread's uncaughtExceptionHandler + * Sets suspended status and blocks as spare until resumed + * or shutdown. */ - private void terminate(Throwable exception) { - transitionRunStateTo(TERMINATED); - try { - pool.workerTerminated(this); - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); + final void suspendAsSpare() { + for (;;) { // set suspended unless terminating + int s = runState; + if ((s & TERMINATING) != 0) { // must kill + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | (TRIMMED | TERMINATING))) + return; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | SUSPENDED)) + break; + } + ForkJoinPool p = pool; + p.pushSpare(this); + while ((runState & SUSPENDED) != 0) { + if (p.tryAccumulateStealCount(this)) { + interrupted(); // clear/ignore interrupts + if ((runState & SUSPENDED) == 0) + break; + LockSupport.park(this); + } } } + // Misc support methods for ForkJoinPool + /** - * Run local tasks on exit from main. + * Returns an estimate of the number of tasks in the queue. Also + * used by ForkJoinTask. */ - private void clearLocalTasks() { - while (base != sp && !pool.isTerminating()) { - ForkJoinTask t = popTask(); - if (t != null) { - activate(); // ensure active status - t.quietlyExec(); - } - } + final int getQueueSize() { + int n; // external calls must read base first + return (n = -base + sp) <= 0 ? 0 : n; } /** @@ -521,6 +856,20 @@ public class ForkJoinWorkerThread extend * thread. */ final void cancelTasks() { + ForkJoinTask cj = currentJoin; // try to cancel ongoing tasks + if (cj != null) { + currentJoin = null; + cj.cancelIgnoringExceptions(); + try { + this.interrupt(); // awaken wait + } catch (SecurityException ignore) { + } + } + ForkJoinTask cs = currentSteal; + if (cs != null) { + currentSteal = null; + cs.cancelIgnoringExceptions(); + } while (base != sp) { ForkJoinTask t = deqTask(); if (t != null) @@ -529,355 +878,336 @@ public class ForkJoinWorkerThread extend } /** - * This method is required to be public, but should never be - * called explicitly. It performs the main run loop to execute - * ForkJoinTasks. + * Drains tasks to given collection c. + * + * @return the number of tasks drained */ - public void run() { - Throwable exception = null; - try { - onStart(); - while (!isShutdown()) - step(); - } catch (Throwable ex) { - exception = ex; - } finally { - onTermination(exception); + final int drainTasksTo(Collection> c) { + int n = 0; + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) { + c.add(t); + ++n; + } } + return n; } + // Support methods for ForkJoinTask + /** - * Main top-level action. + * Gets and removes a local task. + * + * @return a task, if available */ - private void step() { - ForkJoinTask t = sp != base? popTask() : null; - if (t != null || (t = scan(null, true)) != null) { - activate(); - t.quietlyExec(); + final ForkJoinTask pollLocalTask() { + ForkJoinPool p = pool; + while (sp != base) { + int a; // inline p.tryIncrementActiveCount + if (active || + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) + return locallyFifo ? locallyDeqTask() : popTask(); } - else { - inactivate(); - eventCount = pool.sync(this, eventCount); + return null; + } + + /** + * Gets and removes a local or stolen task. + * + * @return a task, if available + */ + final ForkJoinTask pollTask() { + ForkJoinTask t = pollLocalTask(); + if (t == null) { + t = scan(); + // cannot retain/track/help steal + UNSAFE.putOrderedObject(this, currentStealOffset, null); } + return t; } - // scanning for and stealing tasks + /** + * Possibly runs some tasks and/or blocks, until task is done. + * + * @param joinMe the task to join + */ + final void joinTask(ForkJoinTask joinMe) { + // currentJoin only written by this thread; only need ordered store + ForkJoinTask prevJoin = currentJoin; + UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); + if (sp != base) + localHelpJoinTask(joinMe); + if (joinMe.status >= 0) + pool.awaitJoin(joinMe, this); + UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); + } /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. + * Run tasks in local queue until given task is done. * - * This is currently unused, and manually inlined + * @param joinMe the task to join */ - private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; + private void localHelpJoinTask(ForkJoinTask joinMe) { + int s; + ForkJoinTask[] q; + while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (t == null) // lost to a stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + /* + * This recheck (and similarly in helpJoinTask) + * handles cases where joinMe is independently + * cancelled or forced even though there is other work + * available. Back out of the pop by putting t back + * into slot before we commit by writing sp. + */ + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + break; + } + sp = s; + // UNSAFE.putOrderedInt(this, spOffset, s); + t.quietlyExec(); + } + } } /** - * Tries to steal a task from another worker and/or, if enabled, - * submission queue. Starts at a random index of workers array, - * and probes workers until finding one with non-empty queue or - * finding that all are empty. It randomly selects the first n-1 - * probes. If these are empty, it resorts to full circular - * traversal, which is necessary to accurately set active status - * by caller. Also restarts if pool barrier has tripped since last - * scan, which forces refresh of workers array, in case barrier - * was associated with resize. + * Unless terminating, tries to locate and help perform tasks for + * a stealer of the given task, or in turn one of its stealers. + * Traces currentSteal->currentJoin links looking for a thread + * working on a descendant of the given task and with a non-empty + * queue to steal back and execute tasks from. * - * This method must be both fast and quiet -- usually avoiding - * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. (Note that we only need to store it if - * we found a task; otherwise it doesn't matter if we start at the - * same place next time.) + * The implementation is very branchy to cope with potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or of length greater than MAX_HELP_DEPTH links. All + * of these cases are dealt with by just returning back to the + * caller, who is expected to retry if other join mechanisms also + * don't work out. * - * @param joinMe if non null; exit early if done - * @param checkSubmissions true if OK to take submissions - * @return a task, or null if none found + * @param joinMe the task to join */ - private ForkJoinTask scan(ForkJoinTask joinMe, - boolean checkSubmissions) { - ForkJoinPool p = pool; - if (p == null) // Never null, but avoids - return null; // implicit nullchecks below - int r = randomVictimSeed; // extract once to keep scan quiet - restart: // outer loop refreshes ws array - while (joinMe == null || joinMe.status >= 0) { - int mask; - ForkJoinWorkerThread[] ws = p.workers; - if (ws != null && (mask = ws.length - 1) > 0) { - int probes = -mask; // use random index while negative - int idx = r; - for (;;) { - ForkJoinWorkerThread v; - // inlined xorshift to update seed - r ^= r << 1; r ^= r >>> 3; r ^= r << 10; - if ((v = ws[mask & idx]) != null && v.sp != v.base) { - ForkJoinTask t; - activate(); - if ((joinMe == null || joinMe.status >= 0) && - (t = v.deqTask()) != null) { - randomVictimSeed = r; - ++stealCount; - return t; + final void helpJoinTask(ForkJoinTask joinMe) { + ForkJoinWorkerThread[] ws; + int n; + if (joinMe.status < 0) // already done + return; + if ((runState & TERMINATING) != 0) { // cancel if shutting down + joinMe.cancelIgnoringExceptions(); + return; + } + if ((ws = pool.workers) == null || (n = ws.length) <= 1) + return; // need at least 2 workers + + ForkJoinTask task = joinMe; // base of chain + ForkJoinWorkerThread thread = this; // thread with stolen task + for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length + // Try to find v, the stealer of task, by first using hint + ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; + if (v == null || v.currentSteal != task) { + for (int j = 0; ; ++j) { // search array + if (j < n) { + ForkJoinTask vs; + if ((v = ws[j]) != null && + (vs = v.currentSteal) != null) { + if (joinMe.status < 0 || task.status < 0) + return; // stale or done + if (vs == task) { + thread.stealHint = j; + break; // save hint for next time + } } - continue restart; // restart on contention } - if ((probes >> 1) <= mask) // n-1 random then circular - idx = (probes++ < 0)? r : (idx + 1); else - break; + return; // no stealer } } - if (checkSubmissions && p.hasQueuedSubmissions()) { - activate(); - ForkJoinTask t = p.pollSubmission(); - if (t != null) - return t; - } - else { - long ec = eventCount; // restart on pool event - if ((eventCount = p.getEventCount()) == ec) + for (;;) { // Try to help v, using specialized form of deqTask + if (joinMe.status < 0) + return; + int b = v.base; + ForkJoinTask[] q = v.queue; + if (b == v.sp || q == null) break; + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; + ForkJoinTask t = q[i]; + int pid = poolIndex; + ForkJoinTask ps = currentSteal; + if (task.status < 0) + return; // stale or done + if (t != null && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + return; // back out on cancel + } + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, currentStealOffset, t); + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, ps); + } } - } - return null; - } - - /** - * Callback from pool.sync to rescan before blocking. If a - * task is found, it is pushed so it can be executed upon return. - * @return true if found and pushed a task - */ - final boolean prescan() { - ForkJoinTask t = scan(null, true); - if (t != null) { - pushTask(t); - return true; - } - else { - inactivate(); - return false; + // Try to descend to find v's stealer + ForkJoinTask next = v.currentJoin; + if (task.status < 0 || next == null || next == task || + joinMe.status < 0) + return; + task = next; + thread = v; } } - // Support for ForkJoinTask methods - - /** - * Scan, returning early if joinMe done - */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = scan(joinMe, false); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; - } - return t; - } - /** - * Pops or steals a task - * @return task, or null if none available + * Implements ForkJoinTask.getSurplusQueuedTaskCount(). + * Returns an estimate of the number of tasks, offset by a + * function of number of idle workers. + * + * This method provides a cheap heuristic guide for task + * partitioning when programmers, frameworks, tools, or languages + * have little or no idea about task granularity. In essence by + * offering this method, we ask users only about tradeoffs in + * overhead vs expected throughput and its variance, rather than + * how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, each + * thread makes available for stealing enough tasks for other + * threads to remain active. Inductively, if all threads play by + * the same rules, each thread should make available only a + * constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of 1 + * would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should minimize + * steal rates, which in general means that threads nearer the top + * of computation tree should generate more than those nearer the + * bottom. In perfect steady state, each thread is at + * approximately the same level of computation tree. However, + * producing extra tasks amortizes the uncertainty of progress and + * diffusion assumptions. + * + * So, users will want to use values larger, but not much larger + * than 1 to both smooth over transient shortages and hedge + * against uneven progress; as traded off against the cost of + * extra task overhead. We leave the user to pick a threshold + * value to compare with the results of this call to guide + * decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to estimate + * surplus strictly locally. In steady-state, if one thread is + * maintaining say 2 surplus tasks, then so are others. So we can + * just use estimated queue length (although note that (sp - base) + * can be an overestimate because of stealers lagging increments + * of base). However, this strategy alone leads to serious + * mis-estimates in some non-steady-state conditions (ramp-up, + * ramp-down, other stalls). We can detect many of these by + * further considering the number of "idle" threads, that are + * known to have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. */ - final ForkJoinTask pollLocalOrStolenTask() { - ForkJoinTask t; - return (t = popTask()) == null? scan(null, false) : t; + final int getEstimatedSurplusTaskCount() { + return sp - base - pool.idlePerActive(); } /** - * Runs tasks until pool isQuiescent + * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { + ForkJoinTask ps = currentSteal; // to restore below for (;;) { - ForkJoinTask t = pollLocalOrStolenTask(); - if (t != null) { - activate(); + ForkJoinTask t = pollLocalTask(); + if (t != null || (t = scan()) != null) t.quietlyExec(); - } else { - inactivate(); - if (pool.isQuiescent()) { - activate(); // re-activate on exit - break; + ForkJoinPool p = pool; + int a; // to inline CASes + if (active) { + if (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)) + continue; // retry later + active = false; // inactivate + UNSAFE.putOrderedObject(this, currentStealOffset, ps); + } + if (p.isQuiescent()) { + active = true; // re-activate + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a+1)); + return; } } } } - /** - * Returns an estimate of the number of tasks in the queue. - */ - final int getQueueSize() { - int n = sp - base; - return n <= 0? 0 : n; // suppress momentarily negative values - } - - /** - * Returns an estimate of the number of tasks, offset by a - * function of number of idle workers. - */ - final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); - } - - // Per-worker exported random numbers + // Unsafe mechanics - // Same constants as java.util.Random - final static long JURandomMultiplier = 0x5DEECE66DL; - final static long JURandomAddend = 0xBL; - final static long JURandomMask = (1L << 48) - 1; - - private final int nextJURandom(int bits) { - long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) & - JURandomMask; - juRandomSeed = next; - return (int)(next >>> (48 - bits)); - } - - private final int nextJURandomInt(int n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - int bits = nextJURandom(31); - if ((n & -n) == n) - return (int)((n * (long)bits) >> 31); + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); + private static final long runStateOffset = + objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long currentJoinOffset = + objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); + private static final long currentStealOffset = + objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); + private static final long qBase = + UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + private static final long poolRunStateOffset = // to inline CAS + objectFieldOffset("runState", ForkJoinPool.class); - for (;;) { - int val = bits % n; - if (bits - val + (n-1) >= 0) - return val; - bits = nextJURandom(31); - } - } + private static final int qShift; - private final long nextJURandomLong() { - return ((long)(nextJURandom(32)) << 32) + nextJURandom(32); + static { + int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); } - private final long nextJURandomLong(long n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - long offset = 0; - while (n >= Integer.MAX_VALUE) { // randomly pick half range - int bits = nextJURandom(2); // 2nd bit for odd vs even split - long half = n >>> 1; - long nextn = ((bits & 2) == 0)? half : n - half; - if ((bits & 1) == 0) - offset += n - nextn; - n = nextn; + private static long objectFieldOffset(String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; } - return offset + nextJURandomInt((int)n); - } - - private final double nextJURandomDouble() { - return (((long)(nextJURandom(26)) << 27) + nextJURandom(27)) - / (double)(1L << 53); - } - - /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt} - * @return the next pseudorandom, uniformly distributed {@code int} - * value from this worker's random number generator's sequence - */ - public static int nextRandomInt() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandom(32); - } - - /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt(int)} - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next pseudorandom, uniformly distributed {@code int} - * value between {@code 0} (inclusive) and {@code n} (exclusive) - * from this worker's random number generator's sequence - * @throws IllegalArgumentException if n is not positive - */ - public static int nextRandomInt(int n) { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomInt(n); - } - - /** - * Returns a random long using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextLong} - * @return the next pseudorandom, uniformly distributed {@code long} - * value from this worker's random number generator's sequence - */ - public static long nextRandomLong() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomLong(); } /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt(int)} - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next pseudorandom, uniformly distributed {@code int} - * value between {@code 0} (inclusive) and {@code n} (exclusive) - * from this worker's random number generator's sequence - * @throws IllegalArgumentException if n is not positive - */ - public static long nextRandomLong(long n) { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomLong(n); - } - - /** - * Returns a random double using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextDouble} - * @return the next pseudorandom, uniformly distributed {@code double} - * value between {@code 0.0} and {@code 1.0} from this - * worker's random number generator's sequence + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe */ - public static double nextRandomDouble() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomDouble(); - } - - // Temporary Unsafe mechanics for preliminary release - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long qBase; - static final int qShift; - static final long runStateOffset; - static { + private static sun.misc.Unsafe getUnsafe() { try { - if (ForkJoinWorkerThread.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - baseOffset = _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField("base")); - spOffset = _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField("sp")); - runStateOffset = _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField("runState")); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Exception e) { - throw new RuntimeException("Could not initialize intrinsics", e); + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } } } }