--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/07/20 21:45:06 1.8 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/09/20 20:42:37 1.51 @@ -5,220 +5,328 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Random; +import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * methods surrounding the main task processing loop. If you do - * create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. + * are no overridable methods dealing with scheduling or execution. + * However, you can override initialization and termination methods + * surrounding the main task processing loop. If you do create such a + * subclass, you will also need to supply a custom {@link + * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code + * ForkJoinPool}. * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { /* - * Algorithm overview: + * Overview: + * + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform + * ForkJoinTasks. This class includes bookkeeping in support of + * worker activation, suspension, and lifecycle control described + * in more detail in the internal documentation of class + * ForkJoinPool. And as described further below, this class also + * includes special-cased support for some ForkJoinTask + * methods. But the main mechanics involve work-stealing: * - * 1. Work-Stealing: Work-stealing queues are special forms of - * Deques that support only three of the four possible - * end-operations -- push, pop, and deq (aka steal), and only do - * so under the constraints that push and pop are called only from - * the owning thread, while deq may be called from other threads. - * (If you are unfamiliar with them, you probably want to read - * Herlihy and Shavit's book "The Art of Multiprocessor - * programming", chapter 16 describing these in more detail before - * proceeding.) The main work-stealing queue design is roughly - * similar to "Dynamic Circular Work-Stealing Deque" by David - * Chase and Yossi Lev, SPAA 2005 - * (http://research.sun.com/scalable/pubs/index.html). The main - * difference ultimately stems from gc requirements that we null - * out taken slots as soon as we can, to maintain as small a - * footprint as possible even in programs generating huge numbers - * of tasks. To accomplish this, we shift the CAS arbitrating pop - * vs deq (steal) from being on the indices ("base" and "sp") to - * the slots themselves (mainly via method "casSlotNull()"). So, - * both a successful pop and deq mainly entail CAS'ing a nonnull - * slot to null. Because we rely on CASes of references, we do - * not need tag bits on base or sp. They are simple ints as used - * in any circular array-based queue (see for example ArrayDeque). - * Updates to the indices must still be ordered in a way that - * guarantees that (sp - base) > 0 means the queue is empty, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or deq have not fully - * committed. Note that this means that the deq operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probablistic non-blockingness. If - * an attempted steal fails, a thief always chooses a different + * Work-stealing queues are special forms of Deques that support + * only three of the four possible end-operations -- push, pop, + * and deq (aka steal), under the further constraints that push + * and pop are called only from the owning thread, while deq may + * be called from other threads. (If you are unfamiliar with + * them, you probably want to read Herlihy and Shavit's book "The + * Art of Multiprocessor programming", chapter 16 describing these + * in more detail before proceeding.) The main work-stealing + * queue design is roughly similar to those in the papers "Dynamic + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 + * (http://research.sun.com/scalable/pubs/index.html) and + * "Idempotent work stealing" by Michael, Saraswat, and Vechev, + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). + * The main differences ultimately stem from gc requirements that + * we null out taken slots as soon as we can, to maintain as small + * a footprint as possible even in programs generating huge + * numbers of tasks. To accomplish this, we shift the CAS + * arbitrating pop vs deq (steal) from being on the indices + * ("base" and "sp") to the slots themselves (mainly via method + * "casSlotNull()"). So, both a successful pop and deq mainly + * entail a CAS of a slot from non-null to null. Because we rely + * on CASes of references, we do not need tag bits on base or sp. + * They are simple ints as used in any circular array-based queue + * (see for example ArrayDeque). Updates to the indices must + * still be ordered in a way that guarantees that sp == base means + * the queue is empty, but otherwise may err on the side of + * possibly making the queue appear nonempty when a push, pop, or + * deq have not fully committed. Note that this means that the deq + * operation, considered individually, is not wait-free. One thief + * cannot successfully continue until another in-progress one (or, + * if previously empty, a push) completes. However, in the + * aggregate, we ensure at least probabilistic non-blockingness. + * If an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress deq or new push on * any empty queue to complete. One reason this works well here is * that apparently-nonempty often means soon-to-be-stealable, - * which gives threads a chance to activate if necessary before - * stealing (see below). + * which gives threads a chance to set activation status if + * necessary before stealing. + * + * This approach also enables support for "async mode" where local + * task processing is in FIFO, not LIFO order; simply by using a + * version of deq rather than pop when locallyFifo is true (as set + * by the ForkJoinPool). This allows use in message-passing + * frameworks in which tasks are never joined. + * + * When a worker would otherwise be blocked waiting to join a + * task, it first tries a form of linear helping: Each worker + * records (in field currentSteal) the most recent task it stole + * from some other worker. Plus, it records (in field currentJoin) + * the task it is currently actively joining. Method joinTask uses + * these markers to try to find a worker to help (i.e., steal back + * a task from and execute it) that could hasten completion of the + * actively joined task. In essence, the joiner executes a task + * that would be on its own local deque had the to-be-joined task + * not been stolen. This may be seen as a conservative variant of + * the approach in Wagner & Calder "Leapfrogging: a portable + * technique for implementing efficient futures" SIGPLAN Notices, + * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs + * in that: (1) We only maintain dependency links across workers + * upon steals, rather than use per-task bookkeeping. This may + * require a linear scan of workers array to locate stealers, but + * usually doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. This isolates cost to + * when it is needed, rather than adding to per-task overhead. + * (2) It is "shallow", ignoring nesting and potentially cyclic + * mutual steals. (3) It is intentionally racy: field currentJoin + * is updated only while actively joining, which means that we + * miss links in the chain during long-lived tasks, GC stalls etc + * (which is OK since blocking in such cases is usually a good + * idea). (4) We bound the number of attempts to find work (see + * MAX_HELP_DEPTH) and fall back to suspending the worker and if + * necessary replacing it with a spare (see + * ForkJoinPool.awaitJoin). * - * Efficient implementation of this approach currently relies on - * an uncomfortable amount of "Unsafe" mechanics. To maintain + * Efficient implementation of these algorithms currently relies + * on an uncomfortable amount of "Unsafe" mechanics. To maintain * correct orderings, reads and writes of variable base require - * volatile ordering. Variable sp does not require volatile write - * but needs cheaper store-ordering on writes. Because they are - * protected by volatile base reads, reads of the queue array and - * its slots do not need volatile load semantics, but writes (in - * push) require store order and CASes (in pop and deq) require - * (volatile) CAS semantics. Since these combinations aren't - * supported using ordinary volatiles, the only way to accomplish - * these effciently is to use direct Unsafe calls. (Using external - * AtomicIntegers and AtomicReferenceArrays for the indices and - * array is significantly slower because of memory locality and - * indirection effects.) Further, performance on most platforms is - * very sensitive to placement and sizing of the (resizable) queue - * array. Even though these queues don't usually become all that - * big, the initial size must be large enough to counteract cache + * volatile ordering. Variable sp does not require volatile + * writes but still needs store-ordering, which we accomplish by + * pre-incrementing sp before filling the slot with an ordered + * store. (Pre-incrementing also enables backouts used in + * joinTask.) Because they are protected by volatile base reads, + * reads of the queue array and its slots by other threads do not + * need volatile load semantics, but writes (in push) require + * store order and CASes (in pop and deq) require (volatile) CAS + * semantics. (Michael, Saraswat, and Vechev's algorithm has + * similar properties, but without support for nulling slots.) + * Since these combinations aren't supported using ordinary + * volatiles, the only way to accomplish these efficiently is to + * use direct Unsafe calls. (Using external AtomicIntegers and + * AtomicReferenceArrays for the indices and array is + * significantly slower because of memory locality and indirection + * effects.) + * + * Further, performance on most platforms is very sensitive to + * placement and sizing of the (resizable) queue array. Even + * though these queues don't usually become all that big, the + * initial size must be large enough to counteract cache * contention effects across multiple queues (especially in the * presence of GC cardmarking). Also, to improve thread-locality, - * queues are currently initialized immediately after the thread - * gets the initial signal to start processing tasks. However, - * all queue-related methods except pushTask are written in a way - * that allows them to instead be lazily allocated and/or disposed - * of when empty. All together, these low-level implementation - * choices produce as much as a factor of 4 performance - * improvement compared to naive implementations, and enable the - * processing of billions of tasks per second, sometimes at the - * expense of ugliness. - * - * 2. Run control: The primary run control is based on a global - * counter (activeCount) held by the pool. It uses an algorithm - * similar to that in Herlihy and Shavit section 17.6 to cause - * threads to eventually block when all threads declare they are - * inactive. (See variable "scans".) For this to work, threads - * must be declared active when executing tasks, and before - * stealing a task. They must be inactive before blocking on the - * Pool Barrier (awaiting a new submission or other Pool - * event). In between, there is some free play which we take - * advantage of to avoid contention and rapid flickering of the - * global activeCount: If inactive, we activate only if a victim - * queue appears to be nonempty (see above). Similarly, a thread - * tries to inactivate only after a full scan of other threads. - * The net effect is that contention on activeCount is rarely a - * measurable performance issue. (There are also a few other cases - * where we scan for work rather than retry/block upon - * contention.) - * - * 3. Selection control. We maintain policy of always choosing to - * run local tasks rather than stealing, and always trying to - * steal tasks before trying to run a new submission. All steals - * are currently performed in randomly-chosen deq-order. It may be - * worthwhile to bias these with locality / anti-locality - * information, but doing this well probably requires more - * lower-level information from JVMs than currently provided. + * queues are initialized after starting. All together, these + * low-level implementation choices produce as much as a factor of + * 4 performance improvement compared to naive implementations, + * and enable the processing of billions of tasks per second, + * sometimes at the expense of ugliness. */ /** + * Generator for initial random seeds for random victim + * selection. This is used only to create initial seeds. Random + * steals use a cheaper xorshift generator per steal attempt. We + * expect only rare contention on seedGenerator, so just use a + * plain Random. + */ + private static final Random seedGenerator = new Random(); + + /** + * The maximum stolen->joining link depth allowed in helpJoinTask. + * Depths for legitimate chains are unbounded, but we use a fixed + * constant to avoid (otherwise unchecked) cycles and bound + * staleness of traversal parameters at the expense of sometimes + * blocking when we could be helping. + */ + private static final int MAX_HELP_DEPTH = 8; + + /** * Capacity of work-stealing queue array upon initialization. - * Must be a power of two. Initial size must be at least 2, but is + * Must be a power of two. Initial size must be at least 4, but is * padded to minimize cache effects. */ private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum work-stealing queue array size. Must be less than or - * equal to 1 << 28 to ensure lack of index wraparound. (This - * is less than usual bounds, because we need leftshift by 3 - * to be in int range). + * equal to 1 << (31 - width of array entry) to ensure lack of + * index wraparound. The value is set in the static block + * at the end of this file after obtaining width. */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; + private static final int MAXIMUM_QUEUE_CAPACITY; /** - * The pool this thread works in. Accessed directly by ForkJoinTask + * The pool this thread works in. Accessed directly by ForkJoinTask. */ final ForkJoinPool pool; /** * The work-stealing queue array. Size must be a power of two. - * Initialized when thread starts, to improve memory locality. + * Initialized in onStart, to improve memory locality. */ private ForkJoinTask[] queue; /** + * Index (mod queue.length) of least valid queue slot, which is + * always the next position to steal from if nonempty. + */ + private volatile int base; + + /** * Index (mod queue.length) of next queue slot to push to or pop - * from. It is written only by owner thread, via ordered store. - * Both sp and base are allowed to wrap around on overflow, but - * (sp - base) still estimates size. + * from. It is written only by owner thread, and accessed by other + * threads only after reading (volatile) base. Both sp and base + * are allowed to wrap around on overflow, but (sp - base) still + * estimates size. */ - private volatile int sp; + private int sp; /** - * Index (mod queue.length) of least valid queue slot, which is - * always the next position to steal from if nonempty. + * The index of most recent stealer, used as a hint to avoid + * traversal in method helpJoinTask. This is only a hint because a + * worker might have had multiple steals and this only holds one + * of them (usually the most current). Declared non-volatile, + * relying on other prevailing sync to keep reasonably current. */ - private volatile int base; + private int stealHint; /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * calling pool.sync + * Run state of this worker. In addition to the usual run levels, + * tracks if this worker is suspended as a spare, and if it was + * killed (trimmed) while suspended. However, "active" status is + * maintained separately and modified only in conjunction with + * CASes of the pool's runState (which are currently sadly + * manually inlined for performance.) Accessed directly by pool + * to simplify checks for normal (zero) status. */ - private boolean active; + volatile int runState; + + private static final int TERMINATING = 0x01; + private static final int TERMINATED = 0x02; + private static final int SUSPENDED = 0x04; // inactive spare + private static final int TRIMMED = 0x08; // killed while suspended /** - * Run state of this worker. Supports simple versions of the usual - * shutdown/shutdownNow control. + * Number of steals. Directly accessed (and reset) by + * pool.tryAccumulateStealCount when idle. */ - private volatile int runState; + int stealCount; /** * Seed for random number generator for choosing steal victims. - * Uses Marsaglia xorshift. Must be nonzero upon initialization. + * Uses Marsaglia xorshift. Must be initialized as nonzero. */ private int seed; /** - * Number of steals, transferred to pool when idle + * Activity status. When true, this worker is considered active. + * Accessed directly by pool. Must be false upon construction. */ - private int stealCount; + boolean active; + + /** + * True if use local fifo, not default lifo, for local polling. + * Shadows value from ForkJoinPool. + */ + private final boolean locallyFifo; /** * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * running, and accessed directly by pool to locate this worker in + * its workers array. */ int poolIndex; /** - * The last barrier event waited for. Accessed in pool callback - * methods, but only by current thread. + * The last pool event waited for. Accessed only by pool in + * callback methods invoked within this thread. + */ + int lastEventCount; + + /** + * Encoded index and event count of next event waiter. Accessed + * only by ForkJoinPool for managing event waiters. + */ + volatile long nextWaiter; + + /** + * Number of times this thread suspended as spare. Accessed only + * by pool. + */ + int spareCount; + + /** + * Encoded index and count of next spare waiter. Accessed only + * by ForkJoinPool for managing spares. + */ + volatile int nextSpare; + + /** + * The task currently being joined, set only when actively trying + * to help other stealers in helpJoinTask. Written only by this + * thread, but read by others. */ - long lastEventCount; + private volatile ForkJoinTask currentJoin; /** - * True if use local fifo, not default lifo, for local polling + * The task most recently stolen from another worker (or + * submission queue). Written only by this thread, but read by + * others. */ - private boolean locallyFifo; + private volatile ForkJoinTask currentSteal; /** * Creates a ForkJoinWorkerThread operating in the given pool. + * * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { - if (pool == null) throw new NullPointerException(); this.pool = pool; - // Note: poolIndex is set by pool during construction - // Remaining initialization is deferred to onStart + this.locallyFifo = pool.locallyFifo; + setDaemon(true); + // To avoid exposing construction details to subclasses, + // remaining initialization is in start() and onStart() + } + + /** + * Performs additional initialization and starts this thread. + */ + final void start(int poolIndex, UncaughtExceptionHandler ueh) { + this.poolIndex = poolIndex; + if (ueh != null) + setUncaughtExceptionHandler(ueh); + start(); } - // Public access methods + // Public/protected methods /** - * Returns the pool hosting this thread + * Returns the pool hosting this thread. + * * @return the pool */ public ForkJoinPool getPool() { @@ -231,88 +339,64 @@ public class ForkJoinWorkerThread extend * threads (minus one) that have ever been created in the pool. * This method may be useful for applications that track status or * collect results per-worker rather than per-task. - * @return the index number. + * + * @return the index number */ public int getPoolIndex() { return poolIndex; } /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. - * @param async if true, use locally FIFO scheduling + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke @code{super.onStart()} at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. */ - void setAsyncMode(boolean async) { - locallyFifo = async; - } - - // Runstate management - - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } + protected void onStart() { + int rs = seedGenerator.nextInt(); + seed = rs == 0? 1 : rs; // seed must be nonzero - /** - * Transition to at least the given state. Return true if not - * already at least given state. - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } + // Allocate name string and arrays in this thread + String pid = Integer.toString(pool.getPoolNumber()); + String wid = Integer.toString(poolIndex); + setName("ForkJoinPool-" + pid + "-worker-" + wid); - /** - * Try to set status to active; fail on contention - */ - private boolean tryActivate() { - if (!active) { - if (!pool.tryIncrementActiveCount()) - return false; - active = true; - } - return true; + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; } /** - * Try to set status to active; fail on contention + * Performs cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * {@code super.onTermination} at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or {@code null} if completed normally */ - private boolean tryInactivate() { - if (active) { - if (!pool.tryDecrementActiveCount()) - return false; - active = false; + protected void onTermination(Throwable exception) { + try { + ForkJoinPool p = pool; + if (active) { + int a; // inline p.tryDecrementActiveCount + active = false; + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)); + } + cancelTasks(); + setTerminated(); + p.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + UNSAFE.throwException(exception); } - return true; } /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - */ - private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; - } - - // Lifecycle methods - - /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute * ForkJoinTasks. @@ -321,7 +405,6 @@ public class ForkJoinWorkerThread extend Throwable exception = null; try { onStart(); - pool.sync(this); // await first pool event mainLoop(); } catch (Throwable ex) { exception = ex; @@ -330,132 +413,150 @@ public class ForkJoinWorkerThread extend } } + // helpers for run() + /** - * Execute tasks until shut down. + * Finds and executes tasks, and checks status while running. */ private void mainLoop() { - while (!isShutdown()) { - ForkJoinTask t = pollTask(); - if (t != null || (t = pollSubmission()) != null) - t.quietlyExec(); - else if (tryInactivate()) - pool.sync(this); + boolean ran = false; // true if ran a task on last step + ForkJoinPool p = pool; + for (;;) { + p.preStep(this, ran); + if (runState != 0) + break; + ran = tryExecSteal() || tryExecSubmission(); } } /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. + * Tries to steal a task and execute it. + * + * @return true if ran a task */ - protected void onStart() { - // Allocate while starting to improve chances of thread-local - // isolation - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - // Initial value of seed need not be especially random but - // should differ across workers and must be nonzero - int p = poolIndex + 1; - seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + private boolean tryExecSteal() { + ForkJoinTask t; + if ((t = scan()) != null) { + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; + } + return false; } /** - * Perform cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. + * If a submission exists, try to activate and run it. * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. + * @return true if ran a task */ - protected void onTermination(Throwable exception) { - // Execute remaining local tasks unless aborting or terminating - while (exception == null && !pool.isTerminating() && base != sp) { - try { - ForkJoinTask t = popTask(); - if (t != null) + private boolean tryExecSubmission() { + ForkJoinPool p = pool; + // This loop is needed in case attempt to activate fails, in + // which case we only retry if there still appears to be a + // submission. + while (p.hasQueuedSubmissions()) { + ForkJoinTask t; int a; + if (active || // inline p.tryIncrementActiveCount + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) { + if ((t = p.pollSubmission()) != null) { + UNSAFE.putOrderedObject(this, currentStealOffset, t); t.quietlyExec(); - } catch(Throwable ex) { - exception = ex; + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; + } } } - // Cancel other tasks, transition status, notify pool, and - // propagate exception to uncaught exception handler - try { - do;while (!tryInactivate()); // ensure inactive - cancelTasks(); - runState = TERMINATED; - pool.workerTerminated(this); - } catch (Throwable ex) { // Shouldn't ever happen - if (exception == null) // but if so, at least rethrown - exception = ex; - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } + return false; } - // Intrinsics-based support for queue operations. - /** - * Add in store-order the given task at given slot of q to - * null. Caller must ensure q is nonnull and index is in range. + * Runs local tasks until queue is empty or shut down. Call only + * while active. */ - private static void setSlot(ForkJoinTask[] q, int i, - ForkJoinTask t){ - _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); + private void execLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); + if (t != null) + t.quietlyExec(); + else if (sp == base) + break; + } } + /* + * Intrinsics-based atomic writes for queue slots. These are + * basically the same as methods in AtomicReferenceArray, but + * specialized for (1) ForkJoinTask elements (2) requirement that + * nullness and bounds checks have already been performed by + * callers and (3) effective offsets are known not to overflow + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't + * need corresponding version for reads: plain array reads are OK + * because they are protected by other volatile reads and are + * confirmed by CASes. + * + * Most uses don't actually call these methods, but instead contain + * inlined forms that enable more predictable optimization. We + * don't define the version of write used in pushTask at all, but + * instead inline there a store-fenced array slot write. + */ + /** - * CAS given slot of q to null. Caller must ensure q is nonnull - * and index is in range. + * CASes slot i of array q from t to null. Caller must ensure q is + * non-null and index is in range. */ - private static boolean casSlotNull(ForkJoinTask[] q, int i, - ForkJoinTask t) { - return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); + private static final boolean casSlotNull(ForkJoinTask[] q, int i, + ForkJoinTask t) { + return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } /** - * Sets sp in store-order. + * Performs a volatile write of the given task at given slot of + * array q. Caller must ensure q is non-null and index is in + * range. This method is used only during resets and backouts. */ - private void storeSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + private static final void writeSlot(ForkJoinTask[] q, int i, + ForkJoinTask t) { + UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } - // Main queue methods + // queue methods /** - * Pushes a task. Called only by current thread. - * @param t the task. Caller must ensure nonnull + * Pushes a task. Call only from this thread. + * + * @param t the task. Caller must ensure non-null. */ final void pushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp; - setSlot(q, s & mask, t); - storeSp(++s); - if ((s -= base) == 1) - pool.signalWork(); - else if (s >= mask) - growQueue(); + int mask = q.length - 1; // implicit assert q != null + int s = sp++; // ok to increment sp before slot write + UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); + if ((s -= base) == 0) + pool.signalWork(); // was empty + else if (s == mask) + growQueue(); // is full } /** * Tries to take a task from the base of the queue, failing if - * either empty or contended. - * @return a task, or null if none or contended. + * empty or contended. Note: Specializations of this code appear + * in locallyDeqTask and elsewhere. + * + * @return a task, or null if none or contended */ final ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; - int i; - int b; + int b, i; if (sp != (b = base) && (q = queue) != null && // must read q after b - (t = q[i = (q.length - 1) & b]) != null && - casSlotNull(q, i, t)) { + (t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { base = b + 1; return t; } @@ -463,52 +564,80 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Ensures active status - * if nonnull. Called only by current thread. + * Tries to take a task from the base of own queue. Assumes active + * status. Called only by this thread. + * + * @return a task, or null if none + */ + final ForkJoinTask locallyDeqTask() { + ForkJoinTask[] q = queue; + if (q != null) { + ForkJoinTask t; + int b, i; + while (sp != (b = base)) { + if ((t = q[i = (q.length - 1) & b]) != null && base == b && + UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, + t, null)) { + base = b + 1; + return t; + } + } + } + return null; + } + + /** + * Returns a popped task, or null if empty. Assumes active status. + * Called only by this thread. */ - final ForkJoinTask popTask() { - int s = sp; - while (s != base) { - if (tryActivate()) { - ForkJoinTask[] q = queue; - int mask = q.length - 1; - int i = (s - 1) & mask; + private ForkJoinTask popTask() { + ForkJoinTask[] q = queue; + if (q != null) { + int s; + while ((s = sp) != base) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset ForkJoinTask t = q[i]; - if (t == null || !casSlotNull(q, i, t)) + if (t == null) // lost to stealer break; - storeSp(s - 1); - return t; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); + return t; + } } } return null; } /** - * Specialized version of popTask to pop only if - * topmost element is the given task. Called only - * by current thread while active. - * @param t the task. Caller must ensure nonnull + * Specialized version of popTask to pop only if topmost element + * is the given task. Called only by this thread while active. + * + * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { + int s; ForkJoinTask[] q = queue; - int mask = q.length - 1; - int s = sp - 1; - if (casSlotNull(q, s & mask, t)) { - storeSp(s); + if ((s = sp) != base && q != null && + UNSAFE.compareAndSwapObject + (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); return true; } return false; } /** - * Returns next task. + * Returns next task, or null if empty or contended. */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; if (q == null) return null; int mask = q.length - 1; - int i = locallyFifo? base : (sp - 1); + int i = locallyFifo ? base : (sp - 1); return q[i & mask]; } @@ -534,240 +663,551 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = oldQ[oldIndex]; if (t != null && !casSlotNull(oldQ, oldIndex, t)) t = null; - setSlot(newQ, b & newMask, t); + writeSlot(newQ, b & newMask, t); } while (++b != bf); pool.signalWork(); } /** + * Computes next value for random victim probe in scan(). Scans + * don't require a very high quality generator, but also not a + * crummy one. Marsaglia xor-shift is cheap and works well enough. + * Note: This is manually inlined in scan(). + */ + private static final int xorShift(int r) { + r ^= r << 13; + r ^= r >>> 17; + return r ^ (r << 5); + } + + /** * Tries to steal a task from another worker. Starts at a random * index of workers array, and probes workers until finding one * with non-empty queue or finding that all are empty. It * randomly selects the first n probes. If these are empty, it - * resorts to a full circular traversal, which is necessary to - * accurately set active status by caller. Also restarts if pool - * events occurred since last scan, which forces refresh of - * workers array, in case barrier was associated with resize. + * resorts to a circular sweep, which is necessary to accurately + * set active status. (The circular sweep uses steps of + * approximately half the array size plus 1, to avoid bias + * stemming from leftmost packing of the array in ForkJoinPool.) * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than - * those needed to check for and take tasks. This accounts for, - * among other things, updating random seed in place without - * storing it until exit. + * those needed to check for and take tasks (or to activate if not + * already active). This accounts for, among other things, + * updating random seed in place without storing it until exit. * * @return a task, or null if none found */ private ForkJoinTask scan() { - ForkJoinTask t = null; - int r = seed; // extract once to keep scan quiet - ForkJoinWorkerThread[] ws; // refreshed on outer loop - int mask; // must be power 2 minus 1 and > 0 - outer:do { - if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { - int idx = r; - int probes = ~mask; // use random index while negative - for (;;) { - r = xorShift(r); // update random seed - ForkJoinWorkerThread v = ws[mask & idx]; - if (v == null || v.sp == v.base) { - if (probes <= mask) - idx = (probes++ < 0)? r : (idx + 1); - else - break; + ForkJoinPool p = pool; + ForkJoinWorkerThread[] ws; // worker array + int n; // upper bound of #workers + if ((ws = p.workers) != null && (n = ws.length) > 1) { + boolean canSteal = active; // shadow active status + int r = seed; // extract seed once + int mask = n - 1; + int j = -n; // loop counter + int k = r; // worker index, random if j < 0 + for (;;) { + ForkJoinWorkerThread v = ws[k & mask]; + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift + ForkJoinTask[] q; ForkJoinTask t; int b, a; + if (v != null && (b = v.base) != v.sp && + (q = v.queue) != null) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + int pid = poolIndex; + if ((t = q[i]) != null) { + if (!canSteal && // inline p.tryIncrementActiveCount + UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1)) + canSteal = active = true; + if (canSteal && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, + currentStealOffset, t); + seed = r; + ++stealCount; + return t; + } } - else if (!tryActivate() || (t = v.deqTask()) == null) - continue outer; // restart on contention - else - break outer; + j = -n; + k = r; // restart on contention } + else if (++j <= 0) + k = r; + else if (j <= n) + k += (n >>> 1) | 1; + else + break; } - } while (pool.hasNewSyncEvent(this)); // retry on pool events - seed = r; - return t; + } + return null; + } + + // Run State management + + // status check methods used mainly by ForkJoinPool + final boolean isRunning() { return runState == 0; } + final boolean isTerminated() { return (runState & TERMINATED) != 0; } + final boolean isSuspended() { return (runState & SUSPENDED) != 0; } + final boolean isTrimmed() { return (runState & TRIMMED) != 0; } + + final boolean isTerminating() { + if ((runState & TERMINATING) != 0) + return true; + if (pool.isAtLeastTerminating()) { // propagate pool state + shutdown(); + return true; + } + return false; } /** - * gets and removes a local or stolen a task - * @return a task, if available + * Sets state to TERMINATING. Does NOT unpark or interrupt + * to wake up if currently blocked. Callers must do so if desired. */ - final ForkJoinTask pollTask() { - ForkJoinTask t = locallyFifo? deqTask() : popTask(); - if (t == null && (t = scan()) != null) - ++stealCount; - return t; + final void shutdown() { + for (;;) { + int s = runState; + if ((s & (TERMINATING|TERMINATED)) != 0) + break; + if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + (s & ~SUSPENDED) | + (TRIMMED|TERMINATING))) + break; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | TERMINATING)) + break; + } } /** - * gets a local task - * @return a task, if available + * Sets state to TERMINATED. Called only by onTermination(). */ - final ForkJoinTask pollLocalTask() { - return locallyFifo? deqTask() : popTask(); + private void setTerminated() { + int s; + do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, + s = runState, + s | (TERMINATING|TERMINATED))); + } + + /** + * If suspended, tries to set status to unsuspended. + * Does NOT wake up if blocked. + * + * @return true if successful + */ + final boolean tryUnsuspend() { + int s; + while (((s = runState) & SUSPENDED) != 0) { + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)) + return true; + } + return false; } /** - * Returns a pool submission, if one exists, activating first. - * @return a submission, if available + * Sets suspended status and blocks as spare until resumed + * or shutdown. */ - private ForkJoinTask pollSubmission() { + final void suspendAsSpare() { + for (;;) { // set suspended unless terminating + int s = runState; + if ((s & TERMINATING) != 0) { // must kill + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | (TRIMMED | TERMINATING))) + return; + } + else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s | SUSPENDED)) + break; + } ForkJoinPool p = pool; - while (p.hasQueuedSubmissions()) { - ForkJoinTask t; - if (tryActivate() && (t = p.pollSubmission()) != null) - return t; + p.pushSpare(this); + while ((runState & SUSPENDED) != 0) { + if (p.tryAccumulateStealCount(this)) { + interrupted(); // clear/ignore interrupts + if ((runState & SUSPENDED) == 0) + break; + LockSupport.park(this); + } } - return null; } - // Methods accessed only by Pool + // Misc support methods for ForkJoinPool + + /** + * Returns an estimate of the number of tasks in the queue. Also + * used by ForkJoinTask. + */ + final int getQueueSize() { + int n; // external calls must read base first + return (n = -base + sp) <= 0 ? 0 : n; + } /** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) - t.cancelIgnoringExceptions(); + ForkJoinTask cj = currentJoin; // try to cancel ongoing tasks + if (cj != null) { + currentJoin = null; + cj.cancelIgnoringExceptions(); + try { + this.interrupt(); // awaken wait + } catch (SecurityException ignore) { + } + } + ForkJoinTask cs = currentSteal; + if (cs != null) { + currentSteal = null; + cs.cancelIgnoringExceptions(); + } + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) + t.cancelIgnoringExceptions(); + } } /** - * Drains tasks to given collection c + * Drains tasks to given collection c. + * * @return the number of tasks drained */ - final int drainTasksTo(Collection> c) { + final int drainTasksTo(Collection> c) { int n = 0; - ForkJoinTask t; - while (base != sp && (t = deqTask()) != null) { - c.add(t); - ++n; + while (base != sp) { + ForkJoinTask t = deqTask(); + if (t != null) { + c.add(t); + ++n; + } } return n; } + // Support methods for ForkJoinTask + /** - * Get and clear steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). + * Gets and removes a local task. + * + * @return a task, if available */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; + final ForkJoinTask pollLocalTask() { + ForkJoinPool p = pool; + while (sp != base) { + int a; // inline p.tryIncrementActiveCount + if (active || + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) + return locallyFifo ? locallyDeqTask() : popTask(); + } + return null; } /** - * Returns true if at least one worker in the given array appears - * to have at least one queued task. - * @param ws array of workers - */ - static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { - if (ws != null) { - int len = ws.length; - for (int j = 0; j < 2; ++j) { // need two passes for clean sweep - for (int i = 0; i < len; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.sp != w.base) - return true; - } - } + * Gets and removes a local or stolen task. + * + * @return a task, if available + */ + final ForkJoinTask pollTask() { + ForkJoinTask t = pollLocalTask(); + if (t == null) { + t = scan(); + // cannot retain/track/help steal + UNSAFE.putOrderedObject(this, currentStealOffset, null); } - return false; + return t; } - // Support methods for ForkJoinTask - /** - * Returns an estimate of the number of tasks in the queue. + * Possibly runs some tasks and/or blocks, until task is done. + * + * @param joinMe the task to join */ - final int getQueueSize() { - int n = sp - base; - return n < 0? 0 : n; // suppress momentarily negative values + final void joinTask(ForkJoinTask joinMe) { + // currentJoin only written by this thread; only need ordered store + ForkJoinTask prevJoin = currentJoin; + UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); + if (sp != base) + localHelpJoinTask(joinMe); + if (joinMe.status >= 0) + pool.awaitJoin(joinMe, this); + UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); } /** - * Returns an estimate of the number of tasks, offset by a - * function of number of idle workers. + * Run tasks in local queue until given task is done. + * + * @param joinMe the task to join */ - final int getEstimatedSurplusTaskCount() { - // The halving approximates weighting idle vs non-idle workers - return (sp - base) - (pool.getIdleThreadCount() >>> 1); + private void localHelpJoinTask(ForkJoinTask joinMe) { + int s; + ForkJoinTask[] q; + while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (t == null) // lost to a stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + /* + * This recheck (and similarly in helpJoinTask) + * handles cases where joinMe is independently + * cancelled or forced even though there is other work + * available. Back out of the pop by putting t back + * into slot before we commit by writing sp. + */ + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + break; + } + sp = s; + // UNSAFE.putOrderedInt(this, spOffset, s); + t.quietlyExec(); + } + } } /** - * Scan, returning early if joinMe done + * Unless terminating, tries to locate and help perform tasks for + * a stealer of the given task, or in turn one of its stealers. + * Traces currentSteal->currentJoin links looking for a thread + * working on a descendant of the given task and with a non-empty + * queue to steal back and execute tasks from. + * + * The implementation is very branchy to cope with potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or of length greater than MAX_HELP_DEPTH links. All + * of these cases are dealt with by just returning back to the + * caller, who is expected to retry if other join mechanisms also + * don't work out. + * + * @param joinMe the task to join */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = pollTask(); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; + final void helpJoinTask(ForkJoinTask joinMe) { + ForkJoinWorkerThread[] ws; + int n; + if (joinMe.status < 0) // already done + return; + if ((runState & TERMINATING) != 0) { // cancel if shutting down + joinMe.cancelIgnoringExceptions(); + return; + } + if ((ws = pool.workers) == null || (n = ws.length) <= 1) + return; // need at least 2 workers + + ForkJoinTask task = joinMe; // base of chain + ForkJoinWorkerThread thread = this; // thread with stolen task + for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length + // Try to find v, the stealer of task, by first using hint + ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; + if (v == null || v.currentSteal != task) { + for (int j = 0; ; ++j) { // search array + if (j < n) { + ForkJoinTask vs; + if ((v = ws[j]) != null && + (vs = v.currentSteal) != null) { + if (joinMe.status < 0 || task.status < 0) + return; // stale or done + if (vs == task) { + thread.stealHint = j; + break; // save hint for next time + } + } + } + else + return; // no stealer + } + } + for (;;) { // Try to help v, using specialized form of deqTask + if (joinMe.status < 0) + return; + int b = v.base; + ForkJoinTask[] q = v.queue; + if (b == v.sp || q == null) + break; + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; + ForkJoinTask t = q[i]; + int pid = poolIndex; + ForkJoinTask ps = currentSteal; + if (task.status < 0) + return; // stale or done + if (t != null && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + return; // back out on cancel + } + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, currentStealOffset, t); + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, ps); + } + } + // Try to descend to find v's stealer + ForkJoinTask next = v.currentJoin; + if (task.status < 0 || next == null || next == task || + joinMe.status < 0) + return; + task = next; + thread = v; } - return t; } /** - * Runs tasks until pool isQuiescent + * Implements ForkJoinTask.getSurplusQueuedTaskCount(). + * Returns an estimate of the number of tasks, offset by a + * function of number of idle workers. + * + * This method provides a cheap heuristic guide for task + * partitioning when programmers, frameworks, tools, or languages + * have little or no idea about task granularity. In essence by + * offering this method, we ask users only about tradeoffs in + * overhead vs expected throughput and its variance, rather than + * how finely to partition tasks. + * + * In a steady state strict (tree-structured) computation, each + * thread makes available for stealing enough tasks for other + * threads to remain active. Inductively, if all threads play by + * the same rules, each thread should make available only a + * constant number of tasks. + * + * The minimum useful constant is just 1. But using a value of 1 + * would require immediate replenishment upon each steal to + * maintain enough tasks, which is infeasible. Further, + * partitionings/granularities of offered tasks should minimize + * steal rates, which in general means that threads nearer the top + * of computation tree should generate more than those nearer the + * bottom. In perfect steady state, each thread is at + * approximately the same level of computation tree. However, + * producing extra tasks amortizes the uncertainty of progress and + * diffusion assumptions. + * + * So, users will want to use values larger, but not much larger + * than 1 to both smooth over transient shortages and hedge + * against uneven progress; as traded off against the cost of + * extra task overhead. We leave the user to pick a threshold + * value to compare with the results of this call to guide + * decisions, but recommend values such as 3. + * + * When all threads are active, it is on average OK to estimate + * surplus strictly locally. In steady-state, if one thread is + * maintaining say 2 surplus tasks, then so are others. So we can + * just use estimated queue length (although note that (sp - base) + * can be an overestimate because of stealers lagging increments + * of base). However, this strategy alone leads to serious + * mis-estimates in some non-steady-state conditions (ramp-up, + * ramp-down, other stalls). We can detect many of these by + * further considering the number of "idle" threads, that are + * known to have zero queued tasks, so compensate by a factor of + * (#idle/#active) threads. + */ + final int getEstimatedSurplusTaskCount() { + return sp - base - pool.idlePerActive(); + } + + /** + * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { + ForkJoinTask ps = currentSteal; // to restore below for (;;) { - ForkJoinTask t = pollTask(); - if (t != null) + ForkJoinTask t = pollLocalTask(); + if (t != null || (t = scan()) != null) t.quietlyExec(); - else if (tryInactivate() && pool.isQuiescent()) - break; + else { + ForkJoinPool p = pool; + int a; // to inline CASes + if (active) { + if (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)) + continue; // retry later + active = false; // inactivate + UNSAFE.putOrderedObject(this, currentStealOffset, ps); + } + if (p.isQuiescent()) { + active = true; // re-activate + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a+1)); + return; + } + } + } + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); + private static final long runStateOffset = + objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long currentJoinOffset = + objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); + private static final long currentStealOffset = + objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); + private static final long qBase = + UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + private static final long poolRunStateOffset = // to inline CAS + objectFieldOffset("runState", ForkJoinPool.class); + + private static final int qShift; + + static { + int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + qShift = 31 - Integer.numberOfLeadingZeros(s); + MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); + } + + private static long objectFieldOffset(String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; } - do;while (!tryActivate()); // re-activate on exit } - // Temporary Unsafe mechanics for preliminary release - private static Unsafe getUnsafe() throws Throwable { + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { try { - return Unsafe.getUnsafe(); + return sun.misc.Unsafe.getUnsafe(); } catch (SecurityException se) { try { return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - return getUnsafePrivileged(); + (new java.security + .PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + java.lang.reflect.Field f = sun.misc + .Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); }}); } catch (java.security.PrivilegedActionException e) { - throw e.getCause(); + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } } - - private static Unsafe getUnsafePrivileged() - throws NoSuchFieldException, IllegalAccessException { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } - - private static long fieldOffset(String fieldName) - throws NoSuchFieldException { - return _unsafe.objectFieldOffset - (ForkJoinWorkerThread.class.getDeclaredField(fieldName)); - } - - static final Unsafe _unsafe; - static final long baseOffset; - static final long spOffset; - static final long runStateOffset; - static final long qBase; - static final int qShift; - static { - try { - _unsafe = getUnsafe(); - baseOffset = fieldOffset("base"); - spOffset = fieldOffset("sp"); - runStateOffset = fieldOffset("runState"); - qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class); - int s = _unsafe.arrayIndexScale(ForkJoinTask[].class); - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - qShift = 31 - Integer.numberOfLeadingZeros(s); - } catch (Throwable e) { - throw new RuntimeException("Could not initialize intrinsics", e); - } - } }