--- jsr166/src/jsr166y/ForkJoinTask.java 2010/09/04 00:21:31 1.56
+++ jsr166/src/jsr166y/ForkJoinTask.java 2010/11/28 21:21:03 1.73
@@ -6,8 +6,6 @@
package jsr166y;
-import java.util.concurrent.*;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
@@ -15,6 +13,16 @@ import java.util.List;
import java.util.RandomAccess;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@@ -28,10 +36,10 @@ import java.util.WeakHashMap;
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
- * #invokeAll}. However, this class also provides a number of other
- * methods that can come into play in advanced usages, as well as
- * extension mechanics that allow support of new forms of fork/join
- * processing.
+ * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
+ * provides a number of other methods that can come into play in
+ * advanced usages, as well as extension mechanics that allow
+ * support of new forms of fork/join processing.
*
*
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -100,7 +108,17 @@ import java.util.WeakHashMap;
* ForkJoinTasks (as may be determined using method {@link
* #inForkJoinPool}). Attempts to invoke them in other contexts
* result in exceptions or errors, possibly including
- * ClassCastException.
+ * {@code ClassCastException}.
+ *
+ *
Method {@link #join} and its variants are appropriate for use
+ * only when completion dependencies are acyclic; that is, the
+ * parallel computation can be described as a directed acyclic graph
+ * (DAG). Otherwise, executions may encounter a form of deadlock as
+ * tasks cyclically wait for each other. However, this framework
+ * supports other methods and techniques (for example the use of
+ * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
+ * may be of use in constructing custom subclasses for problems that
+ * are not statically structured as DAGs.
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -116,9 +134,10 @@ import java.util.WeakHashMap;
* computation. Large tasks should be split into smaller subtasks,
* usually via recursive decomposition. As a very rough rule of thumb,
* a task should perform more than 100 and less than 10000 basic
- * computational steps. If tasks are too big, then parallelism cannot
- * improve throughput. If too small, then memory and internal task
- * maintenance overhead may overwhelm processing.
+ * computational steps, and should avoid indefinite looping. If tasks
+ * are too big, then parallelism cannot improve throughput. If too
+ * small, then memory and internal task maintenance overhead may
+ * overwhelm processing.
*
*
This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
@@ -153,7 +172,7 @@ public abstract class ForkJoinTask im
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
- * COMPLETED. CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
+ * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
@@ -206,7 +225,7 @@ public abstract class ForkJoinTask im
/**
* Records exception and sets exceptional completion.
- *
+ *
* @return status on exit
*/
private void setExceptionalCompletion(Throwable rex) {
@@ -215,65 +234,83 @@ public abstract class ForkJoinTask im
}
/**
- * Blocks a worker thread until completion. Called only by
- * pool. Currently unused -- pool-based waits use timeout
- * version below.
- */
- final void internalAwaitDone() {
- int s; // the odd construction reduces lock bias effects
- while ((s = status) >= 0) {
- try {
- synchronized(this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait();
- }
- } catch (InterruptedException ie) {
- cancelIfTerminating();
- }
- }
- }
-
- /**
* Blocks a worker thread until completed or timed out. Called
* only by pool.
- *
- * @return status on exit
*/
- final int internalAwaitDone(long millis) {
- int s;
- if ((s = status) >= 0) {
- try {
- synchronized(this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait(millis, 0);
+ final void internalAwaitDone(long millis, int nanos) {
+ int s = status;
+ if ((s == 0 &&
+ UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
+ s > 0) {
+ try { // the odd construction reduces lock bias effects
+ synchronized (this) {
+ if (status > 0)
+ wait(millis, nanos);
+ else
+ notifyAll();
}
} catch (InterruptedException ie) {
cancelIfTerminating();
}
- s = status;
}
- return s;
}
/**
* Blocks a non-worker-thread until completion.
*/
private void externalAwaitDone() {
- int s;
- while ((s = status) >= 0) {
- synchronized(this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
- boolean interrupted = false;
- while (status >= 0) {
+ if (status >= 0) {
+ boolean interrupted = false;
+ synchronized (this) {
+ for (;;) {
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
- break;
+ }
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Blocks a non-worker-thread until completion or interruption or timeout.
+ */
+ private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+ throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (status >= 0) {
+ long startTime = timed ? System.nanoTime() : 0L;
+ synchronized (this) {
+ for (;;) {
+ long nt;
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else if (!timed)
+ wait();
+ else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
+ wait(nt / 1000000, (int)(nt % 1000000));
+ else
+ break;
}
}
}
@@ -308,7 +345,7 @@ public abstract class ForkJoinTask im
* #isDone} returning {@code true}.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -322,10 +359,13 @@ public abstract class ForkJoinTask im
}
/**
- * Returns the result of the computation when it {@link #isDone is done}.
- * This method differs from {@link #get()} in that
+ * Returns the result of the computation when it {@link #isDone is
+ * done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
- * {@code Error}, not {@code ExecutionException}.
+ * {@code Error}, not {@code ExecutionException}, and that
+ * interrupts of the calling thread do not cause the
+ * method to abruptly return by throwing {@code
+ * InterruptedException}.
*
* @return the computed result
*/
@@ -340,7 +380,8 @@ public abstract class ForkJoinTask im
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
- * exception if the underlying computation did so.
+ * {@code RuntimeException} or {@code Error} if the underlying
+ * computation did so.
*
* @return the computed result
*/
@@ -355,14 +396,18 @@ public abstract class ForkJoinTask im
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If either task encounters an
- * exception, the other one may be, but is not guaranteed to be,
- * cancelled. If both tasks throw an exception, then this method
- * throws one of them. The individual status of each task may be
- * checked using {@link #getException()} and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, the
+ * other may be cancelled. However, the execution status of
+ * individual tasks is not guaranteed upon exceptional return. The
+ * status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -380,15 +425,17 @@ public abstract class ForkJoinTask im
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If any task encounters an
- * exception, others may be, but are not guaranteed to be,
- * cancelled. If more than one task encounters an exception, then
- * this method throws any one of these exceptions. The individual
- * status of each task may be checked using {@link #getException()}
- * and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, others
+ * may be cancelled. However, the execution status of individual
+ * tasks is not guaranteed upon exceptional return. The status of
+ * each task may be obtained using {@link #getException()} and
+ * related methods to check if they have been cancelled, completed
+ * normally or exceptionally, or left unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -432,17 +479,18 @@ public abstract class ForkJoinTask im
/**
* Forks all tasks in the specified collection, returning when
* {@code isDone} holds for each task or an (unchecked) exception
- * is encountered. If any task encounters an exception, others
- * may be, but are not guaranteed to be, cancelled. If more than
- * one task encounters an exception, then this method throws any
- * one of these exceptions. The individual status of each task
- * may be checked using {@link #getException()} and related
- * methods. The behavior of this operation is undefined if the
- * specified collection is modified while the operation is in
- * progress.
+ * is encountered, in which case the exception is rethrown. If
+ * more than one task encounters an exception, then this method
+ * throws any one of these exceptions. If any task encounters an
+ * exception, others may be cancelled. However, the execution
+ * status of individual tasks is not guaranteed upon exceptional
+ * return. The status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -494,25 +542,28 @@ public abstract class ForkJoinTask im
/**
* Attempts to cancel execution of this task. This attempt will
- * fail if the task has already completed, has already been
- * cancelled, or could not be cancelled for some other reason. If
- * successful, and this task has not started when cancel is
- * called, execution of this task is suppressed, {@link
- * #isCancelled} will report true, and {@link #join} will result
- * in a {@code CancellationException} being thrown.
+ * fail if the task has already completed or could not be
+ * cancelled for some other reason. If successful, and this task
+ * has not started when {@code cancel} is called, execution of
+ * this task is suppressed. After this method returns
+ * successfully, unless there is an intervening call to {@link
+ * #reinitialize}, subsequent calls to {@link #isCancelled},
+ * {@link #isDone}, and {@code cancel} will return {@code true}
+ * and calls to {@link #join} and related methods will result in
+ * {@code CancellationException}.
*
* This method may be overridden in subclasses, but if so, must
- * still ensure that these minimal properties hold. In particular,
- * the {@code cancel} method itself must not throw exceptions.
+ * still ensure that these properties hold. In particular, the
+ * {@code cancel} method itself must not throw exceptions.
*
*
This method is designed to be invoked by other
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
- * @param mayInterruptIfRunning this value is ignored in the
- * default implementation because tasks are not
- * cancelled via interruption
+ * @param mayInterruptIfRunning this value has no effect in the
+ * default implementation because interrupts are not used to
+ * control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
@@ -613,13 +664,14 @@ public abstract class ForkJoinTask im
/**
* Completes this task, and if not already aborted or cancelled,
- * returning a {@code null} result upon {@code join} and related
- * operations. This method may be used to provide results for
- * asynchronous tasks, or to provide alternative handling for
- * tasks that would not otherwise complete normally. Its use in
- * other situations is discouraged. This method is
- * overridable, but overridden versions must invoke {@code super}
- * implementation to maintain guarantees.
+ * returning the given value as the result of subsequent
+ * invocations of {@code join} and related operations. This method
+ * may be used to provide results for asynchronous tasks, or to
+ * provide alternative handling for tasks that would not otherwise
+ * complete normally. Its use in other situations is
+ * discouraged. This method is overridable, but overridden
+ * versions must invoke {@code super} implementation to maintain
+ * guarantees.
*
* @param value the result value for this task
*/
@@ -633,12 +685,25 @@ public abstract class ForkJoinTask im
setCompletion(NORMAL);
}
+ /**
+ * Waits if necessary for the computation to complete, and then
+ * retrieves its result.
+ *
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ */
public final V get() throws InterruptedException, ExecutionException {
- quietlyJoin();
- if (Thread.interrupted())
- throw new InterruptedException();
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread)
+ quietlyJoin();
+ else
+ externalInterruptibleAwaitDone(false, 0L);
int s = status;
- if (s < NORMAL) {
+ if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -648,74 +713,34 @@ public abstract class ForkJoinTask im
return getRawResult();
}
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ * @throws TimeoutException if the wait timed out
+ */
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ long nanos = unit.toNanos(timeout);
Thread t = Thread.currentThread();
- ForkJoinPool pool;
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- if (status >= 0 && w.unpushTask(this))
- quietlyExec();
- pool = w.pool;
- }
+ if (t instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
else
- pool = null;
- /*
- * Timed wait loop intermixes cases for FJ (pool != null) and
- * non FJ threads. For FJ, decrement pool count but don't try
- * for replacement; increment count on completion. For non-FJ,
- * deal with interrupts. This is messy, but a little less so
- * than is splitting the FJ and nonFJ cases.
- */
- boolean interrupted = false;
- boolean dec = false; // true if pool count decremented
- long nanos = unit.toNanos(timeout);
- for (;;) {
- if (Thread.interrupted() && pool == null) {
- interrupted = true;
- break;
- }
- int s = status;
- if (s < 0)
- break;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
- long startTime = System.nanoTime();
- long nt; // wait time
- while (status >= 0 &&
- (nt = nanos - (System.nanoTime() - startTime)) > 0) {
- if (pool != null && !dec)
- dec = pool.tryDecrementRunningCount();
- else {
- long ms = nt / 1000000;
- int ns = (int) (nt % 1000000);
- try {
- synchronized(this) {
- if (status >= 0)
- wait(ms, ns);
- }
- } catch (InterruptedException ie) {
- if (pool != null)
- cancelIfTerminating();
- else {
- interrupted = true;
- break;
- }
- }
- }
- }
- break;
- }
- }
- if (pool != null && dec)
- pool.incrementRunningCount();
- if (interrupted)
- throw new InterruptedException();
- int es = status;
- if (es != NORMAL) {
+ externalInterruptibleAwaitDone(true, nanos);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
- if (es == CANCELLED)
+ if (s == CANCELLED)
throw new CancellationException();
- if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}
@@ -746,7 +771,7 @@ public abstract class ForkJoinTask im
return;
}
}
- w.joinTask(this);
+ w.joinTask(this, false, 0L);
}
}
else
@@ -756,9 +781,7 @@ public abstract class ForkJoinTask im
/**
* Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing its
- * exception. This method may be useful when processing
- * collections of tasks when some have been cancelled or otherwise
- * known to have aborted.
+ * exception.
*/
public final void quietlyInvoke() {
if (status >= 0) {
@@ -784,7 +807,7 @@ public abstract class ForkJoinTask im
* processed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -803,6 +826,12 @@ public abstract class ForkJoinTask im
* under any other usage conditions are not guaranteed.
* This method may be useful when executing
* pre-constructed trees of subtasks in loops.
+ *
+ * Upon completion of this method, {@code isDone()} reports
+ * {@code false}, and {@code getException()} reports {@code
+ * null}. However, the value returned by {@code getRawResult} is
+ * unaffected. To clear this value, you can invoke {@code
+ * setRawResult(null)}.
*/
public void reinitialize() {
if (status == EXCEPTIONAL)
@@ -824,11 +853,12 @@ public abstract class ForkJoinTask im
}
/**
- * Returns {@code true} if the current thread is executing as a
- * ForkJoinPool computation.
+ * Returns {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
*
- * @return {@code true} if the current thread is executing as a
- * ForkJoinPool computation, or false otherwise
+ * @return {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
+ * or {@code false} otherwise
*/
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
@@ -843,7 +873,7 @@ public abstract class ForkJoinTask im
* were not, stolen.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -862,7 +892,7 @@ public abstract class ForkJoinTask im
* fork other tasks.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -885,7 +915,7 @@ public abstract class ForkJoinTask im
* exceeded.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -943,7 +973,7 @@ public abstract class ForkJoinTask im
* otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -962,7 +992,7 @@ public abstract class ForkJoinTask im
* be useful otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -985,7 +1015,7 @@ public abstract class ForkJoinTask im
* otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.