--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/04 11:33:53 1.70 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/10/24 19:37:26 1.83 @@ -6,16 +6,22 @@ package jsr166y; -import java.util.concurrent.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.CountDownLatch; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. @@ -300,7 +306,7 @@ public class ForkJoinPool extends Abstra * about the same time as another is needlessly being created. We * counteract this and related slop in part by requiring resumed * spares to immediately recheck (in preStep) to see whether they - * they should re-suspend. + * should re-suspend. * * 6. Killing off unneeded workers. A timeout mechanism is used to * shed unused workers: The oldest (first) event queue waiter uses @@ -429,10 +435,11 @@ public class ForkJoinPool extends Abstra /** * The wakeup interval (in nanoseconds) for the oldest worker - * worker waiting for an event invokes tryShutdownUnusedWorker to shrink - * the number of workers. The exact value does not matter too - * much, but should be long enough to slowly release resources - * during long periods without use without disrupting normal use. + * waiting for an event to invoke tryShutdownUnusedWorker to + * shrink the number of workers. The exact value does not matter + * too much. It must be short enough to release resources during + * sustained periods of idleness, but not so short that threads + * are continually re-created. */ private static final long SHRINK_RATE_NANOS = 30L * 1000L * 1000L * 1000L; // 2 per minute @@ -515,7 +522,7 @@ public class ForkJoinPool extends Abstra * Lifecycle control. The low word contains the number of workers * that are (probably) executing tasks. This value is atomically * incremented before a worker gets a task to run, and decremented - * when worker has no tasks and cannot find any. Bits 16-18 + * when a worker has no tasks and cannot find any. Bits 16-18 * contain runLevel value. When all are zero, the pool is * running. Level transitions are monotonic (running -> shutdown * -> terminating -> terminated) so each transition adds a bit. @@ -604,7 +611,7 @@ public class ForkJoinPool extends Abstra * (rarely) necessary when other count updates lag. * * @param dr -- either zero or ONE_RUNNING - * @param dt == either zero or ONE_TOTAL + * @param dt -- either zero or ONE_TOTAL */ private void decrementWorkerCounts(int dr, int dt) { for (;;) { @@ -673,7 +680,7 @@ public class ForkJoinPool extends Abstra } /** - * Nulls out record of worker in workers array + * Nulls out record of worker in workers array. */ private void forgetWorker(ForkJoinWorkerThread w) { int idx = w.poolIndex; @@ -698,7 +705,7 @@ public class ForkJoinPool extends Abstra */ final void workerTerminated(ForkJoinWorkerThread w) { forgetWorker(w); - decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); + decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL); while (w.stealCount != 0) // collect final count tryAccumulateStealCount(w); tryTerminate(false); @@ -784,17 +791,15 @@ public class ForkJoinPool extends Abstra if (tryAccumulateStealCount(w)) { // transfer while idle boolean untimed = (w.nextWaiter != 0L || (workerCounts & RUNNING_COUNT_MASK) <= 1); - long startTime = untimed? 0 : System.nanoTime(); + long startTime = untimed ? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) // recheck after clear - break; + if (eventCount != ec || w.isTerminating()) + break; // recheck after clear if (untimed) LockSupport.park(w); else { LockSupport.parkNanos(w, SHRINK_RATE_NANOS); - if (eventCount != ec || w.runState != 0 || - runState >= TERMINATING) + if (eventCount != ec || w.isTerminating()) break; if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) tryShutdownUnusedWorker(ec); @@ -806,7 +811,7 @@ public class ForkJoinPool extends Abstra // Maintaining parallelism /** - * Pushes worker onto the spare stack + * Pushes worker onto the spare stack. */ final void pushSpare(ForkJoinWorkerThread w) { int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1); @@ -862,16 +867,23 @@ public class ForkJoinPool extends Abstra UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc + (ONE_RUNNING|ONE_TOTAL))) { ForkJoinWorkerThread w = null; + Throwable fail = null; try { w = factory.newThread(this); - } finally { // adjust on null or exceptional factory return - if (w == null) { - decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); - tryTerminate(false); // handle failure during shutdown - } + } catch (Throwable ex) { + fail = ex; } - if (w == null) + if (w == null) { // null or exceptional factory return + decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); + tryTerminate(false); // handle failure during shutdown + // If originating from an external caller, + // propagate exception, else ignore + if (fail != null && runState < TERMINATING && + !(Thread.currentThread() instanceof + ForkJoinWorkerThread)) + UNSAFE.throwException(fail); break; + } w.start(recordWorker(w), ueh); if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { int c; // advance event count @@ -960,8 +972,12 @@ public class ForkJoinPool extends Abstra boolean active = w.active; boolean inactivate = false; int pc = parallelism; - int rs; - while (w.runState == 0 && (rs = runState) < TERMINATING) { + while (w.runState == 0) { + int rs = runState; + if (rs >= TERMINATING) { // propagate shutdown + w.shutdown(); + break; + } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) inactivate = active = w.active = false; @@ -998,16 +1014,28 @@ public class ForkJoinPool extends Abstra * * @param joinMe the task to join * @param worker the current worker thread + * @param timed true if wait should time out + * @param nanos timeout value if timed */ - final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker) { + final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker, + boolean timed, long nanos) { + long startTime = timed? System.nanoTime() : 0L; int retries = 2 + (parallelism >> 2); // #helpJoins before blocking while (joinMe.status >= 0) { int wc; + long nt = 0L; + if (runState >= TERMINATING) { + joinMe.cancelIgnoringExceptions(); + break; + } worker.helpJoinTask(joinMe); if (joinMe.status < 0) break; else if (retries > 0) --retries; + else if (timed && + (nt = nanos - (System.nanoTime() - startTime)) <= 0L) + break; else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) { @@ -1016,11 +1044,27 @@ public class ForkJoinPool extends Abstra (h = eventWaiters) != 0L && // help release others (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); - if (stat >= 0 && - ((workerCounts & RUNNING_COUNT_MASK) == 0 || - (stat = - joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) - helpMaintainParallelism(); // timeout or no running workers + if (stat >= 0) { + if ((workerCounts & RUNNING_COUNT_MASK) != 0) { + long ms; int ns; + if (!timed) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else { // at most JOIN_TIMEOUT_MILLIS per wait + ms = nt / 1000000; + if (ms > JOIN_TIMEOUT_MILLIS) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else + ns = (int) (nt % 1000000); + } + stat = joinMe.internalAwaitDone(ms, ns); + } + if (stat >= 0) // timeout or no running workers + helpMaintainParallelism(); + } do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); @@ -1089,6 +1133,7 @@ public class ForkJoinPool extends Abstra return true; } + /** * Actions on transition to TERMINATING * @@ -1106,16 +1151,13 @@ public class ForkJoinPool extends Abstra c = eventCount, c+1); eventWaiters = 0L; // clobber lists spareWaiters = 0; - ForkJoinWorkerThread[] ws = workers; - int n = ws.length; - for (int i = 0; i < n; ++i) { - ForkJoinWorkerThread w = ws[i]; + for (ForkJoinWorkerThread w : workers) { if (w != null) { w.shutdown(); if (passes > 0 && !w.isTerminated()) { w.cancelTasks(); LockSupport.unpark(w); - if (passes > 1) { + if (passes > 1 && !w.isInterrupted()) { try { w.interrupt(); } catch (SecurityException ignore) { @@ -1128,7 +1170,7 @@ public class ForkJoinPool extends Abstra } /** - * Clear out and cancel submissions, ignoring exceptions + * Clears out and cancels submissions, ignoring exceptions. */ private void cancelSubmissions() { ForkJoinTask task; @@ -1143,15 +1185,15 @@ public class ForkJoinPool extends Abstra // misc support for ForkJoinWorkerThread /** - * Returns pool number + * Returns pool number. */ final int getPoolNumber() { return poolNumber; } /** - * Tries to accumulates steal count from a worker, clearing - * the worker's value. + * Tries to accumulate steal count from a worker, clearing + * the worker's value if successful. * * @return true if worker steal count now zero */ @@ -1175,7 +1217,10 @@ public class ForkJoinPool extends Abstra int pc = parallelism; // use parallelism, not rc int ac = runState; // no mask -- artificially boosts during shutdown // Use exact results for small values, saturate past 4 - return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3; + return ((pc <= ac) ? 0 : + (pc >>> 1 <= ac) ? 1 : + (pc >>> 2 <= ac) ? 3 : + pc >>> 3); } // Public and protected methods @@ -1225,13 +1270,13 @@ public class ForkJoinPool extends Abstra * use {@link #defaultForkJoinWorkerThreadFactory}. * @param handler the handler for internal worker threads that * terminate due to unrecoverable errors encountered while executing - * tasks. For default value, use null. + * tasks. For default value, use {@code null}. * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which * worker threads only process event-style asynchronous tasks. - * For default value, use false. + * For default value, use {@code false}. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null @@ -1279,17 +1324,13 @@ public class ForkJoinPool extends Abstra // Execution methods /** - * Common code for execute, invoke and submit + * Submits task and creates, starts, or resumes some workers if necessary */ private void doSubmit(ForkJoinTask task) { - if (task == null) - throw new NullPointerException(); - if (runState >= SHUTDOWN) - throw new RejectedExecutionException(); submissionQueue.offer(task); int c; // try to increment event count -- CAS failure OK UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); - helpMaintainParallelism(); // create, start, or resume some workers + helpMaintainParallelism(); } /** @@ -1302,8 +1343,33 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public T invoke(ForkJoinTask task) { - doSubmit(task); - return task.join(); + if (task == null) + throw new NullPointerException(); + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + return task.invoke(); // bypass submit if in same pool + else { + doSubmit(task); + return task.join(); + } + } + + /** + * Unless terminating, forks task if within an ongoing FJ + * computation in the current pool, else submits as external task. + */ + private void forkOrSubmit(ForkJoinTask task) { + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + task.fork(); + else + doSubmit(task); } /** @@ -1315,7 +1381,9 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public void execute(ForkJoinTask task) { - doSubmit(task); + if (task == null) + throw new NullPointerException(); + forkOrSubmit(task); } // AbstractExecutorService methods @@ -1326,12 +1394,14 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public void execute(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job; if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else job = ForkJoinTask.adapt(task, null); - doSubmit(job); + forkOrSubmit(job); } /** @@ -1344,7 +1414,9 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { - doSubmit(task); + if (task == null) + throw new NullPointerException(); + forkOrSubmit(task); return task; } @@ -1354,8 +1426,10 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Callable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job = ForkJoinTask.adapt(task); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1365,8 +1439,10 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job = ForkJoinTask.adapt(task, result); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1376,12 +1452,14 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job; if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else job = ForkJoinTask.adapt(task, null); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1441,7 +1519,7 @@ public class ForkJoinPool extends Abstra /** * Returns the number of worker threads that have started but not - * yet terminated. This result returned by this method may differ + * yet terminated. The result returned by this method may differ * from {@link #getParallelism} when threads are created to * maintain parallelism when others are cooperatively blocked. * @@ -1526,13 +1604,9 @@ public class ForkJoinPool extends Abstra */ public long getQueuedTaskCount() { long count = 0; - ForkJoinWorkerThread[] ws = workers; - int n = ws.length; - for (int i = 0; i < n; ++i) { - ForkJoinWorkerThread w = ws[i]; + for (ForkJoinWorkerThread w : workers) if (w != null) count += w.getQueueSize(); - } return count; } @@ -1587,13 +1661,9 @@ public class ForkJoinPool extends Abstra */ protected int drainTasksTo(Collection> c) { int count = submissionQueue.drainTo(c); - ForkJoinWorkerThread[] ws = workers; - int n = ws.length; - for (int i = 0; i < n; ++i) { - ForkJoinWorkerThread w = ws[i]; + for (ForkJoinWorkerThread w : workers) if (w != null) count += w.drainTasksTo(c); - } return count; } @@ -1697,6 +1767,13 @@ public class ForkJoinPool extends Abstra } /** + * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. + */ + final boolean isAtLeastTerminating() { + return runState >= TERMINATING; + } + + /** * Returns {@code true} if this pool has been shut down. * * @return {@code true} if this pool has been shut down @@ -1850,11 +1927,11 @@ public class ForkJoinPool extends Abstra private static final long eventCountOffset = objectFieldOffset("eventCount", ForkJoinPool.class); private static final long eventWaitersOffset = - objectFieldOffset("eventWaiters",ForkJoinPool.class); + objectFieldOffset("eventWaiters", ForkJoinPool.class); private static final long stealCountOffset = - objectFieldOffset("stealCount",ForkJoinPool.class); + objectFieldOffset("stealCount", ForkJoinPool.class); private static final long spareWaitersOffset = - objectFieldOffset("spareWaiters",ForkJoinPool.class); + objectFieldOffset("spareWaiters", ForkJoinPool.class); private static long objectFieldOffset(String field, Class klazz) { try {