--- jsr166/src/jsr166e/ForkJoinPool.java 2012/12/16 19:57:00 1.32 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/12/20 17:14:32 1.37 @@ -1114,7 +1114,7 @@ public class ForkJoinPool extends Abstra /** * Tolerance for idle timeouts, to cope with timer undershoots */ - private static final long TIMEOUT_SLOP = 2000000L; // 20ms + private static final long TIMEOUT_SLOP = 2000000L; /** * The maximum stolen->joining link depth allowed in method @@ -1303,39 +1303,6 @@ public class ForkJoinPool extends Abstra } /** - * Performs secondary initialization, called when plock is zero. - * Creates workQueue array and sets plock to a valid value. The - * lock body must be exception-free (so no try/finally) so we - * optimistically allocate new array outside the lock and throw - * away if (very rarely) not needed. (A similar tactic is used in - * fullExternalPush.) Because the plock seq value can eventually - * wrap around zero, this method harmlessly fails to reinitialize - * if workQueues exists, while still advancing plock. - * - * Additionally tries to create the first worker. - */ - private void initWorkers() { - WorkQueue[] ws, nws; int ps; - int p = config & SMASK; // find power of two table size - int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - n = (n + 1) << 1; - if ((ws = workQueues) == null || ws.length == 0) - nws = new WorkQueue[n]; - else - nws = null; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if (((ws = workQueues) == null || ws.length == 0) && nws != null) - workQueues = nws; - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - tryAddWorker(); - } - - /** * Tries to create and start one worker if fewer than target * parallelism level exist. Adjusts counts etc on failure. */ @@ -1460,7 +1427,7 @@ public class ForkJoinPool extends Abstra if (e > 0) { // activate or create replacement if ((ws = workQueues) == null || (i = e & SMASK) >= ws.length || - (v = ws[i]) != null) + (v = ws[i]) == null) break; long nc = (((long)(v.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); @@ -1520,13 +1487,19 @@ public class ForkJoinPool extends Abstra /** * Full version of externalPush. This method is called, among * other times, upon the first submission of the first task to the - * pool, so must perform secondary initialization (via - * initWorkers). It also detects first submission by an external - * thread by looking up its ThreadLocal, and creates a new shared - * queue if the one at index if empty or contended. The plock lock - * body must be exception-free (so no try/finally) so we - * optimistically allocate new queues outside the lock and throw - * them away if (very rarely) not needed. + * pool, so must perform secondary initialization. It also + * detects first submission by an external thread by looking up + * its ThreadLocal, and creates a new shared queue if the one at + * index if empty or contended. The plock lock body must be + * exception-free (so no try/finally) so we optimistically + * allocate new queues outside the lock and throw them away if + * (very rarely) not needed. + * + * Secondary initialization occurs when plock is zero, to create + * workQueue array and set plock to a valid value. This lock body + * must also be exception-free. Because the plock seq value can + * eventually wrap around zero, this method harmlessly fails to + * reinitialize if workQueues exists, while still advancing plock. */ private void fullExternalPush(ForkJoinTask task) { int r = 0; // random index seed @@ -1537,17 +1510,31 @@ public class ForkJoinPool extends Abstra r += SEED_INCREMENT) && r != 0) submitters.set(z = new Submitter(r)); } - else if (r == 0) { // move to a different index + else if (r == 0) { // move to a different index r = z.seed; - r ^= r << 13; // same xorshift as WorkQueues + r ^= r << 13; // same xorshift as WorkQueues r ^= r >>> 17; z.seed = r ^ (r << 5); } else if ((ps = plock) < 0) throw new RejectedExecutionException(); else if (ps == 0 || (ws = workQueues) == null || - (m = ws.length - 1) < 0) - initWorkers(); + (m = ws.length - 1) < 0) { // initialize workQueues + int p = config & SMASK; // find power of two table size + int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; + WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? + new WorkQueue[n] : null); + if (((ps = plock) & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + if (((ws = workQueues) == null || ws.length == 0) && nws != null) + workQueues = nws; + int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask[] a = q.array; @@ -1729,14 +1716,13 @@ public class ForkJoinPool extends Abstra else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) idleAwaitWork(w, nc, c); } - else if (w.eventCount < 0 && !tryTerminate(false, false) && - ctl == c) { // block + else if (w.eventCount < 0 && ctl == c) { Thread wt = Thread.currentThread(); Thread.interrupted(); // clear status U.putObject(wt, PARKBLOCKER, this); w.parker = wt; // emulate LockSupport.park if (w.eventCount < 0) // recheck - U.park(false, 0L); + U.park(false, 0L); // block w.parker = null; U.putObject(wt, PARKBLOCKER, null); } @@ -1745,7 +1731,7 @@ public class ForkJoinPool extends Abstra (ws = workQueues) != null && h < ws.length && (q = ws[h]) != null) { // signal others before retry WorkQueue v; Thread p; int u, i, s; - for (int n = (config & SMASK) >>> 1;;) { + for (int n = (config & SMASK) - 1;;) { int idleCount = (w.eventCount < 0) ? 0 : -1; if (((s = idleCount - q.base + q.top) <= n && (n = s) <= 0) || @@ -1785,7 +1771,8 @@ public class ForkJoinPool extends Abstra */ private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w != null && w.eventCount < 0 && - !tryTerminate(false, false) && (int)prevCtl != 0) { + !tryTerminate(false, false) && (int)prevCtl != 0 && + ctl == currentCtl) { int dc = -(short)(currentCtl >>> TC_SHIFT); long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; @@ -1803,6 +1790,7 @@ public class ForkJoinPool extends Abstra if (deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { w.eventCount = (w.eventCount + E_SEQ) | E_MASK; + w.hint = -1; w.qlock = -1; // shrink break; } @@ -2096,29 +2084,22 @@ public class ForkJoinPool extends Abstra /** * Returns a (probably) non-empty steal queue, if one is found - * during a random, then cyclic scan, else null. This method must - * be retried by caller if, by the time it tries to use the queue, - * it is empty. + * during a scan, else null. This method must be retried by + * caller if, by the time it tries to use the queue, it is empty. * @param r a (random) seed for scanning */ private WorkQueue findNonEmptyStealQueue(int r) { - for (WorkQueue[] ws;;) { - int ps = plock, m, n; - if ((ws = workQueues) == null || (m = ws.length - 1) < 1) - return null; - for (int j = (m + 1) << 2; ;) { - WorkQueue q = ws[(((r + j) << 1) | 1) & m]; - if (q != null && (n = q.base - q.top) < 0) { - if (n < -1) - signalWork(q); - return q; - } - else if (--j < 0) { - if (plock == ps) - return null; - break; + for (;;) { + int ps = plock, m; WorkQueue[] ws; WorkQueue q; + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { + for (int j = (m + 1) << 2; j >= 0; --j) { + if ((q = ws[(((r + j) << 1) | 1) & m]) != null && + q.base - q.top < 0) + return q; } } + if (plock == ps) + return null; } } @@ -2130,37 +2111,34 @@ public class ForkJoinPool extends Abstra */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - ForkJoinTask localTask; // exhaust local queue - while ((localTask = w.nextLocalTask()) != null) - localTask.doExec(); - // Similar to loop in scan(), but ignoring submissions - WorkQueue q = findNonEmptyStealQueue(w.nextSeed()); - if (q != null) { - ForkJoinTask t; int b; + long c; WorkQueue q; ForkJoinTask t; int b; + while ((t = w.nextLocalTask()) != null) { + if (w.base - w.top < 0) + signalWork(w); + t.doExec(); + } + if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) { if (!active) { // re-establish active count - long c; active = true; do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, c + AC_UNIT)); } - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); w.runSubtask(t); + } } - else { - long c; - if (active) { // decrement active count without queuing + else if (active) { // decrement active count without queuing + long nc = (c = ctl) - AC_UNIT; + if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0) + return; // bypass decrement-then-increment + if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c -= AC_UNIT)); - } - else - c = ctl; // re-increment on exit - if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) { - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); - break; - } } + else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 && + U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) + return; } } @@ -2176,8 +2154,11 @@ public class ForkJoinPool extends Abstra return t; if ((q = findNonEmptyStealQueue(w.nextSeed())) == null) return null; - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); return t; + } } } @@ -2259,45 +2240,49 @@ public class ForkJoinPool extends Abstra * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - if (this == commonPool) // cannot shut down + int ps; + if (this == commonPool) // cannot shut down return false; + if ((ps = plock) >= 0) { // enable by setting plock + if (!enable) + return false; + if ((ps & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } for (long c;;) { - if (((c = ctl) & STOP_BIT) != 0) { // already terminating + if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) { synchronized (this) { - notifyAll(); // signal when 0 workers + notifyAll(); // signal when 0 workers } } return true; } - if (plock >= 0) { // not yet enabled - int ps; - if (!enable) - return false; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN)) - releasePlock(SHUTDOWN); - } - if (!now) { // check if idle & no tasks - if ((int)(c >> AC_SHIFT) != -(config & SMASK) || - hasQueuedSubmissions()) + if (!now) { // check if idle & no tasks + WorkQueue[] ws; WorkQueue w; + if ((int)(c >> AC_SHIFT) != -(config & SMASK)) return false; - // Check for unqueued inactive workers. One pass suffices. - WorkQueue[] ws = workQueues; WorkQueue w; - if (ws != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.eventCount >= 0) - return false; + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; ++i) { + if ((w = ws[i]) != null) { + if (!w.isEmpty()) { // signal unprocessed tasks + signalWork(w); + return false; + } + if ((i & 1) != 0 && w.eventCount >= 0) + return false; // unqueued inactive worker + } } } } if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { for (int pass = 0; pass < 3; ++pass) { - WorkQueue[] ws = workQueues; - if (ws != null) { - WorkQueue w; Thread wt; + WorkQueue[] ws; WorkQueue w; Thread wt; + if ((ws = workQueues) != null) { int n = ws.length; for (int i = 0; i < n; ++i) { if ((w = ws[i]) != null) { @@ -2308,7 +2293,7 @@ public class ForkJoinPool extends Abstra if (!wt.isInterrupted()) { try { wt.interrupt(); - } catch (SecurityException ignore) { + } catch (Throwable ignore) { } } U.unpark(wt); @@ -2319,7 +2304,7 @@ public class ForkJoinPool extends Abstra // Wake up workers parked on event queue int i, e; long cc; Thread p; while ((e = (int)(cc = ctl) & E_MASK) != 0 && - (i = e & SMASK) < n && + (i = e & SMASK) < n && i >= 0 && (w = ws[i]) != null) { long nc = ((long)(w.nextWait & E_MASK) | ((cc + AC_UNIT) & AC_MASK) | @@ -2475,8 +2460,11 @@ public class ForkJoinPool extends Abstra if ((p = commonPool) != null && (q = p.findNonEmptyStealQueue(1)) != null && (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) + (t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + p.signalWork(q); t.doExec(); + } } // Exported methods @@ -2577,7 +2565,9 @@ public class ForkJoinPool extends Abstra } /** - * Returns the common pool instance. + * Returns the common pool instance. This pool is statically + * constructed; its run state is unaffected by attempts to + * {@link #shutdown} or {@link #shutdownNow}. * * @return the common pool instance */ @@ -3356,6 +3346,7 @@ public class ForkJoinPool extends Abstra commonPool = new ForkJoinPool(par, ct, fac, handler); } + /** * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. * Replace with a simple call to Unsafe.getUnsafe when integrating