--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/24 20:28:18 1.39 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/08/11 18:45:12 1.40 @@ -155,17 +155,6 @@ public class ForkJoinWorkerThread extend private static final Random seedGenerator = new Random(); /** - * The timeout value for suspending spares. Spare workers that - * remain unsignalled for more than this time may be trimmed - * (killed and removed from pool). Since our goal is to avoid - * long-term thread buildup, the exact value of timeout does not - * matter too much so long as it avoids most false-alarm timeouts - * under GC stalls or momentarily high system load. - */ - private static final long SPARE_KEEPALIVE_NANOS = - 5L * 1000L * 1000L * 1000L; // 5 secs - - /** * The maximum stolen->joining link depth allowed in helpJoinTask. * Depths for legitimate chains are unbounded, but we use a fixed * constant to avoid (otherwise unchecked) cycles and bound @@ -175,6 +164,13 @@ public class ForkJoinWorkerThread extend private static final int MAX_HELP_DEPTH = 8; /** + * The wakeup interval (in nanoseconds) for the first worker + * suspended as spare. On each wakeup not signalled by a + * resumption, it may ask the pool to reduce the number of spares. + */ + private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L; + + /** * Capacity of work-stealing queue array upon initialization. * Must be a power of two. Initial size must be at least 4, but is * padded to minimize cache effects. @@ -238,14 +234,6 @@ public class ForkJoinWorkerThread extend private static final int TRIMMED = 0x08; // killed while suspended /** - * Number of LockSupport.park calls to block this thread for - * suspension or event waits. Used for internal instrumention; - * currently not exported but included because volatile write upon - * park also provides a workaround for a JVM bug. - */ - volatile int parkCount; - - /** * Number of steals, transferred and reset in pool callbacks pool * when idle Accessed directly by pool. */ @@ -289,6 +277,17 @@ public class ForkJoinWorkerThread extend volatile long nextWaiter; /** + * Number of times this thread suspended as spare + */ + int spareCount; + + /** + * Encoded index and count of next spare waiter. Used only + * by ForkJoinPool for managing spares. + */ + volatile int nextSpare; + + /** * The task currently being joined, set only when actively trying * to helpStealer. Written only by current thread, but read by * others. @@ -311,6 +310,7 @@ public class ForkJoinWorkerThread extend protected ForkJoinWorkerThread(ForkJoinPool pool) { this.pool = pool; this.locallyFifo = pool.locallyFifo; + setDaemon(true); // To avoid exposing construction details to subclasses, // remaining initialization is in start() and onStart() } @@ -320,7 +320,6 @@ public class ForkJoinWorkerThread extend */ final void start(int poolIndex, UncaughtExceptionHandler ueh) { this.poolIndex = poolIndex; - setDaemon(true); if (ueh != null) setUncaughtExceptionHandler(ueh); start(); @@ -382,6 +381,8 @@ public class ForkJoinWorkerThread extend protected void onTermination(Throwable exception) { try { cancelTasks(); + while (active) // force inactive + active = !pool.tryDecrementActiveCount(); setTerminated(); pool.workerTerminated(this); } catch (Throwable ex) { // Shouldn't ever happen @@ -416,57 +417,69 @@ public class ForkJoinWorkerThread extend * Find and execute tasks and check status while running */ private void mainLoop() { - int emptyScans = 0; // consecutive times failed to find work + int misses = 0; // track consecutive times failed to find work; max 2 ForkJoinPool p = pool; for (;;) { - p.preStep(this, emptyScans); + p.preStep(this, misses); if (runState != 0) - return; - ForkJoinTask t; // try to get and run stolen or submitted task - if ((t = scan()) != null || (t = pollSubmission()) != null) { - t.tryExec(); - if (base != sp) - runLocalTasks(); - currentSteal = null; - emptyScans = 0; - } - else - ++emptyScans; + break; + misses = ((tryExecSteal() || tryExecSubmission()) ? 0 : + (misses < 2 ? misses + 1 : 2)); } } /** - * Runs local tasks until queue is empty or shut down. Call only - * while active. + * Try to steal a task and execute it + * + * @return true if ran a task */ - private void runLocalTasks() { - while (runState == 0) { - ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); - if (t != null) - t.tryExec(); - else if (base == sp) - break; + private boolean tryExecSteal() { + ForkJoinTask t; + if ((t = scan()) != null) { + t.quietlyExec(); + currentSteal = null; + if (sp != base) + execLocalTasks(); + return true; } + return false; } /** - * If a submission exists, try to activate and take it + * If a submission exists, try to activate and run it; * - * @return a task, if available + * @return true if ran a task */ - private ForkJoinTask pollSubmission() { + private boolean tryExecSubmission() { ForkJoinPool p = pool; while (p.hasQueuedSubmissions()) { + ForkJoinTask t; if (active || (active = p.tryIncrementActiveCount())) { - ForkJoinTask t = p.pollSubmission(); - if (t != null) { + if ((t = p.pollSubmission()) != null) { currentSteal = t; - return t; + t.quietlyExec(); + currentSteal = null; + if (sp != base) + execLocalTasks(); + return true; } - return scan(); // if missed, rescan } } - return null; + return false; + } + + /** + * Runs local tasks until queue is empty or shut down. Call only + * while active. + */ + private void execLocalTasks() { + while (runState == 0) { + ForkJoinTask t = locallyFifo? locallyDeqTask() : popTask(); + if (t != null) + t.quietlyExec(); + else if (sp == base) + break; + } } /* @@ -534,7 +547,7 @@ public class ForkJoinWorkerThread extend ForkJoinTask t; ForkJoinTask[] q; int b, i; - if ((b = base) != sp && + if (sp != (b = base) && (q = queue) != null && // must read q after b (t = q[i = (q.length - 1) & b]) != null && base == b && UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { @@ -571,16 +584,21 @@ public class ForkJoinWorkerThread extend * Returns a popped task, or null if empty. Assumes active status. * Called only by current thread. */ - final ForkJoinTask popTask() { - int s; - ForkJoinTask[] q; - if (base != (s = sp) && (q = queue) != null) { - int i = (q.length - 1) & --s; - ForkJoinTask t = q[i]; - if (t != null && UNSAFE.compareAndSwapObject - (q, (i << qShift) + qBase, t, null)) { - sp = s; - return t; + private ForkJoinTask popTask() { + ForkJoinTask[] q = queue; + if (q != null) { + int s; + while ((s = sp) != base) { + int i = (q.length - 1) & --s; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (t == null) // lost to stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + sp = s; // putOrderedInt may encourage more timely write + // UNSAFE.putOrderedInt(this, spOffset, s); + return t; + } } } return null; @@ -595,11 +613,12 @@ public class ForkJoinWorkerThread extend */ final boolean unpushTask(ForkJoinTask t) { int s; - ForkJoinTask[] q; - if (base != (s = sp) && (q = queue) != null && + ForkJoinTask[] q = queue; + if ((s = sp) != base && q != null && UNSAFE.compareAndSwapObject (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { sp = s; + // UNSAFE.putOrderedInt(this, spOffset, s); return true; } return false; @@ -688,23 +707,22 @@ public class ForkJoinWorkerThread extend ForkJoinWorkerThread v = ws[k & mask]; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift if (v != null && v.base != v.sp) { - if (canSteal || // ensure active status - (canSteal = active = p.tryIncrementActiveCount())) { - int b = v.base; // inline specialized deqTask - ForkJoinTask[] q; - if (b != v.sp && (q = v.queue) != null) { - ForkJoinTask t; - int i = (q.length - 1) & b; - long u = (i << qShift) + qBase; // raw offset - if ((t = q[i]) != null && v.base == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { - currentSteal = t; - v.stealHint = poolIndex; - v.base = b + 1; - seed = r; - ++stealCount; - return t; - } + ForkJoinTask[] q; int b; + if ((canSteal || // ensure active status + (canSteal = active = p.tryIncrementActiveCount())) && + (q = v.queue) != null && (b = v.base) != v.sp) { + int i = (q.length - 1) & b; + long u = (i << qShift) + qBase; // raw offset + ForkJoinTask t = q[i]; + if (v.base == b && t != null && + UNSAFE.compareAndSwapObject(q, u, t, null)) { + int pid = poolIndex; + currentSteal = t; + v.stealHint = pid; + v.base = b + 1; + seed = r; + ++stealCount; + return t; } } j = -n; @@ -724,33 +742,40 @@ public class ForkJoinWorkerThread extend // Run State management // status check methods used mainly by ForkJoinPool + final boolean isRunning() { return runState == 0; } final boolean isTerminating() { return (runState & TERMINATING) != 0; } final boolean isTerminated() { return (runState & TERMINATED) != 0; } final boolean isSuspended() { return (runState & SUSPENDED) != 0; } final boolean isTrimmed() { return (runState & TRIMMED) != 0; } /** - * Sets state to TERMINATING, also resuming if suspended. + * Sets state to TERMINATING, also, unless "quiet", unparking if + * not already terminated + * + * @param quiet don't unpark (used for faster status updates on + * pool termination) */ - final void shutdown() { + final void shutdown(boolean quiet) { for (;;) { int s = runState; + if ((s & (TERMINATING|TERMINATED)) != 0) + break; if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, (s & ~SUSPENDED) | - (TRIMMED|TERMINATING))) { - LockSupport.unpark(this); + (TRIMMED|TERMINATING))) break; - } } else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | TERMINATING)) break; } + if (!quiet && (runState & TERMINATED) != 0) + LockSupport.unpark(this); } /** - * Sets state to TERMINATED. Called only by this thread. + * Sets state to TERMINATED. Called only by onTermination() */ private void setTerminated() { int s; @@ -760,37 +785,28 @@ public class ForkJoinWorkerThread extend } /** - * Instrumented version of park used by ForkJoinPool.eventSync - */ - final void doPark() { - ++parkCount; - LockSupport.park(this); - } - - /** * If suspended, tries to set status to unsuspended and unparks. * * @return true if successful */ - final boolean tryResumeSpare() { - int s = runState; - if ((s & SUSPENDED) != 0 && - UNSAFE.compareAndSwapInt(this, runStateOffset, s, - s & ~SUSPENDED)) { - LockSupport.unpark(this); - return true; + final boolean tryUnsuspend() { + int s; + while (((s = runState) & SUSPENDED) != 0) { + if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, + s & ~SUSPENDED)) + return true; } return false; } /** - * Sets suspended status and blocks as spare until resumed, - * shutdown, or timed out. - * - * @return false if trimmed + * Sets suspended status and blocks as spare until resumed + * or shutdown. + * @returns true if still running on exit */ final boolean suspendAsSpare() { - for (;;) { // set suspended unless terminating + lastEventCount = 0; // reset upon resume + for (;;) { // set suspended unless terminating int s = runState; if ((s & TERMINATING) != 0) { // must kill if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, @@ -801,38 +817,27 @@ public class ForkJoinWorkerThread extend s | SUSPENDED)) break; } - int pc = pool.parallelism; - pool.accumulateStealCount(this); - boolean timed; - long nanos; - long startTime; - if (poolIndex < pc) { // untimed wait for core threads - timed = false; - nanos = 0L; - startTime = 0L; - } - else { // timed wait for added threads - timed = true; - nanos = SPARE_KEEPALIVE_NANOS; - startTime = System.nanoTime(); - } - lastEventCount = 0; // reset upon resume - interrupted(); // clear/ignore interrupts + ForkJoinPool p = pool; + p.pushSpare(this); while ((runState & SUSPENDED) != 0) { - ++parkCount; - if (!timed) + if (!p.tryAccumulateStealCount(this)) + continue; + interrupted(); // clear/ignore interrupts + if ((runState & SUSPENDED) == 0) + break; + if (nextSpare != 0) // untimed LockSupport.park(this); - else if ((nanos -= (System.nanoTime() - startTime)) > 0) - LockSupport.parkNanos(this, nanos); - else { // try to trim on timeout - int s = runState; - if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, - (s & ~SUSPENDED) | - (TRIMMED|TERMINATING))) - return false; + else { + long startTime = System.nanoTime(); + LockSupport.parkNanos(this, TRIM_RATE_NANOS); + if ((runState & SUSPENDED) == 0) + break; + long now = System.nanoTime(); + if (now - startTime >= TRIM_RATE_NANOS) + pool.tryTrimSpare(now); } } - return true; + return runState == 0; } // Misc support methods for ForkJoinPool @@ -842,7 +847,8 @@ public class ForkJoinWorkerThread extend * used by ForkJoinTask. */ final int getQueueSize() { - return -base + sp; + int n; // external calls must read base first + return (n = -base + sp) <= 0 ? 0 : n; } /** @@ -850,10 +856,14 @@ public class ForkJoinWorkerThread extend * thread. */ final void cancelTasks() { - ForkJoinTask cj = currentJoin; // try to kill live tasks + ForkJoinTask cj = currentJoin; // try to cancel ongoing tasks if (cj != null) { currentJoin = null; cj.cancelIgnoringExceptions(); + try { + this.interrupt(); // awaken wait + } catch (SecurityException ignore) { + } } ForkJoinTask cs = currentSteal; if (cs != null) { @@ -908,59 +918,42 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = pollLocalTask(); if (t == null) { t = scan(); - currentSteal = null; // cannot retain/track + currentSteal = null; // cannot retain/track/help } return t; } /** * Possibly runs some tasks and/or blocks, until task is done. - * The main body is basically a big spinloop, alternating between - * calls to helpJoinTask and pool.tryAwaitJoin with increased - * patience parameters until either the task is done without - * waiting, or we have, if necessary, created or resumed a - * replacement for this thread while it blocks. * * @param joinMe the task to join - * @return task status on exit */ - final int joinTask(ForkJoinTask joinMe) { - int stat; + final void joinTask(ForkJoinTask joinMe) { + // currentJoin only written by this thread; only need ordered store ForkJoinTask prevJoin = currentJoin; - // Only written by this thread; only need ordered store UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); - if ((stat = joinMe.status) >= 0 && - (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) { - for (int retries = 0; ; ++retries) { - helpJoinTask(joinMe, retries); - if ((stat = joinMe.status) < 0) - break; - pool.tryAwaitJoin(joinMe, retries); - if ((stat = joinMe.status) < 0) - break; - Thread.yield(); // tame unbounded loop - } - } + if (sp != base) + localHelpJoinTask(joinMe); + if (joinMe.status >= 0) + pool.awaitJoin(joinMe, this); UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); - return stat; } /** * Run tasks in local queue until given task is done. * * @param joinMe the task to join - * @return task status on exit */ - private int localHelpJoinTask(ForkJoinTask joinMe) { - int stat, s; + private void localHelpJoinTask(ForkJoinTask joinMe) { + int s; ForkJoinTask[] q; - while ((stat = joinMe.status) >= 0 && - base != (s = sp) && (q = queue) != null) { - ForkJoinTask t; + while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { int i = (q.length - 1) & --s; long u = (i << qShift) + qBase; // raw offset - if ((t = q[i]) != null && - UNSAFE.compareAndSwapObject(q, u, t, null)) { + ForkJoinTask t = q[i]; + if (t == null) // lost to a stealer + break; + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { /* * This recheck (and similarly in helpJoinTask) * handles cases where joinMe is independently @@ -968,15 +961,15 @@ public class ForkJoinWorkerThread extend * available. Back out of the pop by putting t back * into slot before we commit by writing sp. */ - if ((stat = joinMe.status) < 0) { + if (joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); break; } sp = s; - t.tryExec(); + // UNSAFE.putOrderedInt(this, spOffset, s); + t.quietlyExec(); } } - return stat; } /** @@ -984,36 +977,40 @@ public class ForkJoinWorkerThread extend * given task, or in turn one of its stealers. Traces * currentSteal->currentJoin links looking for a thread working on * a descendant of the given task and with a non-empty queue to - * steal back and execute tasks from. Restarts search upon - * encountering chains that are stale, unknown, or of length - * greater than MAX_HELP_DEPTH links, to avoid unbounded cycles. - * - * The implementation is very branchy to cope with the restart - * cases. Returns void, not task status (which must be reread by - * caller anyway) to slightly simplify control paths. + * steal back and execute tasks from. + * + * The implementation is very branchy to cope with the potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or of length greater than MAX_HELP_DEPTH links. All + * of these cases are dealt with by just returning back to the + * caller, who is expected to retry if other join mechanisms also + * don't work out. * * @param joinMe the task to join - * @param rescans the number of times to recheck for work */ - private void helpJoinTask(ForkJoinTask joinMe, int rescans) { + final void helpJoinTask(ForkJoinTask joinMe) { ForkJoinWorkerThread[] ws = pool.workers; - int n; - if (ws == null || (n = ws.length) <= 1) - return; // need at least 2 workers - restart:while (rescans-- >= 0 && joinMe.status >= 0) { + int n; // need at least 2 workers + if (ws != null && (n = ws.length) > 1 && joinMe.status >= 0) { ForkJoinTask task = joinMe; // base of chain ForkJoinWorkerThread thread = this; // thread with stolen task - for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) { + for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length // Try to find v, the stealer of task, by first using hint ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; if (v == null || v.currentSteal != task) { for (int j = 0; ; ++j) { // search array - if (task.status < 0 || j == n) - continue restart; // stale or no stealer - if ((v = ws[j]) != null && v.currentSteal == task) { - thread.stealHint = j; // save for next time - break; + if (j < n) { + if ((v = ws[j]) != null) { + if (task.status < 0) + return; // stale or done + if (v.currentSteal == task) { + thread.stealHint = j; + break; // save hint for next time + } + } } + else + return; // no stealer } } // Try to help v, using specialized form of deqTask @@ -1023,33 +1020,32 @@ public class ForkJoinWorkerThread extend int i = (q.length - 1) & b; long u = (i << qShift) + qBase; ForkJoinTask t = q[i]; - if (task.status < 0) // stale - continue restart; - if (t != null) { - if (v.base == b && - UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (task.status < 0) + return; // stale or done + if (v.base == b) { + if (t == null) + return; // producer stalled + if (UNSAFE.compareAndSwapObject(q, u, t, null)) { if (joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); return; // back out on cancel } + int pid = poolIndex; ForkJoinTask prevSteal = currentSteal; currentSteal = t; - v.stealHint = poolIndex; + v.stealHint = pid; v.base = b + 1; - t.tryExec(); + t.quietlyExec(); currentSteal = prevSteal; } } - else if (v.base == b) // producer stalled - continue restart; // retry via restart if (joinMe.status < 0) return; } // Try to descend to find v's stealer ForkJoinTask next = v.currentJoin; - if (next == null || next == task || task.status < 0) - continue restart; // no descendent or stale - if (joinMe.status < 0) + if (task.status < 0 || next == null || next == task || + joinMe.status < 0) return; task = next; thread = v; @@ -1115,14 +1111,15 @@ public class ForkJoinWorkerThread extend for (;;) { ForkJoinTask t = pollLocalTask(); if (t != null || (t = scan()) != null) { - t.tryExec(); + t.quietlyExec(); currentSteal = null; } else { ForkJoinPool p = pool; if (active) { + if (!p.tryDecrementActiveCount()) + continue; // retry later active = false; // inactivate - do {} while (!p.tryDecrementActiveCount()); } if (p.isQuiescent()) { active = true; // re-activate @@ -1136,12 +1133,17 @@ public class ForkJoinWorkerThread extend // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = getUnsafe(); + private static final long spOffset = + objectFieldOffset("sp", ForkJoinWorkerThread.class); private static final long runStateOffset = objectFieldOffset("runState", ForkJoinWorkerThread.class); private static final long currentJoinOffset = objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); + private static final long currentStealOffset = + objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); private static final long qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); + private static final int qShift; static {