--- jsr166/src/jsr166y/ForkJoinPool.java 2012/12/30 02:05:53 1.167 +++ jsr166/src/jsr166y/ForkJoinPool.java 2013/01/01 15:10:39 1.168 @@ -440,7 +440,7 @@ public class ForkJoinPool extends Abstra * Common Pool * =========== * - * The static commonPool always exists after static + * The static common Pool always exists after static * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields, with no nested @@ -1074,12 +1074,12 @@ public class ForkJoinPool extends Abstra * to paranoically avoid potential initialization circularities * as well as to simplify generated code. */ - static final ForkJoinPool commonPool; + static final ForkJoinPool common; /** - * Common pool parallelism. Must equal commonPool.parallelism. + * Common pool parallelism. Must equal common.parallelism. */ - static final int commonPoolParallelism; + static final int commonParallelism; /** * Sequence number for creating workerNamePrefix. @@ -2241,7 +2241,7 @@ public class ForkJoinPool extends Abstra */ private boolean tryTerminate(boolean now, boolean enable) { int ps; - if (this == commonPool) // cannot shut down + if (this == common) // cannot shut down return false; if ((ps = plock) >= 0) { // enable by setting plock if (!enable) @@ -2332,7 +2332,7 @@ public class ForkJoinPool extends Abstra static WorkQueue commonSubmitterQueue() { ForkJoinPool p; WorkQueue[] ws; int m; Submitter z; return ((z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0) ? ws[m & z.seed & SQMASK] : null; @@ -2346,7 +2346,7 @@ public class ForkJoinPool extends Abstra ForkJoinTask[] a; int m, s; if (t != null && (z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0 && (q = ws[m & z.seed & SQMASK]) != null && @@ -2423,7 +2423,7 @@ public class ForkJoinPool extends Abstra ForkJoinTask[] a; int m, s, n; if (t != null && (z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0 && (q = ws[m & z.seed & SQMASK]) != null && @@ -2452,21 +2452,6 @@ public class ForkJoinPool extends Abstra } } - /** - * Restricted version of helpQuiescePool for external callers - */ - static void externalHelpQuiescePool() { - ForkJoinPool p; ForkJoinTask t; WorkQueue q; int b; - if ((p = commonPool) != null && - (q = p.findNonEmptyStealQueue(1)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) { - if (q.base - q.top < 0) - p.signalWork(q); - t.doExec(); - } - } - // Exported methods // Constructors @@ -2566,14 +2551,19 @@ public class ForkJoinPool extends Abstra /** * Returns the common pool instance. This pool is statically - * constructed; its run state is unaffected by attempts to - * {@link #shutdown} or {@link #shutdownNow}. + * constructed; its run state is unaffected by attempts to {@link + * #shutdown} or {@link #shutdownNow}. However this pool and any + * ongoing processing are automatically terminated upon program + * {@link System#exit}. Any program that relies on asynchronous + * task processing to complete before program termination should + * invoke {@link #quiesceCommonPool}, or the timeout-based {@code + * commonPool().}{@link #awaitQuiescence}, before exit. * * @return the common pool instance */ public static ForkJoinPool commonPool() { - // assert commonPool != null : "static init error"; - return commonPool; + // assert common != null : "static init error"; + return common; } // Execution methods @@ -2754,7 +2744,7 @@ public class ForkJoinPool extends Abstra * @return the targeted parallelism level of the common pool */ public static int getCommonPoolParallelism() { - return commonPoolParallelism; + return commonParallelism; } /** @@ -3093,9 +3083,10 @@ public class ForkJoinPool extends Abstra /** * Blocks until all tasks have completed execution after a * shutdown request, or the timeout occurs, or the current thread - * is interrupted, whichever happens first. Note that the {@link - * #commonPool()} never terminates until program shutdown so - * this method will always time out. + * is interrupted, whichever happens first. Because the {@link + * #commonPool()} never terminates until program shutdown, when + * applied to the common pool, this method is equivalent to {@link + * #awaitQuiescence} but always returns {@code false}. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument @@ -3105,6 +3096,12 @@ public class ForkJoinPool extends Abstra */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (this == common) { + awaitQuiescence(timeout, unit); + return false; + } long nanos = unit.toNanos(timeout); if (isTerminated()) return true; @@ -3124,6 +3121,62 @@ public class ForkJoinPool extends Abstra } /** + * If called by a ForkJoinTask operating in this pool, equivalent + * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, + * waits and/or attempts to assist performing tasks until this + * pool {@link #isQuiescent} or the indicated timeout elapses. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return {@code true} if quiescent; {@code false} if the + * timeout elapsed. + */ + public boolean awaitQuiescence(long timeout, TimeUnit unit) { + long nanos = unit.toNanos(timeout); + ForkJoinWorkerThread wt; + Thread thread = Thread.currentThread(); + if ((thread instanceof ForkJoinWorkerThread) && + (wt = (ForkJoinWorkerThread)thread).pool == this) { + helpQuiescePool(wt.workQueue); + return true; + } + long startTime = System.nanoTime(); + WorkQueue[] ws; + int r = 0, m; + boolean found = true; + while (!isQuiescent() && (ws = workQueues) != null && + (m = ws.length - 1) >= 0) { + if (!found) { + if ((System.nanoTime() - startTime) > nanos) + return false; + Thread.yield(); // cannot block + } + found = false; + for (int j = (m + 1) << 2; j >= 0; --j) { + ForkJoinTask t; WorkQueue q; int b; + if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { + found = true; + if ((t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); + t.doExec(); + } + break; + } + } + } + return true; + } + + /** + * Waits and/or attempts to assist performing tasks indefinitely + * until the {@link #commonPool()} {@link #isQuiescent} + */ + public static void quiesceCommonPool() { + common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + + /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s. * @@ -3339,14 +3392,13 @@ public class ForkJoinPool extends Abstra par = Runtime.getRuntime().availableProcessors(); if (par > MAX_CAP) par = MAX_CAP; - commonPoolParallelism = par; + commonParallelism = par; long np = (long)(-par); // precompute initial ctl value long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - commonPool = new ForkJoinPool(par, ct, fac, handler); + common = new ForkJoinPool(par, ct, fac, handler); } - /** * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. * Replace with a simple call to Unsafe.getUnsafe when integrating