--- jsr166/src/jsr166y/ForkJoinTask.java 2009/01/07 16:07:37 1.2 +++ jsr166/src/jsr166y/ForkJoinTask.java 2009/07/16 15:32:34 1.6 @@ -18,7 +18,7 @@ import java.lang.reflect.*; * lighter weight than a normal thread. Huge numbers of tasks and * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. - * + * *

A "main" ForkJoinTask begins execution when submitted to a * {@link ForkJoinPool}. Once started, it will usually in turn start * other subtasks. As indicated by the name of this class, many @@ -28,7 +28,7 @@ import java.lang.reflect.*; * of other methods that can come into play in advanced usages, as * well as extension mechanics that allow support of new forms of * fork/join processing. - * + * *

A ForkJoinTask is a lightweight form of {@link Future}. The * efficiency of ForkJoinTasks stems from a set of restrictions (that * are only partially statically enforceable) reflecting their @@ -82,7 +82,7 @@ import java.lang.reflect.*; * instances of different task subclasses to call each others * methods), some of them may only be called from within other * ForkJoinTasks. Attempts to invoke them in other contexts result in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. * *

Most base support methods are final because their * implementations are intrinsically tied to the underlying @@ -257,7 +257,7 @@ public abstract class ForkJoinTask im * surrounded with pool notifications. * @return status upon exit */ - final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { + private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) { ForkJoinPool pool = w == null? null : w.pool; int s; while ((s = status) >= 0) { @@ -276,7 +276,7 @@ public abstract class ForkJoinTask im * Timed version of awaitDone * @return status upon exit */ - final int awaitDone(ForkJoinWorkerThread w, long nanos) { + private int awaitDone(ForkJoinWorkerThread w, long nanos) { ForkJoinPool pool = w == null? null : w.pool; int s; while ((s = status) >= 0) { @@ -330,7 +330,7 @@ public abstract class ForkJoinTask im if (w == null) Thread.currentThread().interrupt(); // re-interrupt else if (w.isTerminating()) - cancelIgnoreExceptions(); + cancelIgnoringExceptions(); // else if FJworker, ignore interrupt } @@ -449,13 +449,24 @@ public abstract class ForkJoinTask im /** * Cancel, ignoring any exceptions it throws */ - final void cancelIgnoreExceptions() { + final void cancelIgnoringExceptions() { try { cancel(false); } catch(Throwable ignore) { } } + /** + * Main implementation of helpJoin + */ + private int busyJoin(ForkJoinWorkerThread w) { + int s; + ForkJoinTask t; + while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null) + t.quietlyExec(); + return (s >= 0)? awaitDone(w, false) : s; // block if no work + } + // public methods /** @@ -464,7 +475,7 @@ public abstract class ForkJoinTask im * than once unless it has completed and been reinitialized. This * method may be invoked only from within ForkJoinTask * computations. Attempts to invoke in other contexts result in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. */ public final void fork() { ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this); @@ -485,21 +496,6 @@ public abstract class ForkJoinTask im return getRawResult(); } - public final V get() throws InterruptedException, ExecutionException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, true); - return reportFutureResult(); - } - - public final V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - ForkJoinWorkerThread w = getWorker(); - if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, unit.toNanos(timeout)); - return reportTimedFutureResult(); - } - /** * Commences performing this task, awaits its completion if * necessary, and return its result. @@ -519,7 +515,7 @@ public abstract class ForkJoinTask im * both of them or an exception is encountered. This method may be * invoked only from within ForkJoinTask computations. Attempts to * invoke in other contexts result in exceptions or errors - * including ClassCastException. + * possibly including ClassCastException. * @param t1 one task * @param t2 the other task * @throws NullPointerException if t1 or t2 are null @@ -536,7 +532,7 @@ public abstract class ForkJoinTask im * for all of them. If any task encounters an exception, others * may be cancelled. This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors including ClassCastException. + * result in exceptions or errors possibly including ClassCastException. * @param tasks the array of tasks * @throws NullPointerException if tasks or any element are null. * @throws RuntimeException or Error if any task did so. @@ -580,7 +576,7 @@ public abstract class ForkJoinTask im * encounters an exception, others may be cancelled. This method * may be invoked only from within ForkJoinTask * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. * @param tasks the collection of tasks * @throws NullPointerException if tasks or any element are null. * @throws RuntimeException or Error if any task did so. @@ -642,29 +638,6 @@ public abstract class ForkJoinTask im } /** - * Returns true if this task threw an exception or was cancelled - * @return true if this task threw an exception or was cancelled - */ - public final boolean isCompletedAbnormally() { - return (status & COMPLETION_MASK) < NORMAL; - } - - /** - * Returns the exception thrown by the base computation, or a - * CancellationException if cancelled, or null if none or if the - * method has not yet completed. - * @return the exception, or null if none - */ - public final Throwable getException() { - int s = status & COMPLETION_MASK; - if (s >= NORMAL) - return null; - if (s == CANCELLED) - return new CancellationException(); - return exceptionMap.get(this); - } - - /** * Asserts that the results of this task's computation will not be * used. If a cancellation occurs before atempting to execute this * task, then execution will be suppressed, isCancelled @@ -697,6 +670,29 @@ public abstract class ForkJoinTask im } /** + * Returns true if this task threw an exception or was cancelled + * @return true if this task threw an exception or was cancelled + */ + public final boolean isCompletedAbnormally() { + return (status & COMPLETION_MASK) < NORMAL; + } + + /** + * Returns the exception thrown by the base computation, or a + * CancellationException if cancelled, or null if none or if the + * method has not yet completed. + * @return the exception, or null if none + */ + public final Throwable getException() { + int s = status & COMPLETION_MASK; + if (s >= NORMAL) + return null; + if (s == CANCELLED) + return new CancellationException(); + return exceptionMap.get(this); + } + + /** * Completes this task abnormally, and if not already aborted or * cancelled, causes it to throw the given exception upon * join and related operations. This method may be used @@ -738,6 +734,21 @@ public abstract class ForkJoinTask im setNormalCompletion(); } + public final V get() throws InterruptedException, ExecutionException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, true); + return reportFutureResult(); + } + + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + ForkJoinWorkerThread w = getWorker(); + if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) + awaitDone(w, unit.toNanos(timeout)); + return reportTimedFutureResult(); + } + /** * Possibly executes other tasks until this task is ready, then * returns the result of the computation. This method may be more @@ -747,13 +758,13 @@ public abstract class ForkJoinTask im * while helping. (This usually holds for pure divide-and-conquer * tasks). This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * resul!t in exceptions or errors including ClassCastException. + * resul!t in exceptions or errors possibly including ClassCastException. * @return the computed result */ public final V helpJoin() { ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); if (status < 0 || !w.unpushTask(this) || !tryExec()) - reportException(w.helpJoinTask(this)); + reportException(busyJoin(w)); return getRawResult(); } @@ -761,14 +772,14 @@ public abstract class ForkJoinTask im * Possibly executes other tasks until this task is ready. This * method may be invoked only from within ForkJoinTask * computations. Attempts to invoke in other contexts resul!t in - * exceptions or errors including ClassCastException. + * exceptions or errors possibly including ClassCastException. */ public final void quietlyHelpJoin() { if (status >= 0) { ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread()); if (!w.unpushTask(this) || !tryQuietlyInvoke()) - w.helpJoinTask(this); + busyJoin(w); } } @@ -799,6 +810,17 @@ public abstract class ForkJoinTask im } /** + * Possibly executes tasks until the pool hosting the current task + * {@link ForkJoinPool#isQuiescent}. This method may be of use in + * designs in which many tasks are forked, but none are explicitly + * joined, instead executing them until all are processed. + */ + public static void helpQuiesce() { + ((ForkJoinWorkerThread)(Thread.currentThread())). + helpQuiescePool(); + } + + /** * Resets the internal bookkeeping state of this task, allowing a * subsequent fork. This method allows repeated reuse of * this task, but only if reuse occurs when this task has either @@ -833,7 +855,7 @@ public abstract class ForkJoinTask im * alternative local processing of tasks that could have been, but * were not, stolen. This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors including ClassCastException. + * result in exceptions or errors possibly including ClassCastException. * @return true if unforked */ public boolean tryUnfork() { @@ -841,17 +863,6 @@ public abstract class ForkJoinTask im } /** - * Possibly executes tasks until the pool hosting the current task - * {@link ForkJoinPool#isQuiescent}. This method may be of use in - * designs in which many tasks are forked, but none are explicitly - * joined, instead executing them until all are processed. - */ - public static void helpQuiesce() { - ((ForkJoinWorkerThread)(Thread.currentThread())). - helpQuiescePool(); - } - - /** * Returns an estimate of the number of tasks that have been * forked by the current worker thread but not yet executed. This * value may be useful for heuristic decisions about whether to @@ -915,12 +926,14 @@ public abstract class ForkJoinTask im protected abstract boolean exec(); /** - * Returns, but does not unschedule or execute, the task most - * recently forked by the current thread but not yet executed, if - * one is available. There is no guarantee that this task will - * actually be polled or executed next. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. + * Returns, but does not unschedule or execute, the task queued by + * the current thread but not yet executed, if one is + * available. There is no guarantee that this task will actually + * be polled or executed next. This method is designed primarily + * to support extensions, and is unlikely to be useful otherwise. + * This method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. * * @return the next task, or null if none are available */ @@ -929,32 +942,38 @@ public abstract class ForkJoinTask im } /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed. This method + * is designed primarily to support extensions, and is unlikely to + * be useful otherwise. This method may be invoked only from + * within ForkJoinTask computations. Attempts to invoke in other + * contexts result in exceptions or errors possibly including + * ClassCastException. * * @return the next task, or null if none are available */ protected static ForkJoinTask pollNextLocalTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask(); + return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask(); } - + /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed, if - * one is available, or if not available, a task that was forked - * by some other thread, if available. Availability may be - * transient, so a null result does not necessarily - * imply quiecence of the pool this task is operating in. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. - * + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed, if one is + * available, or if not available, a task that was forked by some + * other thread, if available. Availability may be transient, so a + * null result does not necessarily imply quiecence + * of the pool this task is operating in. This method is designed + * primarily to support extensions, and is unlikely to be useful + * otherwise. This method may be invoked only from within + * ForkJoinTask computations. Attempts to invoke in other contexts + * result in exceptions or errors possibly including + * ClassCastException. + * * @return a task, or null if none are available */ protected static ForkJoinTask pollTask() { return ((ForkJoinWorkerThread)(Thread.currentThread())). - getLocalOrStolenTask(); + pollTask(); } // Serialization support @@ -989,22 +1008,45 @@ public abstract class ForkJoinTask im } // Temporary Unsafe mechanics for preliminary release + private static Unsafe getUnsafe() throws Throwable { + try { + return Unsafe.getUnsafe(); + } catch (SecurityException se) { + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public Unsafe run() throws Exception { + return getUnsafePrivileged(); + }}); + } catch (java.security.PrivilegedActionException e) { + throw e.getCause(); + } + } + } + + private static Unsafe getUnsafePrivileged() + throws NoSuchFieldException, IllegalAccessException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } + + private static long fieldOffset(String fieldName) + throws NoSuchFieldException { + return _unsafe.objectFieldOffset + (ForkJoinTask.class.getDeclaredField(fieldName)); + } static final Unsafe _unsafe; static final long statusOffset; static { try { - if (ForkJoinTask.class.getClassLoader() != null) { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - _unsafe = (Unsafe)f.get(null); - } - else - _unsafe = Unsafe.getUnsafe(); - statusOffset = _unsafe.objectFieldOffset - (ForkJoinTask.class.getDeclaredField("status")); - } catch (Exception ex) { throw new Error(ex); } + _unsafe = getUnsafe(); + statusOffset = fieldOffset("status"); + } catch (Throwable e) { + throw new RuntimeException("Could not initialize intrinsics", e); + } } }