--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/01/07 19:12:36 1.3 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/09/20 20:42:37 1.51 @@ -5,328 +5,558 @@ */ package jsr166y; -import java.util.*; + import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import sun.misc.Unsafe; -import java.lang.reflect.*; + +import java.util.Random; +import java.util.Collection; +import java.util.concurrent.locks.LockSupport; /** * A thread managed by a {@link ForkJoinPool}. This class is * subclassable solely for the sake of adding functionality -- there - * are no overridable methods dealing with scheduling or - * execution. However, you can override initialization and termination - * cleanup methods surrounding the main task processing loop. If you - * do create such a subclass, you will also need to supply a custom - * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. - * - *
This class also provides methods for generating per-thread
- * random numbers, with the same properties as {@link
- * java.util.Random} but with each generator isolated from those of
- * other threads.
+ * are no overridable methods dealing with scheduling or execution.
+ * However, you can override initialization and termination methods
+ * surrounding the main task processing loop. If you do create such a
+ * subclass, you will also need to supply a custom {@link
+ * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
+ * ForkJoinPool}.
+ *
+ * @since 1.7
+ * @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
- * Algorithm overview:
+ * Overview:
+ *
+ * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
+ * ForkJoinTasks. This class includes bookkeeping in support of
+ * worker activation, suspension, and lifecycle control described
+ * in more detail in the internal documentation of class
+ * ForkJoinPool. And as described further below, this class also
+ * includes special-cased support for some ForkJoinTask
+ * methods. But the main mechanics involve work-stealing:
*
- * 1. Work-Stealing: Work-stealing queues are special forms of
- * Deques that support only three of the four possible
- * end-operations -- push, pop, and deq (aka steal), and only do
- * so under the constraints that push and pop are called only from
- * the owning thread, while deq may be called from other threads.
- * (If you are unfamiliar with them, you probably want to read
- * Herlihy and Shavit's book "The Art of Multiprocessor
- * programming", chapter 16 describing these in more detail before
- * proceeding.) The main work-stealing queue design is roughly
- * similar to "Dynamic Circular Work-Stealing Deque" by David
- * Chase and Yossi Lev, SPAA 2005
- * (http://research.sun.com/scalable/pubs/index.html). The main
- * difference ultimately stems from gc requirements that we null
- * out taken slots as soon as we can, to maintain as small a
- * footprint as possible even in programs generating huge numbers
- * of tasks. To accomplish this, we shift the CAS arbitrating pop
- * vs deq (steal) from being on the indices ("base" and "sp") to
- * the slots themselves (mainly via method "casSlotNull()"). So,
- * both a successful pop and deq mainly entail CAS'ing a nonnull
- * slot to null. Because we rely on CASes of references, we do
- * not need tag bits on base or sp. They are simple ints as used
- * in any circular array-based queue (see for example ArrayDeque).
- * Updates to the indices must still be ordered in a way that
- * guarantees that (sp - base) > 0 means the queue is empty, but
- * otherwise may err on the side of possibly making the queue
- * appear nonempty when a push, pop, or deq have not fully
- * committed. Note that this means that the deq operation,
- * considered individually, is not wait-free. One thief cannot
- * successfully continue until another in-progress one (or, if
- * previously empty, a push) completes. However, in the
- * aggregate, we ensure at least probablistic non-blockingness. If
- * an attempted steal fails, a thief always chooses a different
+ * Work-stealing queues are special forms of Deques that support
+ * only three of the four possible end-operations -- push, pop,
+ * and deq (aka steal), under the further constraints that push
+ * and pop are called only from the owning thread, while deq may
+ * be called from other threads. (If you are unfamiliar with
+ * them, you probably want to read Herlihy and Shavit's book "The
+ * Art of Multiprocessor programming", chapter 16 describing these
+ * in more detail before proceeding.) The main work-stealing
+ * queue design is roughly similar to those in the papers "Dynamic
+ * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
+ * (http://research.sun.com/scalable/pubs/index.html) and
+ * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
+ * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
+ * The main differences ultimately stem from gc requirements that
+ * we null out taken slots as soon as we can, to maintain as small
+ * a footprint as possible even in programs generating huge
+ * numbers of tasks. To accomplish this, we shift the CAS
+ * arbitrating pop vs deq (steal) from being on the indices
+ * ("base" and "sp") to the slots themselves (mainly via method
+ * "casSlotNull()"). So, both a successful pop and deq mainly
+ * entail a CAS of a slot from non-null to null. Because we rely
+ * on CASes of references, we do not need tag bits on base or sp.
+ * They are simple ints as used in any circular array-based queue
+ * (see for example ArrayDeque). Updates to the indices must
+ * still be ordered in a way that guarantees that sp == base means
+ * the queue is empty, but otherwise may err on the side of
+ * possibly making the queue appear nonempty when a push, pop, or
+ * deq have not fully committed. Note that this means that the deq
+ * operation, considered individually, is not wait-free. One thief
+ * cannot successfully continue until another in-progress one (or,
+ * if previously empty, a push) completes. However, in the
+ * aggregate, we ensure at least probabilistic non-blockingness.
+ * If an attempted steal fails, a thief always chooses a different
* random victim target to try next. So, in order for one thief to
* progress, it suffices for any in-progress deq or new push on
* any empty queue to complete. One reason this works well here is
* that apparently-nonempty often means soon-to-be-stealable,
- * which gives threads a chance to activate if necessary before
- * stealing (see below).
+ * which gives threads a chance to set activation status if
+ * necessary before stealing.
*
- * Efficient implementation of this approach currently relies on
- * an uncomfortable amount of "Unsafe" mechanics. To maintain
+ * This approach also enables support for "async mode" where local
+ * task processing is in FIFO, not LIFO order; simply by using a
+ * version of deq rather than pop when locallyFifo is true (as set
+ * by the ForkJoinPool). This allows use in message-passing
+ * frameworks in which tasks are never joined.
+ *
+ * When a worker would otherwise be blocked waiting to join a
+ * task, it first tries a form of linear helping: Each worker
+ * records (in field currentSteal) the most recent task it stole
+ * from some other worker. Plus, it records (in field currentJoin)
+ * the task it is currently actively joining. Method joinTask uses
+ * these markers to try to find a worker to help (i.e., steal back
+ * a task from and execute it) that could hasten completion of the
+ * actively joined task. In essence, the joiner executes a task
+ * that would be on its own local deque had the to-be-joined task
+ * not been stolen. This may be seen as a conservative variant of
+ * the approach in Wagner & Calder "Leapfrogging: a portable
+ * technique for implementing efficient futures" SIGPLAN Notices,
+ * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
+ * in that: (1) We only maintain dependency links across workers
+ * upon steals, rather than use per-task bookkeeping. This may
+ * require a linear scan of workers array to locate stealers, but
+ * usually doesn't because stealers leave hints (that may become
+ * stale/wrong) of where to locate them. This isolates cost to
+ * when it is needed, rather than adding to per-task overhead.
+ * (2) It is "shallow", ignoring nesting and potentially cyclic
+ * mutual steals. (3) It is intentionally racy: field currentJoin
+ * is updated only while actively joining, which means that we
+ * miss links in the chain during long-lived tasks, GC stalls etc
+ * (which is OK since blocking in such cases is usually a good
+ * idea). (4) We bound the number of attempts to find work (see
+ * MAX_HELP_DEPTH) and fall back to suspending the worker and if
+ * necessary replacing it with a spare (see
+ * ForkJoinPool.awaitJoin).
+ *
+ * Efficient implementation of these algorithms currently relies
+ * on an uncomfortable amount of "Unsafe" mechanics. To maintain
* correct orderings, reads and writes of variable base require
- * volatile ordering. Variable sp does not require volatile write
- * but needs cheaper store-ordering on writes. Because they are
- * protected by volatile base reads, reads of the queue array and
- * its slots do not need volatile load semantics, but writes (in
- * push) require store order and CASes (in pop and deq) require
- * (volatile) CAS semantics. Since these combinations aren't
- * supported using ordinary volatiles, the only way to accomplish
- * these effciently is to use direct Unsafe calls. (Using external
- * AtomicIntegers and AtomicReferenceArrays for the indices and
- * array is significantly slower because of memory locality and
- * indirection effects.) Further, performance on most platforms is
- * very sensitive to placement and sizing of the (resizable) queue
- * array. Even though these queues don't usually become all that
- * big, the initial size must be large enough to counteract cache
+ * volatile ordering. Variable sp does not require volatile
+ * writes but still needs store-ordering, which we accomplish by
+ * pre-incrementing sp before filling the slot with an ordered
+ * store. (Pre-incrementing also enables backouts used in
+ * joinTask.) Because they are protected by volatile base reads,
+ * reads of the queue array and its slots by other threads do not
+ * need volatile load semantics, but writes (in push) require
+ * store order and CASes (in pop and deq) require (volatile) CAS
+ * semantics. (Michael, Saraswat, and Vechev's algorithm has
+ * similar properties, but without support for nulling slots.)
+ * Since these combinations aren't supported using ordinary
+ * volatiles, the only way to accomplish these efficiently is to
+ * use direct Unsafe calls. (Using external AtomicIntegers and
+ * AtomicReferenceArrays for the indices and array is
+ * significantly slower because of memory locality and indirection
+ * effects.)
+ *
+ * Further, performance on most platforms is very sensitive to
+ * placement and sizing of the (resizable) queue array. Even
+ * though these queues don't usually become all that big, the
+ * initial size must be large enough to counteract cache
* contention effects across multiple queues (especially in the
* presence of GC cardmarking). Also, to improve thread-locality,
- * queues are currently initialized immediately after the thread
- * gets the initial signal to start processing tasks. However,
- * all queue-related methods except pushTask are written in a way
- * that allows them to instead be lazily allocated and/or disposed
- * of when empty. All together, these low-level implementation
- * choices produce as much as a factor of 4 performance
- * improvement compared to naive implementations, and enable the
- * processing of billions of tasks per second, sometimes at the
- * expense of ugliness.
- *
- * 2. Run control: The primary run control is based on a global
- * counter (activeCount) held by the pool. It uses an algorithm
- * similar to that in Herlihy and Shavit section 17.6 to cause
- * threads to eventually block when all threads declare they are
- * inactive. (See variable "scans".) For this to work, threads
- * must be declared active when executing tasks, and before
- * stealing a task. They must be inactive before blocking on the
- * Pool Barrier (awaiting a new submission or other Pool
- * event). In between, there is some free play which we take
- * advantage of to avoid contention and rapid flickering of the
- * global activeCount: If inactive, we activate only if a victim
- * queue appears to be nonempty (see above). Similarly, a thread
- * tries to inactivate only after a full scan of other threads.
- * The net effect is that contention on activeCount is rarely a
- * measurable performance issue. (There are also a few other cases
- * where we scan for work rather than retry/block upon
- * contention.)
- *
- * 3. Selection control. We maintain policy of always choosing to
- * run local tasks rather than stealing, and always trying to
- * steal tasks before trying to run a new submission. All steals
- * are currently performed in randomly-chosen deq-order. It may be
- * worthwhile to bias these with locality / anti-locality
- * information, but doing this well probably requires more
- * lower-level information from JVMs than currently provided.
+ * queues are initialized after starting. All together, these
+ * low-level implementation choices produce as much as a factor of
+ * 4 performance improvement compared to naive implementations,
+ * and enable the processing of billions of tasks per second,
+ * sometimes at the expense of ugliness.
*/
/**
+ * Generator for initial random seeds for random victim
+ * selection. This is used only to create initial seeds. Random
+ * steals use a cheaper xorshift generator per steal attempt. We
+ * expect only rare contention on seedGenerator, so just use a
+ * plain Random.
+ */
+ private static final Random seedGenerator = new Random();
+
+ /**
+ * The maximum stolen->joining link depth allowed in helpJoinTask.
+ * Depths for legitimate chains are unbounded, but we use a fixed
+ * constant to avoid (otherwise unchecked) cycles and bound
+ * staleness of traversal parameters at the expense of sometimes
+ * blocking when we could be helping.
+ */
+ private static final int MAX_HELP_DEPTH = 8;
+
+ /**
* Capacity of work-stealing queue array upon initialization.
- * Must be a power of two. Initial size must be at least 2, but is
+ * Must be a power of two. Initial size must be at least 4, but is
* padded to minimize cache effects.
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum work-stealing queue array size. Must be less than or
- * equal to 1 << 30 to ensure lack of index wraparound.
+ * equal to 1 << (31 - width of array entry) to ensure lack of
+ * index wraparound. The value is set in the static block
+ * at the end of this file after obtaining width.
*/
- private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30;
+ private static final int MAXIMUM_QUEUE_CAPACITY;
/**
- * Generator of seeds for per-thread random numbers.
+ * The pool this thread works in. Accessed directly by ForkJoinTask.
*/
- private static final Random randomSeedGenerator = new Random();
+ final ForkJoinPool pool;
/**
* The work-stealing queue array. Size must be a power of two.
+ * Initialized in onStart, to improve memory locality.
*/
private ForkJoinTask>[] queue;
/**
+ * Index (mod queue.length) of least valid queue slot, which is
+ * always the next position to steal from if nonempty.
+ */
+ private volatile int base;
+
+ /**
* Index (mod queue.length) of next queue slot to push to or pop
- * from. It is written only by owner thread, via ordered store.
- * Both sp and base are allowed to wrap around on overflow, but
- * (sp - base) still estimates size.
+ * from. It is written only by owner thread, and accessed by other
+ * threads only after reading (volatile) base. Both sp and base
+ * are allowed to wrap around on overflow, but (sp - base) still
+ * estimates size.
*/
- private volatile int sp;
+ private int sp;
/**
- * Index (mod queue.length) of least valid queue slot, which is
- * always the next position to steal from if nonempty.
+ * The index of most recent stealer, used as a hint to avoid
+ * traversal in method helpJoinTask. This is only a hint because a
+ * worker might have had multiple steals and this only holds one
+ * of them (usually the most current). Declared non-volatile,
+ * relying on other prevailing sync to keep reasonably current.
*/
- private volatile int base;
+ private int stealHint;
/**
- * The pool this thread works in.
+ * Run state of this worker. In addition to the usual run levels,
+ * tracks if this worker is suspended as a spare, and if it was
+ * killed (trimmed) while suspended. However, "active" status is
+ * maintained separately and modified only in conjunction with
+ * CASes of the pool's runState (which are currently sadly
+ * manually inlined for performance.) Accessed directly by pool
+ * to simplify checks for normal (zero) status.
*/
- final ForkJoinPool pool;
+ volatile int runState;
+
+ private static final int TERMINATING = 0x01;
+ private static final int TERMINATED = 0x02;
+ private static final int SUSPENDED = 0x04; // inactive spare
+ private static final int TRIMMED = 0x08; // killed while suspended
+
+ /**
+ * Number of steals. Directly accessed (and reset) by
+ * pool.tryAccumulateStealCount when idle.
+ */
+ int stealCount;
+
+ /**
+ * Seed for random number generator for choosing steal victims.
+ * Uses Marsaglia xorshift. Must be initialized as nonzero.
+ */
+ private int seed;
+
+ /**
+ * Activity status. When true, this worker is considered active.
+ * Accessed directly by pool. Must be false upon construction.
+ */
+ boolean active;
+
+ /**
+ * True if use local fifo, not default lifo, for local polling.
+ * Shadows value from ForkJoinPool.
+ */
+ private final boolean locallyFifo;
/**
* Index of this worker in pool array. Set once by pool before
- * running, and accessed directly by pool during cleanup etc
+ * running, and accessed directly by pool to locate this worker in
+ * its workers array.
*/
int poolIndex;
/**
- * Run state of this worker. Supports simple versions of the usual
- * shutdown/shutdownNow control.
+ * The last pool event waited for. Accessed only by pool in
+ * callback methods invoked within this thread.
*/
- private volatile int runState;
-
- // Runstate values. Order matters
- private static final int RUNNING = 0;
- private static final int SHUTDOWN = 1;
- private static final int TERMINATING = 2;
- private static final int TERMINATED = 3;
+ int lastEventCount;
/**
- * Activity status. When true, this worker is considered active.
- * Must be false upon construction. It must be true when executing
- * tasks, and BEFORE stealing a task. It must be false before
- * blocking on the Pool Barrier.
+ * Encoded index and event count of next event waiter. Accessed
+ * only by ForkJoinPool for managing event waiters.
*/
- private boolean active;
+ volatile long nextWaiter;
/**
- * Number of steals, transferred to pool when idle
+ * Number of times this thread suspended as spare. Accessed only
+ * by pool.
*/
- private int stealCount;
+ int spareCount;
/**
- * Seed for random number generator for choosing steal victims
+ * Encoded index and count of next spare waiter. Accessed only
+ * by ForkJoinPool for managing spares.
*/
- private int randomVictimSeed;
+ volatile int nextSpare;
/**
- * Seed for embedded Jurandom
+ * The task currently being joined, set only when actively trying
+ * to help other stealers in helpJoinTask. Written only by this
+ * thread, but read by others.
*/
- private long juRandomSeed;
+ private volatile ForkJoinTask> currentJoin;
/**
- * The last barrier event waited for
+ * The task most recently stolen from another worker (or
+ * submission queue). Written only by this thread, but read by
+ * others.
*/
- private long eventCount;
+ private volatile ForkJoinTask> currentSteal;
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
+ *
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
- if (pool == null) throw new NullPointerException();
this.pool = pool;
- // remaining initialization deferred to onStart
+ this.locallyFifo = pool.locallyFifo;
+ setDaemon(true);
+ // To avoid exposing construction details to subclasses,
+ // remaining initialization is in start() and onStart()
}
- // public access methods
+ /**
+ * Performs additional initialization and starts this thread.
+ */
+ final void start(int poolIndex, UncaughtExceptionHandler ueh) {
+ this.poolIndex = poolIndex;
+ if (ueh != null)
+ setUncaughtExceptionHandler(ueh);
+ start();
+ }
+
+ // Public/protected methods
/**
- * Returns the pool hosting the current task execution.
+ * Returns the pool hosting this thread.
+ *
* @return the pool
*/
- public static ForkJoinPool getPool() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).pool;
+ public ForkJoinPool getPool() {
+ return pool;
+ }
+
+ /**
+ * Returns the index number of this thread in its pool. The
+ * returned value ranges from zero to the maximum number of
+ * threads (minus one) that have ever been created in the pool.
+ * This method may be useful for applications that track status or
+ * collect results per-worker rather than per-task.
+ *
+ * @return the index number
+ */
+ public int getPoolIndex() {
+ return poolIndex;
+ }
+
+ /**
+ * Initializes internal state after construction but before
+ * processing any tasks. If you override this method, you must
+ * invoke @code{super.onStart()} at the beginning of the method.
+ * Initialization requires care: Most fields must have legal
+ * default values, to ensure that attempted accesses from other
+ * threads work correctly even before this thread starts
+ * processing tasks.
+ */
+ protected void onStart() {
+ int rs = seedGenerator.nextInt();
+ seed = rs == 0? 1 : rs; // seed must be nonzero
+
+ // Allocate name string and arrays in this thread
+ String pid = Integer.toString(pool.getPoolNumber());
+ String wid = Integer.toString(poolIndex);
+ setName("ForkJoinPool-" + pid + "-worker-" + wid);
+
+ queue = new ForkJoinTask>[INITIAL_QUEUE_CAPACITY];
+ }
+
+ /**
+ * Performs cleanup associated with termination of this worker
+ * thread. If you override this method, you must invoke
+ * {@code super.onTermination} at the end of the overridden method.
+ *
+ * @param exception the exception causing this thread to abort due
+ * to an unrecoverable error, or {@code null} if completed normally
+ */
+ protected void onTermination(Throwable exception) {
+ try {
+ ForkJoinPool p = pool;
+ if (active) {
+ int a; // inline p.tryDecrementActiveCount
+ active = false;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a - 1));
+ }
+ cancelTasks();
+ setTerminated();
+ p.workerTerminated(this);
+ } catch (Throwable ex) { // Shouldn't ever happen
+ if (exception == null) // but if so, at least rethrown
+ exception = ex;
+ } finally {
+ if (exception != null)
+ UNSAFE.throwException(exception);
+ }
}
/**
- * Returns the index number of the current worker thread in its
- * pool. The returned value ranges from zero to the maximum
- * number of threads (minus one) that have ever been created in
- * the pool. This method may be useful for applications that
- * track status or collect results on a per-worker basis.
- * @return the index number.
+ * This method is required to be public, but should never be
+ * called explicitly. It performs the main run loop to execute
+ * ForkJoinTasks.
*/
- public static int getPoolIndex() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).poolIndex;
+ public void run() {
+ Throwable exception = null;
+ try {
+ onStart();
+ mainLoop();
+ } catch (Throwable ex) {
+ exception = ex;
+ } finally {
+ onTermination(exception);
+ }
}
- // Access methods used by Pool
+ // helpers for run()
/**
- * Get and clear steal count for accumulation by pool. Called
- * only when known to be idle (in pool.sync and termination).
+ * Finds and executes tasks, and checks status while running.
*/
- final int getAndClearStealCount() {
- int sc = stealCount;
- stealCount = 0;
- return sc;
+ private void mainLoop() {
+ boolean ran = false; // true if ran a task on last step
+ ForkJoinPool p = pool;
+ for (;;) {
+ p.preStep(this, ran);
+ if (runState != 0)
+ break;
+ ran = tryExecSteal() || tryExecSubmission();
+ }
}
/**
- * Returns estimate of the number of tasks in the queue, without
- * correcting for transient negative values
+ * Tries to steal a task and execute it.
+ *
+ * @return true if ran a task
*/
- final int getRawQueueSize() {
- return sp - base;
+ private boolean tryExecSteal() {
+ ForkJoinTask> t;
+ if ((t = scan()) != null) {
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
+ if (sp != base)
+ execLocalTasks();
+ return true;
+ }
+ return false;
}
- // Intrinsics-based support for queue operations.
- // Currently these three (setSp, setSlot, casSlotNull) are
- // usually manually inlined to improve performance
+ /**
+ * If a submission exists, try to activate and run it.
+ *
+ * @return true if ran a task
+ */
+ private boolean tryExecSubmission() {
+ ForkJoinPool p = pool;
+ // This loop is needed in case attempt to activate fails, in
+ // which case we only retry if there still appears to be a
+ // submission.
+ while (p.hasQueuedSubmissions()) {
+ ForkJoinTask> t; int a;
+ if (active || // inline p.tryIncrementActiveCount
+ (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1))) {
+ if ((t = p.pollSubmission()) != null) {
+ UNSAFE.putOrderedObject(this, currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
+ if (sp != base)
+ execLocalTasks();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
/**
- * Sets sp in store-order.
+ * Runs local tasks until queue is empty or shut down. Call only
+ * while active.
*/
- private void setSp(int s) {
- _unsafe.putOrderedInt(this, spOffset, s);
+ private void execLocalTasks() {
+ while (runState == 0) {
+ ForkJoinTask> t = locallyFifo ? locallyDeqTask() : popTask();
+ if (t != null)
+ t.quietlyExec();
+ else if (sp == base)
+ break;
+ }
}
+ /*
+ * Intrinsics-based atomic writes for queue slots. These are
+ * basically the same as methods in AtomicReferenceArray, but
+ * specialized for (1) ForkJoinTask elements (2) requirement that
+ * nullness and bounds checks have already been performed by
+ * callers and (3) effective offsets are known not to overflow
+ * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
+ * need corresponding version for reads: plain array reads are OK
+ * because they are protected by other volatile reads and are
+ * confirmed by CASes.
+ *
+ * Most uses don't actually call these methods, but instead contain
+ * inlined forms that enable more predictable optimization. We
+ * don't define the version of write used in pushTask at all, but
+ * instead inline there a store-fenced array slot write.
+ */
+
/**
- * Add in store-order the given task at given slot of q to
- * null. Caller must ensure q is nonnull and index is in range.
+ * CASes slot i of array q from t to null. Caller must ensure q is
+ * non-null and index is in range.
*/
- private static void setSlot(ForkJoinTask>[] q, int i,
- ForkJoinTask> t){
- _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
+ private static final boolean casSlotNull(ForkJoinTask>[] q, int i,
+ ForkJoinTask> t) {
+ return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
}
/**
- * CAS given slot of q to null. Caller must ensure q is nonnull
- * and index is in range.
+ * Performs a volatile write of the given task at given slot of
+ * array q. Caller must ensure q is non-null and index is in
+ * range. This method is used only during resets and backouts.
*/
- private static boolean casSlotNull(ForkJoinTask>[] q, int i,
- ForkJoinTask> t) {
- return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
+ private static final void writeSlot(ForkJoinTask>[] q, int i,
+ ForkJoinTask> t) {
+ UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
}
- // Main queue methods
+ // queue methods
/**
- * Pushes a task. Called only by current thread.
- * @param t the task. Caller must ensure nonnull
+ * Pushes a task. Call only from this thread.
+ *
+ * @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask> t) {
ForkJoinTask>[] q = queue;
- int mask = q.length - 1;
- int s = sp;
- _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
- _unsafe.putOrderedInt(this, spOffset, ++s);
- if ((s -= base) == 1)
- pool.signalNonEmptyWorkerQueue();
- else if (s >= mask)
- growQueue();
+ int mask = q.length - 1; // implicit assert q != null
+ int s = sp++; // ok to increment sp before slot write
+ UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
+ if ((s -= base) == 0)
+ pool.signalWork(); // was empty
+ else if (s == mask)
+ growQueue(); // is full
}
/**
* Tries to take a task from the base of the queue, failing if
- * either empty or contended.
- * @return a task, or null if none or contended.
+ * empty or contended. Note: Specializations of this code appear
+ * in locallyDeqTask and elsewhere.
+ *
+ * @return a task, or null if none or contended
*/
- private ForkJoinTask> deqTask() {
- ForkJoinTask>[] q;
+ final ForkJoinTask> deqTask() {
ForkJoinTask> t;
- int i;
- int b;
+ ForkJoinTask>[] q;
+ int b, i;
if (sp != (b = base) &&
(q = queue) != null && // must read q after b
- (t = q[i = (q.length - 1) & b]) != null &&
- _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
+ (t = q[i = (q.length - 1) & b]) != null && base == b &&
+ UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
base = b + 1;
return t;
}
@@ -334,48 +564,81 @@ public class ForkJoinWorkerThread extend
}
/**
- * Returns a popped task, or null if empty. Called only by
- * current thread.
+ * Tries to take a task from the base of own queue. Assumes active
+ * status. Called only by this thread.
+ *
+ * @return a task, or null if none
*/
- final ForkJoinTask> popTask() {
- ForkJoinTask> t;
- int i;
+ final ForkJoinTask> locallyDeqTask() {
ForkJoinTask>[] q = queue;
- int mask = q.length - 1;
- int s = sp;
- if (s != base &&
- (t = q[i = (s - 1) & mask]) != null &&
- _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
- _unsafe.putOrderedInt(this, spOffset, s - 1);
- return t;
+ if (q != null) {
+ ForkJoinTask> t;
+ int b, i;
+ while (sp != (b = base)) {
+ if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
+ UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
+ t, null)) {
+ base = b + 1;
+ return t;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns a popped task, or null if empty. Assumes active status.
+ * Called only by this thread.
+ */
+ private ForkJoinTask> popTask() {
+ ForkJoinTask>[] q = queue;
+ if (q != null) {
+ int s;
+ while ((s = sp) != base) {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
+ ForkJoinTask> t = q[i];
+ if (t == null) // lost to stealer
+ break;
+ if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ return t;
+ }
+ }
}
return null;
}
/**
- * Specialized version of popTask to pop only if
- * topmost element is the given task. Called only
- * by current thread.
- * @param t the task. Caller must ensure nonnull
+ * Specialized version of popTask to pop only if topmost element
+ * is the given task. Called only by this thread while active.
+ *
+ * @param t the task. Caller must ensure non-null.
*/
final boolean unpushTask(ForkJoinTask> t) {
+ int s;
ForkJoinTask>[] q = queue;
- int mask = q.length - 1;
- int s = sp - 1;
- if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase,
- t, null)) {
- _unsafe.putOrderedInt(this, spOffset, s);
+ if ((s = sp) != base && q != null &&
+ UNSAFE.compareAndSwapObject
+ (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
return true;
}
return false;
}
/**
- * Returns next task to pop.
+ * Returns next task, or null if empty or contended.
*/
final ForkJoinTask> peekTask() {
ForkJoinTask>[] q = queue;
- return q == null? null : q[(sp - 1) & (q.length - 1)];
+ if (q == null)
+ return null;
+ int mask = q.length - 1;
+ int i = locallyFifo ? base : (sp - 1);
+ return q[i & mask];
}
/**
@@ -400,120 +663,192 @@ public class ForkJoinWorkerThread extend
ForkJoinTask> t = oldQ[oldIndex];
if (t != null && !casSlotNull(oldQ, oldIndex, t))
t = null;
- setSlot(newQ, b & newMask, t);
+ writeSlot(newQ, b & newMask, t);
} while (++b != bf);
- pool.signalIdleWorkers(false);
+ pool.signalWork();
}
- // Runstate management
-
- final boolean isShutdown() { return runState >= SHUTDOWN; }
- final boolean isTerminating() { return runState >= TERMINATING; }
- final boolean isTerminated() { return runState == TERMINATED; }
- final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); }
- final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); }
-
/**
- * Transition to at least the given state. Return true if not
- * already at least given state.
+ * Computes next value for random victim probe in scan(). Scans
+ * don't require a very high quality generator, but also not a
+ * crummy one. Marsaglia xor-shift is cheap and works well enough.
+ * Note: This is manually inlined in scan().
*/
- private boolean transitionRunStateTo(int state) {
- for (;;) {
- int s = runState;
- if (s >= state)
- return false;
- if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
- return true;
- }
+ private static final int xorShift(int r) {
+ r ^= r << 13;
+ r ^= r >>> 17;
+ return r ^ (r << 5);
}
/**
- * Ensure status is active and if necessary adjust pool active count
+ * Tries to steal a task from another worker. Starts at a random
+ * index of workers array, and probes workers until finding one
+ * with non-empty queue or finding that all are empty. It
+ * randomly selects the first n probes. If these are empty, it
+ * resorts to a circular sweep, which is necessary to accurately
+ * set active status. (The circular sweep uses steps of
+ * approximately half the array size plus 1, to avoid bias
+ * stemming from leftmost packing of the array in ForkJoinPool.)
+ *
+ * This method must be both fast and quiet -- usually avoiding
+ * memory accesses that could disrupt cache sharing etc other than
+ * those needed to check for and take tasks (or to activate if not
+ * already active). This accounts for, among other things,
+ * updating random seed in place without storing it until exit.
+ *
+ * @return a task, or null if none found
*/
- final void activate() {
- if (!active) {
- active = true;
- pool.incrementActiveCount();
+ private ForkJoinTask> scan() {
+ ForkJoinPool p = pool;
+ ForkJoinWorkerThread[] ws; // worker array
+ int n; // upper bound of #workers
+ if ((ws = p.workers) != null && (n = ws.length) > 1) {
+ boolean canSteal = active; // shadow active status
+ int r = seed; // extract seed once
+ int mask = n - 1;
+ int j = -n; // loop counter
+ int k = r; // worker index, random if j < 0
+ for (;;) {
+ ForkJoinWorkerThread v = ws[k & mask];
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
+ ForkJoinTask>[] q; ForkJoinTask> t; int b, a;
+ if (v != null && (b = v.base) != v.sp &&
+ (q = v.queue) != null) {
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase; // raw offset
+ int pid = poolIndex;
+ if ((t = q[i]) != null) {
+ if (!canSteal && // inline p.tryIncrementActiveCount
+ UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1))
+ canSteal = active = true;
+ if (canSteal && v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ v.base = b;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, t);
+ seed = r;
+ ++stealCount;
+ return t;
+ }
+ }
+ j = -n;
+ k = r; // restart on contention
+ }
+ else if (++j <= 0)
+ k = r;
+ else if (j <= n)
+ k += (n >>> 1) | 1;
+ else
+ break;
+ }
}
+ return null;
+ }
+
+ // Run State management
+
+ // status check methods used mainly by ForkJoinPool
+ final boolean isRunning() { return runState == 0; }
+ final boolean isTerminated() { return (runState & TERMINATED) != 0; }
+ final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
+ final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
+
+ final boolean isTerminating() {
+ if ((runState & TERMINATING) != 0)
+ return true;
+ if (pool.isAtLeastTerminating()) { // propagate pool state
+ shutdown();
+ return true;
+ }
+ return false;
}
/**
- * Ensure status is inactive and if necessary adjust pool active count
+ * Sets state to TERMINATING. Does NOT unpark or interrupt
+ * to wake up if currently blocked. Callers must do so if desired.
*/
- final void inactivate() {
- if (active) {
- active = false;
- pool.decrementActiveCount();
+ final void shutdown() {
+ for (;;) {
+ int s = runState;
+ if ((s & (TERMINATING|TERMINATED)) != 0)
+ break;
+ if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ (s & ~SUSPENDED) |
+ (TRIMMED|TERMINATING)))
+ break;
+ }
+ else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | TERMINATING))
+ break;
}
}
- // Lifecycle methods
-
/**
- * Initializes internal state after construction but before
- * processing any tasks. If you override this method, you must
- * invoke super.onStart() at the beginning of the method.
- * Initialization requires care: Most fields must have legal
- * default values, to ensure that attempted accesses from other
- * threads work correctly even before this thread starts
- * processing tasks.
+ * Sets state to TERMINATED. Called only by onTermination().
*/
- protected void onStart() {
- juRandomSeed = randomSeedGenerator.nextLong();
- do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero
- if (queue == null)
- queue = new ForkJoinTask>[INITIAL_QUEUE_CAPACITY];
-
- // Heuristically allow one initial thread to warm up; others wait
- if (poolIndex < pool.getParallelism() - 1) {
- eventCount = pool.sync(this, 0);
- activate();
- }
+ private void setTerminated() {
+ int s;
+ do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
+ s = runState,
+ s | (TERMINATING|TERMINATED)));
}
/**
- * Perform cleanup associated with termination of this worker
- * thread. If you override this method, you must invoke
- * super.onTermination at the end of the overridden method.
+ * If suspended, tries to set status to unsuspended.
+ * Does NOT wake up if blocked.
*
- * @param exception the exception causing this thread to abort due
- * to an unrecoverable error, or null if completed normally.
+ * @return true if successful
*/
- protected void onTermination(Throwable exception) {
- try {
- clearLocalTasks();
- inactivate();
- cancelTasks();
- } finally {
- terminate(exception);
+ final boolean tryUnsuspend() {
+ int s;
+ while (((s = runState) & SUSPENDED) != 0) {
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s & ~SUSPENDED))
+ return true;
}
+ return false;
}
/**
- * Notify pool of termination and, if exception is nonnull,
- * rethrow it to trigger this thread's uncaughtExceptionHandler
+ * Sets suspended status and blocks as spare until resumed
+ * or shutdown.
*/
- private void terminate(Throwable exception) {
- transitionRunStateTo(TERMINATED);
- try {
- pool.workerTerminated(this);
- } finally {
- if (exception != null)
- ForkJoinTask.rethrowException(exception);
+ final void suspendAsSpare() {
+ for (;;) { // set suspended unless terminating
+ int s = runState;
+ if ((s & TERMINATING) != 0) { // must kill
+ if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | (TRIMMED | TERMINATING)))
+ return;
+ }
+ else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
+ s | SUSPENDED))
+ break;
+ }
+ ForkJoinPool p = pool;
+ p.pushSpare(this);
+ while ((runState & SUSPENDED) != 0) {
+ if (p.tryAccumulateStealCount(this)) {
+ interrupted(); // clear/ignore interrupts
+ if ((runState & SUSPENDED) == 0)
+ break;
+ LockSupport.park(this);
+ }
}
}
+ // Misc support methods for ForkJoinPool
+
/**
- * Run local tasks on exit from main.
+ * Returns an estimate of the number of tasks in the queue. Also
+ * used by ForkJoinTask.
*/
- private void clearLocalTasks() {
- while (base != sp && !pool.isTerminating()) {
- ForkJoinTask> t = popTask();
- if (t != null) {
- activate(); // ensure active status
- t.quietlyExec();
- }
- }
+ final int getQueueSize() {
+ int n; // external calls must read base first
+ return (n = -base + sp) <= 0 ? 0 : n;
}
/**
@@ -521,6 +856,20 @@ public class ForkJoinWorkerThread extend
* thread.
*/
final void cancelTasks() {
+ ForkJoinTask> cj = currentJoin; // try to cancel ongoing tasks
+ if (cj != null) {
+ currentJoin = null;
+ cj.cancelIgnoringExceptions();
+ try {
+ this.interrupt(); // awaken wait
+ } catch (SecurityException ignore) {
+ }
+ }
+ ForkJoinTask> cs = currentSteal;
+ if (cs != null) {
+ currentSteal = null;
+ cs.cancelIgnoringExceptions();
+ }
while (base != sp) {
ForkJoinTask> t = deqTask();
if (t != null)
@@ -529,355 +878,336 @@ public class ForkJoinWorkerThread extend
}
/**
- * This method is required to be public, but should never be
- * called explicitly. It performs the main run loop to execute
- * ForkJoinTasks.
+ * Drains tasks to given collection c.
+ *
+ * @return the number of tasks drained
*/
- public void run() {
- Throwable exception = null;
- try {
- onStart();
- while (!isShutdown())
- step();
- } catch (Throwable ex) {
- exception = ex;
- } finally {
- onTermination(exception);
+ final int drainTasksTo(Collection super ForkJoinTask>> c) {
+ int n = 0;
+ while (base != sp) {
+ ForkJoinTask> t = deqTask();
+ if (t != null) {
+ c.add(t);
+ ++n;
+ }
}
+ return n;
}
+ // Support methods for ForkJoinTask
+
/**
- * Main top-level action.
+ * Gets and removes a local task.
+ *
+ * @return a task, if available
*/
- private void step() {
- ForkJoinTask> t = sp != base? popTask() : null;
- if (t != null || (t = scan(null, true)) != null) {
- activate();
- t.quietlyExec();
+ final ForkJoinTask> pollLocalTask() {
+ ForkJoinPool p = pool;
+ while (sp != base) {
+ int a; // inline p.tryIncrementActiveCount
+ if (active ||
+ (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
+ a = p.runState, a + 1)))
+ return locallyFifo ? locallyDeqTask() : popTask();
}
- else {
- inactivate();
- eventCount = pool.sync(this, eventCount);
+ return null;
+ }
+
+ /**
+ * Gets and removes a local or stolen task.
+ *
+ * @return a task, if available
+ */
+ final ForkJoinTask> pollTask() {
+ ForkJoinTask> t = pollLocalTask();
+ if (t == null) {
+ t = scan();
+ // cannot retain/track/help steal
+ UNSAFE.putOrderedObject(this, currentStealOffset, null);
}
+ return t;
}
- // scanning for and stealing tasks
+ /**
+ * Possibly runs some tasks and/or blocks, until task is done.
+ *
+ * @param joinMe the task to join
+ */
+ final void joinTask(ForkJoinTask> joinMe) {
+ // currentJoin only written by this thread; only need ordered store
+ ForkJoinTask> prevJoin = currentJoin;
+ UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
+ if (sp != base)
+ localHelpJoinTask(joinMe);
+ if (joinMe.status >= 0)
+ pool.awaitJoin(joinMe, this);
+ UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
+ }
/**
- * Computes next value for random victim probe. Scans don't
- * require a very high quality generator, but also not a crummy
- * one. Marsaglia xor-shift is cheap and works well.
+ * Run tasks in local queue until given task is done.
*
- * This is currently unused, and manually inlined
+ * @param joinMe the task to join
*/
- private static int xorShift(int r) {
- r ^= r << 1;
- r ^= r >>> 3;
- r ^= r << 10;
- return r;
+ private void localHelpJoinTask(ForkJoinTask> joinMe) {
+ int s;
+ ForkJoinTask>[] q;
+ while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
+ ForkJoinTask> t = q[i];
+ if (t == null) // lost to a stealer
+ break;
+ if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * This recheck (and similarly in helpJoinTask)
+ * handles cases where joinMe is independently
+ * cancelled or forced even though there is other work
+ * available. Back out of the pop by putting t back
+ * into slot before we commit by writing sp.
+ */
+ if (joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ break;
+ }
+ sp = s;
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ t.quietlyExec();
+ }
+ }
}
/**
- * Tries to steal a task from another worker and/or, if enabled,
- * submission queue. Starts at a random index of workers array,
- * and probes workers until finding one with non-empty queue or
- * finding that all are empty. It randomly selects the first n-1
- * probes. If these are empty, it resorts to full circular
- * traversal, which is necessary to accurately set active status
- * by caller. Also restarts if pool barrier has tripped since last
- * scan, which forces refresh of workers array, in case barrier
- * was associated with resize.
+ * Unless terminating, tries to locate and help perform tasks for
+ * a stealer of the given task, or in turn one of its stealers.
+ * Traces currentSteal->currentJoin links looking for a thread
+ * working on a descendant of the given task and with a non-empty
+ * queue to steal back and execute tasks from.
*
- * This method must be both fast and quiet -- usually avoiding
- * memory accesses that could disrupt cache sharing etc other than
- * those needed to check for and take tasks. This accounts for,
- * among other things, updating random seed in place without
- * storing it until exit. (Note that we only need to store it if
- * we found a task; otherwise it doesn't matter if we start at the
- * same place next time.)
+ * The implementation is very branchy to cope with potential
+ * inconsistencies or loops encountering chains that are stale,
+ * unknown, or of length greater than MAX_HELP_DEPTH links. All
+ * of these cases are dealt with by just returning back to the
+ * caller, who is expected to retry if other join mechanisms also
+ * don't work out.
*
- * @param joinMe if non null; exit early if done
- * @param checkSubmissions true if OK to take submissions
- * @return a task, or null if none found
+ * @param joinMe the task to join
*/
- private ForkJoinTask> scan(ForkJoinTask> joinMe,
- boolean checkSubmissions) {
- ForkJoinPool p = pool;
- if (p == null) // Never null, but avoids
- return null; // implicit nullchecks below
- int r = randomVictimSeed; // extract once to keep scan quiet
- restart: // outer loop refreshes ws array
- while (joinMe == null || joinMe.status >= 0) {
- int mask;
- ForkJoinWorkerThread[] ws = p.workers;
- if (ws != null && (mask = ws.length - 1) > 0) {
- int probes = -mask; // use random index while negative
- int idx = r;
- for (;;) {
- ForkJoinWorkerThread v;
- // inlined xorshift to update seed
- r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
- if ((v = ws[mask & idx]) != null && v.sp != v.base) {
- ForkJoinTask> t;
- activate();
- if ((joinMe == null || joinMe.status >= 0) &&
- (t = v.deqTask()) != null) {
- randomVictimSeed = r;
- ++stealCount;
- return t;
+ final void helpJoinTask(ForkJoinTask> joinMe) {
+ ForkJoinWorkerThread[] ws;
+ int n;
+ if (joinMe.status < 0) // already done
+ return;
+ if ((runState & TERMINATING) != 0) { // cancel if shutting down
+ joinMe.cancelIgnoringExceptions();
+ return;
+ }
+ if ((ws = pool.workers) == null || (n = ws.length) <= 1)
+ return; // need at least 2 workers
+
+ ForkJoinTask> task = joinMe; // base of chain
+ ForkJoinWorkerThread thread = this; // thread with stolen task
+ for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+ // Try to find v, the stealer of task, by first using hint
+ ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
+ if (v == null || v.currentSteal != task) {
+ for (int j = 0; ; ++j) { // search array
+ if (j < n) {
+ ForkJoinTask> vs;
+ if ((v = ws[j]) != null &&
+ (vs = v.currentSteal) != null) {
+ if (joinMe.status < 0 || task.status < 0)
+ return; // stale or done
+ if (vs == task) {
+ thread.stealHint = j;
+ break; // save hint for next time
+ }
}
- continue restart; // restart on contention
}
- if ((probes >> 1) <= mask) // n-1 random then circular
- idx = (probes++ < 0)? r : (idx + 1);
else
- break;
+ return; // no stealer
}
}
- if (checkSubmissions && p.hasQueuedSubmissions()) {
- activate();
- ForkJoinTask> t = p.pollSubmission();
- if (t != null)
- return t;
- }
- else {
- long ec = eventCount; // restart on pool event
- if ((eventCount = p.getEventCount()) == ec)
+ for (;;) { // Try to help v, using specialized form of deqTask
+ if (joinMe.status < 0)
+ return;
+ int b = v.base;
+ ForkJoinTask>[] q = v.queue;
+ if (b == v.sp || q == null)
break;
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase;
+ ForkJoinTask> t = q[i];
+ int pid = poolIndex;
+ ForkJoinTask> ps = currentSteal;
+ if (task.status < 0)
+ return; // stale or done
+ if (t != null && v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ if (joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ return; // back out on cancel
+ }
+ v.base = b;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this, currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+ }
}
- }
- return null;
- }
-
- /**
- * Callback from pool.sync to rescan before blocking. If a
- * task is found, it is pushed so it can be executed upon return.
- * @return true if found and pushed a task
- */
- final boolean prescan() {
- ForkJoinTask> t = scan(null, true);
- if (t != null) {
- pushTask(t);
- return true;
- }
- else {
- inactivate();
- return false;
+ // Try to descend to find v's stealer
+ ForkJoinTask> next = v.currentJoin;
+ if (task.status < 0 || next == null || next == task ||
+ joinMe.status < 0)
+ return;
+ task = next;
+ thread = v;
}
}
- // Support for ForkJoinTask methods
-
- /**
- * Scan, returning early if joinMe done
- */
- final ForkJoinTask> scanWhileJoining(ForkJoinTask> joinMe) {
- ForkJoinTask> t = scan(joinMe, false);
- if (t != null && joinMe.status < 0 && sp == base) {
- pushTask(t); // unsteal if done and this task would be stealable
- t = null;
- }
- return t;
- }
-
/**
- * Pops or steals a task
- * @return task, or null if none available
+ * Implements ForkJoinTask.getSurplusQueuedTaskCount().
+ * Returns an estimate of the number of tasks, offset by a
+ * function of number of idle workers.
+ *
+ * This method provides a cheap heuristic guide for task
+ * partitioning when programmers, frameworks, tools, or languages
+ * have little or no idea about task granularity. In essence by
+ * offering this method, we ask users only about tradeoffs in
+ * overhead vs expected throughput and its variance, rather than
+ * how finely to partition tasks.
+ *
+ * In a steady state strict (tree-structured) computation, each
+ * thread makes available for stealing enough tasks for other
+ * threads to remain active. Inductively, if all threads play by
+ * the same rules, each thread should make available only a
+ * constant number of tasks.
+ *
+ * The minimum useful constant is just 1. But using a value of 1
+ * would require immediate replenishment upon each steal to
+ * maintain enough tasks, which is infeasible. Further,
+ * partitionings/granularities of offered tasks should minimize
+ * steal rates, which in general means that threads nearer the top
+ * of computation tree should generate more than those nearer the
+ * bottom. In perfect steady state, each thread is at
+ * approximately the same level of computation tree. However,
+ * producing extra tasks amortizes the uncertainty of progress and
+ * diffusion assumptions.
+ *
+ * So, users will want to use values larger, but not much larger
+ * than 1 to both smooth over transient shortages and hedge
+ * against uneven progress; as traded off against the cost of
+ * extra task overhead. We leave the user to pick a threshold
+ * value to compare with the results of this call to guide
+ * decisions, but recommend values such as 3.
+ *
+ * When all threads are active, it is on average OK to estimate
+ * surplus strictly locally. In steady-state, if one thread is
+ * maintaining say 2 surplus tasks, then so are others. So we can
+ * just use estimated queue length (although note that (sp - base)
+ * can be an overestimate because of stealers lagging increments
+ * of base). However, this strategy alone leads to serious
+ * mis-estimates in some non-steady-state conditions (ramp-up,
+ * ramp-down, other stalls). We can detect many of these by
+ * further considering the number of "idle" threads, that are
+ * known to have zero queued tasks, so compensate by a factor of
+ * (#idle/#active) threads.
*/
- final ForkJoinTask> pollLocalOrStolenTask() {
- ForkJoinTask> t;
- return (t = popTask()) == null? scan(null, false) : t;
+ final int getEstimatedSurplusTaskCount() {
+ return sp - base - pool.idlePerActive();
}
/**
- * Runs tasks until pool isQuiescent
+ * Runs tasks until {@code pool.isQuiescent()}.
*/
final void helpQuiescePool() {
+ ForkJoinTask> ps = currentSteal; // to restore below
for (;;) {
- ForkJoinTask> t = pollLocalOrStolenTask();
- if (t != null) {
- activate();
+ ForkJoinTask> t = pollLocalTask();
+ if (t != null || (t = scan()) != null)
t.quietlyExec();
- }
else {
- inactivate();
- if (pool.isQuiescent()) {
- activate(); // re-activate on exit
- break;
+ ForkJoinPool p = pool;
+ int a; // to inline CASes
+ if (active) {
+ if (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a - 1))
+ continue; // retry later
+ active = false; // inactivate
+ UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+ }
+ if (p.isQuiescent()) {
+ active = true; // re-activate
+ do {} while (!UNSAFE.compareAndSwapInt
+ (p, poolRunStateOffset, a = p.runState, a+1));
+ return;
}
}
}
}
- /**
- * Returns an estimate of the number of tasks in the queue.
- */
- final int getQueueSize() {
- int n = sp - base;
- return n <= 0? 0 : n; // suppress momentarily negative values
- }
-
- /**
- * Returns an estimate of the number of tasks, offset by a
- * function of number of idle workers.
- */
- final int getEstimatedSurplusTaskCount() {
- // The halving approximates weighting idle vs non-idle workers
- return (sp - base) - (pool.getIdleThreadCount() >>> 1);
- }
-
- // Per-worker exported random numbers
+ // Unsafe mechanics
- // Same constants as java.util.Random
- final static long JURandomMultiplier = 0x5DEECE66DL;
- final static long JURandomAddend = 0xBL;
- final static long JURandomMask = (1L << 48) - 1;
-
- private final int nextJURandom(int bits) {
- long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) &
- JURandomMask;
- juRandomSeed = next;
- return (int)(next >>> (48 - bits));
- }
-
- private final int nextJURandomInt(int n) {
- if (n <= 0)
- throw new IllegalArgumentException("n must be positive");
- int bits = nextJURandom(31);
- if ((n & -n) == n)
- return (int)((n * (long)bits) >> 31);
+ private static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ private static final long spOffset =
+ objectFieldOffset("sp", ForkJoinWorkerThread.class);
+ private static final long runStateOffset =
+ objectFieldOffset("runState", ForkJoinWorkerThread.class);
+ private static final long currentJoinOffset =
+ objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
+ private static final long currentStealOffset =
+ objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
+ private static final long qBase =
+ UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
+ private static final long poolRunStateOffset = // to inline CAS
+ objectFieldOffset("runState", ForkJoinPool.class);
- for (;;) {
- int val = bits % n;
- if (bits - val + (n-1) >= 0)
- return val;
- bits = nextJURandom(31);
- }
- }
+ private static final int qShift;
- private final long nextJURandomLong() {
- return ((long)(nextJURandom(32)) << 32) + nextJURandom(32);
+ static {
+ int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
+ if ((s & (s-1)) != 0)
+ throw new Error("data type scale not a power of two");
+ qShift = 31 - Integer.numberOfLeadingZeros(s);
+ MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
}
- private final long nextJURandomLong(long n) {
- if (n <= 0)
- throw new IllegalArgumentException("n must be positive");
- long offset = 0;
- while (n >= Integer.MAX_VALUE) { // randomly pick half range
- int bits = nextJURandom(2); // 2nd bit for odd vs even split
- long half = n >>> 1;
- long nextn = ((bits & 2) == 0)? half : n - half;
- if ((bits & 1) == 0)
- offset += n - nextn;
- n = nextn;
+ private static long objectFieldOffset(String field, Class> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
}
- return offset + nextJURandomInt((int)n);
- }
-
- private final double nextJURandomDouble() {
- return (((long)(nextJURandom(26)) << 27) + nextJURandom(27))
- / (double)(1L << 53);
- }
-
- /**
- * Returns a random integer using a per-worker random
- * number generator with the same properties as
- * {@link java.util.Random#nextInt}
- * @return the next pseudorandom, uniformly distributed {@code int}
- * value from this worker's random number generator's sequence
- */
- public static int nextRandomInt() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).
- nextJURandom(32);
- }
-
- /**
- * Returns a random integer using a per-worker random
- * number generator with the same properties as
- * {@link java.util.Random#nextInt(int)}
- * @param n the bound on the random number to be returned. Must be
- * positive.
- * @return the next pseudorandom, uniformly distributed {@code int}
- * value between {@code 0} (inclusive) and {@code n} (exclusive)
- * from this worker's random number generator's sequence
- * @throws IllegalArgumentException if n is not positive
- */
- public static int nextRandomInt(int n) {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).
- nextJURandomInt(n);
- }
-
- /**
- * Returns a random long using a per-worker random
- * number generator with the same properties as
- * {@link java.util.Random#nextLong}
- * @return the next pseudorandom, uniformly distributed {@code long}
- * value from this worker's random number generator's sequence
- */
- public static long nextRandomLong() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).
- nextJURandomLong();
}
/**
- * Returns a random integer using a per-worker random
- * number generator with the same properties as
- * {@link java.util.Random#nextInt(int)}
- * @param n the bound on the random number to be returned. Must be
- * positive.
- * @return the next pseudorandom, uniformly distributed {@code int}
- * value between {@code 0} (inclusive) and {@code n} (exclusive)
- * from this worker's random number generator's sequence
- * @throws IllegalArgumentException if n is not positive
- */
- public static long nextRandomLong(long n) {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).
- nextJURandomLong(n);
- }
-
- /**
- * Returns a random double using a per-worker random
- * number generator with the same properties as
- * {@link java.util.Random#nextDouble}
- * @return the next pseudorandom, uniformly distributed {@code double}
- * value between {@code 0.0} and {@code 1.0} from this
- * worker's random number generator's sequence
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
*/
- public static double nextRandomDouble() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).
- nextJURandomDouble();
- }
-
- // Temporary Unsafe mechanics for preliminary release
-
- static final Unsafe _unsafe;
- static final long baseOffset;
- static final long spOffset;
- static final long qBase;
- static final int qShift;
- static final long runStateOffset;
- static {
+ private static sun.misc.Unsafe getUnsafe() {
try {
- if (ForkJoinWorkerThread.class.getClassLoader() != null) {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- _unsafe = (Unsafe)f.get(null);
- }
- else
- _unsafe = Unsafe.getUnsafe();
- baseOffset = _unsafe.objectFieldOffset
- (ForkJoinWorkerThread.class.getDeclaredField("base"));
- spOffset = _unsafe.objectFieldOffset
- (ForkJoinWorkerThread.class.getDeclaredField("sp"));
- runStateOffset = _unsafe.objectFieldOffset
- (ForkJoinWorkerThread.class.getDeclaredField("runState"));
- qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
- int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
- qShift = 31 - Integer.numberOfLeadingZeros(s);
- } catch (Exception e) {
- throw new RuntimeException("Could not initialize intrinsics", e);
+ return sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security
+ .PrivilegedExceptionAction