--- jsr166/src/jsr166e/ForkJoinPool.java 2012/11/26 14:11:53 1.25 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/12/20 17:14:32 1.37 @@ -987,14 +987,13 @@ public class ForkJoinPool extends Abstra if (t != null) { (currentSteal = t).doExec(); currentSteal = null; + ++nsteals; if (base - top < 0) { // process remaining local tasks if (mode == 0) popAndExecAll(); else pollAndExecAll(); } - ++nsteals; - hint = -1; } } @@ -1021,22 +1020,6 @@ public class ForkJoinPool extends Abstra s != Thread.State.TIMED_WAITING); } - /** - * If this owned and is not already interrupted, try to - * interrupt and/or unpark, ignoring exceptions. - */ - final void interruptOwner() { - Thread wt, p; - if ((wt = owner) != null && !wt.isInterrupted()) { - try { - wt.interrupt(); - } catch (SecurityException ignore) { - } - } - if ((p = parker) != null) - U.unpark(p); - } - // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long QLOCK; @@ -1129,6 +1112,11 @@ public class ForkJoinPool extends Abstra private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; /** + * Tolerance for idle timeouts, to cope with timer undershoots + */ + private static final long TIMEOUT_SLOP = 2000000L; + + /** * The maximum stolen->joining link depth allowed in method * tryHelpStealer. Must be a power of two. Depths for legitimate * chains are unbounded, but we use a fixed constant to avoid @@ -1263,9 +1251,7 @@ public class ForkJoinPool extends Abstra * fails. This acts as a spinLock for normal cases, but falls back * to builtin monitor to block when (rarely) needed. This would be * a terrible idea for a highly contended lock, but works fine as - * a more conservative alternative to a pure spinlock. See - * internal ConcurrentHashMap documentation for further - * explanation of nearly the same construction. + * a more conservative alternative to a pure spinlock. */ private int acquirePlock() { int spins = PL_SPINS, r = 0, ps, nps; @@ -1317,41 +1303,8 @@ public class ForkJoinPool extends Abstra } /** - * Performs secondary initialization, called when plock is zero. - * Creates workQueue array and sets plock to a valid value. The - * lock body must be exception-free (so no try/finally) so we - * optimistically allocate new array outside the lock and throw - * away if (very rarely) not needed. (A similar tactic is used in - * fullExternalPush.) Because the plock seq value can eventually - * wrap around zero, this method harmlessly fails to reinitialize - * if workQueues exists, while still advancing plock. - * - * Additionally tries to create the first worker. - */ - private void initWorkers() { - WorkQueue[] ws, nws; int ps; - int p = config & SMASK; // find power of two table size - int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - n = (n + 1) << 1; - if ((ws = workQueues) == null || ws.length == 0) - nws = new WorkQueue[n]; - else - nws = null; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if (((ws = workQueues) == null || ws.length == 0) && nws != null) - workQueues = nws; - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - tryAddWorker(); - } - - /** - * Tries to create and start one worker. Adjusts counts etc on - * failure. + * Tries to create and start one worker if fewer than target + * parallelism level exist. Adjusts counts etc on failure. */ private void tryAddWorker() { long c; int u; @@ -1461,21 +1414,42 @@ public class ForkJoinPool extends Abstra } } - long c; // adjust ctl counts + long c; // adjust ctl counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); - if (!tryTerminate(false, false) && w != null) { - w.cancelAll(); // cancel remaining tasks - if (w.array != null) // suppress signal if never ran - tryAddWorker(); // create replacement - if (ex == null) // help clean refs on way out - ForkJoinTask.helpExpungeStaleExceptions(); + if (!tryTerminate(false, false) && w != null && w.array != null) { + w.cancelAll(); // cancel remaining tasks + WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; + while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { + if (e > 0) { // activate or create replacement + if ((ws = workQueues) == null || + (i = e & SMASK) >= ws.length || + (v = ws[i]) == null) + break; + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (v.eventCount != (e | INT_SIGN)) + break; + if (U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + break; + } + } + else { + if ((short)u < 0) + tryAddWorker(); + break; + } + } } - - if (ex != null) // rethrow + if (ex == null) // help clean refs on way out + ForkJoinTask.helpExpungeStaleExceptions(); + else // rethrow ForkJoinTask.rethrow(ex); } @@ -1513,13 +1487,19 @@ public class ForkJoinPool extends Abstra /** * Full version of externalPush. This method is called, among * other times, upon the first submission of the first task to the - * pool, so must perform secondary initialization (via - * initWorkers). It also detects first submission by an external - * thread by looking up its ThreadLocal, and creates a new shared - * queue if the one at index if empty or contended. The plock lock - * body must be exception-free (so no try/finally) so we - * optimistically allocate new queues outside the lock and throw - * them away if (very rarely) not needed. + * pool, so must perform secondary initialization. It also + * detects first submission by an external thread by looking up + * its ThreadLocal, and creates a new shared queue if the one at + * index if empty or contended. The plock lock body must be + * exception-free (so no try/finally) so we optimistically + * allocate new queues outside the lock and throw them away if + * (very rarely) not needed. + * + * Secondary initialization occurs when plock is zero, to create + * workQueue array and set plock to a valid value. This lock body + * must also be exception-free. Because the plock seq value can + * eventually wrap around zero, this method harmlessly fails to + * reinitialize if workQueues exists, while still advancing plock. */ private void fullExternalPush(ForkJoinTask task) { int r = 0; // random index seed @@ -1530,17 +1510,31 @@ public class ForkJoinPool extends Abstra r += SEED_INCREMENT) && r != 0) submitters.set(z = new Submitter(r)); } - else if (r == 0) { // move to a different index + else if (r == 0) { // move to a different index r = z.seed; - r ^= r << 13; // same xorshift as WorkQueues + r ^= r << 13; // same xorshift as WorkQueues r ^= r >>> 17; z.seed = r ^ (r << 5); } else if ((ps = plock) < 0) throw new RejectedExecutionException(); else if (ps == 0 || (ws = workQueues) == null || - (m = ws.length - 1) < 0) - initWorkers(); + (m = ws.length - 1) < 0) { // initialize workQueues + int p = config & SMASK; // find power of two table size + int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; + WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? + new WorkQueue[n] : null); + if (((ps = plock) & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + if (((ws = workQueues) == null || ws.length == 0) && nws != null) + workQueues = nws; + int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask[] a = q.array; @@ -1660,12 +1654,14 @@ public class ForkJoinPool extends Abstra * * * If not already enqueued, try to inactivate and enqueue the * worker on wait queue. Or, if inactivating has caused the pool - * to be quiescent, relay to idleAwaitWork to check for - * termination and possibly shrink pool. + * to be quiescent, relay to idleAwaitWork to possibly shrink + * pool. * * * If already enqueued and none of the above apply, possibly - * (with 1/2 probability) park awaiting signal, else lingering to - * help scan and signal. + * park awaiting signal, else lingering to help scan and signal. + * + * * If a non-empty queue discovered or left as a hint, + * help wake up other workers before return * * @param w the worker (via its WorkQueue) * @return a task or null if none found @@ -1676,6 +1672,7 @@ public class ForkJoinPool extends Abstra if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { int ec = w.eventCount; // ec is negative if inactive int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + w.hint = -1; // update seed and clear hint int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; do { WorkQueue q; ForkJoinTask[] a; int b; @@ -1697,40 +1694,64 @@ public class ForkJoinPool extends Abstra } } while (--j >= 0); - long c, sc; int e, ns, h; - if ((h = w.hint) < 0) { - if ((ns = w.nsteals) != 0) { - if (U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + ns)) - w.nsteals = 0; // collect steals - } - else if (plock != ps) // consistency check - ; // skip - else if ((e = (int)(c = ctl)) < 0) - w.qlock = -1; // pool is terminating - else if (ec >= 0) { // try to enqueue/inactivate - long nc = ((long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK))); - w.nextWait = e; // link and mark inactive - w.eventCount = ec | INT_SIGN; - if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) - w.eventCount = ec; // unmark on CAS failure - else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) - idleAwaitWork(w, nc, c); - } - else if (w.eventCount < 0) { // block - Thread wt = Thread.currentThread(); - Thread.interrupted(); // clear status - U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; // emulate LockSupport.park - if (w.eventCount < 0) // recheck - U.park(false, 0L); - w.parker = null; - U.putObject(wt, PARKBLOCKER, null); - } - } - if (h >= 0 || (h = w.hint) >= 0) { // signal others before retry - w.hint = -1; // reset - helpSignal(null, h, true); + int h, e, ns; long c, sc; WorkQueue q; + if ((ns = w.nsteals) != 0) { + if (U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + ns)) + w.nsteals = 0; // collect steals and rescan + } + else if (plock != ps) // consistency check + ; // skip + else if ((e = (int)(c = ctl)) < 0) + w.qlock = -1; // pool is terminating + else { + if ((h = w.hint) < 0) { + if (ec >= 0) { // try to enqueue/inactivate + long nc = (((long)ec | + ((c - AC_UNIT) & (AC_MASK|TC_MASK)))); + w.nextWait = e; // link and mark inactive + w.eventCount = ec | INT_SIGN; + if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) + w.eventCount = ec; // unmark on CAS failure + else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) + idleAwaitWork(w, nc, c); + } + else if (w.eventCount < 0 && ctl == c) { + Thread wt = Thread.currentThread(); + Thread.interrupted(); // clear status + U.putObject(wt, PARKBLOCKER, this); + w.parker = wt; // emulate LockSupport.park + if (w.eventCount < 0) // recheck + U.park(false, 0L); // block + w.parker = null; + U.putObject(wt, PARKBLOCKER, null); + } + } + if ((h >= 0 || (h = w.hint) >= 0) && + (ws = workQueues) != null && h < ws.length && + (q = ws[h]) != null) { // signal others before retry + WorkQueue v; Thread p; int u, i, s; + for (int n = (config & SMASK) - 1;;) { + int idleCount = (w.eventCount < 0) ? 0 : -1; + if (((s = idleCount - q.base + q.top) <= n && + (n = s) <= 0) || + (u = (int)((c = ctl) >>> 32)) >= 0 || + (e = (int)c) <= 0 || m < (i = e & SMASK) || + (v = ws[i]) == null) + break; + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (v.eventCount != (e | INT_SIGN) || + !U.compareAndSwapLong(this, CTL, c, nc)) + break; + v.hint = h; + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + if (--n <= 0) + break; + } + } } } return null; @@ -1750,10 +1771,11 @@ public class ForkJoinPool extends Abstra */ private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w != null && w.eventCount < 0 && - !tryTerminate(false, false) && (int)prevCtl != 0) { + !tryTerminate(false, false) && (int)prevCtl != 0 && + ctl == currentCtl) { int dc = -(short)(currentCtl >>> TC_SHIFT); long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; - long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop + long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; Thread wt = Thread.currentThread(); while (ctl == currentCtl) { Thread.interrupted(); // timed variant of version in scan() @@ -1768,6 +1790,7 @@ public class ForkJoinPool extends Abstra if (deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { w.eventCount = (w.eventCount + E_SEQ) | E_MASK; + w.hint = -1; w.qlock = -1; // shrink break; } @@ -1776,26 +1799,25 @@ public class ForkJoinPool extends Abstra } /** - * Scans through queues looking for work (optionally, while - * joining a task); if any present, signals. May return early if - * more signalling is detectably unneeded. + * Scans through queues looking for work while joining a task; if + * any present, signals. May return early if more signalling is + * detectably unneeded. * - * @param task if non-null, return early if done + * @param task return early if done * @param origin an index to start scan - * @param once if only the origin should be checked */ - private void helpSignal(ForkJoinTask task, int origin, boolean once) { + private void helpSignal(ForkJoinTask task, int origin) { WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s; - if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && + if (task != null && task.status >= 0 && + (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { - outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) { + outer: for (int k = origin, j = m; j >= 0; --j) { WorkQueue q = ws[k++ & m]; for (int n = m;;) { // limit to at most m signals - if (task != null && task.status < 0) + if (task.status < 0) break outer; if (q == null || - ((s = (task == null ? -1 : 0) - q.base + q.top) <= n && - (n = s) <= 0)) + ((s = -q.base + q.top) <= n && (n = s) <= 0)) break; if ((u = (int)((c = ctl) >>> 32)) >= 0 || (e = (int)c) <= 0 || m < (i = e & SMASK) || @@ -1803,8 +1825,9 @@ public class ForkJoinPool extends Abstra break outer; long nc = (((long)(w.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { + if (w.eventCount != (e | INT_SIGN)) + break outer; + if (U.compareAndSwapLong(this, CTL, c, nc)) { w.eventCount = (e + E_SEQ) & E_MASK; if ((p = w.parker) != null) U.unpark(p); @@ -1996,7 +2019,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); // process local tasks if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2005,7 +2028,7 @@ public class ForkJoinPool extends Abstra if ((!joiner.isEmpty() || // try helping (s = tryHelpStealer(joiner, task)) == 0) && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && tryCompensate()) { if (task.trySetSignal() && (s = task.status) >= 0) { synchronized (task) { @@ -2046,7 +2069,7 @@ public class ForkJoinPool extends Abstra do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && joiner.tryRemoveAndExec(task)); if (s >= 0 && (s = task.status) >= 0) { - helpSignal(task, joiner.poolIndex, false); + helpSignal(task, joiner.poolIndex); if ((s = task.status) >= 0 && (task instanceof CountedCompleter)) s = helpComplete(task, LIFO_QUEUE); @@ -2061,29 +2084,22 @@ public class ForkJoinPool extends Abstra /** * Returns a (probably) non-empty steal queue, if one is found - * during a random, then cyclic scan, else null. This method must - * be retried by caller if, by the time it tries to use the queue, - * it is empty. + * during a scan, else null. This method must be retried by + * caller if, by the time it tries to use the queue, it is empty. * @param r a (random) seed for scanning */ private WorkQueue findNonEmptyStealQueue(int r) { - for (WorkQueue[] ws;;) { - int ps = plock, m, n; - if ((ws = workQueues) == null || (m = ws.length - 1) < 1) - return null; - for (int j = (m + 1) << 2; ;) { - WorkQueue q = ws[(((r + j) << 1) | 1) & m]; - if (q != null && (n = q.base - q.top) < 0) { - if (n < -1) - signalWork(q); - return q; - } - else if (--j < 0) { - if (plock == ps) - return null; - break; + for (;;) { + int ps = plock, m; WorkQueue[] ws; WorkQueue q; + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { + for (int j = (m + 1) << 2; j >= 0; --j) { + if ((q = ws[(((r + j) << 1) | 1) & m]) != null && + q.base - q.top < 0) + return q; } } + if (plock == ps) + return null; } } @@ -2095,37 +2111,34 @@ public class ForkJoinPool extends Abstra */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - ForkJoinTask localTask; // exhaust local queue - while ((localTask = w.nextLocalTask()) != null) - localTask.doExec(); - // Similar to loop in scan(), but ignoring submissions - WorkQueue q = findNonEmptyStealQueue(w.nextSeed()); - if (q != null) { - ForkJoinTask t; int b; + long c; WorkQueue q; ForkJoinTask t; int b; + while ((t = w.nextLocalTask()) != null) { + if (w.base - w.top < 0) + signalWork(w); + t.doExec(); + } + if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) { if (!active) { // re-establish active count - long c; active = true; do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, c + AC_UNIT)); } - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); w.runSubtask(t); + } } - else { - long c; - if (active) { // decrement active count without queuing + else if (active) { // decrement active count without queuing + long nc = (c = ctl) - AC_UNIT; + if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0) + return; // bypass decrement-then-increment + if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c -= AC_UNIT)); - } - else - c = ctl; // re-increment on exit - if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) { - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); - break; - } } + else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 && + U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) + return; } } @@ -2141,8 +2154,11 @@ public class ForkJoinPool extends Abstra return t; if ((q = findNonEmptyStealQueue(w.nextSeed())) == null) return null; - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); return t; + } } } @@ -2224,61 +2240,71 @@ public class ForkJoinPool extends Abstra * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - if (this == commonPool) // cannot shut down + int ps; + if (this == commonPool) // cannot shut down return false; + if ((ps = plock) >= 0) { // enable by setting plock + if (!enable) + return false; + if ((ps & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } for (long c;;) { - if (((c = ctl) & STOP_BIT) != 0) { // already terminating + if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) { synchronized (this) { - notifyAll(); // signal when 0 workers + notifyAll(); // signal when 0 workers } } return true; } - if (plock >= 0) { // not yet enabled - int ps; - if (!enable) - return false; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = SHUTDOWN; - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - } - if (!now) { // check if idle & no tasks - if ((int)(c >> AC_SHIFT) != -(config & SMASK) || - hasQueuedSubmissions()) + if (!now) { // check if idle & no tasks + WorkQueue[] ws; WorkQueue w; + if ((int)(c >> AC_SHIFT) != -(config & SMASK)) return false; - // Check for unqueued inactive workers. One pass suffices. - WorkQueue[] ws = workQueues; WorkQueue w; - if (ws != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.eventCount >= 0) - return false; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; ++i) { + if ((w = ws[i]) != null) { + if (!w.isEmpty()) { // signal unprocessed tasks + signalWork(w); + return false; + } + if ((i & 1) != 0 && w.eventCount >= 0) + return false; // unqueued inactive worker + } } } } if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { for (int pass = 0; pass < 3; ++pass) { - WorkQueue[] ws = workQueues; - if (ws != null) { - WorkQueue w; + WorkQueue[] ws; WorkQueue w; Thread wt; + if ((ws = workQueues) != null) { int n = ws.length; for (int i = 0; i < n; ++i) { if ((w = ws[i]) != null) { w.qlock = -1; if (pass > 0) { w.cancelAll(); - if (pass > 1) - w.interruptOwner(); + if (pass > 1 && (wt = w.owner) != null) { + if (!wt.isInterrupted()) { + try { + wt.interrupt(); + } catch (Throwable ignore) { + } + } + U.unpark(wt); + } } } } // Wake up workers parked on event queue int i, e; long cc; Thread p; while ((e = (int)(cc = ctl) & E_MASK) != 0 && - (i = e & SMASK) < n && + (i = e & SMASK) < n && i >= 0 && (w = ws[i]) != null) { long nc = ((long)(w.nextWait & E_MASK) | ((cc + AC_UNIT) & AC_MASK) | @@ -2378,7 +2404,7 @@ public class ForkJoinPool extends Abstra (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) break; if (task == null) { - helpSignal(root, q.poolIndex, false); + helpSignal(root, q.poolIndex); if (root.status >= 0) helpComplete(root, SHARED_QUEUE); break; @@ -2421,7 +2447,7 @@ public class ForkJoinPool extends Abstra if (t instanceof CountedCompleter) p.externalHelpComplete(q, t); else - p.helpSignal(t, q.poolIndex, false); + p.helpSignal(t, q.poolIndex); } } } @@ -2434,8 +2460,11 @@ public class ForkJoinPool extends Abstra if ((p = commonPool) != null && (q = p.findNonEmptyStealQueue(1)) != null && (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) + (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + p.signalWork(q); t.doExec(); + } } // Exported methods @@ -2536,7 +2565,9 @@ public class ForkJoinPool extends Abstra } /** - * Returns the common pool instance. + * Returns the common pool instance. This pool is statically + * constructed; its run state is unaffected by attempts to + * {@link #shutdown} or {@link #shutdownNow}. * * @return the common pool instance */ @@ -3315,6 +3346,7 @@ public class ForkJoinPool extends Abstra commonPool = new ForkJoinPool(par, ct, fac, handler); } + /** * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. * Replace with a simple call to Unsafe.getUnsafe when integrating