--- jsr166/src/jsr166y/ForkJoinPool.java 2010/07/23 13:07:43 1.58 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/07/23 14:09:17 1.59 @@ -60,7 +60,7 @@ import java.util.concurrent.CountDownLat * Runnable}- or {@code Callable}- based activities as well. However, * tasks that are already executing in a pool should normally * NOT use these pool execution methods, but instead use the - * within-computation forms listed in the table. + * within-computation forms listed in the table. * * * @@ -84,7 +84,7 @@ import java.util.concurrent.CountDownLat * * *
{@link ForkJoinTask#fork} (ForkJoinTasks are Futures)
- * + * *

Sample Usage. Normally a single {@code ForkJoinPool} is * used for all parallel task execution in a program or subsystem. * Otherwise, use would not usually outweigh the construction and @@ -171,7 +171,7 @@ public class ForkJoinPool extends Abstra * ForkJoinWorkerThread.joinTask) interleave these options until * successful. Creating a new spare always succeeds, but also * increases application footprint, so we try to avoid it, within - * reason. + * reason. * * The ManagedBlocker extension API can't use option (1) so uses a * special version of (2) in method awaitBlocker. @@ -539,7 +539,7 @@ public class ForkJoinPool extends Abstra final void incrementRunningCount() { int c; do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, - c = workerCounts, + c = workerCounts, c + ONE_RUNNING)); } @@ -689,29 +689,27 @@ public class ForkJoinPool extends Abstra * parallelism maintenance */ private void ensureEnoughWorkers() { - for (;;) { + while ((runState & TERMINATING) == 0) { int pc = parallelism; int wc = workerCounts; int rc = wc & RUNNING_COUNT_MASK; int tc = wc >>> TOTAL_COUNT_SHIFT; if (tc < pc) { - if (runState == TERMINATING || - (UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - wc, wc + (ONE_RUNNING|ONE_TOTAL)) && - addWorker() == null)) + if (UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + wc, wc + (ONE_RUNNING|ONE_TOTAL)) && + addWorker() == null) break; } - else if (tc > pc && rc < pc && + else if (tc > pc && rc < pc && tc > (runState & ACTIVE_COUNT_MASK)) { ForkJoinWorkerThread spare = null; ForkJoinWorkerThread[] ws = workers; int nws = ws.length; - for (int i = 0; i < nws; ++i) { + for (int i = 0; i < nws; ++i) { ForkJoinWorkerThread w = ws[i]; if (w != null && w.isSuspended()) { - if ((workerCounts & RUNNING_COUNT_MASK) > pc || - runState == TERMINATING) + if ((workerCounts & RUNNING_COUNT_MASK) > pc) return; if (w.tryResumeSpare()) incrementRunningCount(); @@ -792,7 +790,7 @@ public class ForkJoinPool extends Abstra */ private void signalEvent() { int c; - do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset, + do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1)); releaseWaiters(); } @@ -919,7 +917,7 @@ public class ForkJoinPool extends Abstra * * We allow blocking if: * - * 1. There would still be at least as many running threads as + * 1. There would still be at least as many running threads as * parallelism level if this thread blocks. * * 2. A spare is resumed to replace this worker. We tolerate @@ -929,13 +927,13 @@ public class ForkJoinPool extends Abstra * preStep(). * * 3. After #spares repeated checks, there are no fewer than #spare - * threads not running. We allow this slack to avoid hysteresis - * and as a hedge against lag/uncertainty of running count + * threads not running. We allow this slack to avoid hysteresis + * and as a hedge against lag/uncertainty of running count * estimates when signalling or unblocking stalls. * * 4. All existing workers are busy (as rechecked via repeated * retries by caller) and a new spare is created. - * + * * If none of the above hold, we try to escape out by * re-incrementing count and returning to caller, which can retry * later. @@ -948,8 +946,13 @@ public class ForkJoinPool extends Abstra * none of the blocking checks hold */ final boolean tryAwaitJoin(ForkJoinTask joinMe, int retries) { - if (joinMe.status < 0) // precheck to prime loop + if (joinMe.status < 0) // precheck for cancellation + return false; + if ((runState & TERMINATING) != 0) { // shutting down + joinMe.cancelIgnoringExceptions(); return false; + } + int pc = parallelism; boolean running = true; // false when running count decremented outer:for (;;) { @@ -957,7 +960,7 @@ public class ForkJoinPool extends Abstra int rc = wc & RUNNING_COUNT_MASK; int tc = wc >>> TOTAL_COUNT_SHIFT; if (running) { // replace with spare or decrement count - if (rc <= pc && tc > pc && + if (rc <= pc && tc > pc && (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { ForkJoinWorkerThread[] ws = workers; int nws = ws.length; @@ -979,7 +982,7 @@ public class ForkJoinPool extends Abstra } if (retries < 0 || // < 0 means replacement check only rc == 0 || joinMe.status < 0 || workerCounts != wc || - !UNSAFE.compareAndSwapInt(this, workerCountsOffset, + !UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) return false; // done or inconsistent or contended running = false; @@ -993,8 +996,8 @@ public class ForkJoinPool extends Abstra if (retries > sc) { if (rc > 0 && rc >= pc - sc) // allow slack break; - if (tc < MAX_THREADS && - tc == (runState & ACTIVE_COUNT_MASK) && + if (tc < MAX_THREADS && + tc == (runState & ACTIVE_COUNT_MASK) && workerCounts == wc && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc+(ONE_RUNNING|ONE_TOTAL))) { @@ -1036,12 +1039,12 @@ public class ForkJoinPool extends Abstra int wc = workerCounts; int rc = wc & RUNNING_COUNT_MASK; int tc = wc >>> TOTAL_COUNT_SHIFT; - if (running) { - if (rc <= pc && tc > pc && + if (running) { + if (rc <= pc && tc > pc && (retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) { ForkJoinWorkerThread[] ws = workers; int nws = ws.length; - for (int i = 0; i < nws; ++i) { + for (int i = 0; i < nws; ++i) { ForkJoinWorkerThread w = ws[i]; if (w != null) { if (done = blocker.isReleasable()) @@ -1060,22 +1063,22 @@ public class ForkJoinPool extends Abstra if (done = blocker.isReleasable()) return; if (rc == 0 || workerCounts != wc || - !UNSAFE.compareAndSwapInt(this, workerCountsOffset, + !UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) continue; running = false; if (rc > pc) break; } - else { + else { if (rc >= pc || (done = blocker.isReleasable())) break; int sc = tc - pc + 1; if (retries++ > sc) { if (rc > 0 && rc >= pc - sc) break; - if (tc < MAX_THREADS && - tc == (runState & ACTIVE_COUNT_MASK) && + if (tc < MAX_THREADS && + tc == (runState & ACTIVE_COUNT_MASK) && workerCounts == wc && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc+(ONE_RUNNING|ONE_TOTAL))) { @@ -1086,7 +1089,7 @@ public class ForkJoinPool extends Abstra Thread.yield(); } } - + try { if (!done) do {} while (!blocker.isReleasable() && !blocker.block()); @@ -1098,7 +1101,7 @@ public class ForkJoinPool extends Abstra c = workerCounts, c + ONE_RUNNING)); } } - } + } /** * Possibly initiates and/or completes termination. @@ -1275,10 +1278,10 @@ public class ForkJoinPool extends Abstra * use {@link java.lang.Runtime#availableProcessors}. * @param factory the factory for creating new threads. For default value, * use {@link #defaultForkJoinWorkerThreadFactory}. - * @param handler the handler for internal worker threads that - * terminate due to unrecoverable errors encountered while executing + * @param handler the handler for internal worker threads that + * terminate due to unrecoverable errors encountered while executing * tasks. For default value, use null. - * @param asyncMode if true, + * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which @@ -1292,7 +1295,7 @@ public class ForkJoinPool extends Abstra * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ - public ForkJoinPool(int parallelism, + public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) { @@ -1345,7 +1348,7 @@ public class ForkJoinPool extends Abstra /** * Performs the given task, returning its result upon completion. * If the caller is already engaged in a fork/join computation in - * the current pool, this method is equivalent in effect to + * the current pool, this method is equivalent in effect to * {@link ForkJoinTask#invoke}. * * @param task the task @@ -1362,7 +1365,7 @@ public class ForkJoinPool extends Abstra /** * Arranges for (asynchronous) execution of the given task. * If the caller is already engaged in a fork/join computation in - * the current pool, this method is equivalent in effect to + * the current pool, this method is equivalent in effect to * {@link ForkJoinTask#fork}. * * @param task the task @@ -1393,7 +1396,7 @@ public class ForkJoinPool extends Abstra /** * Submits a ForkJoinTask for execution. * If the caller is already engaged in a fork/join computation in - * the current pool, this method is equivalent in effect to + * the current pool, this method is equivalent in effect to * {@link ForkJoinTask#fork}. * * @param task the task to submit