--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/06 21:36:43 1.71 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/09/20 20:42:36 1.81 @@ -6,16 +6,22 @@ package jsr166y; -import java.util.concurrent.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.CountDownLatch; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. @@ -300,7 +306,7 @@ public class ForkJoinPool extends Abstra * about the same time as another is needlessly being created. We * counteract this and related slop in part by requiring resumed * spares to immediately recheck (in preStep) to see whether they - * they should re-suspend. + * should re-suspend. * * 6. Killing off unneeded workers. A timeout mechanism is used to * shed unused workers: The oldest (first) event queue waiter uses @@ -429,10 +435,11 @@ public class ForkJoinPool extends Abstra /** * The wakeup interval (in nanoseconds) for the oldest worker - * worker waiting for an event invokes tryShutdownUnusedWorker to shrink - * the number of workers. The exact value does not matter too - * much, but should be long enough to slowly release resources - * during long periods without use without disrupting normal use. + * waiting for an event to invoke tryShutdownUnusedWorker to + * shrink the number of workers. The exact value does not matter + * too much. It must be short enough to release resources during + * sustained periods of idleness, but not so short that threads + * are continually re-created. */ private static final long SHRINK_RATE_NANOS = 30L * 1000L * 1000L * 1000L; // 2 per minute @@ -515,7 +522,7 @@ public class ForkJoinPool extends Abstra * Lifecycle control. The low word contains the number of workers * that are (probably) executing tasks. This value is atomically * incremented before a worker gets a task to run, and decremented - * when worker has no tasks and cannot find any. Bits 16-18 + * when a worker has no tasks and cannot find any. Bits 16-18 * contain runLevel value. When all are zero, the pool is * running. Level transitions are monotonic (running -> shutdown * -> terminating -> terminated) so each transition adds a bit. @@ -604,7 +611,7 @@ public class ForkJoinPool extends Abstra * (rarely) necessary when other count updates lag. * * @param dr -- either zero or ONE_RUNNING - * @param dt == either zero or ONE_TOTAL + * @param dt -- either zero or ONE_TOTAL */ private void decrementWorkerCounts(int dr, int dt) { for (;;) { @@ -786,15 +793,13 @@ public class ForkJoinPool extends Abstra (workerCounts & RUNNING_COUNT_MASK) <= 1); long startTime = untimed? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) // recheck after clear - break; + if (eventCount != ec || w.isTerminating()) + break; // recheck after clear if (untimed) LockSupport.park(w); else { LockSupport.parkNanos(w, SHRINK_RATE_NANOS); - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) + if (eventCount != ec || w.isTerminating()) break; if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) tryShutdownUnusedWorker(ec); @@ -806,7 +811,7 @@ public class ForkJoinPool extends Abstra // Maintaining parallelism /** - * Pushes worker onto the spare stack + * Pushes worker onto the spare stack. */ final void pushSpare(ForkJoinWorkerThread w) { int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1); @@ -862,16 +867,23 @@ public class ForkJoinPool extends Abstra UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc + (ONE_RUNNING|ONE_TOTAL))) { ForkJoinWorkerThread w = null; + Throwable fail = null; try { w = factory.newThread(this); - } finally { // adjust on null or exceptional factory return - if (w == null) { - decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); - tryTerminate(false); // handle failure during shutdown - } + } catch (Throwable ex) { + fail = ex; } - if (w == null) + if (w == null) { // null or exceptional factory return + decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); + tryTerminate(false); // handle failure during shutdown + // If originating from an external caller, + // propagate exception, else ignore + if (fail != null && runState < TERMINATING && + !(Thread.currentThread() instanceof + ForkJoinWorkerThread)) + UNSAFE.throwException(fail); break; + } w.start(recordWorker(w), ueh); if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { int c; // advance event count @@ -960,8 +972,12 @@ public class ForkJoinPool extends Abstra boolean active = w.active; boolean inactivate = false; int pc = parallelism; - int rs; - while (w.runState == 0 && (rs = runState) < TERMINATING) { + while (w.runState == 0) { + int rs = runState; + if (rs >= TERMINATING) { // propagate shutdown + w.shutdown(); + break; + } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) inactivate = active = w.active = false; @@ -1089,6 +1105,7 @@ public class ForkJoinPool extends Abstra return true; } + /** * Actions on transition to TERMINATING * @@ -1112,7 +1129,7 @@ public class ForkJoinPool extends Abstra if (passes > 0 && !w.isTerminated()) { w.cancelTasks(); LockSupport.unpark(w); - if (passes > 1) { + if (passes > 1 && !w.isInterrupted()) { try { w.interrupt(); } catch (SecurityException ignore) { @@ -1125,7 +1142,7 @@ public class ForkJoinPool extends Abstra } /** - * Clear out and cancel submissions, ignoring exceptions + * Clears out and cancels submissions, ignoring exceptions. */ private void cancelSubmissions() { ForkJoinTask task; @@ -1140,15 +1157,15 @@ public class ForkJoinPool extends Abstra // misc support for ForkJoinWorkerThread /** - * Returns pool number + * Returns pool number. */ final int getPoolNumber() { return poolNumber; } /** - * Tries to accumulates steal count from a worker, clearing - * the worker's value. + * Tries to accumulate steal count from a worker, clearing + * the worker's value if successful. * * @return true if worker steal count now zero */ @@ -1172,7 +1189,10 @@ public class ForkJoinPool extends Abstra int pc = parallelism; // use parallelism, not rc int ac = runState; // no mask -- artificially boosts during shutdown // Use exact results for small values, saturate past 4 - return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3; + return ((pc <= ac) ? 0 : + (pc >>> 1 <= ac) ? 1 : + (pc >>> 2 <= ac) ? 3 : + pc >>> 3); } // Public and protected methods @@ -1222,13 +1242,13 @@ public class ForkJoinPool extends Abstra * use {@link #defaultForkJoinWorkerThreadFactory}. * @param handler the handler for internal worker threads that * terminate due to unrecoverable errors encountered while executing - * tasks. For default value, use null. + * tasks. For default value, use {@code null}. * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which * worker threads only process event-style asynchronous tasks. - * For default value, use false. + * For default value, use {@code false}. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null @@ -1438,7 +1458,7 @@ public class ForkJoinPool extends Abstra /** * Returns the number of worker threads that have started but not - * yet terminated. This result returned by this method may differ + * yet terminated. The result returned by this method may differ * from {@link #getParallelism} when threads are created to * maintain parallelism when others are cooperatively blocked. * @@ -1686,6 +1706,13 @@ public class ForkJoinPool extends Abstra } /** + * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. + */ + final boolean isAtLeastTerminating() { + return runState >= TERMINATING; + } + + /** * Returns {@code true} if this pool has been shut down. * * @return {@code true} if this pool has been shut down @@ -1839,11 +1866,11 @@ public class ForkJoinPool extends Abstra private static final long eventCountOffset = objectFieldOffset("eventCount", ForkJoinPool.class); private static final long eventWaitersOffset = - objectFieldOffset("eventWaiters",ForkJoinPool.class); + objectFieldOffset("eventWaiters", ForkJoinPool.class); private static final long stealCountOffset = - objectFieldOffset("stealCount",ForkJoinPool.class); + objectFieldOffset("stealCount", ForkJoinPool.class); private static final long spareWaitersOffset = - objectFieldOffset("spareWaiters",ForkJoinPool.class); + objectFieldOffset("spareWaiters", ForkJoinPool.class); private static long objectFieldOffset(String field, Class klazz) { try {