--- jsr166/src/jsr166e/ForkJoinTask.java 2012/10/28 22:35:45 1.2 +++ jsr166/src/jsr166e/ForkJoinTask.java 2013/07/22 16:52:31 1.15 @@ -33,7 +33,7 @@ import java.lang.reflect.Constructor; *

A "main" {@code ForkJoinTask} begins execution when it is * explicitly submitted to a {@link ForkJoinPool}, or, if not already * engaged in a ForkJoin computation, commenced in the {@link - * ForkJoinPool#commonPool} via {@link #fork}, {@link #invoke}, or + * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or * related methods. Once started, it will usually in turn start other * subtasks. As indicated by the name of this class, many programs * using {@code ForkJoinTask} employ only methods {@link #fork} and @@ -55,7 +55,7 @@ import java.lang.reflect.Constructor; * minimize other blocking synchronization apart from joining other * tasks or using synchronizers such as Phasers that are advertised to * cooperate with fork/join scheduling. Subdividable tasks should also - * not perform blocking IO, and should ideally access variables that + * not perform blocking I/O, and should ideally access variables that * are completely independent of those accessed by other running * tasks. These guidelines are loosely enforced by not permitting * checked exceptions such as {@code IOExceptions} to be @@ -73,7 +73,7 @@ import java.lang.reflect.Constructor; *

It is possible to define and use ForkJoinTasks that may block, * but doing do requires three further considerations: (1) Completion * of few if any other tasks should be dependent on a task - * that blocks on external synchronization or IO. Event-style async + * that blocks on external synchronization or I/O. Event-style async * tasks that are never joined (for example, those subclassing {@link * CountedCompleter}) often fall into this category. (2) To minimize * resource impact, tasks should be small; ideally performing only the @@ -134,9 +134,9 @@ import java.lang.reflect.Constructor; * (DAG). Otherwise, executions may encounter a form of deadlock as * tasks cyclically wait for each other. However, this framework * supports other methods and techniques (for example the use of - * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that + * {@link java.util.concurrent.Phaser Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that - * are not statically structured as DAGs. To support such usages a + * are not statically structured as DAGs. To support such usages, a * ForkJoinTask may be atomically tagged with a {@code short} * value using {@link #setForkJoinTaskTag} or {@link * #compareAndSetForkJoinTaskTag} and checked using {@link @@ -285,27 +285,35 @@ public abstract class ForkJoinTask im */ private int externalAwaitDone() { int s; - boolean interrupted = false; - if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this)) - s = doExec(); - while (s >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) { - try { - wait(); - } catch (InterruptedException ie) { - interrupted = true; + ForkJoinPool cp = ForkJoinPool.common; + if ((s = status) >= 0) { + if (cp != null) { + if (this instanceof CountedCompleter) + s = cp.externalHelpComplete((CountedCompleter)this); + else if (cp.tryExternalUnpush(this)) + s = doExec(); + } + if (s >= 0 && (s = status) >= 0) { + boolean interrupted = false; + do { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { + try { + wait(); + } catch (InterruptedException ie) { + interrupted = true; + } + } + else + notifyAll(); } } - else - notifyAll(); - } + } while ((s = status) >= 0); + if (interrupted) + Thread.currentThread().interrupt(); } - s = status; } - if (interrupted) - Thread.currentThread().interrupt(); return s; } @@ -313,12 +321,17 @@ public abstract class ForkJoinTask im * Blocks a non-worker-thread until completion or interruption. */ private int externalInterruptibleAwaitDone() throws InterruptedException { + int s; + ForkJoinPool cp = ForkJoinPool.common; if (Thread.interrupted()) throw new InterruptedException(); - int s; - if ((s = status) >= 0 && ForkJoinPool.tryUnsubmitFromCommonPool(this)) - s = doExec(); - while (s >= 0) { + if ((s = status) >= 0 && cp != null) { + if (this instanceof CountedCompleter) + cp.externalHelpComplete((CountedCompleter)this); + else if (cp.tryExternalUnpush(this)) + doExec(); + } + while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) @@ -327,11 +340,11 @@ public abstract class ForkJoinTask im notifyAll(); } } - s = status; } return s; } + /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -438,7 +451,7 @@ public abstract class ForkJoinTask im } /** - * Records exception and possibly propagates + * Records exception and possibly propagates. * * @return status on exit */ @@ -471,7 +484,7 @@ public abstract class ForkJoinTask im } /** - * Removes exception node and clears status + * Removes exception node and clears status. */ private void clearExceptionalCompletion() { int h = System.identityHashCode(this); @@ -601,14 +614,31 @@ public abstract class ForkJoinTask im } /** + * A version of "sneaky throw" to relay exceptions + */ + static void rethrow(Throwable ex) { + if (ex != null) + ForkJoinTask.uncheckedThrow(ex); + } + + /** + * The sneaky part of sneaky throw, relying on generics + * limitations to evade compiler complaints about rethrowing + * unchecked exceptions + */ + @SuppressWarnings("unchecked") static + void uncheckedThrow(Throwable t) throws T { + throw (T)t; // rely on vacuous cast + } + + /** * Throws exception, if any, associated with the given status. */ private void reportException(int s) { - Throwable ex = ((s == CANCELLED) ? new CancellationException() : - (s == EXCEPTIONAL) ? getThrowableException() : - null); - if (ex != null) - U.throwException(ex); + if (s == CANCELLED) + throw new CancellationException(); + if (s == EXCEPTIONAL) + rethrow(getThrowableException()); } // public methods @@ -616,7 +646,7 @@ public abstract class ForkJoinTask im /** * Arranges to asynchronously execute this task in the pool the * current task is running in, if applicable, or using the {@link - * ForkJoinPool#commonPool} if not {@link #inForkJoinPool}. While + * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While * it is not necessarily enforced, it is a usage error to fork a * task more than once unless it has completed and been * reinitialized. Subsequent modifications to the state of this @@ -633,7 +663,7 @@ public abstract class ForkJoinTask im if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else - ForkJoinPool.submitToCommonPool(this); + ForkJoinPool.common.externalPush(this); return this; } @@ -735,7 +765,7 @@ public abstract class ForkJoinTask im } } if (ex != null) - U.throwException(ex); + rethrow(ex); } /** @@ -752,6 +782,7 @@ public abstract class ForkJoinTask im * unprocessed. * * @param tasks the collection of tasks + * @param the type of the values returned from the tasks * @return the tasks argument, to simplify usage * @throws NullPointerException if tasks or any element are null */ @@ -786,7 +817,7 @@ public abstract class ForkJoinTask im } } if (ex != null) - U.throwException(ex); + rethrow(ex); return tasks; } @@ -809,7 +840,7 @@ public abstract class ForkJoinTask im *

This method is designed to be invoked by other * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or - * invoke {@link #completeExceptionally}. + * invoke {@link #completeExceptionally(Throwable)}. * * @param mayInterruptIfRunning this value has no effect in the * default implementation because interrupts are not used to @@ -959,8 +990,10 @@ public abstract class ForkJoinTask im if (Thread.interrupted()) throw new InterruptedException(); // Messy in part because we measure in nanosecs, but wait in millisecs - int s; long ns, ms; - if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { + int s; long ms; + long ns = unit.toNanos(timeout); + ForkJoinPool cp; + if ((s = status) >= 0 && ns > 0L) { long deadline = System.nanoTime() + ns; ForkJoinPool p = null; ForkJoinPool.WorkQueue w = null; @@ -969,16 +1002,22 @@ public abstract class ForkJoinTask im ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; p = wt.pool; w = wt.workQueue; - s = p.helpJoinOnce(w, this); // no retries on failure + p.helpJoinOnce(w, this); // no retries on failure + } + else if ((cp = ForkJoinPool.common) != null) { + if (this instanceof CountedCompleter) + cp.externalHelpComplete((CountedCompleter)this); + else if (cp.tryExternalUnpush(this)) + doExec(); } boolean canBlock = false; boolean interrupted = false; try { while ((s = status) >= 0) { - if (w != null && w.runState < 0) + if (w != null && w.qlock < 0) cancelIgnoringExceptions(this); else if (!canBlock) { - if (p == null || p.tryCompensate(this, null)) + if (p == null || p.tryCompensate(p.ctl)) canBlock = true; } else { @@ -1054,7 +1093,7 @@ public abstract class ForkJoinTask im wt.pool.helpQuiescePool(wt.workQueue); } else - ForkJoinPool.externalHelpQuiescePool(); + ForkJoinPool.quiesceCommonPool(); } /** @@ -1117,9 +1156,9 @@ public abstract class ForkJoinTask im */ public boolean tryUnfork() { Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : - ForkJoinPool.tryUnsubmitFromCommonPool(this); + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : + ForkJoinPool.common.tryExternalUnpush(this)); } /** @@ -1131,10 +1170,12 @@ public abstract class ForkJoinTask im * @return the number of tasks */ public static int getQueuedTaskCount() { - Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.queueSize() : - ForkJoinPool.getEstimatedSubmitterQueueLength(); + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? 0 : q.queueSize(); } /** @@ -1151,53 +1192,7 @@ public abstract class ForkJoinTask im * @return the surplus number of tasks, which may be negative */ public static int getSurplusQueuedTaskCount() { - /* - * The aim of this method is to return a cheap heuristic guide - * for task partitioning when programmers, frameworks, tools, - * or languages have little or no idea about task granularity. - * In essence by offering this method, we ask users only about - * tradeoffs in overhead vs expected throughput and its - * variance, rather than how finely to partition tasks. - * - * In a steady state strict (tree-structured) computation, - * each thread makes available for stealing enough tasks for - * other threads to remain active. Inductively, if all threads - * play by the same rules, each thread should make available - * only a constant number of tasks. - * - * The minimum useful constant is just 1. But using a value of - * 1 would require immediate replenishment upon each steal to - * maintain enough tasks, which is infeasible. Further, - * partitionings/granularities of offered tasks should - * minimize steal rates, which in general means that threads - * nearer the top of computation tree should generate more - * than those nearer the bottom. In perfect steady state, each - * thread is at approximately the same level of computation - * tree. However, producing extra tasks amortizes the - * uncertainty of progress and diffusion assumptions. - * - * So, users will want to use values larger, but not much - * larger than 1 to both smooth over transient shortages and - * hedge against uneven progress; as traded off against the - * cost of extra task overhead. We leave the user to pick a - * threshold value to compare with the results of this call to - * guide decisions, but recommend values such as 3. - * - * When all threads are active, it is on average OK to - * estimate surplus strictly locally. In steady-state, if one - * thread is maintaining say 2 surplus tasks, then so are - * others. So we can just use estimated queue length. - * However, this strategy alone leads to serious mis-estimates - * in some non-steady-state conditions (ramp-up, ramp-down, - * other stalls). We can detect many of these by further - * considering the number of "idle" threads, that are known to - * have zero queued tasks, so compensate by a factor of - * (#idle/#active) threads. - */ - Thread t; ForkJoinWorkerThread wt; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (wt = (ForkJoinWorkerThread)t).workQueue.queueSize() - wt.pool.idlePerActive() : - 0; + return ForkJoinPool.getSurplusQueuedTaskCount(); } // Extension methods @@ -1241,21 +1236,22 @@ public abstract class ForkJoinTask im /** * Returns, but does not unschedule or execute, a task queued by * the current thread but not yet executed, if one is immediately - * available and the current thread is operating in a - * ForkJoinPool. There is no guarantee that this task will - * actually be polled or executed next. Conversely, this method - * may return null even if a task exists but cannot be accessed - * without contention with other threads. This method is designed + * available. There is no guarantee that this task will actually + * be polled or executed next. Conversely, this method may return + * null even if a task exists but cannot be accessed without + * contention with other threads. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask peekNextLocalTask() { - Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.peek() : - null; + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? null : q.peek(); } /** @@ -1331,7 +1327,7 @@ public abstract class ForkJoinTask im * * @param e the expected tag value * @param tag the new tag value - * @return true if successful; i.e., the current value was + * @return {@code true} if successful; i.e., the current value was * equal to e and is now tag. * @since 1.8 */ @@ -1384,6 +1380,24 @@ public abstract class ForkJoinTask im } /** + * Adaptor for Runnables in which failure forces worker exception + */ + static final class RunnableExecuteAction extends ForkJoinTask { + final Runnable runnable; + RunnableExecuteAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; + } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + void internalPropagateException(Throwable ex) { + rethrow(ex); // rethrow outside exec() catches. + } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** * Adaptor for Callables */ static final class AdaptedCallable extends ForkJoinTask @@ -1431,6 +1445,7 @@ public abstract class ForkJoinTask im * * @param runnable the runnable action * @param result the result upon completion + * @param the type of the result * @return the task */ public static ForkJoinTask adapt(Runnable runnable, T result) { @@ -1444,6 +1459,7 @@ public abstract class ForkJoinTask im * encountered into {@code RuntimeException}. * * @param callable the callable action + * @param the type of the callable's result * @return the task */ public static ForkJoinTask adapt(Callable callable) { @@ -1457,6 +1473,8 @@ public abstract class ForkJoinTask im /** * Saves this task to a stream (that is, serializes it). * + * @param s the stream + * @throws java.io.IOException if an I/O error occurs * @serialData the current run status and the exception thrown * during execution, or {@code null} if none */ @@ -1468,6 +1486,10 @@ public abstract class ForkJoinTask im /** * Reconstitutes this task from a stream (that is, deserializes it). + * @param s the stream + * @throws ClassNotFoundException if the class of a serialized object + * could not be found + * @throws java.io.IOException if an I/O error occurs */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { @@ -1480,14 +1502,16 @@ public abstract class ForkJoinTask im // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long STATUS; + static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY]; try { U = getUnsafe(); + Class k = ForkJoinTask.class; STATUS = U.objectFieldOffset - (ForkJoinTask.class.getDeclaredField("status")); + (k.getDeclaredField("status")); } catch (Exception e) { throw new Error(e); } @@ -1503,21 +1527,23 @@ public abstract class ForkJoinTask im private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } }