--- jsr166/src/jsr166y/ForkJoinPool.java 2010/04/05 15:52:26 1.53 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/04/18 12:51:18 1.54 @@ -978,6 +978,64 @@ public class ForkJoinPool extends Abstra } /** + * Unless there are not enough other running threads, adjusts + * counts for a a worker in performing helpJoin that cannot find + * any work, so that this worker can now block. + * + * @return true if worker may block + */ + final boolean preBlockHelpingJoin(ForkJoinTask joinMe) { + while (joinMe.status >= 0) { + releaseWaiters(); // help other threads progress + + // if a spare exists, resume it to maintain parallelism level + if ((workerCounts & RUNNING_COUNT_MASK) <= parallelism) { + ForkJoinWorkerThread spare = null; + for (ForkJoinWorkerThread w : workers) { + if (w != null && w.isSuspended()) { + spare = w; + break; + } + } + if (joinMe.status < 0) + break; + if (spare != null) { + if (spare.tryUnsuspend()) { + boolean canBlock = true; + if (joinMe.requestSignal() < 0) { + canBlock = false; // already done + int c; + do {} while (!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); + } + LockSupport.unpark(spare); + return canBlock; + } + continue; // recheck -- another spare may exist + } + } + + int wc = workerCounts; // reread to shorten CAS window + int rc = wc & RUNNING_COUNT_MASK; + if (rc <= 2) // keep this and at most one other thread alive + break; + + if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, + wc, wc - ONE_RUNNING)) { + if (joinMe.requestSignal() >= 0) + return true; + int c; // back out + do {} while (!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); + break; + } + } + return false; + } + + /** * Possibly initiates and/or completes termination. * * @param now if true, unconditionally terminate, else only @@ -1073,15 +1131,6 @@ public class ForkJoinPool extends Abstra return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3; } - /** - * Returns the approximate (non-atomic) difference between running - * and active counts. - */ - final int inactiveCount() { - return (workerCounts & RUNNING_COUNT_MASK) - - (runState & ACTIVE_COUNT_MASK); - } - // Public and protected methods // Constructors @@ -1305,7 +1354,7 @@ public class ForkJoinPool extends Abstra invoke(new InvokeAll(forkJoinTasks)); @SuppressWarnings({"unchecked", "rawtypes"}) - List> futures = (List>) (List) forkJoinTasks; + List> futures = (List>) (List) forkJoinTasks; return futures; } @@ -1442,7 +1491,11 @@ public class ForkJoinPool extends Abstra * pool. The given value should normally be greater than or equal * to the {@link #getParallelism parallelism} level. Setting this * value has no effect on current pool size. It controls - * construction of new threads. + * construction of new threads. The use of this method may cause + * tasks that intrinsically require extra threads for dependent + * computations to indefinitely stall. If you are instead trying + * to minimize internal thread creation, consider setting {link + * #setMaintainsParallelism} as false. * * @throws IllegalArgumentException if negative or greater than * internal implementation limit