--- jsr166/src/jsr166e/ForkJoinPool.java 2012/12/20 17:14:32 1.37 +++ jsr166/src/jsr166e/ForkJoinPool.java 2013/01/28 19:01:38 1.51 @@ -440,7 +440,7 @@ public class ForkJoinPool extends Abstra * Common Pool * =========== * - * The static commonPool always exists after static + * The static common Pool always exists after static * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields, with no nested @@ -708,7 +708,7 @@ public class ForkJoinPool extends Abstra * shared-queue version is embedded in method externalPush.) * * @param task the task. Caller must ensure non-null. - * @throw RejectedExecutionException if array cannot be resized + * @throws RejectedExecutionException if array cannot be resized */ final void push(ForkJoinTask task) { ForkJoinTask[] a; ForkJoinPool p; @@ -1026,7 +1026,6 @@ public class ForkJoinPool extends Abstra private static final int ABASE; private static final int ASHIFT; static { - int s; try { U = getUnsafe(); Class k = WorkQueue.class; @@ -1034,13 +1033,13 @@ public class ForkJoinPool extends Abstra QLOCK = U.objectFieldOffset (k.getDeclaredField("qlock")); ABASE = U.arrayBaseOffset(ak); - s = U.arrayIndexScale(ak); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } @@ -1074,12 +1073,12 @@ public class ForkJoinPool extends Abstra * to paranoically avoid potential initialization circularities * as well as to simplify generated code. */ - static final ForkJoinPool commonPool; + static final ForkJoinPool common; /** - * Common pool parallelism. Must equal commonPool.parallelism. + * Common pool parallelism. Must equal common.parallelism. */ - static final int commonPoolParallelism; + static final int commonParallelism; /** * Sequence number for creating workerNamePrefix. @@ -1087,8 +1086,8 @@ public class ForkJoinPool extends Abstra private static int poolNumberSequence; /** - * Return the next sequence number. We don't expect this to - * ever contend so use simple builtin sync. + * Returns the next sequence number. We don't expect this to + * ever contend, so use simple builtin sync. */ private static final synchronized int nextPoolId() { return ++poolNumberSequence; @@ -1245,10 +1244,10 @@ public class ForkJoinPool extends Abstra volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; volatile Object pad18, pad19, pad1a, pad1b; - /* + /** * Acquires the plock lock to protect worker array and related * updates. This method is called only if an initial CAS on plock - * fails. This acts as a spinLock for normal cases, but falls back + * fails. This acts as a spinlock for normal cases, but falls back * to builtin monitor to block when (rarely) needed. This would be * a terrible idea for a highly contended lock, but works fine as * a more conservative alternative to a pure spinlock. @@ -1661,7 +1660,7 @@ public class ForkJoinPool extends Abstra * park awaiting signal, else lingering to help scan and signal. * * * If a non-empty queue discovered or left as a hint, - * help wake up other workers before return + * help wake up other workers before return. * * @param w the worker (via its WorkQueue) * @return a task or null if none found @@ -1932,7 +1931,6 @@ public class ForkJoinPool extends Abstra * @param task the task to join * @param mode if shared, exit upon completing any task * if all workers are active - * */ private int helpComplete(ForkJoinTask task, int mode) { WorkQueue[] ws; WorkQueue q; int m, n, s, u; @@ -2241,7 +2239,7 @@ public class ForkJoinPool extends Abstra */ private boolean tryTerminate(boolean now, boolean enable) { int ps; - if (this == commonPool) // cannot shut down + if (this == common) // cannot shut down return false; if ((ps = plock) >= 0) { // enable by setting plock if (!enable) @@ -2332,7 +2330,7 @@ public class ForkJoinPool extends Abstra static WorkQueue commonSubmitterQueue() { ForkJoinPool p; WorkQueue[] ws; int m; Submitter z; return ((z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0) ? ws[m & z.seed & SQMASK] : null; @@ -2346,7 +2344,7 @@ public class ForkJoinPool extends Abstra ForkJoinTask[] a; int m, s; if (t != null && (z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0 && (q = ws[m & z.seed & SQMASK]) != null && @@ -2423,7 +2421,7 @@ public class ForkJoinPool extends Abstra ForkJoinTask[] a; int m, s, n; if (t != null && (z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0 && (q = ws[m & z.seed & SQMASK]) != null && @@ -2452,21 +2450,6 @@ public class ForkJoinPool extends Abstra } } - /** - * Restricted version of helpQuiescePool for external callers - */ - static void externalHelpQuiescePool() { - ForkJoinPool p; ForkJoinTask t; WorkQueue q; int b; - if ((p = commonPool) != null && - (q = p.findNonEmptyStealQueue(1)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) { - if (q.base - q.top < 0) - p.signalWork(q); - t.doExec(); - } - } - // Exported methods // Constructors @@ -2566,14 +2549,20 @@ public class ForkJoinPool extends Abstra /** * Returns the common pool instance. This pool is statically - * constructed; its run state is unaffected by attempts to - * {@link #shutdown} or {@link #shutdownNow}. + * constructed; its run state is unaffected by attempts to {@link + * #shutdown} or {@link #shutdownNow}. However this pool and any + * ongoing processing are automatically terminated upon program + * {@link System#exit}. Any program that relies on asynchronous + * task processing to complete before program termination should + * invoke {@code commonPool().}{@link #awaitQuiescence}, before + * exit. * * @return the common pool instance + * @since 1.8 */ public static ForkJoinPool commonPool() { - // assert commonPool != null : "static init error"; - return commonPool; + // assert common != null : "static init error"; + return common; } // Execution methods @@ -2696,27 +2685,23 @@ public class ForkJoinPool extends Abstra // In previous versions of this class, this method constructed // a task to run ForkJoinTask.invokeAll, but now external // invocation of multiple tasks is at least as efficient. - List> fs = new ArrayList>(tasks.size()); - // Workaround needed because method wasn't declared with - // wildcards in return type but should have been. - @SuppressWarnings({"unchecked", "rawtypes"}) - List> futures = (List>) (List) fs; + ArrayList> futures = new ArrayList>(tasks.size()); boolean done = false; try { for (Callable t : tasks) { ForkJoinTask f = new ForkJoinTask.AdaptedCallable(t); + futures.add(f); externalPush(f); - fs.add(f); } - for (ForkJoinTask f : fs) - f.quietlyJoin(); + for (int i = 0, size = futures.size(); i < size; i++) + ((ForkJoinTask)futures.get(i)).quietlyJoin(); done = true; return futures; } finally { if (!done) - for (ForkJoinTask f : fs) - f.cancel(false); + for (int i = 0, size = futures.size(); i < size; i++) + futures.get(i).cancel(false); } } @@ -2752,9 +2737,10 @@ public class ForkJoinPool extends Abstra * Returns the targeted parallelism level of the common pool. * * @return the targeted parallelism level of the common pool + * @since 1.8 */ public static int getCommonPoolParallelism() { - return commonPoolParallelism; + return commonParallelism; } /** @@ -3012,7 +2998,7 @@ public class ForkJoinPool extends Abstra * Possibly initiates an orderly shutdown in which previously * submitted tasks are executed, but no new tasks will be * accepted. Invocation has no effect on execution state if this - * is the {@link #commonPool}, and no additional effect if + * is the {@link #commonPool()}, and no additional effect if * already shut down. Tasks that are in the process of being * submitted concurrently during the course of this method may or * may not be rejected. @@ -3030,7 +3016,7 @@ public class ForkJoinPool extends Abstra /** * Possibly attempts to cancel and/or stop all tasks, and reject * all subsequently submitted tasks. Invocation has no effect on - * execution state if this is the {@link #commonPool}, and no + * execution state if this is the {@link #commonPool()}, and no * additional effect if already shut down. Otherwise, tasks that * are in the process of being submitted or executed concurrently * during the course of this method may or may not be @@ -3093,9 +3079,10 @@ public class ForkJoinPool extends Abstra /** * Blocks until all tasks have completed execution after a * shutdown request, or the timeout occurs, or the current thread - * is interrupted, whichever happens first. Note that the {@link - * #commonPool()} never terminates until program shutdown so - * this method will always time out. + * is interrupted, whichever happens first. Because the {@link + * #commonPool()} never terminates until program shutdown, when + * applied to the common pool, this method is equivalent to {@link + * #awaitQuiescence} but always returns {@code false}. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument @@ -3105,6 +3092,12 @@ public class ForkJoinPool extends Abstra */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (this == common) { + awaitQuiescence(timeout, unit); + return false; + } long nanos = unit.toNanos(timeout); if (isTerminated()) return true; @@ -3124,6 +3117,62 @@ public class ForkJoinPool extends Abstra } /** + * If called by a ForkJoinTask operating in this pool, equivalent + * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, + * waits and/or attempts to assist performing tasks until this + * pool {@link #isQuiescent} or the indicated timeout elapses. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return {@code true} if quiescent; {@code false} if the + * timeout elapsed. + */ + public boolean awaitQuiescence(long timeout, TimeUnit unit) { + long nanos = unit.toNanos(timeout); + ForkJoinWorkerThread wt; + Thread thread = Thread.currentThread(); + if ((thread instanceof ForkJoinWorkerThread) && + (wt = (ForkJoinWorkerThread)thread).pool == this) { + helpQuiescePool(wt.workQueue); + return true; + } + long startTime = System.nanoTime(); + WorkQueue[] ws; + int r = 0, m; + boolean found = true; + while (!isQuiescent() && (ws = workQueues) != null && + (m = ws.length - 1) >= 0) { + if (!found) { + if ((System.nanoTime() - startTime) > nanos) + return false; + Thread.yield(); // cannot block + } + found = false; + for (int j = (m + 1) << 2; j >= 0; --j) { + ForkJoinTask t; WorkQueue q; int b; + if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { + found = true; + if ((t = q.pollAt(b)) != null) { + if (q.base - q.top < 0) + signalWork(q); + t.doExec(); + } + break; + } + } + } + return true; + } + + /** + * Waits and/or attempts to assist performing tasks indefinitely + * until the {@link #commonPool()} {@link #isQuiescent} + */ + static void quiesceCommonPool() { + common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + + /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s. * @@ -3276,7 +3325,7 @@ public class ForkJoinPool extends Abstra private static final long QLOCK; static { - int s; // initialize field offsets for CAS etc + // initialize field offsets for CAS etc try { U = getUnsafe(); Class k = ForkJoinPool.class; @@ -3296,13 +3345,13 @@ public class ForkJoinPool extends Abstra (wk.getDeclaredField("qlock")); Class ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); - s = U.arrayIndexScale(ak); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); submitters = new ThreadLocal(); ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory = @@ -3339,14 +3388,13 @@ public class ForkJoinPool extends Abstra par = Runtime.getRuntime().availableProcessors(); if (par > MAX_CAP) par = MAX_CAP; - commonPoolParallelism = par; + commonParallelism = par; long np = (long)(-par); // precompute initial ctl value long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - commonPool = new ForkJoinPool(par, ct, fac, handler); + common = new ForkJoinPool(par, ct, fac, handler); } - /** * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. * Replace with a simple call to Unsafe.getUnsafe when integrating @@ -3357,22 +3405,23 @@ public class ForkJoinPool extends Abstra private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } - }