--- jsr166/src/jsr166y/ForkJoinTask.java 2009/01/07 16:07:37 1.2 +++ jsr166/src/jsr166y/ForkJoinTask.java 2009/07/20 22:26:03 1.9 @@ -18,17 +18,17 @@ import java.lang.reflect.*; * lighter weight than a normal thread. Huge numbers of tasks and * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. - * + * *

A "main" ForkJoinTask begins execution when submitted to a * {@link ForkJoinPool}. Once started, it will usually in turn start * other subtasks. As indicated by the name of this class, many - * programs using ForkJoinTasks employ only methods fork - * and join, or derivatives such as - * invokeAll. However, this class also provides a number + * programs using ForkJoinTasks employ only methods {@code fork} + * and {@code join}, or derivatives such as + * {@code invokeAll}. However, this class also provides a number * of other methods that can come into play in advanced usages, as * well as extension mechanics that allow support of new forms of * fork/join processing. - * + * *

A ForkJoinTask is a lightweight form of {@link Future}. The * efficiency of ForkJoinTasks stems from a set of restrictions (that * are only partially statically enforceable) reflecting their @@ -36,7 +36,7 @@ import java.lang.reflect.*; * operating on purely isolated objects. The primary coordination * mechanisms are {@link #fork}, that arranges asynchronous execution, * and {@link #join}, that doesn't proceed until the task's result has - * been computed. Computations should avoid synchronized + * been computed. Computations should avoid {@code synchronized} * methods or blocks, and should minimize other blocking * synchronization apart from joining other tasks or using * synchronizers such as Phasers that are advertised to cooperate with @@ -48,7 +48,7 @@ import java.lang.reflect.*; * performance, and the potential to indefinitely stall if the number * of threads not waiting for IO or other external synchronization * becomes exhausted. This usage restriction is in part enforced by - * not permitting checked exceptions such as IOExceptions + * not permitting checked exceptions such as {@code IOExceptions} * to be thrown. However, computations may still encounter unchecked * exceptions, that are rethrown to callers attempting join * them. These exceptions may additionally include @@ -58,17 +58,17 @@ import java.lang.reflect.*; *

The primary method for awaiting completion and extracting * results of a task is {@link #join}, but there are several variants: * The {@link Future#get} methods support interruptible and/or timed - * waits for completion and report results using Future + * waits for completion and report results using {@code Future} * conventions. Method {@link #helpJoin} enables callers to actively * execute other tasks while awaiting joins, which is sometimes more * efficient but only applies when all subtasks are known to be * strictly tree-structured. Method {@link #invoke} is semantically - * equivalent to fork(); join() but always attempts to + * equivalent to {@code fork(); join()} but always attempts to * begin execution in the current thread. The "quiet" forms * of these methods do not extract results or report exceptions. These * may be useful when a set of tasks are being executed, and you need * to delay processing of results or exceptions until all complete. - * Method invokeAll (available in multiple versions) + * Method {@code invokeAll} (available in multiple versions) * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * @@ -76,33 +76,33 @@ import java.lang.reflect.*; * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing. Normally, a concrete * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a compute + * established in a constructor, and then defines a {@code compute} * method that somehow uses the control methods supplied by this base - * class. While these methods have public access (to allow + * class. While these methods have {@code public} access (to allow * instances of different task subclasses to call each others * methods), some of them may only be called from within other * ForkJoinTasks. Attempts to invoke them in other contexts result in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. * - *

Most base support methods are final because their + *

Most base support methods are {@code final} because their * implementations are intrinsically tied to the underlying * lightweight task scheduling framework, and so cannot be overridden. * Developers creating new basic styles of fork/join processing should - * minimally implement protected methods - * exec, setRawResult, and - * getRawResult, while also introducing an abstract + * minimally implement {@code protected} methods + * {@code exec}, {@code setRawResult}, and + * {@code getRawResult}, while also introducing an abstract * computational method that can be implemented in its subclasses, - * possibly relying on other protected methods provided + * possibly relying on other {@code protected} methods provided * by this class. * *

ForkJoinTasks should perform relatively small amounts of - * computations, othewise splitting into smaller tasks. As a very + * computations, otherwise splitting into smaller tasks. As a very * rough rule of thumb, a task should perform more than 100 and less * than 10000 basic computational steps. If tasks are too big, then - * parellelism cannot improve throughput. If too small, then memory + * parallelism cannot improve throughput. If too small, then memory * and internal task maintenance overhead may overwhelm processing. * - *

ForkJoinTasks are Serializable, which enables them + *

ForkJoinTasks are {@code Serializable}, which enables them * to be used in extensions such as remote execution frameworks. It is * in general sensible to serialize tasks only before or after, but * not during execution. Serialization is not relied on during @@ -128,7 +128,7 @@ public abstract class ForkJoinTask im * currently unused. Also value 0x80000000 is available as spare * completion value. */ - volatile int status; // accessed directy by pool and workers + volatile int status; // accessed directly by pool and workers static final int COMPLETION_MASK = 0xe0000000; static final int NORMAL = 0xe0000000; // == mask @@ -257,7 +257,7 @@ public abstract class ForkJoinTask im * surrounded with pool notifications. * @return status upon exit */ - final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { + private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { ForkJoinPool pool = w == null? null : w.pool; int s; while ((s = status) >= 0) { @@ -276,7 +276,7 @@ public abstract class ForkJoinTask im * Timed version of awaitDone * @return status upon exit */ - final int awaitDone(ForkJoinWorkerThread w, long nanos) { + private int awaitDone(ForkJoinWorkerThread w, long nanos) { ForkJoinPool pool = w == null? null : w.pool; int s; while ((s = status) >= 0) { @@ -330,7 +330,7 @@ public abstract class ForkJoinTask im if (w == null) Thread.currentThread().interrupt(); // re-interrupt else if (w.isTerminating()) - cancelIgnoreExceptions(); + cancelIgnoringExceptions(); // else if FJworker, ignore interrupt } @@ -449,13 +449,24 @@ public abstract class ForkJoinTask im /** * Cancel, ignoring any exceptions it throws */ - final void cancelIgnoreExceptions() { + final void cancelIgnoringExceptions() { try { cancel(false); } catch(Throwable ignore) { } } + /** + * Main implementation of helpJoin + */ + private int busyJoin(ForkJoinWorkerThread w) { + int s; + ForkJoinTask t; + while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null) + t.quietlyExec(); + return (s >= 0)? awaitDone(w, false) : s; // block if no work + } + // public methods /** @@ -464,7 +475,7 @@ public abstract class ForkJoinTask im * than once unless it has completed and been reinitialized. This * method may be invoked only from within ForkJoinTask * computations. Attempts to invoke in other contexts result in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. */ public final void fork() { ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this); @@ -472,7 +483,7 @@ public abstract class ForkJoinTask im /** * Returns the result of the computation when it is ready. - * This method differs from get in that abnormal + * This method differs from {@code get} in that abnormal * completion results in RuntimeExceptions or Errors, not * ExecutionExceptions. * @@ -485,21 +496,6 @@ public abstract class ForkJoinTask im return getRawResult(); } - public final V get() throws InterruptedException, ExecutionException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, true); - return reportFutureResult(); - } - - public final V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, unit.toNanos(timeout)); - return reportTimedFutureResult(); - } - /** * Commences performing this task, awaits its completion if * necessary, and return its result. @@ -515,11 +511,11 @@ public abstract class ForkJoinTask im } /** - * Forks both tasks, returning when isDone holds for + * Forks both tasks, returning when {@code isDone} holds for * both of them or an exception is encountered. This method may be * invoked only from within ForkJoinTask computations. Attempts to * invoke in other contexts result in exceptions or errors - * including ClassCastException. + * possibly including ClassCastException. * @param t1 one task * @param t2 the other task * @throws NullPointerException if t1 or t2 are null @@ -532,11 +528,11 @@ public abstract class ForkJoinTask im } /** - * Forks the given tasks, returning when isDone holds + * Forks the given tasks, returning when {@code isDone} holds * for all of them. If any task encounters an exception, others * may be cancelled. This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors including ClassCastException. + * result in exceptions or errors possibly including ClassCastException. * @param tasks the array of tasks * @throws NullPointerException if tasks or any element are null. * @throws RuntimeException or Error if any task did so. @@ -576,11 +572,11 @@ public abstract class ForkJoinTask im /** * Forks all tasks in the collection, returning when - * isDone holds for all of them. If any task + * {@code isDone} holds for all of them. If any task * encounters an exception, others may be cancelled. This method * may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors including ClassCastException. + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. * @param tasks the collection of tasks * @throws NullPointerException if tasks or any element are null. * @throws RuntimeException or Error if any task did so. @@ -642,37 +638,14 @@ public abstract class ForkJoinTask im } /** - * Returns true if this task threw an exception or was cancelled - * @return true if this task threw an exception or was cancelled - */ - public final boolean isCompletedAbnormally() { - return (status & COMPLETION_MASK) < NORMAL; - } - - /** - * Returns the exception thrown by the base computation, or a - * CancellationException if cancelled, or null if none or if the - * method has not yet completed. - * @return the exception, or null if none - */ - public final Throwable getException() { - int s = status & COMPLETION_MASK; - if (s >= NORMAL) - return null; - if (s == CANCELLED) - return new CancellationException(); - return exceptionMap.get(this); - } - - /** * Asserts that the results of this task's computation will not be - * used. If a cancellation occurs before atempting to execute this - * task, then execution will be suppressed, isCancelled - * will report true, and join will result in a - * CancellationException being thrown. Otherwise, when + * used. If a cancellation occurs before attempting to execute this + * task, then execution will be suppressed, {@code isCancelled} + * will report true, and {@code join} will result in a + * {@code CancellationException} being thrown. Otherwise, when * cancellation races with completion, there are no guarantees - * about whether isCancelled will report true, whether - * join will return normally or via an exception, or + * about whether {@code isCancelled} will report true, whether + * {@code join} will return normally or via an exception, or * whether these behaviors will remain consistent upon repeated * invocation. * @@ -683,7 +656,7 @@ public abstract class ForkJoinTask im *

This method is designed to be invoked by other * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or - * invoke completeExceptionally. + * invoke {@code completeExceptionally}. * * @param mayInterruptIfRunning this value is ignored in the * default implementation because tasks are not in general @@ -697,13 +670,36 @@ public abstract class ForkJoinTask im } /** + * Returns true if this task threw an exception or was cancelled + * @return true if this task threw an exception or was cancelled + */ + public final boolean isCompletedAbnormally() { + return (status & COMPLETION_MASK) < NORMAL; + } + + /** + * Returns the exception thrown by the base computation, or a + * CancellationException if cancelled, or null if none or if the + * method has not yet completed. + * @return the exception, or null if none + */ + public final Throwable getException() { + int s = status & COMPLETION_MASK; + if (s >= NORMAL) + return null; + if (s == CANCELLED) + return new CancellationException(); + return exceptionMap.get(this); + } + + /** * Completes this task abnormally, and if not already aborted or * cancelled, causes it to throw the given exception upon - * join and related operations. This method may be used + * {@code join} and related operations. This method may be used * to induce exceptions in asynchronous tasks, or to force * completion of tasks that would not otherwise complete. Its use * in other situations is likely to be wrong. This method is - * overridable, but overridden versions must invoke super + * overridable, but overridden versions must invoke {@code super} * implementation to maintain guarantees. * * @param ex the exception to throw. If this exception is @@ -718,12 +714,12 @@ public abstract class ForkJoinTask im /** * Completes this task, and if not already aborted or cancelled, - * returning a null result upon join and related + * returning a {@code null} result upon {@code join} and related * operations. This method may be used to provide results for * asynchronous tasks, or to provide alternative handling for * tasks that would not otherwise complete normally. Its use in * other situations is likely to be wrong. This method is - * overridable, but overridden versions must invoke super + * overridable, but overridden versions must invoke {@code super} * implementation to maintain guarantees. * * @param value the result value for this task. @@ -738,37 +734,52 @@ public abstract class ForkJoinTask im setNormalCompletion(); } + public final V get() throws InterruptedException, ExecutionException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, true); + return reportFutureResult(); + } + + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, unit.toNanos(timeout)); + return reportTimedFutureResult(); + } + /** * Possibly executes other tasks until this task is ready, then * returns the result of the computation. This method may be more - * efficient than join, but is only applicable when - * there are no potemtial dependencies between continuation of the + * efficient than {@code join}, but is only applicable when + * there are no potential dependencies between continuation of the * current task and that of any other task that might be executed * while helping. (This usually holds for pure divide-and-conquer * tasks). This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * resul!t in exceptions or errors including ClassCastException. + * result in exceptions or errors possibly including ClassCastException. * @return the computed result */ public final V helpJoin() { ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); if (status < 0 || !w.unpushTask(this) || !tryExec()) - reportException(w.helpJoinTask(this)); + reportException(busyJoin(w)); return getRawResult(); } /** * Possibly executes other tasks until this task is ready. This * method may be invoked only from within ForkJoinTask - * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors including ClassCastException. + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. */ public final void quietlyHelpJoin() { if (status >= 0) { ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); if (!w.unpushTask(this) || !tryQuietlyInvoke()) - w.helpJoinTask(this); + busyJoin(w); } } @@ -799,8 +810,19 @@ public abstract class ForkJoinTask im } /** + * Possibly executes tasks until the pool hosting the current task + * {@link ForkJoinPool#isQuiescent}. This method may be of use in + * designs in which many tasks are forked, but none are explicitly + * joined, instead executing them until all are processed. + */ + public static void helpQuiesce() { + ((ForkJoinWorkerThread)(Thread.currentThread())). + helpQuiescePool(); + } + + /** * Resets the internal bookkeeping state of this task, allowing a - * subsequent fork. This method allows repeated reuse of + * subsequent {@code fork}. This method allows repeated reuse of * this task, but only if reuse occurs when this task has either * never been forked, or has been forked, then completed and all * outstanding joins of this task have also completed. Effects @@ -833,7 +855,7 @@ public abstract class ForkJoinTask im * alternative local processing of tasks that could have been, but * were not, stolen. This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors including ClassCastException. + * result in exceptions or errors possibly including ClassCastException. * @return true if unforked */ public boolean tryUnfork() { @@ -841,17 +863,6 @@ public abstract class ForkJoinTask im } /** - * Possibly executes tasks until the pool hosting the current task - * {@link ForkJoinPool#isQuiescent}. This method may be of use in - * designs in which many tasks are forked, but none are explicitly - * joined, instead executing them until all are processed. - */ - public static void helpQuiesce() { - ((ForkJoinWorkerThread)(Thread.currentThread())). - helpQuiescePool(); - } - - /** * Returns an estimate of the number of tasks that have been * forked by the current worker thread but not yet executed. This * value may be useful for heuristic decisions about whether to @@ -882,7 +893,7 @@ public abstract class ForkJoinTask im // Extension methods /** - * Returns the result that would be returned by join, + * Returns the result that would be returned by {@code join}, * even if this task completed abnormally, or null if this task is * not known to have been completed. This method is designed to * aid debugging, as well as to support extensions. Its use in any @@ -907,7 +918,7 @@ public abstract class ForkJoinTask im * called otherwise. The return value controls whether this task * is considered to be done normally. It may return false in * asynchronous actions that require explicit invocations of - * complete to become joinable. It may throw exceptions + * {@code complete} to become joinable. It may throw exceptions * to indicate abnormal exit. * @return true if completed normally * @throws Error or RuntimeException if encountered during computation @@ -915,12 +926,14 @@ public abstract class ForkJoinTask im protected abstract boolean exec(); /** - * Returns, but does not unschedule or execute, the task most - * recently forked by the current thread but not yet executed, if - * one is available. There is no guarantee that this task will - * actually be polled or executed next. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. + * Returns, but does not unschedule or execute, the task queued by + * the current thread but not yet executed, if one is + * available. There is no guarantee that this task will actually + * be polled or executed next. This method is designed primarily + * to support extensions, and is unlikely to be useful otherwise. + * This method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. * * @return the next task, or null if none are available */ @@ -929,32 +942,38 @@ public abstract class ForkJoinTask im } /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed. This method + * is designed primarily to support extensions, and is unlikely to + * be useful otherwise. This method may be invoked only from + * within ForkJoinTask computations. Attempts to invoke in other + * contexts result in exceptions or errors possibly including + * ClassCastException. * * @return the next task, or null if none are available */ protected static ForkJoinTask pollNextLocalTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask(); + return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask(); } /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed, if - * one is available, or if not available, a task that was forked - * by some other thread, if available. Availability may be - * transient, so a null result does not necessarily - * imply quiecence of the pool this task is operating in. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. - * + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed, if one is + * available, or if not available, a task that was forked by some + * other thread, if available. Availability may be transient, so a + * {@code null} result does not necessarily imply quiescence + * of the pool this task is operating in. This method is designed + * primarily to support extensions, and is unlikely to be useful + * otherwise. This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * result in exceptions or errors possibly including + * ClassCastException. + * * @return a task, or null if none are available */ protected static ForkJoinTask pollTask() { return ((ForkJoinWorkerThread)(Thread.currentThread())). - getLocalOrStolenTask(); + pollTask(); } // Serialization support @@ -989,22 +1008,45 @@ public abstract class ForkJoinTask im } // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinTask.class.getDeclaredField(fieldName)); + } static final Unsafe _unsafe; static final long statusOffset; static { try { - if (ForkJoinTask.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - statusOffset = _unsafe.objectFieldOffset - (ForkJoinTask.class.getDeclaredField("status")); - } catch (Exception ex) { throw new Error(ex); } + _unsafe = getUnsafe(); + statusOffset = fieldOffset("status"); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } } }