--- jsr166/src/jsr166y/ForkJoinPool.java 2012/11/26 14:11:54 1.153 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/12/08 14:08:51 1.154 @@ -1129,6 +1129,11 @@ public class ForkJoinPool extends Abstra private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; /** + * Tolerance for idle timeouts, to cope with timer undershoots + */ + private static final long TIMEOUT_SLOP = 2000000L; // 20ms + + /** * The maximum stolen->joining link depth allowed in method * tryHelpStealer. Must be a power of two. Depths for legitimate * chains are unbounded, but we use a fixed constant to avoid @@ -1350,8 +1355,8 @@ public class ForkJoinPool extends Abstra } /** - * Tries to create and start one worker. Adjusts counts etc on - * failure. + * Tries to create and start one worker if fewer than target + * parallelism level exist. Adjusts counts etc on failure. */ private void tryAddWorker() { long c; int u; @@ -1467,15 +1472,36 @@ public class ForkJoinPool extends Abstra ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); - if (!tryTerminate(false, false) && w != null) { + if (!tryTerminate(false, false) && w != null && w.array != null) { w.cancelAll(); // cancel remaining tasks - if (w.array != null) // suppress signal if never ran - tryAddWorker(); // create replacement - if (ex == null) // help clean refs on way out - ForkJoinTask.helpExpungeStaleExceptions(); + int e, u, i, n; WorkQueue[] ws; WorkQueue v; Thread p; + while ((u = (int)((c = ctl) >>> 32)) < 0) { + if ((e = (int)c) > 0) { // activate or create replacement + if ((ws = workQueues) != null && + ws.length > (i = e & SMASK) && + (v = ws[i]) != null && v.eventCount == (e | INT_SIGN)) { + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + break; + } + } + else + break; + } + else { + if ((short)u < 0) + tryAddWorker(); + break; + } + } } - - if (ex != null) // rethrow + if (ex == null) // help clean refs on way out + ForkJoinTask.helpExpungeStaleExceptions(); + else // rethrow ForkJoinTask.rethrow(ex); } @@ -1728,10 +1754,8 @@ public class ForkJoinPool extends Abstra U.putObject(wt, PARKBLOCKER, null); } } - if (h >= 0 || (h = w.hint) >= 0) { // signal others before retry - w.hint = -1; // reset - helpSignal(null, h, true); - } + if (h >= 0 || w.hint >= 0) // signal others before retry + helpSignalHint(w); } return null; } @@ -1753,7 +1777,7 @@ public class ForkJoinPool extends Abstra !tryTerminate(false, false) && (int)prevCtl != 0) { int dc = -(short)(currentCtl >>> TC_SHIFT); long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; - long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop + long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; Thread wt = Thread.currentThread(); while (ctl == currentCtl) { Thread.interrupted(); // timed variant of version in scan() @@ -1776,26 +1800,25 @@ public class ForkJoinPool extends Abstra } /** - * Scans through queues looking for work (optionally, while - * joining a task); if any present, signals. May return early if - * more signalling is detectably unneeded. + * Scans through queues looking for work while joining a task; if + * any present, signals. May return early if more signalling is + * detectably unneeded. * - * @param task if non-null, return early if done + * @param task return early if done * @param origin an index to start scan - * @param once if only the origin should be checked */ - private void helpSignal(ForkJoinTask task, int origin, boolean once) { + private void helpSignal(ForkJoinTask task, int origin) { WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s; - if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && + if (task != null && task.status >= 0 && + (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { - outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) { + outer: for (int k = origin, j = m; j >= 0; --j) { WorkQueue q = ws[k++ & m]; for (int n = m;;) { // limit to at most m signals - if (task != null && task.status < 0) + if (task.status < 0) break outer; if (q == null || - ((s = (task == null ? -1 : 0) - q.base + q.top) <= n && - (n = s) <= 0)) + ((s = -q.base + q.top) <= n && (n = s) <= 0)) break; if ((u = (int)((c = ctl) >>> 32)) >= 0 || (e = (int)c) <= 0 || m < (i = e & SMASK) || @@ -1817,6 +1840,40 @@ public class ForkJoinPool extends Abstra } /** + * Signals other workers if tasks are present in hinted queue. + * + * @param caller the worker with the hint + */ + private void helpSignalHint(WorkQueue caller) { + WorkQueue[] ws; WorkQueue q, w; Thread p; long c; int h, m, u, e, i, s; + if (caller != null && (h = caller.hint) >= 0 && + (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && + (ws = workQueues) != null && (m = ws.length - 1) >= 0 && + (q = ws[h & m]) != null) { + caller.hint = -1; + for (int n = 2;;) { // limit to at most 2 signals + int idleCount = (caller.eventCount < 0)? 0 : -1; + if (((s = idleCount - q.base + q.top) <= n && (n = s) <= 0) || + (u = (int)((c = ctl) >>> 32)) >= 0 || + (e = (int)c) <= 0 || m < (i = e & SMASK) || + (w = ws[i]) == null) + break; + long nc = (((long)(w.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + w.hint = h; + w.eventCount = (e + E_SEQ) & E_MASK; + if ((p = w.parker) != null) + U.unpark(p); + if (--n <= 0) + break; + } + } + } + } + + /** * Tries to locate and execute tasks for a stealer of the given * task, or in turn one of its stealers, Traces currentSteal -> * currentJoin links looking for a thread working on a descendant @@ -1996,7 +2053,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); // process local tasks if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2005,7 +2062,7 @@ public class ForkJoinPool extends Abstra if ((!joiner.isEmpty() || // try helping (s = tryHelpStealer(joiner, task)) == 0) && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && tryCompensate()) { if (task.trySetSignal() && (s = task.status) >= 0) { synchronized (task) { @@ -2046,7 +2103,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2378,7 +2435,7 @@ public class ForkJoinPool extends Abstra (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) break; if (task == null) { - helpSignal(root, q.poolIndex, false); + helpSignal(root, q.poolIndex); if (root.status >= 0) helpComplete(root, SHARED_QUEUE); break; @@ -2421,7 +2478,7 @@ public class ForkJoinPool extends Abstra if (t instanceof CountedCompleter) p.externalHelpComplete(q, t); else - p.helpSignal(t, q.poolIndex, false); + p.helpSignal(t, q.poolIndex); } } }