--- jsr166/src/jsr166y/ForkJoinTask.java 2009/01/07 16:07:37 1.2
+++ jsr166/src/jsr166y/ForkJoinTask.java 2009/07/20 21:45:06 1.7
@@ -18,7 +18,7 @@ import java.lang.reflect.*;
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
- *
+ *
*
A "main" ForkJoinTask begins execution when submitted to a
* {@link ForkJoinPool}. Once started, it will usually in turn start
* other subtasks. As indicated by the name of this class, many
@@ -28,7 +28,7 @@ import java.lang.reflect.*;
* of other methods that can come into play in advanced usages, as
* well as extension mechanics that allow support of new forms of
* fork/join processing.
- *
+ *
*
A ForkJoinTask is a lightweight form of {@link Future}. The
* efficiency of ForkJoinTasks stems from a set of restrictions (that
* are only partially statically enforceable) reflecting their
@@ -82,7 +82,7 @@ import java.lang.reflect.*;
* instances of different task subclasses to call each others
* methods), some of them may only be called from within other
* ForkJoinTasks. Attempts to invoke them in other contexts result in
- * exceptions or errors including ClassCastException.
+ * exceptions or errors possibly including ClassCastException.
*
*
Most base support methods are final
because their
* implementations are intrinsically tied to the underlying
@@ -257,7 +257,7 @@ public abstract class ForkJoinTask im
* surrounded with pool notifications.
* @return status upon exit
*/
- final int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
+ private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
ForkJoinPool pool = w == null? null : w.pool;
int s;
while ((s = status) >= 0) {
@@ -276,7 +276,7 @@ public abstract class ForkJoinTask im
* Timed version of awaitDone
* @return status upon exit
*/
- final int awaitDone(ForkJoinWorkerThread w, long nanos) {
+ private int awaitDone(ForkJoinWorkerThread w, long nanos) {
ForkJoinPool pool = w == null? null : w.pool;
int s;
while ((s = status) >= 0) {
@@ -330,7 +330,7 @@ public abstract class ForkJoinTask im
if (w == null)
Thread.currentThread().interrupt(); // re-interrupt
else if (w.isTerminating())
- cancelIgnoreExceptions();
+ cancelIgnoringExceptions();
// else if FJworker, ignore interrupt
}
@@ -449,13 +449,24 @@ public abstract class ForkJoinTask im
/**
* Cancel, ignoring any exceptions it throws
*/
- final void cancelIgnoreExceptions() {
+ final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch(Throwable ignore) {
}
}
+ /**
+ * Main implementation of helpJoin
+ */
+ private int busyJoin(ForkJoinWorkerThread w) {
+ int s;
+ ForkJoinTask> t;
+ while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
+ t.quietlyExec();
+ return (s >= 0)? awaitDone(w, false) : s; // block if no work
+ }
+
// public methods
/**
@@ -464,7 +475,7 @@ public abstract class ForkJoinTask im
* than once unless it has completed and been reinitialized. This
* method may be invoked only from within ForkJoinTask
* computations. Attempts to invoke in other contexts result in
- * exceptions or errors including ClassCastException.
+ * exceptions or errors possibly including ClassCastException.
*/
public final void fork() {
((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
@@ -485,21 +496,6 @@ public abstract class ForkJoinTask im
return getRawResult();
}
- public final V get() throws InterruptedException, ExecutionException {
- ForkJoinWorkerThread w = getWorker();
- if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
- awaitDone(w, true);
- return reportFutureResult();
- }
-
- public final V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- ForkJoinWorkerThread w = getWorker();
- if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
- awaitDone(w, unit.toNanos(timeout));
- return reportTimedFutureResult();
- }
-
/**
* Commences performing this task, awaits its completion if
* necessary, and return its result.
@@ -519,7 +515,7 @@ public abstract class ForkJoinTask im
* both of them or an exception is encountered. This method may be
* invoked only from within ForkJoinTask computations. Attempts to
* invoke in other contexts result in exceptions or errors
- * including ClassCastException.
+ * possibly including ClassCastException.
* @param t1 one task
* @param t2 the other task
* @throws NullPointerException if t1 or t2 are null
@@ -536,7 +532,7 @@ public abstract class ForkJoinTask im
* for all of them. If any task encounters an exception, others
* may be cancelled. This method may be invoked only from within
* ForkJoinTask computations. Attempts to invoke in other contexts
- * result in exceptions or errors including ClassCastException.
+ * result in exceptions or errors possibly including ClassCastException.
* @param tasks the array of tasks
* @throws NullPointerException if tasks or any element are null.
* @throws RuntimeException or Error if any task did so.
@@ -580,7 +576,7 @@ public abstract class ForkJoinTask im
* encounters an exception, others may be cancelled. This method
* may be invoked only from within ForkJoinTask
* computations. Attempts to invoke in other contexts resul!t in
- * exceptions or errors including ClassCastException.
+ * exceptions or errors possibly including ClassCastException.
* @param tasks the collection of tasks
* @throws NullPointerException if tasks or any element are null.
* @throws RuntimeException or Error if any task did so.
@@ -642,29 +638,6 @@ public abstract class ForkJoinTask im
}
/**
- * Returns true if this task threw an exception or was cancelled
- * @return true if this task threw an exception or was cancelled
- */
- public final boolean isCompletedAbnormally() {
- return (status & COMPLETION_MASK) < NORMAL;
- }
-
- /**
- * Returns the exception thrown by the base computation, or a
- * CancellationException if cancelled, or null if none or if the
- * method has not yet completed.
- * @return the exception, or null if none
- */
- public final Throwable getException() {
- int s = status & COMPLETION_MASK;
- if (s >= NORMAL)
- return null;
- if (s == CANCELLED)
- return new CancellationException();
- return exceptionMap.get(this);
- }
-
- /**
* Asserts that the results of this task's computation will not be
* used. If a cancellation occurs before atempting to execute this
* task, then execution will be suppressed, isCancelled
@@ -697,6 +670,29 @@ public abstract class ForkJoinTask im
}
/**
+ * Returns true if this task threw an exception or was cancelled
+ * @return true if this task threw an exception or was cancelled
+ */
+ public final boolean isCompletedAbnormally() {
+ return (status & COMPLETION_MASK) < NORMAL;
+ }
+
+ /**
+ * Returns the exception thrown by the base computation, or a
+ * CancellationException if cancelled, or null if none or if the
+ * method has not yet completed.
+ * @return the exception, or null if none
+ */
+ public final Throwable getException() {
+ int s = status & COMPLETION_MASK;
+ if (s >= NORMAL)
+ return null;
+ if (s == CANCELLED)
+ return new CancellationException();
+ return exceptionMap.get(this);
+ }
+
+ /**
* Completes this task abnormally, and if not already aborted or
* cancelled, causes it to throw the given exception upon
* join
and related operations. This method may be used
@@ -738,6 +734,21 @@ public abstract class ForkJoinTask im
setNormalCompletion();
}
+ public final V get() throws InterruptedException, ExecutionException {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
+ awaitDone(w, true);
+ return reportFutureResult();
+ }
+
+ public final V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ ForkJoinWorkerThread w = getWorker();
+ if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
+ awaitDone(w, unit.toNanos(timeout));
+ return reportTimedFutureResult();
+ }
+
/**
* Possibly executes other tasks until this task is ready, then
* returns the result of the computation. This method may be more
@@ -747,13 +758,13 @@ public abstract class ForkJoinTask im
* while helping. (This usually holds for pure divide-and-conquer
* tasks). This method may be invoked only from within
* ForkJoinTask computations. Attempts to invoke in other contexts
- * resul!t in exceptions or errors including ClassCastException.
+ * resul!t in exceptions or errors possibly including ClassCastException.
* @return the computed result
*/
public final V helpJoin() {
ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
if (status < 0 || !w.unpushTask(this) || !tryExec())
- reportException(w.helpJoinTask(this));
+ reportException(busyJoin(w));
return getRawResult();
}
@@ -761,14 +772,14 @@ public abstract class ForkJoinTask im
* Possibly executes other tasks until this task is ready. This
* method may be invoked only from within ForkJoinTask
* computations. Attempts to invoke in other contexts resul!t in
- * exceptions or errors including ClassCastException.
+ * exceptions or errors possibly including ClassCastException.
*/
public final void quietlyHelpJoin() {
if (status >= 0) {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread)(Thread.currentThread());
if (!w.unpushTask(this) || !tryQuietlyInvoke())
- w.helpJoinTask(this);
+ busyJoin(w);
}
}
@@ -799,6 +810,17 @@ public abstract class ForkJoinTask im
}
/**
+ * Possibly executes tasks until the pool hosting the current task
+ * {@link ForkJoinPool#isQuiescent}. This method may be of use in
+ * designs in which many tasks are forked, but none are explicitly
+ * joined, instead executing them until all are processed.
+ */
+ public static void helpQuiesce() {
+ ((ForkJoinWorkerThread)(Thread.currentThread())).
+ helpQuiescePool();
+ }
+
+ /**
* Resets the internal bookkeeping state of this task, allowing a
* subsequent fork
. This method allows repeated reuse of
* this task, but only if reuse occurs when this task has either
@@ -833,7 +855,7 @@ public abstract class ForkJoinTask im
* alternative local processing of tasks that could have been, but
* were not, stolen. This method may be invoked only from within
* ForkJoinTask computations. Attempts to invoke in other contexts
- * result in exceptions or errors including ClassCastException.
+ * result in exceptions or errors possibly including ClassCastException.
* @return true if unforked
*/
public boolean tryUnfork() {
@@ -841,17 +863,6 @@ public abstract class ForkJoinTask im
}
/**
- * Possibly executes tasks until the pool hosting the current task
- * {@link ForkJoinPool#isQuiescent}. This method may be of use in
- * designs in which many tasks are forked, but none are explicitly
- * joined, instead executing them until all are processed.
- */
- public static void helpQuiesce() {
- ((ForkJoinWorkerThread)(Thread.currentThread())).
- helpQuiescePool();
- }
-
- /**
* Returns an estimate of the number of tasks that have been
* forked by the current worker thread but not yet executed. This
* value may be useful for heuristic decisions about whether to
@@ -915,12 +926,14 @@ public abstract class ForkJoinTask im
protected abstract boolean exec();
/**
- * Returns, but does not unschedule or execute, the task most
- * recently forked by the current thread but not yet executed, if
- * one is available. There is no guarantee that this task will
- * actually be polled or executed next.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
+ * Returns, but does not unschedule or execute, the task queued by
+ * the current thread but not yet executed, if one is
+ * available. There is no guarantee that this task will actually
+ * be polled or executed next. This method is designed primarily
+ * to support extensions, and is unlikely to be useful otherwise.
+ * This method may be invoked only from within ForkJoinTask
+ * computations. Attempts to invoke in other contexts result in
+ * exceptions or errors possibly including ClassCastException.
*
* @return the next task, or null if none are available
*/
@@ -929,32 +942,38 @@ public abstract class ForkJoinTask im
}
/**
- * Unschedules and returns, without executing, the task most
- * recently forked by the current thread but not yet executed.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed. This method
+ * is designed primarily to support extensions, and is unlikely to
+ * be useful otherwise. This method may be invoked only from
+ * within ForkJoinTask computations. Attempts to invoke in other
+ * contexts result in exceptions or errors possibly including
+ * ClassCastException.
*
* @return the next task, or null if none are available
*/
protected static ForkJoinTask> pollNextLocalTask() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
+ return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
}
/**
- * Unschedules and returns, without executing, the task most
- * recently forked by the current thread but not yet executed, if
- * one is available, or if not available, a task that was forked
- * by some other thread, if available. Availability may be
- * transient, so a null
result does not necessarily
- * imply quiecence of the pool this task is operating in.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
- *
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed, if one is
+ * available, or if not available, a task that was forked by some
+ * other thread, if available. Availability may be transient, so a
+ * null
result does not necessarily imply quiecence
+ * of the pool this task is operating in. This method is designed
+ * primarily to support extensions, and is unlikely to be useful
+ * otherwise. This method may be invoked only from within
+ * ForkJoinTask computations. Attempts to invoke in other contexts
+ * result in exceptions or errors possibly including
+ * ClassCastException.
+ *
* @return a task, or null if none are available
*/
protected static ForkJoinTask> pollTask() {
return ((ForkJoinWorkerThread)(Thread.currentThread())).
- getLocalOrStolenTask();
+ pollTask();
}
// Serialization support
@@ -989,22 +1008,45 @@ public abstract class ForkJoinTask im
}
// Temporary Unsafe mechanics for preliminary release
+ private static Unsafe getUnsafe() throws Throwable {
+ try {
+ return Unsafe.getUnsafe();
+ } catch (SecurityException se) {
+ try {
+ return java.security.AccessController.doPrivileged
+ (new java.security.PrivilegedExceptionAction() {
+ public Unsafe run() throws Exception {
+ return getUnsafePrivileged();
+ }});
+ } catch (java.security.PrivilegedActionException e) {
+ throw e.getCause();
+ }
+ }
+ }
+
+ private static Unsafe getUnsafePrivileged()
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ }
+
+ private static long fieldOffset(String fieldName)
+ throws NoSuchFieldException {
+ return _unsafe.objectFieldOffset
+ (ForkJoinTask.class.getDeclaredField(fieldName));
+ }
static final Unsafe _unsafe;
static final long statusOffset;
static {
try {
- if (ForkJoinTask.class.getClassLoader() != null) {
- Field f = Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- _unsafe = (Unsafe)f.get(null);
- }
- else
- _unsafe = Unsafe.getUnsafe();
- statusOffset = _unsafe.objectFieldOffset
- (ForkJoinTask.class.getDeclaredField("status"));
- } catch (Exception ex) { throw new Error(ex); }
+ _unsafe = getUnsafe();
+ statusOffset = fieldOffset("status");
+ } catch (Throwable e) {
+ throw new RuntimeException("Could not initialize intrinsics", e);
+ }
}
}