--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/23 14:09:17 1.37 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/09/04 11:33:53 1.46 @@ -97,19 +97,20 @@ public class ForkJoinWorkerThread extend * technique for implementing efficient futures" SIGPLAN Notices, * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs * in that: (1) We only maintain dependency links across workers - * upon steals, rather than maintain per-task bookkeeping. This - * may require a linear scan of workers array to locate stealers, - * but usually doesn't because stealers leave hints (that may - * become stale/wrong) of where to locate the kathem. This - * isolates cost to when it is needed, rather than adding to - * per-task overhead. (2) It is "shallow", ignoring nesting and - * potentially cyclic mutual steals. (3) It is intentionally - * racy: field currentJoin is updated only while actively joining, - * which means that we could miss links in the chain during - * long-lived tasks, GC stalls etc. (4) We bound the number of - * attempts to find work (see MAX_HELP_DEPTH) and fall back to - * suspending the worker and if necessary replacing it with a - * spare (see ForkJoinPool.tryAwaitJoin). + * upon steals, rather than use per-task bookkeeping. This may + * require a linear scan of workers array to locate stealers, but + * usually doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. This isolates cost to + * when it is needed, rather than adding to per-task overhead. + * (2) It is "shallow", ignoring nesting and potentially cyclic + * mutual steals. (3) It is intentionally racy: field currentJoin + * is updated only while actively joining, which means that we + * miss links in the chain during long-lived tasks, GC stalls etc + * (which is OK since blocking in such cases is usually a good + * idea). (4) We bound the number of attempts to find work (see + * MAX_HELP_DEPTH) and fall back to suspending the worker and if + * necessary replacing it with a spare (see + * ForkJoinPool.awaitJoin). * * Efficient implementation of these algorithms currently relies * on an uncomfortable amount of "Unsafe" mechanics. To maintain @@ -154,17 +155,6 @@ public class ForkJoinWorkerThread extend private static final Random seedGenerator = new Random(); /** - * The timeout value for suspending spares. Spare workers that - * remain unsignalled for more than this time may be trimmed - * (killed and removed from pool). Since our goal is to avoid - * long-term thread buildup, the exact value of timeout does not - * matter too much so long as it avoids most false-alarm timeouts - * under GC stalls or momentarily high system load. - */ - private static final long SPARE_KEEPALIVE_NANOS = - 5L * 1000L * 1000L * 1000L; // 5 secs - - /** * The maximum stolen->joining link depth allowed in helpJoinTask. * Depths for legitimate chains are unbounded, but we use a fixed * constant to avoid (otherwise unchecked) cycles and bound @@ -182,11 +172,11 @@ public class ForkJoinWorkerThread extend /** * Maximum work-stealing queue array size. Must be less than or - * equal to 1 << 28 to ensure lack of index wraparound. (This - * is less than usual bounds, because we need leftshift by 3 - * to be in int range). + * equal to 1 << (31 - width of array entry) to ensure lack of + * index wraparound. The value is set in the static block + * at the end of this file after obtaining width. */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; + private static final int MAXIMUM_QUEUE_CAPACITY; /** * The pool this thread works in. Accessed directly by ForkJoinTask. @@ -227,9 +217,12 @@ public class ForkJoinWorkerThread extend * Run state of this worker. In addition to the usual run levels, * tracks if this worker is suspended as a spare, and if it was * killed (trimmed) while suspended. However, "active" status is - * maintained separately. + * maintained separately and modified only in conjunction with + * CASes of the pool's runState (which are currently sadly + * manually inlined for performance.) Accessed directly by pool + * to simplify checks for normal (zero) status. */ - private volatile int runState; + volatile int runState; private static final int TERMINATING = 0x01; private static final int TERMINATED = 0x02; @@ -237,16 +230,8 @@ public class ForkJoinWorkerThread extend private static final int TRIMMED = 0x08; // killed while suspended /** - * Number of LockSupport.park calls to block this thread for - * suspension or event waits. Used for internal instrumention; - * currently not exported but included because volatile write upon - * park also provides a workaround for a JVM bug. - */ - volatile int parkCount; - - /** - * Number of steals, transferred and reset in pool callbacks pool - * when idle Accessed directly by pool. + * Number of steals. Directly accessed (and reset) by + * pool.tryAccumulateStealCount when idle. */ int stealCount; @@ -256,7 +241,6 @@ public class ForkJoinWorkerThread extend */ private int seed; - /** * Activity status. When true, this worker is considered active. * Accessed directly by pool. Must be false upon construction. @@ -265,8 +249,7 @@ public class ForkJoinWorkerThread extend /** * True if use local fifo, not default lifo, for local polling. - * Shadows value from ForkJoinPool, which resets it if changed - * pool-wide. + * Shadows value from ForkJoinPool. */ private final boolean locallyFifo; @@ -284,24 +267,36 @@ public class ForkJoinWorkerThread extend int lastEventCount; /** - * Encoded index and event count of next event waiter. Used only - * by ForkJoinPool for managing event waiters. + * Encoded index and event count of next event waiter. Accessed + * only by ForkJoinPool for managing event waiters. */ volatile long nextWaiter; /** + * Number of times this thread suspended as spare. Accessed only + * by pool. + */ + int spareCount; + + /** + * Encoded index and count of next spare waiter. Accessed only + * by ForkJoinPool for managing spares. + */ + volatile int nextSpare; + + /** * The task currently being joined, set only when actively trying - * to helpStealer. Written only by current thread, but read by - * others. + * to help other stealers in helpJoinTask. Written only by this + * thread, but read by others. */ private volatile ForkJoinTask currentJoin; /** * The task most recently stolen from another worker (or - * submission queue). Not volatile because always read/written in - * presence of related volatiles in those cases where it matters. + * submission queue). Written only by this thread, but read by + * others. */ - private ForkJoinTask currentSteal; + private volatile ForkJoinTask currentSteal; /** * Creates a ForkJoinWorkerThread operating in the given pool. @@ -312,18 +307,18 @@ public class ForkJoinWorkerThread extend protected ForkJoinWorkerThread(ForkJoinPool pool) { this.pool = pool; this.locallyFifo = pool.locallyFifo; + setDaemon(true); // To avoid exposing construction details to subclasses, // remaining initialization is in start() and onStart() } /** - * Performs additional initialization and starts this thread + * Performs additional initialization and starts this thread. */ final void start(int poolIndex, UncaughtExceptionHandler ueh) { this.poolIndex = poolIndex; if (ueh != null) setUncaughtExceptionHandler(ueh); - setDaemon(true); start(); } @@ -354,7 +349,7 @@ public class ForkJoinWorkerThread extend /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. + * invoke @code{super.onStart()} at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts @@ -382,9 +377,16 @@ public class ForkJoinWorkerThread extend */ protected void onTermination(Throwable exception) { try { + ForkJoinPool p = pool; + if (active) { + int a; // inline p.tryDecrementActiveCount + active = false; + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)); + } cancelTasks(); setTerminated(); - pool.workerTerminated(this); + p.workerTerminated(this); } catch (Throwable ex) { // Shouldn't ever happen if (exception == null) // but if so, at least rethrown exception = ex; @@ -414,60 +416,76 @@ public class ForkJoinWorkerThread extend // helpers for run() /** - * Find and execute tasks and check status while running + * Finds and executes tasks, and checks status while running. */ private void mainLoop() { - int emptyScans = 0; // consecutive times failed to find work + boolean ran = false; // true if ran a task on last step ForkJoinPool p = pool; for (;;) { - p.preStep(this, emptyScans); + p.preStep(this, ran); if (runState != 0) - return; - ForkJoinTask t; // try to get and run stolen or submitted task - if ((t = scan()) != null || (t = pollSubmission()) != null) { - t.tryExec(); - if (base != sp) - runLocalTasks(); - currentSteal = null; - emptyScans = 0; - } - else - ++emptyScans; + break; + ran = tryExecSteal() || tryExecSubmission(); } } /** - * Runs local tasks until queue is empty or shut down. Call only - * while active. + * Tries to steal a task and execute it. + * + * @return true if ran a task */ - private void runLocalTasks() { - while (runState == 0) { - ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); - if (t != null) - t.tryExec(); - else if (base == sp) - break; + private boolean tryExecSteal() { + ForkJoinTask t; + if ((t = scan()) != null) { + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; } + return false; } /** - * If a submission exists, try to activate and take it + * If a submission exists, try to activate and run it. * - * @return a task, if available + * @return true if ran a task */ - private ForkJoinTask pollSubmission() { + private boolean tryExecSubmission() { ForkJoinPool p = pool; + // This loop is needed in case attempt to activate fails, in + // which case we only retry if there still appears to be a + // submission. while (p.hasQueuedSubmissions()) { - if (active || (active = p.tryIncrementActiveCount())) { - ForkJoinTask t = p.pollSubmission(); - if (t != null) { - currentSteal = t; - return t; + ForkJoinTask t; int a; + if (active || // inline p.tryIncrementActiveCount + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) { + if ((t = p.pollSubmission()) != null) { + UNSAFE.putOrderedObject(this, currentStealOffset, t); + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, null); + if (sp != base) + execLocalTasks(); + return true; } - return scan(); // if missed, rescan } } - return null; + return false; + } + + /** + * Runs local tasks until queue is empty or shut down. Call only + * while active. + */ + private void execLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo ? locallyDeqTask() : popTask(); + if (t != null) + t.quietlyExec(); + else if (sp == base) + break; + } } /* @@ -502,7 +520,7 @@ public class ForkJoinWorkerThread extend * range. This method is used only during resets and backouts. */ private static final void writeSlot(ForkJoinTask[] q, int i, - ForkJoinTask t) { + ForkJoinTask t) { UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); } @@ -535,7 +553,7 @@ public class ForkJoinWorkerThread extend ForkJoinTask t; ForkJoinTask[] q; int b, i; - if ((b = base) != sp && + if (sp != (b = base) && (q = queue) != null && // must read q after b (t = q[i = (q.length - 1) & b]) != null && base == b && UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { @@ -547,7 +565,7 @@ public class ForkJoinWorkerThread extend /** * Tries to take a task from the base of own queue. Assumes active - * status. Called only by current thread. + * status. Called only by this thread. * * @return a task, or null if none */ @@ -570,18 +588,23 @@ public class ForkJoinWorkerThread extend /** * Returns a popped task, or null if empty. Assumes active status. - * Called only by current thread. + * Called only by this thread. */ - final ForkJoinTask popTask() { - int s; - ForkJoinTask[] q; - if (base != (s = sp) && (q = queue) != null) { - int i = (q.length - 1) & --s; - ForkJoinTask t = q[i]; - if (t != null && UNSAFE.compareAndSwapObject - (q, (i << qShift) + qBase, t, null)) { - sp = s; - return t; + private ForkJoinTask popTask() { + ForkJoinTask[] q = queue; + if (q != null) { + int s; + while ((s = sp) != base) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (t == null) // lost to stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); + return t; + } } } return null; @@ -589,25 +612,25 @@ public class ForkJoinWorkerThread extend /** * Specialized version of popTask to pop only if topmost element - * is the given task. Called only by current thread while - * active. + * is the given task. Called only by this thread while active. * * @param t the task. Caller must ensure non-null. */ final boolean unpushTask(ForkJoinTask t) { int s; - ForkJoinTask[] q; - if (base != (s = sp) && (q = queue) != null && + ForkJoinTask[] q = queue; + if ((s = sp) != base && q != null && UNSAFE.compareAndSwapObject (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { - sp = s; + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); return true; } return false; } /** - * Returns next task or null if empty or contended + * Returns next task, or null if empty or contended. */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; @@ -649,7 +672,7 @@ public class ForkJoinWorkerThread extend * Computes next value for random victim probe in scan(). Scans * don't require a very high quality generator, but also not a * crummy one. Marsaglia xor-shift is cheap and works well enough. - * Note: This is manually inlined in scan() + * Note: This is manually inlined in scan(). */ private static final int xorShift(int r) { r ^= r << 13; @@ -688,24 +711,26 @@ public class ForkJoinWorkerThread extend for (;;) { ForkJoinWorkerThread v = ws[k & mask]; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift - if (v != null && v.base != v.sp) { - if (canSteal || // ensure active status - (canSteal = active = p.tryIncrementActiveCount())) { - int b = v.base; // inline specialized deqTask - ForkJoinTask[] q; - if (b != v.sp && (q = v.queue) != null) { - ForkJoinTask t; - int i = (q.length - 1) & b; - long u = (i << qShift) + qBase; // raw offset - if ((t = q[i]) != null && v.base == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - currentSteal = t; - v.stealHint = poolIndex; - v.base = b + 1; - seed = r; - ++stealCount; - return t; - } + ForkJoinTask[] q; ForkJoinTask t; int b, a; + if (v != null && (b = v.base) != v.sp && + (q = v.queue) != null) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + int pid = poolIndex; + if ((t = q[i]) != null) { + if (!canSteal && // inline p.tryIncrementActiveCount + UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1)) + canSteal = active = true; + if (canSteal && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, + currentStealOffset, t); + seed = r; + ++stealCount; + return t; } } j = -n; @@ -725,24 +750,26 @@ public class ForkJoinWorkerThread extend // Run State management // status check methods used mainly by ForkJoinPool + final boolean isRunning() { return runState == 0; } final boolean isTerminating() { return (runState & TERMINATING) != 0; } final boolean isTerminated() { return (runState & TERMINATED) != 0; } final boolean isSuspended() { return (runState & SUSPENDED) != 0; } final boolean isTrimmed() { return (runState & TRIMMED) != 0; } /** - * Sets state to TERMINATING, also resuming if suspended. + * Sets state to TERMINATING. Does NOT unpark or interrupt + * to wake up if currently blocked. Callers must do so if desired. */ final void shutdown() { for (;;) { int s = runState; + if ((s & (TERMINATING|TERMINATED)) != 0) + break; if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, (s & ~SUSPENDED) | - (TRIMMED|TERMINATING))) { - LockSupport.unpark(this); + (TRIMMED|TERMINATING))) break; - } } else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | TERMINATING)) @@ -751,7 +778,7 @@ public class ForkJoinWorkerThread extend } /** - * Sets state to TERMINATED. Called only by this thread. + * Sets state to TERMINATED. Called only by onTermination(). */ private void setTerminated() { int s; @@ -761,78 +788,47 @@ public class ForkJoinWorkerThread extend } /** - * Instrumented version of park used by ForkJoinPool.awaitEvent - */ - final void doPark() { - ++parkCount; - LockSupport.park(this); - } - - /** - * If suspended, tries to set status to unsuspended and unparks. + * If suspended, tries to set status to unsuspended. + * Does NOT wake up if blocked. * * @return true if successful */ - final boolean tryResumeSpare() { - int s = runState; - if ((s & SUSPENDED) != 0 && - UNSAFE.compareAndSwapInt(this, runStateOffset, s, - s & ~SUSPENDED)) { - LockSupport.unpark(this); - return true; + final boolean tryUnsuspend() { + int s; + while (((s = runState) & SUSPENDED) != 0) { + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)) + return true; } return false; } /** - * Sets suspended status and blocks as spare until resumed, - * shutdown, or timed out. - * - * @return false if trimmed + * Sets suspended status and blocks as spare until resumed + * or shutdown. */ - final boolean suspendAsSpare() { - for (;;) { // set suspended unless terminating + final void suspendAsSpare() { + for (;;) { // set suspended unless terminating int s = runState; if ((s & TERMINATING) != 0) { // must kill if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | (TRIMMED | TERMINATING))) - return false; + return; } else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | SUSPENDED)) break; } - boolean timed; - long nanos; - long startTime; - if (poolIndex < pool.parallelism) { - timed = false; - nanos = 0L; - startTime = 0L; - } - else { - timed = true; - nanos = SPARE_KEEPALIVE_NANOS; - startTime = System.nanoTime(); - } - pool.accumulateStealCount(this); - lastEventCount = 0; // reset upon resume - interrupted(); // clear/ignore interrupts + ForkJoinPool p = pool; + p.pushSpare(this); while ((runState & SUSPENDED) != 0) { - ++parkCount; - if (!timed) + if (p.tryAccumulateStealCount(this)) { + interrupted(); // clear/ignore interrupts + if ((runState & SUSPENDED) == 0) + break; LockSupport.park(this); - else if ((nanos -= (System.nanoTime() - startTime)) > 0) - LockSupport.parkNanos(this, nanos); - else { // try to trim on timeout - int s = runState; - if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, - (s & ~SUSPENDED) | - (TRIMMED|TERMINATING))) - return false; } } - return true; } // Misc support methods for ForkJoinPool @@ -842,7 +838,8 @@ public class ForkJoinWorkerThread extend * used by ForkJoinTask. */ final int getQueueSize() { - return -base + sp; + int n; // external calls must read base first + return (n = -base + sp) <= 0 ? 0 : n; } /** @@ -850,10 +847,14 @@ public class ForkJoinWorkerThread extend * thread. */ final void cancelTasks() { - ForkJoinTask cj = currentJoin; // try to kill live tasks + ForkJoinTask cj = currentJoin; // try to cancel ongoing tasks if (cj != null) { currentJoin = null; cj.cancelIgnoringExceptions(); + try { + this.interrupt(); // awaken wait + } catch (SecurityException ignore) { + } } ForkJoinTask cs = currentSteal; if (cs != null) { @@ -892,9 +893,13 @@ public class ForkJoinWorkerThread extend * @return a task, if available */ final ForkJoinTask pollLocalTask() { + ForkJoinPool p = pool; while (sp != base) { - if (active || (active = pool.tryIncrementActiveCount())) - return locallyFifo? locallyDeqTask() : popTask(); + int a; // inline p.tryIncrementActiveCount + if (active || + (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, + a = p.runState, a + 1))) + return locallyFifo ? locallyDeqTask() : popTask(); } return null; } @@ -905,66 +910,46 @@ public class ForkJoinWorkerThread extend * @return a task, if available */ final ForkJoinTask pollTask() { - ForkJoinTask t; - return (t = pollLocalTask()) != null ? t : scan(); + ForkJoinTask t = pollLocalTask(); + if (t == null) { + t = scan(); + // cannot retain/track/help steal + UNSAFE.putOrderedObject(this, currentStealOffset, null); + } + return t; } /** * Possibly runs some tasks and/or blocks, until task is done. - * The main body is basically a big spinloop, alternating between - * calls to helpJoinTask and pool.tryAwaitJoin with increased - * patience parameters until either the task is done without - * waiting, or we have, if necessary, created or resumed a - * replacement for this thread while it blocks. * * @param joinMe the task to join - * @return task status on exit */ - final int joinTask(ForkJoinTask joinMe) { - int stat; + final void joinTask(ForkJoinTask joinMe) { + // currentJoin only written by this thread; only need ordered store ForkJoinTask prevJoin = currentJoin; - currentJoin = joinMe; - if ((stat = joinMe.status) >= 0 && - (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) { - ForkJoinPool p = pool; - int helpRetries = 2; // initial patience values - int awaitRetries = -1; // -1 is sentinel for replace-check only - do { - helpJoinTask(joinMe, helpRetries); - if ((stat = joinMe.status) < 0) - break; - boolean busy = p.tryAwaitJoin(joinMe, awaitRetries); - if ((stat = joinMe.status) < 0) - break; - if (awaitRetries == -1) - awaitRetries = 0; - else if (busy) - ++awaitRetries; - if (helpRetries < p.parallelism) - helpRetries <<= 1; - Thread.yield(); // tame unbounded loop - } while (joinMe.status >= 0); - } - currentJoin = prevJoin; - return stat; + UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); + if (sp != base) + localHelpJoinTask(joinMe); + if (joinMe.status >= 0) + pool.awaitJoin(joinMe, this); + UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); } /** * Run tasks in local queue until given task is done. * * @param joinMe the task to join - * @return task status on exit */ - private int localHelpJoinTask(ForkJoinTask joinMe) { - int stat, s; + private void localHelpJoinTask(ForkJoinTask joinMe) { + int s; ForkJoinTask[] q; - while ((stat = joinMe.status) >= 0 && - base != (s = sp) && (q = queue) != null) { - ForkJoinTask t; + while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { int i = (q.length - 1) & --s; long u = (i << qShift) + qBase; // raw offset - if ((t = q[i]) != null && - UNSAFE.compareAndSwapObject(q, u, t, null)) { + ForkJoinTask t = q[i]; + if (t == null) // lost to a stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { /* * This recheck (and similarly in helpJoinTask) * handles cases where joinMe is independently @@ -972,95 +957,107 @@ public class ForkJoinWorkerThread extend * available. Back out of the pop by putting t back * into slot before we commit by writing sp. */ - if ((stat = joinMe.status) < 0) { + if (joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); break; } sp = s; - t.tryExec(); + // UNSAFE.putOrderedInt(this, spOffset, s); + t.quietlyExec(); } } - return stat; } /** - * Tries to locate and help perform tasks for a stealer of the - * given task, or in turn one of its stealers. Traces - * currentSteal->currentJoin links looking for a thread working on - * a descendant of the given task and with a non-empty queue to - * steal back and execute tasks from. Restarts search upon - * encountering chains that are stale, unknown, or of length - * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles. - * - * The implementation is very branchy to cope with the restart - * cases. Returns void, not task status (which must be reread by - * caller anyway) to slightly simplify control paths. + * Unless terminating, tries to locate and help perform tasks for + * a stealer of the given task, or in turn one of its stealers. + * Traces currentSteal->currentJoin links looking for a thread + * working on a descendant of the given task and with a non-empty + * queue to steal back and execute tasks from. + * + * The implementation is very branchy to cope with potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or of length greater than MAX_HELP_DEPTH links. All + * of these cases are dealt with by just returning back to the + * caller, who is expected to retry if other join mechanisms also + * don't work out. * * @param joinMe the task to join */ - final void helpJoinTask(ForkJoinTask joinMe, int retries) { - ForkJoinWorkerThread[] ws = pool.workers; + final void helpJoinTask(ForkJoinTask joinMe) { + ForkJoinWorkerThread[] ws; int n; - if (ws == null || (n = ws.length) <= 1) - return; // need at least 2 workers - - restart:while (joinMe.status >= 0 && --retries >= 0) { - ForkJoinTask task = joinMe; // base of chain - ForkJoinWorkerThread thread = this; // thread with stolen task - for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) { - // Try to find v, the stealer of task, by first using hint - ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; - if (v == null || v.currentSteal != task) { - for (int j = 0; ; ++j) { // search array - if (task.status < 0 || j == n) - continue restart; // stale or no stealer - if ((v = ws[j]) != null && v.currentSteal == task) { - thread.stealHint = j; // save for next time - break; - } - } - } - // Try to help v, using specialized form of deqTask - int b; - ForkJoinTask[] q; - while ((b = v.base) != v.sp && (q = v.queue) != null) { - int i = (q.length - 1) & b; - long u = (i << qShift) + qBase; - ForkJoinTask t = q[i]; - if (task.status < 0) // stale - continue restart; - if (v.base == b) { // recheck after reading t - if (t == null) // producer stalled - continue restart; // retry via restart - if (UNSAFE.compareAndSwapObject(q, u, t, null)) { - if (joinMe.status < 0) { - UNSAFE.putObjectVolatile(q, u, t); - return; // back out on cancel + if (joinMe.status < 0) // already done + return; + if ((runState & TERMINATING) != 0) { // cancel if shutting down + joinMe.cancelIgnoringExceptions(); + return; + } + if ((ws = pool.workers) == null || (n = ws.length) <= 1) + return; // need at least 2 workers + + ForkJoinTask task = joinMe; // base of chain + ForkJoinWorkerThread thread = this; // thread with stolen task + for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length + // Try to find v, the stealer of task, by first using hint + ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; + if (v == null || v.currentSteal != task) { + for (int j = 0; ; ++j) { // search array + if (j < n) { + ForkJoinTask vs; + if ((v = ws[j]) != null && + (vs = v.currentSteal) != null) { + if (joinMe.status < 0 || task.status < 0) + return; // stale or done + if (vs == task) { + thread.stealHint = j; + break; // save hint for next time } - ForkJoinTask prevSteal = currentSteal; - currentSteal = t; - v.stealHint = poolIndex; - v.base = b + 1; - t.tryExec(); - currentSteal = prevSteal; } } - if (joinMe.status < 0) - return; + else + return; // no stealer } - // Try to descend to find v's stealer - ForkJoinTask next = v.currentJoin; - if (next == null || task.status < 0) - continue restart; // no descendent or stale + } + for (;;) { // Try to help v, using specialized form of deqTask if (joinMe.status < 0) return; - task = next; - thread = v; + int b = v.base; + ForkJoinTask[] q = v.queue; + if (b == v.sp || q == null) + break; + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; + ForkJoinTask t = q[i]; + int pid = poolIndex; + ForkJoinTask ps = currentSteal; + if (task.status < 0) + return; // stale or done + if (t != null && v.base == b++ && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (joinMe.status < 0) { + UNSAFE.putObjectVolatile(q, u, t); + return; // back out on cancel + } + v.base = b; + v.stealHint = pid; + UNSAFE.putOrderedObject(this, currentStealOffset, t); + t.quietlyExec(); + UNSAFE.putOrderedObject(this, currentStealOffset, ps); + } } + // Try to descend to find v's stealer + ForkJoinTask next = v.currentJoin; + if (task.status < 0 || next == null || next == task || + joinMe.status < 0) + return; + task = next; + thread = v; } } /** + * Implements ForJoinTask.getSurplusQueuedTaskCount(). * Returns an estimate of the number of tasks, offset by a * function of number of idle workers. * @@ -1115,21 +1112,25 @@ public class ForkJoinWorkerThread extend * Runs tasks until {@code pool.isQuiescent()}. */ final void helpQuiescePool() { + ForkJoinTask ps = currentSteal; // to restore below for (;;) { ForkJoinTask t = pollLocalTask(); - if (t != null || (t = scan()) != null) { - t.tryExec(); - currentSteal = null; - } + if (t != null || (t = scan()) != null) + t.quietlyExec(); else { ForkJoinPool p = pool; + int a; // to inline CASes if (active) { + if (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a - 1)) + continue; // retry later active = false; // inactivate - do {} while (!p.tryDecrementActiveCount()); + UNSAFE.putOrderedObject(this, currentStealOffset, ps); } if (p.isQuiescent()) { active = true; // re-activate - do {} while (!p.tryIncrementActiveCount()); + do {} while (!UNSAFE.compareAndSwapInt + (p, poolRunStateOffset, a = p.runState, a+1)); return; } } @@ -1139,12 +1140,19 @@ public class ForkJoinWorkerThread extend // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); private static final long runStateOffset = objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long currentJoinOffset = + objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); + private static final long currentStealOffset = + objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); private static final long qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); - private static final long threadStatusOffset = - objectFieldOffset("threadStatus", Thread.class); + private static final long poolRunStateOffset = // to inline CAS + objectFieldOffset("runState", ForkJoinPool.class); + private static final int qShift; static { @@ -1152,6 +1160,7 @@ public class ForkJoinWorkerThread extend if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); qShift = 31 - Integer.numberOfLeadingZeros(s); + MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); } private static long objectFieldOffset(String field, Class klazz) {