--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/07 06:19:05 1.72
+++ jsr166/src/jsr166y/ForkJoinPool.java 2010/09/20 20:42:36 1.81
@@ -6,16 +6,22 @@
package jsr166y;
-import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -429,10 +435,11 @@ public class ForkJoinPool extends Abstra
/**
* The wakeup interval (in nanoseconds) for the oldest worker
- * waiting for an event invokes tryShutdownUnusedWorker to shrink
- * the number of workers. The exact value does not matter too
- * much, but should be long enough to slowly release resources
- * during long periods without use without disrupting normal use.
+ * waiting for an event to invoke tryShutdownUnusedWorker to
+ * shrink the number of workers. The exact value does not matter
+ * too much. It must be short enough to release resources during
+ * sustained periods of idleness, but not so short that threads
+ * are continually re-created.
*/
private static final long SHRINK_RATE_NANOS =
30L * 1000L * 1000L * 1000L; // 2 per minute
@@ -515,7 +522,7 @@ public class ForkJoinPool extends Abstra
* Lifecycle control. The low word contains the number of workers
* that are (probably) executing tasks. This value is atomically
* incremented before a worker gets a task to run, and decremented
- * when worker has no tasks and cannot find any. Bits 16-18
+ * when a worker has no tasks and cannot find any. Bits 16-18
* contain runLevel value. When all are zero, the pool is
* running. Level transitions are monotonic (running -> shutdown
* -> terminating -> terminated) so each transition adds a bit.
@@ -604,7 +611,7 @@ public class ForkJoinPool extends Abstra
* (rarely) necessary when other count updates lag.
*
* @param dr -- either zero or ONE_RUNNING
- * @param dt == either zero or ONE_TOTAL
+ * @param dt -- either zero or ONE_TOTAL
*/
private void decrementWorkerCounts(int dr, int dt) {
for (;;) {
@@ -786,15 +793,13 @@ public class ForkJoinPool extends Abstra
(workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING) // recheck after clear
- break;
+ if (eventCount != ec || w.isTerminating())
+ break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING)
+ if (eventCount != ec || w.isTerminating())
break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
tryShutdownUnusedWorker(ec);
@@ -806,7 +811,7 @@ public class ForkJoinPool extends Abstra
// Maintaining parallelism
/**
- * Pushes worker onto the spare stack
+ * Pushes worker onto the spare stack.
*/
final void pushSpare(ForkJoinWorkerThread w) {
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
@@ -862,16 +867,23 @@ public class ForkJoinPool extends Abstra
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc + (ONE_RUNNING|ONE_TOTAL))) {
ForkJoinWorkerThread w = null;
+ Throwable fail = null;
try {
w = factory.newThread(this);
- } finally { // adjust on null or exceptional factory return
- if (w == null) {
- decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
- tryTerminate(false); // handle failure during shutdown
- }
+ } catch (Throwable ex) {
+ fail = ex;
}
- if (w == null)
+ if (w == null) { // null or exceptional factory return
+ decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+ tryTerminate(false); // handle failure during shutdown
+ // If originating from an external caller,
+ // propagate exception, else ignore
+ if (fail != null && runState < TERMINATING &&
+ !(Thread.currentThread() instanceof
+ ForkJoinWorkerThread))
+ UNSAFE.throwException(fail);
break;
+ }
w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count
@@ -960,8 +972,12 @@ public class ForkJoinPool extends Abstra
boolean active = w.active;
boolean inactivate = false;
int pc = parallelism;
- int rs;
- while (w.runState == 0 && (rs = runState) < TERMINATING) {
+ while (w.runState == 0) {
+ int rs = runState;
+ if (rs >= TERMINATING) { // propagate shutdown
+ w.shutdown();
+ break;
+ }
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false;
@@ -1089,6 +1105,7 @@ public class ForkJoinPool extends Abstra
return true;
}
+
/**
* Actions on transition to TERMINATING
*
@@ -1112,7 +1129,7 @@ public class ForkJoinPool extends Abstra
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
- if (passes > 1) {
+ if (passes > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
@@ -1225,13 +1242,13 @@ public class ForkJoinPool extends Abstra
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
- * tasks. For default value, use null
.
+ * tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
- * For default value, use false
.
+ * For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -1441,7 +1458,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns the number of worker threads that have started but not
- * yet terminated. This result returned by this method may differ
+ * yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
@@ -1689,6 +1706,13 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
+ */
+ final boolean isAtLeastTerminating() {
+ return runState >= TERMINATING;
+ }
+
+ /**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
@@ -1842,11 +1866,11 @@ public class ForkJoinPool extends Abstra
private static final long eventCountOffset =
objectFieldOffset("eventCount", ForkJoinPool.class);
private static final long eventWaitersOffset =
- objectFieldOffset("eventWaiters",ForkJoinPool.class);
+ objectFieldOffset("eventWaiters", ForkJoinPool.class);
private static final long stealCountOffset =
- objectFieldOffset("stealCount",ForkJoinPool.class);
+ objectFieldOffset("stealCount", ForkJoinPool.class);
private static final long spareWaitersOffset =
- objectFieldOffset("spareWaiters",ForkJoinPool.class);
+ objectFieldOffset("spareWaiters", ForkJoinPool.class);
private static long objectFieldOffset(String field, Class> klazz) {
try {