--- jsr166/src/jsr166y/ForkJoinPool.java 2010/11/13 13:11:51 1.84 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/11/21 13:55:04 1.85 @@ -596,6 +596,16 @@ public class ForkJoinPool extends Abstra } /** + * Tries to increment running count part of workerCounts + */ + final boolean tryIncrementRunningCount() { + int c; + return UNSAFE.compareAndSwapInt(this, workerCountsOffset, + c = workerCounts, + c + ONE_RUNNING); + } + + /** * Tries to decrement running count unless already zero */ final boolean tryDecrementRunningCount() { @@ -669,10 +679,11 @@ public class ForkJoinPool extends Abstra for (k = 0; k < n && ws[k] != null; ++k) ; if (k == n) - ws = Arrays.copyOf(ws, n << 1); + ws = workers = Arrays.copyOf(ws, n << 1); } ws[k] = w; - workers = ws; // volatile array write ensures slot visibility + int c = eventCount; // advance event count to ensure visibility + UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1); } finally { lock.unlock(); } @@ -793,7 +804,7 @@ public class ForkJoinPool extends Abstra (workerCounts & RUNNING_COUNT_MASK) <= 1); long startTime = untimed ? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt - if (eventCount != ec || w.isTerminating()) + if (w.isTerminating() || eventCount != ec) break; // recheck after clear if (untimed) LockSupport.park(w); @@ -831,7 +842,8 @@ public class ForkJoinPool extends Abstra if ((sw = spareWaiters) != 0 && (id = (sw & SPARE_ID_MASK) - 1) >= 0 && id < n && (w = ws[id]) != null && - (workerCounts & RUNNING_COUNT_MASK) < parallelism && + (runState >= TERMINATING || + (workerCounts & RUNNING_COUNT_MASK) < parallelism) && spareWaiters == sw && UNSAFE.compareAndSwapInt(this, spareWaitersOffset, sw, w.nextSpare)) { @@ -885,12 +897,8 @@ public class ForkJoinPool extends Abstra break; } w.start(recordWorker(w), ueh); - if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { - int c; // advance event count - UNSAFE.compareAndSwapInt(this, eventCountOffset, - c = eventCount, c+1); + if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) break; // add at most one unless total below target - } } } if (eventWaiters != 0L) @@ -974,24 +982,31 @@ public class ForkJoinPool extends Abstra int pc = parallelism; while (w.runState == 0) { int rs = runState; - if (rs >= TERMINATING) { // propagate shutdown + if (rs >= TERMINATING) { // propagate shutdown w.shutdown(); break; } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && - UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) + UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) { inactivate = active = w.active = false; - int wc = workerCounts; + if (rs == SHUTDOWN) { // all inactive and shut down + tryTerminate(false); + continue; + } + } + int wc = workerCounts; // try to suspend as spare if ((wc & RUNNING_COUNT_MASK) > pc) { if (!(inactivate |= active) && // must inactivate to suspend - workerCounts == wc && // try to suspend as spare + workerCounts == wc && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) w.suspendAsSpare(); } else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) helpMaintainParallelism(); // not enough workers - else if (!ran) { + else if (ran) + break; + else { long h = eventWaiters; int ec = eventCount; if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) @@ -1003,8 +1018,6 @@ public class ForkJoinPool extends Abstra else if (!(inactivate |= active)) eventSync(w, wec); // must inactivate before sync } - else - break; } } @@ -1021,56 +1034,61 @@ public class ForkJoinPool extends Abstra boolean timed, long nanos) { long startTime = timed? System.nanoTime() : 0L; int retries = 2 + (parallelism >> 2); // #helpJoins before blocking + boolean running = true; // false when count decremented while (joinMe.status >= 0) { - int wc; - long nt = 0L; if (runState >= TERMINATING) { joinMe.cancelIgnoringExceptions(); break; } - worker.helpJoinTask(joinMe); + running = worker.helpJoinTask(joinMe, running); if (joinMe.status < 0) break; - else if (retries > 0) + if (retries > 0) { --retries; - else if (timed && - (nt = nanos - (System.nanoTime() - startTime)) <= 0L) - break; - else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && - UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc, wc - ONE_RUNNING)) { - int stat, c; long h; - while ((stat = joinMe.status) >= 0 && - (h = eventWaiters) != 0L && // help release others - (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) + continue; + } + int wc = workerCounts; + if ((wc & RUNNING_COUNT_MASK) != 0) { + if (running) { + if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, + wc, wc - ONE_RUNNING)) + continue; + running = false; + } + long h = eventWaiters; + if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); - if (stat >= 0) { - if ((workerCounts & RUNNING_COUNT_MASK) != 0) { - long ms; int ns; - if (!timed) { + if (joinMe.status < 0) + break; + if ((workerCounts & RUNNING_COUNT_MASK) != 0) { + long ms; int ns; + if (!timed) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else { // at most JOIN_TIMEOUT_MILLIS per wait + long nt = nanos - (System.nanoTime() - startTime); + if (nt <= 0L) + break; + ms = nt / 1000000; + if (ms > JOIN_TIMEOUT_MILLIS) { ms = JOIN_TIMEOUT_MILLIS; ns = 0; } - else { // at most JOIN_TIMEOUT_MILLIS per wait - ms = nt / 1000000; - if (ms > JOIN_TIMEOUT_MILLIS) { - ms = JOIN_TIMEOUT_MILLIS; - ns = 0; - } - else - ns = (int) (nt % 1000000); - } - stat = joinMe.internalAwaitDone(ms, ns); + else + ns = (int) (nt % 1000000); } - if (stat >= 0) // timeout or no running workers - helpMaintainParallelism(); + if (joinMe.internalAwaitDone(ms, ns) < 0) + break; } - do {} while (!UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - c = workerCounts, c + ONE_RUNNING)); - if (stat < 0) - break; // else restart } + helpMaintainParallelism(); + } + if (!running) { + int c; + do {} while (!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); } } @@ -1081,9 +1099,10 @@ public class ForkJoinPool extends Abstra throws InterruptedException { while (!blocker.isReleasable()) { int wc = workerCounts; - if ((wc & RUNNING_COUNT_MASK) != 0 && - UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc, wc - ONE_RUNNING)) { + if ((wc & RUNNING_COUNT_MASK) == 0) + helpMaintainParallelism(); + else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, + wc, wc - ONE_RUNNING)) { try { while (!blocker.isReleasable()) { long h = eventWaiters; @@ -1133,7 +1152,6 @@ public class ForkJoinPool extends Abstra return true; } - /** * Actions on transition to TERMINATING *