--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/01 06:40:12 1.68
+++ jsr166/src/jsr166y/ForkJoinPool.java 2010/09/07 23:49:30 1.79
@@ -6,17 +6,23 @@
package jsr166y;
-import java.util.concurrent.*;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -301,7 +307,7 @@ public class ForkJoinPool extends Abstra
* about the same time as another is needlessly being created. We
* counteract this and related slop in part by requiring resumed
* spares to immediately recheck (in preStep) to see whether they
- * they should re-suspend.
+ * should re-suspend.
*
* 6. Killing off unneeded workers. A timeout mechanism is used to
* shed unused workers: The oldest (first) event queue waiter uses
@@ -346,7 +352,7 @@ public class ForkJoinPool extends Abstra
* "while ((local = field) != 0)") which are usually the simplest
* way to ensure the required read orderings (which are sometimes
* critical). Also several occurrences of the unusual "do {}
- * while(!cas...)" which is the simplest way to force an update of
+ * while (!cas...)" which is the simplest way to force an update of
* a CAS'ed variable. There are also other coding oddities that
* help some methods perform reasonably even when interpreted (not
* compiled), at the expense of some messy constructions that
@@ -430,10 +436,11 @@ public class ForkJoinPool extends Abstra
/**
* The wakeup interval (in nanoseconds) for the oldest worker
- * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
- * the number of workers. The exact value does not matter too
- * much, but should be long enough to slowly release resources
- * during long periods without use without disrupting normal use.
+ * waiting for an event to invoke tryShutdownUnusedWorker to
+ * shrink the number of workers. The exact value does not matter
+ * too much. It must be short enough to release resources during
+ * sustained periods of idleness, but not so short that threads
+ * are continually re-created.
*/
private static final long SHRINK_RATE_NANOS =
30L * 1000L * 1000L * 1000L; // 2 per minute
@@ -516,7 +523,7 @@ public class ForkJoinPool extends Abstra
* Lifecycle control. The low word contains the number of workers
* that are (probably) executing tasks. This value is atomically
* incremented before a worker gets a task to run, and decremented
- * when worker has no tasks and cannot find any. Bits 16-18
+ * when a worker has no tasks and cannot find any. Bits 16-18
* contain runLevel value. When all are zero, the pool is
* running. Level transitions are monotonic (running -> shutdown
* -> terminating -> terminated) so each transition adds a bit.
@@ -605,7 +612,7 @@ public class ForkJoinPool extends Abstra
* (rarely) necessary when other count updates lag.
*
* @param dr -- either zero or ONE_RUNNING
- * @param dt == either zero or ONE_TOTAL
+ * @param dt -- either zero or ONE_TOTAL
*/
private void decrementWorkerCounts(int dr, int dt) {
for (;;) {
@@ -674,7 +681,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Nulls out record of worker in workers array
+ * Nulls out record of worker in workers array.
*/
private void forgetWorker(ForkJoinWorkerThread w) {
int idx = w.poolIndex;
@@ -807,7 +814,7 @@ public class ForkJoinPool extends Abstra
// Maintaining parallelism
/**
- * Pushes worker onto the spare stack
+ * Pushes worker onto the spare stack.
*/
final void pushSpare(ForkJoinWorkerThread w) {
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
@@ -832,9 +839,9 @@ public class ForkJoinPool extends Abstra
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
int c; // increment running count before resume
- do {} while(!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
if (w.tryUnsuspend())
LockSupport.unpark(w);
else // back out if w was shutdown
@@ -1107,10 +1114,7 @@ public class ForkJoinPool extends Abstra
c = eventCount, c+1);
eventWaiters = 0L; // clobber lists
spareWaiters = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers) {
if (w != null) {
w.shutdown();
if (passes > 0 && !w.isTerminated()) {
@@ -1129,7 +1133,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Clear out and cancel submissions, ignoring exceptions
+ * Clears out and cancels submissions, ignoring exceptions.
*/
private void cancelSubmissions() {
ForkJoinTask> task;
@@ -1144,15 +1148,15 @@ public class ForkJoinPool extends Abstra
// misc support for ForkJoinWorkerThread
/**
- * Returns pool number
+ * Returns pool number.
*/
final int getPoolNumber() {
return poolNumber;
}
/**
- * Tries to accumulates steal count from a worker, clearing
- * the worker's value.
+ * Tries to accumulate steal count from a worker, clearing
+ * the worker's value if successful.
*
* @return true if worker steal count now zero
*/
@@ -1176,7 +1180,10 @@ public class ForkJoinPool extends Abstra
int pc = parallelism; // use parallelism, not rc
int ac = runState; // no mask -- artificially boosts during shutdown
// Use exact results for small values, saturate past 4
- return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
+ return ((pc <= ac) ? 0 :
+ (pc >>> 1 <= ac) ? 1 :
+ (pc >>> 2 <= ac) ? 3 :
+ pc >>> 3);
}
// Public and protected methods
@@ -1226,13 +1233,13 @@ public class ForkJoinPool extends Abstra
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
- * tasks. For default value, use null
.
+ * tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
- * For default value, use false
.
+ * For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -1442,7 +1449,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns the number of worker threads that have started but not
- * yet terminated. This result returned by this method may differ
+ * yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
@@ -1527,13 +1534,9 @@ public class ForkJoinPool extends Abstra
*/
public long getQueuedTaskCount() {
long count = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.getQueueSize();
- }
return count;
}
@@ -1588,13 +1591,9 @@ public class ForkJoinPool extends Abstra
*/
protected int drainTasksTo(Collection super ForkJoinTask>> c) {
int count = submissionQueue.drainTo(c);
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.drainTasksTo(c);
- }
return count;
}
@@ -1721,7 +1720,7 @@ public class ForkJoinPool extends Abstra
throws InterruptedException {
try {
return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
- } catch(TimeoutException ex) {
+ } catch (TimeoutException ex) {
return false;
}
}
@@ -1851,11 +1850,11 @@ public class ForkJoinPool extends Abstra
private static final long eventCountOffset =
objectFieldOffset("eventCount", ForkJoinPool.class);
private static final long eventWaitersOffset =
- objectFieldOffset("eventWaiters",ForkJoinPool.class);
+ objectFieldOffset("eventWaiters", ForkJoinPool.class);
private static final long stealCountOffset =
- objectFieldOffset("stealCount",ForkJoinPool.class);
+ objectFieldOffset("stealCount", ForkJoinPool.class);
private static final long spareWaitersOffset =
- objectFieldOffset("spareWaiters",ForkJoinPool.class);
+ objectFieldOffset("spareWaiters", ForkJoinPool.class);
private static long objectFieldOffset(String field, Class> klazz) {
try {