--- jsr166/src/jsr166y/ForkJoinPool.java 2010/10/10 11:56:11 1.82 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/10/24 19:37:26 1.83 @@ -705,7 +705,7 @@ public class ForkJoinPool extends Abstra */ final void workerTerminated(ForkJoinWorkerThread w) { forgetWorker(w); - decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); + decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL); while (w.stealCount != 0) // collect final count tryAccumulateStealCount(w); tryTerminate(false); @@ -791,7 +791,7 @@ public class ForkJoinPool extends Abstra if (tryAccumulateStealCount(w)) { // transfer while idle boolean untimed = (w.nextWaiter != 0L || (workerCounts & RUNNING_COUNT_MASK) <= 1); - long startTime = untimed? 0 : System.nanoTime(); + long startTime = untimed ? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt if (eventCount != ec || w.isTerminating()) break; // recheck after clear @@ -1014,11 +1014,16 @@ public class ForkJoinPool extends Abstra * * @param joinMe the task to join * @param worker the current worker thread + * @param timed true if wait should time out + * @param nanos timeout value if timed */ - final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker) { + final void awaitJoin(ForkJoinTask joinMe, ForkJoinWorkerThread worker, + boolean timed, long nanos) { + long startTime = timed? System.nanoTime() : 0L; int retries = 2 + (parallelism >> 2); // #helpJoins before blocking while (joinMe.status >= 0) { int wc; + long nt = 0L; if (runState >= TERMINATING) { joinMe.cancelIgnoringExceptions(); break; @@ -1028,6 +1033,9 @@ public class ForkJoinPool extends Abstra break; else if (retries > 0) --retries; + else if (timed && + (nt = nanos - (System.nanoTime() - startTime)) <= 0L) + break; else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) { @@ -1036,11 +1044,27 @@ public class ForkJoinPool extends Abstra (h = eventWaiters) != 0L && // help release others (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); - if (stat >= 0 && - ((workerCounts & RUNNING_COUNT_MASK) == 0 || - (stat = - joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) - helpMaintainParallelism(); // timeout or no running workers + if (stat >= 0) { + if ((workerCounts & RUNNING_COUNT_MASK) != 0) { + long ms; int ns; + if (!timed) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else { // at most JOIN_TIMEOUT_MILLIS per wait + ms = nt / 1000000; + if (ms > JOIN_TIMEOUT_MILLIS) { + ms = JOIN_TIMEOUT_MILLIS; + ns = 0; + } + else + ns = (int) (nt % 1000000); + } + stat = joinMe.internalAwaitDone(ms, ns); + } + if (stat >= 0) // timeout or no running workers + helpMaintainParallelism(); + } do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING));