--- jsr166/src/jsr166y/ForkJoinPool.java 2010/07/23 13:07:43 1.58 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/07/23 14:09:17 1.59 @@ -60,7 +60,7 @@ import java.util.concurrent.CountDownLat * Runnable}- or {@code Callable}- based activities as well. However, * tasks that are already executing in a pool should normally * NOT use these pool execution methods, but instead use the - * within-computation forms listed in the table. + * within-computation forms listed in the table. * *
{@link ForkJoinTask#fork} (ForkJoinTasks are Futures) | *
Sample Usage. Normally a single {@code ForkJoinPool} is
* used for all parallel task execution in a program or subsystem.
* Otherwise, use would not usually outweigh the construction and
@@ -171,7 +171,7 @@ public class ForkJoinPool extends Abstra
* ForkJoinWorkerThread.joinTask) interleave these options until
* successful. Creating a new spare always succeeds, but also
* increases application footprint, so we try to avoid it, within
- * reason.
+ * reason.
*
* The ManagedBlocker extension API can't use option (1) so uses a
* special version of (2) in method awaitBlocker.
@@ -539,7 +539,7 @@ public class ForkJoinPool extends Abstra
final void incrementRunningCount() {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- c = workerCounts,
+ c = workerCounts,
c + ONE_RUNNING));
}
@@ -689,29 +689,27 @@ public class ForkJoinPool extends Abstra
* parallelism maintenance
*/
private void ensureEnoughWorkers() {
- for (;;) {
+ while ((runState & TERMINATING) == 0) {
int pc = parallelism;
int wc = workerCounts;
int rc = wc & RUNNING_COUNT_MASK;
int tc = wc >>> TOTAL_COUNT_SHIFT;
if (tc < pc) {
- if (runState == TERMINATING ||
- (UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
- addWorker() == null))
+ if (UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
+ addWorker() == null)
break;
}
- else if (tc > pc && rc < pc &&
+ else if (tc > pc && rc < pc &&
tc > (runState & ACTIVE_COUNT_MASK)) {
ForkJoinWorkerThread spare = null;
ForkJoinWorkerThread[] ws = workers;
int nws = ws.length;
- for (int i = 0; i < nws; ++i) {
+ for (int i = 0; i < nws; ++i) {
ForkJoinWorkerThread w = ws[i];
if (w != null && w.isSuspended()) {
- if ((workerCounts & RUNNING_COUNT_MASK) > pc ||
- runState == TERMINATING)
+ if ((workerCounts & RUNNING_COUNT_MASK) > pc)
return;
if (w.tryResumeSpare())
incrementRunningCount();
@@ -792,7 +790,7 @@ public class ForkJoinPool extends Abstra
*/
private void signalEvent() {
int c;
- do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
c = eventCount, c+1));
releaseWaiters();
}
@@ -919,7 +917,7 @@ public class ForkJoinPool extends Abstra
*
* We allow blocking if:
*
- * 1. There would still be at least as many running threads as
+ * 1. There would still be at least as many running threads as
* parallelism level if this thread blocks.
*
* 2. A spare is resumed to replace this worker. We tolerate
@@ -929,13 +927,13 @@ public class ForkJoinPool extends Abstra
* preStep().
*
* 3. After #spares repeated checks, there are no fewer than #spare
- * threads not running. We allow this slack to avoid hysteresis
- * and as a hedge against lag/uncertainty of running count
+ * threads not running. We allow this slack to avoid hysteresis
+ * and as a hedge against lag/uncertainty of running count
* estimates when signalling or unblocking stalls.
*
* 4. All existing workers are busy (as rechecked via repeated
* retries by caller) and a new spare is created.
- *
+ *
* If none of the above hold, we try to escape out by
* re-incrementing count and returning to caller, which can retry
* later.
@@ -948,8 +946,13 @@ public class ForkJoinPool extends Abstra
* none of the blocking checks hold
*/
final boolean tryAwaitJoin(ForkJoinTask> joinMe, int retries) {
- if (joinMe.status < 0) // precheck to prime loop
+ if (joinMe.status < 0) // precheck for cancellation
+ return false;
+ if ((runState & TERMINATING) != 0) { // shutting down
+ joinMe.cancelIgnoringExceptions();
return false;
+ }
+
int pc = parallelism;
boolean running = true; // false when running count decremented
outer:for (;;) {
@@ -957,7 +960,7 @@ public class ForkJoinPool extends Abstra
int rc = wc & RUNNING_COUNT_MASK;
int tc = wc >>> TOTAL_COUNT_SHIFT;
if (running) { // replace with spare or decrement count
- if (rc <= pc && tc > pc &&
+ if (rc <= pc && tc > pc &&
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
ForkJoinWorkerThread[] ws = workers;
int nws = ws.length;
@@ -979,7 +982,7 @@ public class ForkJoinPool extends Abstra
}
if (retries < 0 || // < 0 means replacement check only
rc == 0 || joinMe.status < 0 || workerCounts != wc ||
- !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
return false; // done or inconsistent or contended
running = false;
@@ -993,8 +996,8 @@ public class ForkJoinPool extends Abstra
if (retries > sc) {
if (rc > 0 && rc >= pc - sc) // allow slack
break;
- if (tc < MAX_THREADS &&
- tc == (runState & ACTIVE_COUNT_MASK) &&
+ if (tc < MAX_THREADS &&
+ tc == (runState & ACTIVE_COUNT_MASK) &&
workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc+(ONE_RUNNING|ONE_TOTAL))) {
@@ -1036,12 +1039,12 @@ public class ForkJoinPool extends Abstra
int wc = workerCounts;
int rc = wc & RUNNING_COUNT_MASK;
int tc = wc >>> TOTAL_COUNT_SHIFT;
- if (running) {
- if (rc <= pc && tc > pc &&
+ if (running) {
+ if (rc <= pc && tc > pc &&
(retries > 0 || tc > (runState & ACTIVE_COUNT_MASK))) {
ForkJoinWorkerThread[] ws = workers;
int nws = ws.length;
- for (int i = 0; i < nws; ++i) {
+ for (int i = 0; i < nws; ++i) {
ForkJoinWorkerThread w = ws[i];
if (w != null) {
if (done = blocker.isReleasable())
@@ -1060,22 +1063,22 @@ public class ForkJoinPool extends Abstra
if (done = blocker.isReleasable())
return;
if (rc == 0 || workerCounts != wc ||
- !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ !UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
continue;
running = false;
if (rc > pc)
break;
}
- else {
+ else {
if (rc >= pc || (done = blocker.isReleasable()))
break;
int sc = tc - pc + 1;
if (retries++ > sc) {
if (rc > 0 && rc >= pc - sc)
break;
- if (tc < MAX_THREADS &&
- tc == (runState & ACTIVE_COUNT_MASK) &&
+ if (tc < MAX_THREADS &&
+ tc == (runState & ACTIVE_COUNT_MASK) &&
workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc+(ONE_RUNNING|ONE_TOTAL))) {
@@ -1086,7 +1089,7 @@ public class ForkJoinPool extends Abstra
Thread.yield();
}
}
-
+
try {
if (!done)
do {} while (!blocker.isReleasable() && !blocker.block());
@@ -1098,7 +1101,7 @@ public class ForkJoinPool extends Abstra
c = workerCounts, c + ONE_RUNNING));
}
}
- }
+ }
/**
* Possibly initiates and/or completes termination.
@@ -1275,10 +1278,10 @@ public class ForkJoinPool extends Abstra
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
- * @param handler the handler for internal worker threads that
- * terminate due to unrecoverable errors encountered while executing
+ * @param handler the handler for internal worker threads that
+ * terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use null
.
- * @param asyncMode if true,
+ * @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
@@ -1292,7 +1295,7 @@ public class ForkJoinPool extends Abstra
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
- public ForkJoinPool(int parallelism,
+ public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode) {
@@ -1345,7 +1348,7 @@ public class ForkJoinPool extends Abstra
/**
* Performs the given task, returning its result upon completion.
* If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
+ * the current pool, this method is equivalent in effect to
* {@link ForkJoinTask#invoke}.
*
* @param task the task
@@ -1362,7 +1365,7 @@ public class ForkJoinPool extends Abstra
/**
* Arranges for (asynchronous) execution of the given task.
* If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
+ * the current pool, this method is equivalent in effect to
* {@link ForkJoinTask#fork}.
*
* @param task the task
@@ -1393,7 +1396,7 @@ public class ForkJoinPool extends Abstra
/**
* Submits a ForkJoinTask for execution.
* If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
+ * the current pool, this method is equivalent in effect to
* {@link ForkJoinTask#fork}.
*
* @param task the task to submit