--- jsr166/src/jsr166y/ForkJoinPool.java 2010/09/20 20:42:36 1.81 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/11/13 13:11:51 1.84 @@ -705,7 +705,7 @@ public class ForkJoinPool extends Abstra */ final void workerTerminated(ForkJoinWorkerThread w) { forgetWorker(w); - decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); + decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL); while (w.stealCount != 0) // collect final count tryAccumulateStealCount(w); tryTerminate(false); @@ -791,7 +791,7 @@ public class ForkJoinPool extends Abstra if (tryAccumulateStealCount(w)) { // transfer while idle boolean untimed = (w.nextWaiter != 0L || (workerCounts & RUNNING_COUNT_MASK) <= 1); - long startTime = untimed? 0 : System.nanoTime(); + long startTime = untimed ? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt if (eventCount != ec || w.isTerminating()) break; // recheck after clear @@ -1014,16 +1014,28 @@ public class ForkJoinPool extends Abstra * * @param joinMe the task to join * @param worker the current worker thread + * @param timed true if wait should time out + * @param nanos timeout value if timed */ - final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker) { + final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker, + boolean timed, long nanos) { + long startTime = timed? System.nanoTime() : 0L; int retries = 2 + (parallelism >> 2); // #helpJoins before blocking while (joinMe.status >= 0) { int wc; + long nt = 0L; + if (runState >= TERMINATING) { + joinMe.cancelIgnoringExceptions(); + break; + } worker.helpJoinTask(joinMe); if (joinMe.status < 0) break; else if (retries > 0) --retries; + else if (timed && + (nt = nanos - (System.nanoTime() - startTime)) <= 0L) + break; else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) { @@ -1032,11 +1044,27 @@ public class ForkJoinPool extends Abstra (h = eventWaiters) != 0L && // help release others (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); - if (stat >= 0 && - ((workerCounts & RUNNING_COUNT_MASK) == 0 || - (stat = - joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) - helpMaintainParallelism(); // timeout or no running workers + if (stat >= 0) { + if ((workerCounts & RUNNING_COUNT_MASK) != 0) { + long ms; int ns; + if (!timed) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else { // at most JOIN_TIMEOUT_MILLIS per wait + ms = nt / 1000000; + if (ms > JOIN_TIMEOUT_MILLIS) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else + ns = (int) (nt % 1000000); + } + stat = joinMe.internalAwaitDone(ms, ns); + } + if (stat >= 0) // timeout or no running workers + helpMaintainParallelism(); + } do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); @@ -1100,7 +1128,7 @@ public class ForkJoinPool extends Abstra // Finish now if all threads terminated; else in some subsequent call if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { advanceRunLevel(TERMINATED); - termination.arrive(); + termination.forceTermination(); } return true; } @@ -1296,17 +1324,13 @@ public class ForkJoinPool extends Abstra // Execution methods /** - * Common code for execute, invoke and submit + * Submits task and creates, starts, or resumes some workers if necessary */ private void doSubmit(ForkJoinTask task) { - if (task == null) - throw new NullPointerException(); - if (runState >= SHUTDOWN) - throw new RejectedExecutionException(); submissionQueue.offer(task); int c; // try to increment event count -- CAS failure OK UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); - helpMaintainParallelism(); // create, start, or resume some workers + helpMaintainParallelism(); } /** @@ -1319,8 +1343,33 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public T invoke(ForkJoinTask task) { - doSubmit(task); - return task.join(); + if (task == null) + throw new NullPointerException(); + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + return task.invoke(); // bypass submit if in same pool + else { + doSubmit(task); + return task.join(); + } + } + + /** + * Unless terminating, forks task if within an ongoing FJ + * computation in the current pool, else submits as external task. + */ + private void forkOrSubmit(ForkJoinTask task) { + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + task.fork(); + else + doSubmit(task); } /** @@ -1332,7 +1381,9 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public void execute(ForkJoinTask task) { - doSubmit(task); + if (task == null) + throw new NullPointerException(); + forkOrSubmit(task); } // AbstractExecutorService methods @@ -1343,12 +1394,14 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public void execute(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job; if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else job = ForkJoinTask.adapt(task, null); - doSubmit(job); + forkOrSubmit(job); } /** @@ -1361,7 +1414,9 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { - doSubmit(task); + if (task == null) + throw new NullPointerException(); + forkOrSubmit(task); return task; } @@ -1371,8 +1426,10 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Callable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job = ForkJoinTask.adapt(task); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1382,8 +1439,10 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job = ForkJoinTask.adapt(task, result); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1393,12 +1452,14 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask job; if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else job = ForkJoinTask.adapt(task, null); - doSubmit(job); + forkOrSubmit(job); return job; } @@ -1735,10 +1796,11 @@ public class ForkJoinPool extends Abstra public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { try { - return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; + termination.awaitAdvanceInterruptibly(0, timeout, unit); } catch (TimeoutException ex) { return false; } + return true; } /**