--- jsr166/src/jsr166e/ForkJoinPool.java 2012/11/22 18:14:57 1.23 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/12/17 16:31:08 1.34 @@ -51,7 +51,7 @@ import java.util.concurrent.TimeUnit; * dynamically adding, suspending, or resuming internal worker * threads, even if some tasks are stalled waiting to join * others. However, no such adjustments are guaranteed in the face of - * blocked IO or other unmanaged synchronization. The nested {@link + * blocked I/O or other unmanaged synchronization. The nested {@link * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. * @@ -987,14 +987,13 @@ public class ForkJoinPool extends Abstra if (t != null) { (currentSteal = t).doExec(); currentSteal = null; + ++nsteals; if (base - top < 0) { // process remaining local tasks if (mode == 0) popAndExecAll(); else pollAndExecAll(); } - ++nsteals; - hint = -1; } } @@ -1021,22 +1020,6 @@ public class ForkJoinPool extends Abstra s != Thread.State.TIMED_WAITING); } - /** - * If this owned and is not already interrupted, try to - * interrupt and/or unpark, ignoring exceptions. - */ - final void interruptOwner() { - Thread wt, p; - if ((wt = owner) != null && !wt.isInterrupted()) { - try { - wt.interrupt(); - } catch (SecurityException ignore) { - } - } - if ((p = parker) != null) - U.unpark(p); - } - // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long QLOCK; @@ -1129,6 +1112,11 @@ public class ForkJoinPool extends Abstra private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; /** + * Tolerance for idle timeouts, to cope with timer undershoots + */ + private static final long TIMEOUT_SLOP = 2000000L; + + /** * The maximum stolen->joining link depth allowed in method * tryHelpStealer. Must be a power of two. Depths for legitimate * chains are unbounded, but we use a fixed constant to avoid @@ -1263,9 +1251,7 @@ public class ForkJoinPool extends Abstra * fails. This acts as a spinLock for normal cases, but falls back * to builtin monitor to block when (rarely) needed. This would be * a terrible idea for a highly contended lock, but works fine as - * a more conservative alternative to a pure spinlock. See - * internal ConcurrentHashMap documentation for further - * explanation of nearly the same construction. + * a more conservative alternative to a pure spinlock. */ private int acquirePlock() { int spins = PL_SPINS, r = 0, ps, nps; @@ -1326,7 +1312,7 @@ public class ForkJoinPool extends Abstra * wrap around zero, this method harmlessly fails to reinitialize * if workQueues exists, while still advancing plock. * - * Additonally tries to create the first worker. + * Additionally tries to create the first worker. */ private void initWorkers() { WorkQueue[] ws, nws; int ps; @@ -1350,8 +1336,8 @@ public class ForkJoinPool extends Abstra } /** - * Tries to create and start one worker. Adjusts counts etc on - * failure. + * Tries to create and start one worker if fewer than target + * parallelism level exist. Adjusts counts etc on failure. */ private void tryAddWorker() { long c; int u; @@ -1461,21 +1447,42 @@ public class ForkJoinPool extends Abstra } } - long c; // adjust ctl counts + long c; // adjust ctl counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); - if (!tryTerminate(false, false) && w != null) { - w.cancelAll(); // cancel remaining tasks - if (w.array != null) // suppress signal if never ran - tryAddWorker(); // create replacement - if (ex == null) // help clean refs on way out - ForkJoinTask.helpExpungeStaleExceptions(); + if (!tryTerminate(false, false) && w != null && w.array != null) { + w.cancelAll(); // cancel remaining tasks + WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; + while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { + if (e > 0) { // activate or create replacement + if ((ws = workQueues) == null || + (i = e & SMASK) >= ws.length || + (v = ws[i]) != null) + break; + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (v.eventCount != (e | INT_SIGN)) + break; + if (U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + break; + } + } + else { + if ((short)u < 0) + tryAddWorker(); + break; + } + } } - - if (ex != null) // rethrow + if (ex == null) // help clean refs on way out + ForkJoinTask.helpExpungeStaleExceptions(); + else // rethrow ForkJoinTask.rethrow(ex); } @@ -1660,12 +1667,14 @@ public class ForkJoinPool extends Abstra * * * If not already enqueued, try to inactivate and enqueue the * worker on wait queue. Or, if inactivating has caused the pool - * to be quiescent, relay to idleAwaitWork to check for - * termination and possibly shrink pool. + * to be quiescent, relay to idleAwaitWork to possibly shrink + * pool. * * * If already enqueued and none of the above apply, possibly - * (with 1/2 probability) park awaiting signal, else lingering to - * help scan and signal. + * park awaiting signal, else lingering to help scan and signal. + * + * * If a non-empty queue discovered or left as a hint, + * help wake up other workers before return * * @param w the worker (via its WorkQueue) * @return a task or null if none found @@ -1676,6 +1685,7 @@ public class ForkJoinPool extends Abstra if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { int ec = w.eventCount; // ec is negative if inactive int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + w.hint = -1; // update seed and clear hint int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; do { WorkQueue q; ForkJoinTask[] a; int b; @@ -1697,40 +1707,65 @@ public class ForkJoinPool extends Abstra } } while (--j >= 0); - long c, sc; int e, ns, h; - if ((h = w.hint) < 0) { - if ((ns = w.nsteals) != 0) { - if (U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + ns)) - w.nsteals = 0; // collect steals - } - else if (plock != ps) // consistency check - ; // skip - else if ((e = (int)(c = ctl)) < 0) - w.qlock = -1; // pool is terminating - else if (ec >= 0) { // try to enqueue/inactivate - long nc = ((long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK))); - w.nextWait = e; // link and mark inactive - w.eventCount = ec | INT_SIGN; - if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) - w.eventCount = ec; // unmark on CAS failure - else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) - idleAwaitWork(w, nc, c); - } - else if (w.eventCount < 0) { // block - Thread wt = Thread.currentThread(); - Thread.interrupted(); // clear status - U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; // emulate LockSupport.park - if (w.eventCount < 0) // recheck - U.park(false, 0L); - w.parker = null; - U.putObject(wt, PARKBLOCKER, null); - } - } - if (h >= 0 || (h = w.hint) >= 0) { // signal others before retry - w.hint = -1; // reset - helpSignal(null, h, true); + int h, e, ns; long c, sc; WorkQueue q; + if ((ns = w.nsteals) != 0) { + if (U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + ns)) + w.nsteals = 0; // collect steals and rescan + } + else if (plock != ps) // consistency check + ; // skip + else if ((e = (int)(c = ctl)) < 0) + w.qlock = -1; // pool is terminating + else { + if ((h = w.hint) < 0) { + if (ec >= 0) { // try to enqueue/inactivate + long nc = (((long)ec | + ((c - AC_UNIT) & (AC_MASK|TC_MASK)))); + w.nextWait = e; // link and mark inactive + w.eventCount = ec | INT_SIGN; + if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) + w.eventCount = ec; // unmark on CAS failure + else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) + idleAwaitWork(w, nc, c); + } + else if (w.eventCount < 0 && !tryTerminate(false, false) && + ctl == c) { // block + Thread wt = Thread.currentThread(); + Thread.interrupted(); // clear status + U.putObject(wt, PARKBLOCKER, this); + w.parker = wt; // emulate LockSupport.park + if (w.eventCount < 0) // recheck + U.park(false, 0L); + w.parker = null; + U.putObject(wt, PARKBLOCKER, null); + } + } + if ((h >= 0 || (h = w.hint) >= 0) && + (ws = workQueues) != null && h < ws.length && + (q = ws[h]) != null) { // signal others before retry + WorkQueue v; Thread p; int u, i, s; + for (int n = (config & SMASK) >>> 1;;) { + int idleCount = (w.eventCount < 0) ? 0 : -1; + if (((s = idleCount - q.base + q.top) <= n && + (n = s) <= 0) || + (u = (int)((c = ctl) >>> 32)) >= 0 || + (e = (int)c) <= 0 || m < (i = e & SMASK) || + (v = ws[i]) == null) + break; + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (v.eventCount != (e | INT_SIGN) || + !U.compareAndSwapLong(this, CTL, c, nc)) + break; + v.hint = h; + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + if (--n <= 0) + break; + } + } } } return null; @@ -1753,7 +1788,7 @@ public class ForkJoinPool extends Abstra !tryTerminate(false, false) && (int)prevCtl != 0) { int dc = -(short)(currentCtl >>> TC_SHIFT); long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; - long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop + long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; Thread wt = Thread.currentThread(); while (ctl == currentCtl) { Thread.interrupted(); // timed variant of version in scan() @@ -1776,26 +1811,25 @@ public class ForkJoinPool extends Abstra } /** - * Scans through queues looking for work (optionally, while - * joining a task); if any present, signals. May return early if - * more signalling is detectably unneeded. + * Scans through queues looking for work while joining a task; if + * any present, signals. May return early if more signalling is + * detectably unneeded. * - * @param task if non-null, return early if done + * @param task return early if done * @param origin an index to start scan - * @param once if only the origin should be checked */ - private void helpSignal(ForkJoinTask task, int origin, boolean once) { + private void helpSignal(ForkJoinTask task, int origin) { WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s; - if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && + if (task != null && task.status >= 0 && + (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { - outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) { + outer: for (int k = origin, j = m; j >= 0; --j) { WorkQueue q = ws[k++ & m]; for (int n = m;;) { // limit to at most m signals - if (task != null && task.status < 0) + if (task.status < 0) break outer; if (q == null || - ((s = (task == null ? -1 : 0) - q.base + q.top) <= n && - (n = s) <= 0)) + ((s = -q.base + q.top) <= n && (n = s) <= 0)) break; if ((u = (int)((c = ctl) >>> 32)) >= 0 || (e = (int)c) <= 0 || m < (i = e & SMASK) || @@ -1803,8 +1837,9 @@ public class ForkJoinPool extends Abstra break outer; long nc = (((long)(w.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { + if (w.eventCount != (e | INT_SIGN)) + break outer; + if (U.compareAndSwapLong(this, CTL, c, nc)) { w.eventCount = (e + E_SEQ) & E_MASK; if ((p = w.parker) != null) U.unpark(p); @@ -1996,7 +2031,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); // process local tasks if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2005,7 +2040,7 @@ public class ForkJoinPool extends Abstra if ((!joiner.isEmpty() || // try helping (s = tryHelpStealer(joiner, task)) == 0) && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && tryCompensate()) { if (task.trySetSignal() && (s = task.status) >= 0) { synchronized (task) { @@ -2046,7 +2081,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2242,9 +2277,8 @@ public class ForkJoinPool extends Abstra if (((ps = plock) & PL_LOCK) != 0 || !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) ps = acquirePlock(); - int nps = SHUTDOWN; - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); + if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN)) + releasePlock(SHUTDOWN); } if (!now) { // check if idle & no tasks if ((int)(c >> AC_SHIFT) != -(config & SMASK) || @@ -2263,15 +2297,22 @@ public class ForkJoinPool extends Abstra for (int pass = 0; pass < 3; ++pass) { WorkQueue[] ws = workQueues; if (ws != null) { - WorkQueue w; + WorkQueue w; Thread wt; int n = ws.length; for (int i = 0; i < n; ++i) { if ((w = ws[i]) != null) { w.qlock = -1; if (pass > 0) { w.cancelAll(); - if (pass > 1) - w.interruptOwner(); + if (pass > 1 && (wt = w.owner) != null) { + if (!wt.isInterrupted()) { + try { + wt.interrupt(); + } catch (SecurityException ignore) { + } + } + U.unpark(wt); + } } } } @@ -2378,7 +2419,7 @@ public class ForkJoinPool extends Abstra (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) break; if (task == null) { - helpSignal(root, q.poolIndex, false); + helpSignal(root, q.poolIndex); if (root.status >= 0) helpComplete(root, SHARED_QUEUE); break; @@ -2421,7 +2462,7 @@ public class ForkJoinPool extends Abstra if (t instanceof CountedCompleter) p.externalHelpComplete(q, t); else - p.helpSignal(t, q.poolIndex, false); + p.helpSignal(t, q.poolIndex); } } } @@ -2536,7 +2577,9 @@ public class ForkJoinPool extends Abstra } /** - * Returns the common pool instance. + * Returns the common pool instance. This pool is statically + * constructed; its run state is unaffected by attempts to + * {@link @shutdown} or {@link #shutdownNow}. * * @return the common pool instance */ @@ -3036,7 +3079,7 @@ public class ForkJoinPool extends Abstra * commenced but not yet completed. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have - * ignored or suppressed interruption, or are waiting for IO, + * ignored or suppressed interruption, or are waiting for I/O, * causing this executor not to properly terminate. (See the * advisory notes for class {@link ForkJoinTask} stating that * tasks should not normally entail blocking operations. But if