--- jsr166/src/jsr166y/ForkJoinTask.java 2011/07/01 18:30:14 1.80 +++ jsr166/src/jsr166y/ForkJoinTask.java 2015/09/03 22:54:46 1.102 @@ -30,46 +30,59 @@ import java.lang.reflect.Constructor; * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. * - *

A "main" {@code ForkJoinTask} begins execution when submitted - * to a {@link ForkJoinPool}. Once started, it will usually in turn - * start other subtasks. As indicated by the name of this class, - * many programs using {@code ForkJoinTask} employ only methods - * {@link #fork} and {@link #join}, or derivatives such as {@link + *

A "main" {@code ForkJoinTask} begins execution when it is + * explicitly submitted to a {@link ForkJoinPool}, or, if not already + * engaged in a ForkJoin computation, commenced in the {@link + * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or + * related methods. Once started, it will usually in turn start other + * subtasks. As indicated by the name of this class, many programs + * using {@code ForkJoinTask} employ only methods {@link #fork} and + * {@link #join}, or derivatives such as {@link * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also * provides a number of other methods that can come into play in - * advanced usages, as well as extension mechanics that allow - * support of new forms of fork/join processing. + * advanced usages, as well as extension mechanics that allow support + * of new forms of fork/join processing. * *

A {@code ForkJoinTask} is a lightweight form of {@link Future}. * The efficiency of {@code ForkJoinTask}s stems from a set of * restrictions (that are only partially statically enforceable) - * reflecting their intended use as computational tasks calculating - * pure functions or operating on purely isolated objects. The - * primary coordination mechanisms are {@link #fork}, that arranges + * reflecting their main use as computational tasks calculating pure + * functions or operating on purely isolated objects. The primary + * coordination mechanisms are {@link #fork}, that arranges * asynchronous execution, and {@link #join}, that doesn't proceed * until the task's result has been computed. Computations should - * avoid {@code synchronized} methods or blocks, and should minimize - * other blocking synchronization apart from joining other tasks or - * using synchronizers such as Phasers that are advertised to - * cooperate with fork/join scheduling. Tasks should also not perform - * blocking IO, and should ideally access variables that are - * completely independent of those accessed by other running - * tasks. Minor breaches of these restrictions, for example using - * shared output streams, may be tolerable in practice, but frequent - * use may result in poor performance, and the potential to - * indefinitely stall if the number of threads not waiting for IO or - * other external synchronization becomes exhausted. This usage - * restriction is in part enforced by not permitting checked - * exceptions such as {@code IOExceptions} to be thrown. However, - * computations may still encounter unchecked exceptions, that are - * rethrown to callers attempting to join them. These exceptions may - * additionally include {@link RejectedExecutionException} stemming - * from internal resource exhaustion, such as failure to allocate - * internal task queues. Rethrown exceptions behave in the same way as - * regular exceptions, but, when possible, contain stack traces (as - * displayed for example using {@code ex.printStackTrace()}) of both - * the thread that initiated the computation as well as the thread - * actually encountering the exception; minimally only the latter. + * ideally avoid {@code synchronized} methods or blocks, and should + * minimize other blocking synchronization apart from joining other + * tasks or using synchronizers such as Phasers that are advertised to + * cooperate with fork/join scheduling. Subdividable tasks should also + * not perform blocking I/O, and should ideally access variables that + * are completely independent of those accessed by other running + * tasks. These guidelines are loosely enforced by not permitting + * checked exceptions such as {@code IOExceptions} to be + * thrown. However, computations may still encounter unchecked + * exceptions, that are rethrown to callers attempting to join + * them. These exceptions may additionally include {@link + * RejectedExecutionException} stemming from internal resource + * exhaustion, such as failure to allocate internal task + * queues. Rethrown exceptions behave in the same way as regular + * exceptions, but, when possible, contain stack traces (as displayed + * for example using {@code ex.printStackTrace()}) of both the thread + * that initiated the computation as well as the thread actually + * encountering the exception; minimally only the latter. + * + *

It is possible to define and use ForkJoinTasks that may block, + * but doing do requires three further considerations: (1) Completion + * of few if any other tasks should be dependent on a task + * that blocks on external synchronization or I/O. Event-style async + * tasks that are never joined (for example, those subclassing {@link + * CountedCompleter}) often fall into this category. (2) To minimize + * resource impact, tasks should be small; ideally performing only the + * (possibly) blocking action. (3) Unless the {@link + * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly + * blocked tasks is known to be less than the pool's {@link + * ForkJoinPool#getParallelism} level, the pool cannot guarantee that + * enough threads will be available to ensure progress or good + * performance. * *

The primary method for awaiting completion and extracting * results of a task is {@link #join}, but there are several variants: @@ -85,6 +98,13 @@ import java.lang.reflect.Constructor; * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * + *

In the most typical usages, a fork-join pair act like a call + * (fork) and return (join) from a parallel recursive function. As is + * the case with other forms of recursive calls, returns (joins) + * should be performed innermost-first. For example, {@code a.fork(); + * b.fork(); b.join(); a.join();} is likely to be substantially more + * efficient than joining {@code a} before {@code b}. + * *

The execution status of tasks may be queried at several levels * of detail: {@link #isDone} is true if a task completed in any way * (including the case where a task was cancelled without executing); @@ -100,18 +120,13 @@ import java.lang.reflect.Constructor; *

The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. * *

Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -121,7 +136,17 @@ import java.lang.reflect.Constructor; * supports other methods and techniques (for example the use of * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that - * are not statically structured as DAGs. + * are not statically structured as DAGs. To support such usages a + * ForkJoinTask may be atomically tagged with a {@code short} + * value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use + * these {@code protected} methods or tags for any purpose, but they + * may be of use in the construction of specialized subclasses. For + * example, parallel graph traversals can use the supplied methods to + * avoid revisiting nodes/tasks that have already been processed. + * (Method names for tagging are bulky in part to encourage definition + * of methods that reflect their usage patterns.) * *

Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -175,29 +200,36 @@ public abstract class ForkJoinTask im * The status field holds run control status bits packed into a * single int to minimize footprint and to ensure atomicity (via * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status holds value - * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking - * waits by other threads have the SIGNAL bit set. Completion of - * a stolen task with SIGNAL set awakens any waiters via - * notifyAll. Even though suboptimal for some purposes, we use - * basic builtin wait/notify to take advantage of "monitor - * inflation" in JVMs that we would otherwise need to emulate to - * avoid adding further per-task bookkeeping overhead. We want - * these monitors to be "fat", i.e., not use biasing or thin-lock - * techniques, so use some odd coding idioms that tend to avoid - * them. + * values until completed, upon which status (anded with + * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks + * undergoing blocking waits by other threads have the SIGNAL bit + * set. Completion of a stolen task with SIGNAL set awakens any + * waiters via notifyAll. Even though suboptimal for some + * purposes, we use basic builtin wait/notify to take advantage of + * "monitor inflation" in JVMs that we would otherwise need to + * emulate to avoid adding further per-task bookkeeping overhead. + * We want these monitors to be "fat", i.e., not use biasing or + * thin-lock techniques, so use some odd coding idioms that tend + * to avoid them, mainly by arranging that every synchronized + * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ volatile int status; // accessed directly by pool and workers - private static final int NORMAL = -1; - private static final int CANCELLED = -2; - private static final int EXCEPTIONAL = -3; - private static final int SIGNAL = 1; + static final int DONE_MASK = 0xf0000000; // mask out non-completion bits + static final int NORMAL = 0xf0000000; // must be negative + static final int CANCELLED = 0xc0000000; // must be < NORMAL + static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** - * Marks completion and wakes up threads waiting to join this task, - * also clearing signal request bits. + * Marks completion and wakes up threads waiting to join this + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -206,8 +238,8 @@ public abstract class ForkJoinTask im for (int s;;) { if ((s = status) < 0) return s; - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) { - if (s != 0) + if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -215,27 +247,36 @@ public abstract class ForkJoinTask im } /** - * Tries to block a worker thread until completed or timed out. - * Uses Object.wait time argument conventions. - * May fail on contention or interrupt. + * Primary execution method for stolen tasks. Unless done, calls + * exec and records status if completed, but doesn't wait for + * completion otherwise. * - * @param millis if > 0, wait time. + * @return status on exit from this method */ - final void tryAwaitDone(long millis) { - int s; - try { - if (((s = status) > 0 || - (s == 0 && - UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) && - status > 0) { - synchronized (this) { - if (status > 0) - wait(millis); - } + final int doExec() { + int s; boolean completed; + if ((s = status) >= 0) { + try { + completed = exec(); + } catch (Throwable rex) { + return setExceptionalCompletion(rex); } - } catch (InterruptedException ie) { - // caller must check termination + if (completed) + s = setCompletion(NORMAL); } + return s; + } + + /** + * Tries to set SIGNAL status unless already completed. Used by + * ForkJoinPool. Other variants are directly incorporated into + * externalAwaitDone etc. + * + * @return true if successful + */ + final boolean trySetSignal() { + int s = status; + return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); } /** @@ -244,113 +285,78 @@ public abstract class ForkJoinTask im */ private int externalAwaitDone() { int s; - if ((s = status) >= 0) { - boolean interrupted = false; - synchronized (this) { - while ((s = status) >= 0) { - if (s == 0) - UNSAFE.compareAndSwapInt(this, statusOffset, - 0, SIGNAL); - else { + ForkJoinPool.externalHelpJoin(this); + boolean interrupted = false; + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + else + notifyAll(); } } - if (interrupted) - Thread.currentThread().interrupt(); } + if (interrupted) + Thread.currentThread().interrupt(); return s; } /** - * Blocks a non-worker-thread until completion or interruption or timeout. + * Blocks a non-worker-thread until completion or interruption. */ - private int externalInterruptibleAwaitDone(long millis) - throws InterruptedException { + private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0) { - synchronized (this) { - while ((s = status) >= 0) { - if (s == 0) - UNSAFE.compareAndSwapInt(this, statusOffset, - 0, SIGNAL); - else { - wait(millis); - if (millis > 0L) - break; - } + ForkJoinPool.externalHelpJoin(this); + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(); + else + notifyAll(); } } } return s; } - /** - * Primary execution method for stolen tasks. Unless done, calls - * exec and records status if completed, but doesn't wait for - * completion otherwise. - */ - final void doExec() { - if (status >= 0) { - boolean completed; - try { - completed = exec(); - } catch (Throwable rex) { - setExceptionalCompletion(rex); - return; - } - if (completed) - setCompletion(NORMAL); // must be outside try block - } - } /** - * Primary mechanics for join, get, quietlyJoin. + * Implementation for join, get, quietlyJoin. Directly handles + * only cases of already-completed, external wait, and + * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. + * * @return status upon completion */ private int doJoin() { - Thread t; ForkJoinWorkerThread w; int s; boolean completed; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - if ((s = status) < 0) - return s; - if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { - try { - completed = exec(); - } catch (Throwable rex) { - return setExceptionalCompletion(rex); - } - if (completed) - return setCompletion(NORMAL); - } - return w.joinTask(this); - } - else - return externalAwaitDone(); + int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; + return (s = status) < 0 ? s : + ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + (w = (wt = (ForkJoinWorkerThread)t).workQueue). + tryUnpush(this) && (s = doExec()) < 0 ? s : + wt.pool.awaitJoin(w, this) : + externalAwaitDone(); } /** - * Primary mechanics for invoke, quietlyInvoke. + * Implementation for invoke, quietlyInvoke. + * * @return status upon completion */ private int doInvoke() { - int s; boolean completed; - if ((s = status) < 0) - return s; - try { - completed = exec(); - } catch (Throwable rex) { - return setExceptionalCompletion(rex); - } - if (completed) - return setCompletion(NORMAL); - else - return doJoin(); + int s; Thread t; ForkJoinWorkerThread wt; + return (s = doExec()) < 0 ? s : + ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) : + externalAwaitDone(); } // Exception table support @@ -385,47 +391,86 @@ public abstract class ForkJoinTask im * any ForkJoinPool will call helpExpungeStaleExceptions when its * pool becomes isQuiescent. */ - static final class ExceptionNode extends WeakReference>{ + static final class ExceptionNode extends WeakReference> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles + final int hashCode; // store task hashCode before weak ref disappears ExceptionNode(ForkJoinTask task, Throwable ex, ExceptionNode next) { super(task, exceptionTableRefQueue); this.ex = ex; this.next = next; this.thrower = Thread.currentThread().getId(); + this.hashCode = System.identityHashCode(task); } } /** - * Records exception and sets exceptional completion. + * Records exception and sets status. * * @return status on exit */ - private int setExceptionalCompletion(Throwable ex) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i]); - break; + final int recordExceptionalCompletion(Throwable ex) { + int s; + if ((s = status) >= 0) { + int h = System.identityHashCode(this); + final ReentrantLock lock = exceptionTableLock; + lock.lock(); + try { + expungeStaleExceptions(); + ExceptionNode[] t = exceptionTable; + int i = h & (t.length - 1); + for (ExceptionNode e = t[i]; ; e = e.next) { + if (e == null) { + t[i] = new ExceptionNode(this, ex, t[i]); + break; + } + if (e.get() == this) // already present + break; } - if (e.get() == this) // already present - break; + } finally { + lock.unlock(); } - } finally { - lock.unlock(); + s = setCompletion(EXCEPTIONAL); } - return setCompletion(EXCEPTIONAL); + return s; + } + + /** + * Records exception and possibly propagates. + * + * @return status on exit + */ + private int setExceptionalCompletion(Throwable ex) { + int s = recordExceptionalCompletion(ex); + if ((s & DONE_MASK) == EXCEPTIONAL) + internalPropagateException(ex); + return s; } /** - * Removes exception node and clears status + * Hook for exception propagation support for tasks with completers. + */ + void internalPropagateException(Throwable ex) { + } + + /** + * Cancels, ignoring any exceptions thrown by cancel. Used during + * worker and pool shutdown. Cancel is spec'ed not to throw any + * exceptions, but if it does anyway, we have no recourse during + * shutdown, so guard against this case. + */ + static final void cancelIgnoringExceptions(ForkJoinTask t) { + if (t != null && t.status >= 0) { + try { + t.cancel(false); + } catch (Throwable ignore) { + } + } + } + + /** + * Removes exception node and clears status. */ private void clearExceptionalCompletion() { int h = System.identityHashCode(this); @@ -470,7 +515,7 @@ public abstract class ForkJoinTask im * @return the exception, or null if none */ private Throwable getThrowableException() { - if (status != EXCEPTIONAL) + if ((status & DONE_MASK) != EXCEPTIONAL) return null; int h = System.identityHashCode(this); ExceptionNode e; @@ -488,7 +533,7 @@ public abstract class ForkJoinTask im Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (e.thrower != Thread.currentThread().getId()) { + if (false && e.thrower != Thread.currentThread().getId()) { Class ec = ex.getClass(); try { Constructor noArgCtor = null; @@ -518,9 +563,9 @@ public abstract class ForkJoinTask im private static void expungeStaleExceptions() { for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { if (x instanceof ExceptionNode) { - ForkJoinTask key = ((ExceptionNode)x).get(); + int hashCode = ((ExceptionNode)x).hashCode; ExceptionNode[] t = exceptionTable; - int i = System.identityHashCode(key) & (t.length - 1); + int i = hashCode & (t.length - 1); ExceptionNode e = t[i]; ExceptionNode pred = null; while (e != null) { @@ -555,41 +600,62 @@ public abstract class ForkJoinTask im } /** - * Report the result of invoke or join; called only upon - * non-normal return of internal versions. + * A version of "sneaky throw" to relay exceptions + */ + static void rethrow(final Throwable ex) { + if (ex != null) { + if (ex instanceof Error) + throw (Error)ex; + if (ex instanceof RuntimeException) + throw (RuntimeException)ex; + ForkJoinTask.uncheckedThrow(ex); + } + } + + /** + * The sneaky part of sneaky throw, relying on generics + * limitations to evade compiler complaints about rethrowing + * unchecked exceptions */ - private V reportResult() { - int s; Throwable ex; - if ((s = status) == CANCELLED) + @SuppressWarnings("unchecked") static + void uncheckedThrow(Throwable t) throws T { + if (t != null) + throw (T)t; // rely on vacuous cast + } + + /** + * Throws exception, if any, associated with the given status. + */ + private void reportException(int s) { + if (s == CANCELLED) throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) - UNSAFE.throwException(ex); - return getRawResult(); + if (s == EXCEPTIONAL) + rethrow(getThrowableException()); } // public methods /** - * Arranges to asynchronously execute this task. While it is not - * necessarily enforced, it is a usage error to fork a task more - * than once unless it has completed and been reinitialized. - * Subsequent modifications to the state of this task or any data - * it operates on are not necessarily consistently observable by - * any thread other than the one executing it unless preceded by a - * call to {@link #join} or related methods, or a call to {@link - * #isDone} returning {@code true}. - * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. + * Arranges to asynchronously execute this task in the pool the + * current task is running in, if applicable, or using the {@link + * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While + * it is not necessarily enforced, it is a usage error to fork a + * task more than once unless it has completed and been + * reinitialized. Subsequent modifications to the state of this + * task or any data it operates on are not necessarily + * consistently observable by any thread other than the one + * executing it unless preceded by a call to {@link #join} or + * related methods, or a call to {@link #isDone} returning {@code + * true}. * * @return {@code this}, to simplify usage */ public final ForkJoinTask fork() { - ((ForkJoinWorkerThread) Thread.currentThread()) - .pushTask(this); + Thread t; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + ((ForkJoinWorkerThread)t).workQueue.push(this); + else + ForkJoinPool.common.externalPush(this); return this; } @@ -605,10 +671,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V join() { - if (doJoin() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doJoin() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -620,10 +686,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V invoke() { - if (doInvoke() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doInvoke() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -639,20 +705,17 @@ public abstract class ForkJoinTask im * cancelled, completed normally or exceptionally, or left * unprocessed. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @param t1 the first task * @param t2 the second task * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { + int s1, s2; t2.fork(); - t1.invoke(); - t2.join(); + if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + t1.reportException(s1); + if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + t2.reportException(s2); } /** @@ -667,12 +730,6 @@ public abstract class ForkJoinTask im * related methods to check if they have been cancelled, completed * normally or exceptionally, or left unprocessed. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @param tasks the tasks * @throws NullPointerException if any task is null */ @@ -700,7 +757,7 @@ public abstract class ForkJoinTask im } } if (ex != null) - UNSAFE.throwException(ex); + rethrow(ex); } /** @@ -716,12 +773,6 @@ public abstract class ForkJoinTask im * cancelled, completed normally or exceptionally, or left * unprocessed. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @param tasks the collection of tasks * @return the tasks argument, to simplify usage * @throws NullPointerException if tasks or any element are null @@ -757,7 +808,7 @@ public abstract class ForkJoinTask im } } if (ex != null) - UNSAFE.throwException(ex); + rethrow(ex); return tasks; } @@ -789,20 +840,7 @@ public abstract class ForkJoinTask im * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return setCompletion(CANCELLED) == CANCELLED; - } - - /** - * Cancels, ignoring any exceptions thrown by cancel. Used during - * worker and pool shutdown. Cancel is spec'ed not to throw any - * exceptions, but if it does anyway, we have no recourse during - * shutdown, so guard against this case. - */ - final void cancelIgnoringExceptions() { - try { - cancel(false); - } catch (Throwable ignore) { - } + return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { @@ -810,7 +848,7 @@ public abstract class ForkJoinTask im } public final boolean isCancelled() { - return status == CANCELLED; + return (status & DONE_MASK) == CANCELLED; } /** @@ -830,7 +868,7 @@ public abstract class ForkJoinTask im * exception and was not cancelled */ public final boolean isCompletedNormally() { - return status == NORMAL; + return (status & DONE_MASK) == NORMAL; } /** @@ -841,7 +879,7 @@ public abstract class ForkJoinTask im * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; + int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); @@ -891,6 +929,18 @@ public abstract class ForkJoinTask im } /** + * Completes this task normally without setting a value. The most + * recent value established by {@link #setRawResult} (or {@code + * null} by default) will be returned as the result of subsequent + * invocations of {@code join} and related operations. + * + * @since 1.8 + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -903,9 +953,9 @@ public abstract class ForkJoinTask im */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(0L); + doJoin() : externalInterruptibleAwaitDone(); Throwable ex; - if (s == CANCELLED) + if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); @@ -928,32 +978,63 @@ public abstract class ForkJoinTask im */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - long nanos = unit.toNanos(timeout); - if (status >= 0) { - boolean completed = false; - if (w.unpushTask(this)) { - try { - completed = exec(); - } catch (Throwable rex) { - setExceptionalCompletion(rex); + if (Thread.interrupted()) + throw new InterruptedException(); + // Messy in part because we measure in nanosecs, but wait in millisecs + int s; long ms; + long ns = unit.toNanos(timeout); + if ((s = status) >= 0 && ns > 0L) { + long deadline = System.nanoTime() + ns; + ForkJoinPool p = null; + ForkJoinPool.WorkQueue w = null; + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + w = wt.workQueue; + p.helpJoinOnce(w, this); // no retries on failure + } + else + ForkJoinPool.externalHelpJoin(this); + boolean canBlock = false; + boolean interrupted = false; + try { + while ((s = status) >= 0) { + if (w != null && w.qlock < 0) + cancelIgnoringExceptions(this); + else if (!canBlock) { + if (p == null || p.tryCompensate()) + canBlock = true; + } + else { + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && + U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { + try { + wait(ms); + } catch (InterruptedException ie) { + if (p == null) + interrupted = true; + } + } + else + notifyAll(); + } + } + if ((s = status) < 0 || interrupted || + (ns = deadline - System.nanoTime()) <= 0L) + break; } } - if (completed) - setCompletion(NORMAL); - else if (status >= 0 && nanos > 0) - w.pool.timedAwaitJoin(this, nanos); + } finally { + if (p != null && canBlock) + p.incrementActiveCount(); } + if (interrupted) + throw new InterruptedException(); } - else { - long millis = unit.toMillis(timeout); - if (millis > 0) - externalInterruptibleAwaitDone(millis); - } - int s = status; - if (s != NORMAL) { + if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -990,16 +1071,15 @@ public abstract class ForkJoinTask im * be of use in designs in which many tasks are forked, but none * are explicitly joined, instead executing them until all are * processed. - * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. */ public static void helpQuiesce() { - ((ForkJoinWorkerThread) Thread.currentThread()) - .helpQuiescePool(); + Thread t; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + wt.pool.helpQuiescePool(wt.workQueue); + } + else + ForkJoinPool.quiesceCommonPool(); } /** @@ -1019,7 +1099,7 @@ public abstract class ForkJoinTask im * setRawResult(null)}. */ public void reinitialize() { - if (status == EXCEPTIONAL) + if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; @@ -1052,23 +1132,19 @@ public abstract class ForkJoinTask im /** * Tries to unschedule this task for execution. This method will - * typically succeed if this task is the most recently forked task - * by the current thread, and has not commenced executing in - * another thread. This method may be useful when arranging - * alternative local processing of tasks that could have been, but - * were not, stolen. - * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. + * typically (but is not guaranteed to) succeed if this task is + * the most recently forked task by the current thread, and has + * not commenced executing in another thread. This method may be + * useful when arranging alternative local processing of tasks + * that could have been, but were not, stolen. * * @return {@code true} if unforked */ public boolean tryUnfork() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .unpushTask(this); + Thread t; + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : + ForkJoinPool.tryExternalUnpush(this)); } /** @@ -1077,40 +1153,32 @@ public abstract class ForkJoinTask im * value may be useful for heuristic decisions about whether to * fork other tasks. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @return the number of tasks */ public static int getQueuedTaskCount() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .getQueueSize(); + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? 0 : q.queueSize(); } /** * Returns an estimate of how many more locally queued tasks are * held by the current worker thread than there are other worker - * threads that might steal them. This value may be useful for + * threads that might steal them, or zero if this thread is not + * operating in a ForkJoinPool. This value may be useful for * heuristic decisions about whether to fork other tasks. In many * usages of ForkJoinTasks, at steady state, each worker should * aim to maintain a small constant surplus (for example, 3) of * tasks, and to process computations locally if this threshold is * exceeded. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @return the surplus number of tasks, which may be negative */ public static int getSurplusQueuedTaskCount() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .getEstimatedSurplusTaskCount(); + return ForkJoinPool.getSurplusQueuedTaskCount(); } // Extension methods @@ -1136,15 +1204,18 @@ public abstract class ForkJoinTask im protected abstract void setRawResult(V value); /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in + * Immediately performs the base action of this task and returns + * true if, upon return from this method, this task is guaranteed + * to have completed normally. This method may return false + * otherwise, to indicate that this task is not necessarily + * complete (or is not known to be complete), for example in * asynchronous actions that require explicit invocations of - * {@link #complete} to become joinable. It may also throw an - * (unchecked) exception to indicate abnormal exit. + * completion methods. This method may also throw an (unchecked) + * exception to indicate abnormal exit. This method is designed to + * support extensions, and should not in general be called + * otherwise. * - * @return {@code true} if completed normally + * @return {@code true} if this task is known to have completed normally */ protected abstract boolean exec(); @@ -1158,89 +1229,144 @@ public abstract class ForkJoinTask im * primarily to support extensions, and is unlikely to be useful * otherwise. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask peekNextLocalTask() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .peekTask(); + Thread t; ForkJoinPool.WorkQueue q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + q = ((ForkJoinWorkerThread)t).workQueue; + else + q = ForkJoinPool.commonSubmitterQueue(); + return (q == null) ? null : q.peek(); } /** * Unschedules and returns, without executing, the next task - * queued by the current thread but not yet executed. This method - * is designed primarily to support extensions, and is unlikely to - * be useful otherwise. - * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. + * queued by the current thread but not yet executed, if the + * current thread is operating in a ForkJoinPool. This method is + * designed primarily to support extensions, and is unlikely to be + * useful otherwise. * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask pollNextLocalTask() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .pollLocalTask(); + Thread t; + return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : + null; } /** - * Unschedules and returns, without executing, the next task + * If the current thread is operating in a ForkJoinPool, + * unschedules and returns, without executing, the next task * queued by the current thread but not yet executed, if one is * available, or if not available, a task that was forked by some * other thread, if available. Availability may be transient, so a - * {@code null} result does not necessarily imply quiescence - * of the pool this task is operating in. This method is designed + * {@code null} result does not necessarily imply quiescence of + * the pool this task is operating in. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * - *

This method may be invoked only from within {@code - * ForkJoinPool} computations (as may be determined using method - * {@link #inForkJoinPool}). Attempts to invoke in other contexts - * result in exceptions or errors, possibly including {@code - * ClassCastException}. - * * @return a task, or {@code null} if none are available */ protected static ForkJoinTask pollTask() { - return ((ForkJoinWorkerThread) Thread.currentThread()) - .pollTask(); + Thread t; ForkJoinWorkerThread wt; + return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : + null; + } + + // tag operations + + /** + * Returns the tag for this task. + * + * @return the tag for this task + * @since 1.8 + */ + public final short getForkJoinTaskTag() { + return (short)status; + } + + /** + * Atomically sets the tag value for this task. + * + * @param tag the tag value + * @return the previous value of the tag + * @since 1.8 + */ + public final short setForkJoinTaskTag(short tag) { + for (int s;;) { + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; + } } /** - * Adaptor for Runnables. This implements RunnableFuture + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. + * + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. + * @since 1.8 + */ + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { + for (int s;;) { + if ((short)(s = status) != e) + return false; + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) + return true; + } + } + + /** + * Adapter for Runnables. This implements RunnableFuture * to be compliant with AbstractExecutorService constraints * when used in ForkJoinPool. */ static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture { final Runnable runnable; - final T resultOnCompletion; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; - this.resultOnCompletion = result; + this.result = result; // OK to set this even before completion } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adapter for Runnables without results + */ + static final class AdaptedRunnableAction extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; } - public void run() { invoke(); } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } /** - * Adaptor for Callables + * Adapter for Callables */ static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture { @@ -1250,9 +1376,9 @@ public abstract class ForkJoinTask im if (callable == null) throw new NullPointerException(); this.callable = callable; } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { try { result = callable.call(); return true; @@ -1264,7 +1390,7 @@ public abstract class ForkJoinTask im throw new RuntimeException(ex); } } - public void run() { invoke(); } + public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } @@ -1277,7 +1403,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new AdaptedRunnable(runnable, null); + return new AdaptedRunnableAction(runnable); } /** @@ -1311,11 +1437,10 @@ public abstract class ForkJoinTask im private static final long serialVersionUID = -7721805057305804111L; /** - * Saves the state to a stream (that is, serializes it). + * Saves this task to a stream (that is, serializes it). * * @serialData the current run status and the exception thrown * during execution, or {@code null} if none - * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { @@ -1324,9 +1449,7 @@ public abstract class ForkJoinTask im } /** - * Reconstitutes the instance from a stream (that is, deserializes it). - * - * @param s the stream + * Reconstitutes this task from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { @@ -1337,16 +1460,18 @@ public abstract class ForkJoinTask im } // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long statusOffset; + private static final sun.misc.Unsafe U; + private static final long STATUS; + static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY]; try { - UNSAFE = getUnsafe(); - statusOffset = UNSAFE.objectFieldOffset - (ForkJoinTask.class.getDeclaredField("status")); + U = getUnsafe(); + Class k = ForkJoinTask.class; + STATUS = U.objectFieldOffset + (k.getDeclaredField("status")); } catch (Exception e) { throw new Error(e); } @@ -1362,21 +1487,23 @@ public abstract class ForkJoinTask im private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } }