--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/01 20:12:39 1.69
+++ jsr166/src/jsr166y/ForkJoinPool.java 2010/10/24 19:37:26 1.83
@@ -6,17 +6,22 @@
package jsr166y;
-import java.util.concurrent.*;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -301,7 +306,7 @@ public class ForkJoinPool extends Abstra
* about the same time as another is needlessly being created. We
* counteract this and related slop in part by requiring resumed
* spares to immediately recheck (in preStep) to see whether they
- * they should re-suspend.
+ * should re-suspend.
*
* 6. Killing off unneeded workers. A timeout mechanism is used to
* shed unused workers: The oldest (first) event queue waiter uses
@@ -430,10 +435,11 @@ public class ForkJoinPool extends Abstra
/**
* The wakeup interval (in nanoseconds) for the oldest worker
- * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
- * the number of workers. The exact value does not matter too
- * much, but should be long enough to slowly release resources
- * during long periods without use without disrupting normal use.
+ * waiting for an event to invoke tryShutdownUnusedWorker to
+ * shrink the number of workers. The exact value does not matter
+ * too much. It must be short enough to release resources during
+ * sustained periods of idleness, but not so short that threads
+ * are continually re-created.
*/
private static final long SHRINK_RATE_NANOS =
30L * 1000L * 1000L * 1000L; // 2 per minute
@@ -516,7 +522,7 @@ public class ForkJoinPool extends Abstra
* Lifecycle control. The low word contains the number of workers
* that are (probably) executing tasks. This value is atomically
* incremented before a worker gets a task to run, and decremented
- * when worker has no tasks and cannot find any. Bits 16-18
+ * when a worker has no tasks and cannot find any. Bits 16-18
* contain runLevel value. When all are zero, the pool is
* running. Level transitions are monotonic (running -> shutdown
* -> terminating -> terminated) so each transition adds a bit.
@@ -605,7 +611,7 @@ public class ForkJoinPool extends Abstra
* (rarely) necessary when other count updates lag.
*
* @param dr -- either zero or ONE_RUNNING
- * @param dt == either zero or ONE_TOTAL
+ * @param dt -- either zero or ONE_TOTAL
*/
private void decrementWorkerCounts(int dr, int dt) {
for (;;) {
@@ -674,7 +680,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Nulls out record of worker in workers array
+ * Nulls out record of worker in workers array.
*/
private void forgetWorker(ForkJoinWorkerThread w) {
int idx = w.poolIndex;
@@ -699,7 +705,7 @@ public class ForkJoinPool extends Abstra
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
- decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
@@ -785,17 +791,15 @@ public class ForkJoinPool extends Abstra
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
- long startTime = untimed? 0 : System.nanoTime();
+ long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING) // recheck after clear
- break;
+ if (eventCount != ec || w.isTerminating())
+ break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
- if (eventCount != ec || w.runState != 0 ||
- runState >= TERMINATING)
+ if (eventCount != ec || w.isTerminating())
break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
tryShutdownUnusedWorker(ec);
@@ -807,7 +811,7 @@ public class ForkJoinPool extends Abstra
// Maintaining parallelism
/**
- * Pushes worker onto the spare stack
+ * Pushes worker onto the spare stack.
*/
final void pushSpare(ForkJoinWorkerThread w) {
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
@@ -863,16 +867,23 @@ public class ForkJoinPool extends Abstra
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc + (ONE_RUNNING|ONE_TOTAL))) {
ForkJoinWorkerThread w = null;
+ Throwable fail = null;
try {
w = factory.newThread(this);
- } finally { // adjust on null or exceptional factory return
- if (w == null) {
- decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
- tryTerminate(false); // handle failure during shutdown
- }
+ } catch (Throwable ex) {
+ fail = ex;
}
- if (w == null)
+ if (w == null) { // null or exceptional factory return
+ decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+ tryTerminate(false); // handle failure during shutdown
+ // If originating from an external caller,
+ // propagate exception, else ignore
+ if (fail != null && runState < TERMINATING &&
+ !(Thread.currentThread() instanceof
+ ForkJoinWorkerThread))
+ UNSAFE.throwException(fail);
break;
+ }
w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count
@@ -961,8 +972,12 @@ public class ForkJoinPool extends Abstra
boolean active = w.active;
boolean inactivate = false;
int pc = parallelism;
- int rs;
- while (w.runState == 0 && (rs = runState) < TERMINATING) {
+ while (w.runState == 0) {
+ int rs = runState;
+ if (rs >= TERMINATING) { // propagate shutdown
+ w.shutdown();
+ break;
+ }
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false;
@@ -999,16 +1014,28 @@ public class ForkJoinPool extends Abstra
*
* @param joinMe the task to join
* @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
*/
- final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker) {
+ final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker,
+ boolean timed, long nanos) {
+ long startTime = timed? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
while (joinMe.status >= 0) {
int wc;
+ long nt = 0L;
+ if (runState >= TERMINATING) {
+ joinMe.cancelIgnoringExceptions();
+ break;
+ }
worker.helpJoinTask(joinMe);
if (joinMe.status < 0)
break;
else if (retries > 0)
--retries;
+ else if (timed &&
+ (nt = nanos - (System.nanoTime() - startTime)) <= 0L)
+ break;
else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
@@ -1017,11 +1044,27 @@ public class ForkJoinPool extends Abstra
(h = eventWaiters) != 0L && // help release others
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
- if (stat >= 0 &&
- ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
- (stat =
- joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
- helpMaintainParallelism(); // timeout or no running workers
+ if (stat >= 0) {
+ if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+ long ms; int ns;
+ if (!timed) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else { // at most JOIN_TIMEOUT_MILLIS per wait
+ ms = nt / 1000000;
+ if (ms > JOIN_TIMEOUT_MILLIS) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else
+ ns = (int) (nt % 1000000);
+ }
+ stat = joinMe.internalAwaitDone(ms, ns);
+ }
+ if (stat >= 0) // timeout or no running workers
+ helpMaintainParallelism();
+ }
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
@@ -1090,6 +1133,7 @@ public class ForkJoinPool extends Abstra
return true;
}
+
/**
* Actions on transition to TERMINATING
*
@@ -1107,16 +1151,13 @@ public class ForkJoinPool extends Abstra
c = eventCount, c+1);
eventWaiters = 0L; // clobber lists
spareWaiters = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers) {
if (w != null) {
w.shutdown();
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
- if (passes > 1) {
+ if (passes > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
@@ -1129,7 +1170,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Clear out and cancel submissions, ignoring exceptions
+ * Clears out and cancels submissions, ignoring exceptions.
*/
private void cancelSubmissions() {
ForkJoinTask> task;
@@ -1144,15 +1185,15 @@ public class ForkJoinPool extends Abstra
// misc support for ForkJoinWorkerThread
/**
- * Returns pool number
+ * Returns pool number.
*/
final int getPoolNumber() {
return poolNumber;
}
/**
- * Tries to accumulates steal count from a worker, clearing
- * the worker's value.
+ * Tries to accumulate steal count from a worker, clearing
+ * the worker's value if successful.
*
* @return true if worker steal count now zero
*/
@@ -1176,7 +1217,10 @@ public class ForkJoinPool extends Abstra
int pc = parallelism; // use parallelism, not rc
int ac = runState; // no mask -- artificially boosts during shutdown
// Use exact results for small values, saturate past 4
- return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
+ return ((pc <= ac) ? 0 :
+ (pc >>> 1 <= ac) ? 1 :
+ (pc >>> 2 <= ac) ? 3 :
+ pc >>> 3);
}
// Public and protected methods
@@ -1226,13 +1270,13 @@ public class ForkJoinPool extends Abstra
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
- * tasks. For default value, use null
.
+ * tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
- * For default value, use false
.
+ * For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -1280,17 +1324,13 @@ public class ForkJoinPool extends Abstra
// Execution methods
/**
- * Common code for execute, invoke and submit
+ * Submits task and creates, starts, or resumes some workers if necessary
*/
private void doSubmit(ForkJoinTask task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
- helpMaintainParallelism(); // create, start, or resume some workers
+ helpMaintainParallelism();
}
/**
@@ -1303,8 +1343,33 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public T invoke(ForkJoinTask task) {
- doSubmit(task);
- return task.join();
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
+ doSubmit(task);
+ return task.join();
+ }
+ }
+
+ /**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private void forkOrSubmit(ForkJoinTask task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
}
/**
@@ -1316,7 +1381,9 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public void execute(ForkJoinTask> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
}
// AbstractExecutorService methods
@@ -1327,12 +1394,14 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
}
/**
@@ -1345,7 +1414,9 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(ForkJoinTask task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
return task;
}
@@ -1355,8 +1426,10 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(Callable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1366,8 +1439,10 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task, result);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1377,12 +1452,14 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1442,7 +1519,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns the number of worker threads that have started but not
- * yet terminated. This result returned by this method may differ
+ * yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
@@ -1527,13 +1604,9 @@ public class ForkJoinPool extends Abstra
*/
public long getQueuedTaskCount() {
long count = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.getQueueSize();
- }
return count;
}
@@ -1588,13 +1661,9 @@ public class ForkJoinPool extends Abstra
*/
protected int drainTasksTo(Collection super ForkJoinTask>> c) {
int count = submissionQueue.drainTo(c);
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.drainTasksTo(c);
- }
return count;
}
@@ -1698,6 +1767,13 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
+ */
+ final boolean isAtLeastTerminating() {
+ return runState >= TERMINATING;
+ }
+
+ /**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
@@ -1851,11 +1927,11 @@ public class ForkJoinPool extends Abstra
private static final long eventCountOffset =
objectFieldOffset("eventCount", ForkJoinPool.class);
private static final long eventWaitersOffset =
- objectFieldOffset("eventWaiters",ForkJoinPool.class);
+ objectFieldOffset("eventWaiters", ForkJoinPool.class);
private static final long stealCountOffset =
- objectFieldOffset("stealCount",ForkJoinPool.class);
+ objectFieldOffset("stealCount", ForkJoinPool.class);
private static final long spareWaitersOffset =
- objectFieldOffset("spareWaiters",ForkJoinPool.class);
+ objectFieldOffset("spareWaiters", ForkJoinPool.class);
private static long objectFieldOffset(String field, Class> klazz) {
try {