--- jsr166/src/jsr166y/ForkJoinTask.java 2010/08/11 18:45:12 1.53
+++ jsr166/src/jsr166y/ForkJoinTask.java 2010/10/16 16:37:30 1.65
@@ -7,7 +7,6 @@
package jsr166y;
import java.util.concurrent.*;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
@@ -28,10 +27,10 @@ import java.util.WeakHashMap;
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
- * #invokeAll}. However, this class also provides a number of other
- * methods that can come into play in advanced usages, as well as
- * extension mechanics that allow support of new forms of fork/join
- * processing.
+ * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
+ * provides a number of other methods that can come into play in
+ * advanced usages, as well as extension mechanics that allow
+ * support of new forms of fork/join processing.
*
*
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -100,7 +99,7 @@ import java.util.WeakHashMap;
* ForkJoinTasks (as may be determined using method {@link
* #inForkJoinPool}). Attempts to invoke them in other contexts
* result in exceptions or errors, possibly including
- * ClassCastException.
+ * {@code ClassCastException}.
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -153,7 +152,7 @@ public abstract class ForkJoinTask im
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
- * COMPLETED. CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
+ * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
@@ -205,7 +204,8 @@ public abstract class ForkJoinTask im
}
/**
- * Record exception and set exceptional completion
+ * Records exception and sets exceptional completion.
+ *
* @return status on exit
*/
private void setExceptionalCompletion(Throwable rex) {
@@ -214,13 +214,15 @@ public abstract class ForkJoinTask im
}
/**
- * Blocks a worker thread until completion. Called only by pool.
+ * Blocks a worker thread until completion. Called only by
+ * pool. Currently unused -- pool-based waits use timeout
+ * version below.
*/
final void internalAwaitDone() {
int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) {
try {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait();
}
@@ -231,13 +233,35 @@ public abstract class ForkJoinTask im
}
/**
+ * Blocks a worker thread until completed or timed out. Called
+ * only by pool.
+ *
+ * @return status on exit
+ */
+ final int internalAwaitDone(long millis) {
+ int s;
+ if ((s = status) >= 0) {
+ try {
+ synchronized (this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
+ wait(millis, 0);
+ }
+ } catch (InterruptedException ie) {
+ cancelIfTerminating();
+ }
+ s = status;
+ }
+ return s;
+ }
+
+ /**
* Blocks a non-worker-thread until completion.
*/
private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
- synchronized(this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
+ synchronized (this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
boolean interrupted = false;
while (status >= 0) {
try {
@@ -314,8 +338,9 @@ public abstract class ForkJoinTask im
/**
* Commences performing this task, awaits its completion if
- * necessary, and return its result, or throws an (unchecked)
- * exception if the underlying computation did so.
+ * necessary, and returns its result, or throws an (unchecked)
+ * {@code RuntimeException} or {@code Error} if the underlying
+ * computation did so.
*
* @return the computed result
*/
@@ -330,11 +355,15 @@ public abstract class ForkJoinTask im
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If either task encounters an
- * exception, the other one may be, but is not guaranteed to be,
- * cancelled. If both tasks throw an exception, then this method
- * throws one of them. The individual status of each task may be
- * checked using {@link #getException()} and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, the
+ * other may be cancelled. However, the execution status of
+ * individual tasks is not guaranteed upon exceptional return. The
+ * status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -355,12 +384,14 @@ public abstract class ForkJoinTask im
/**
* Forks the given tasks, returning when {@code isDone} holds for
* each task or an (unchecked) exception is encountered, in which
- * case the exception is rethrown. If any task encounters an
- * exception, others may be, but are not guaranteed to be,
- * cancelled. If more than one task encounters an exception, then
- * this method throws any one of these exceptions. The individual
- * status of each task may be checked using {@link #getException()}
- * and related methods.
+ * case the exception is rethrown. If more than one task
+ * encounters an exception, then this method throws any one of
+ * these exceptions. If any task encounters an exception, others
+ * may be cancelled. However, the execution status of individual
+ * tasks is not guaranteed upon exceptional return. The status of
+ * each task may be obtained using {@link #getException()} and
+ * related methods to check if they have been cancelled, completed
+ * normally or exceptionally, or left unprocessed.
*
* This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -407,14 +438,15 @@ public abstract class ForkJoinTask im
/**
* Forks all tasks in the specified collection, returning when
* {@code isDone} holds for each task or an (unchecked) exception
- * is encountered. If any task encounters an exception, others
- * may be, but are not guaranteed to be, cancelled. If more than
- * one task encounters an exception, then this method throws any
- * one of these exceptions. The individual status of each task
- * may be checked using {@link #getException()} and related
- * methods. The behavior of this operation is undefined if the
- * specified collection is modified while the operation is in
- * progress.
+ * is encountered, in which case the exception is rethrown. If
+ * more than one task encounters an exception, then this method
+ * throws any one of these exceptions. If any task encounters an
+ * exception, others may be cancelled. However, the execution
+ * status of individual tasks is not guaranteed upon exceptional
+ * return. The status of each task may be obtained using {@link
+ * #getException()} and related methods to check if they have been
+ * cancelled, completed normally or exceptionally, or left
+ * unprocessed.
*
* This method may be invoked only from within {@code
* ForkJoinTask} computations (as may be determined using method
@@ -510,7 +542,8 @@ public abstract class ForkJoinTask im
}
/**
- * Cancels ignoring exceptions if worker is terminating
+ * Cancels if current thread is a terminating worker thread,
+ * ignoring any exceptions thrown by cancel.
*/
final void cancelIfTerminating() {
Thread t = Thread.currentThread();
@@ -587,13 +620,14 @@ public abstract class ForkJoinTask im
/**
* Completes this task, and if not already aborted or cancelled,
- * returning a {@code null} result upon {@code join} and related
- * operations. This method may be used to provide results for
- * asynchronous tasks, or to provide alternative handling for
- * tasks that would not otherwise complete normally. Its use in
- * other situations is discouraged. This method is
- * overridable, but overridden versions must invoke {@code super}
- * implementation to maintain guarantees.
+ * returning the given value as the result of subsequent
+ * invocations of {@code join} and related operations. This method
+ * may be used to provide results for asynchronous tasks, or to
+ * provide alternative handling for tasks that would not otherwise
+ * complete normally. Its use in other situations is
+ * discouraged. This method is overridable, but overridden
+ * versions must invoke {@code super} implementation to maintain
+ * guarantees.
*
* @param value the result value for this task
*/
@@ -607,11 +641,34 @@ public abstract class ForkJoinTask im
setCompletion(NORMAL);
}
+ /**
+ * Waits if necessary for the computation to complete, and then
+ * retrieves its result.
+ *
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ */
public final V get() throws InterruptedException, ExecutionException {
- quietlyJoin();
- if (Thread.interrupted())
- throw new InterruptedException();
- int s = status;
+ int s;
+ if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ quietlyJoin();
+ s = status;
+ }
+ else {
+ while ((s = status) >= 0) {
+ synchronized (this) { // interruptible form of awaitDone
+ if (UNSAFE.compareAndSwapInt(this, statusOffset,
+ s, SIGNAL)) {
+ while (status >= 0)
+ wait();
+ }
+ }
+ }
+ }
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
@@ -622,6 +679,20 @@ public abstract class ForkJoinTask im
return getRawResult();
}
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ * @throws TimeoutException if the wait timed out
+ */
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread();
@@ -643,8 +714,9 @@ public abstract class ForkJoinTask im
*/
boolean interrupted = false;
boolean dec = false; // true if pool count decremented
+ long nanos = unit.toNanos(timeout);
for (;;) {
- if (Thread.interrupted() && pool == null) {
+ if (pool == null && Thread.interrupted()) {
interrupted = true;
break;
}
@@ -653,7 +725,6 @@ public abstract class ForkJoinTask im
break;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
long startTime = System.nanoTime();
- long nanos = unit.toNanos(timeout);
long nt; // wait time
while (status >= 0 &&
(nt = nanos - (System.nanoTime() - startTime)) > 0) {
@@ -663,7 +734,7 @@ public abstract class ForkJoinTask im
long ms = nt / 1000000;
int ns = (int) (nt % 1000000);
try {
- synchronized(this) {
+ synchronized (this) {
if (status >= 0)
wait(ms, ns);
}
@@ -730,9 +801,7 @@ public abstract class ForkJoinTask im
/**
* Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing its
- * exception. This method may be useful when processing
- * collections of tasks when some have been cancelled or otherwise
- * known to have aborted.
+ * exception.
*/
public final void quietlyInvoke() {
if (status >= 0) {
@@ -1069,7 +1138,7 @@ public abstract class ForkJoinTask im
private static final long serialVersionUID = -7721805057305804111L;
/**
- * Saves the state to a stream.
+ * Saves the state to a stream (that is, serializes it).
*
* @serialData the current run status and the exception thrown
* during execution, or {@code null} if none
@@ -1082,7 +1151,7 @@ public abstract class ForkJoinTask im
}
/**
- * Reconstitutes the instance from a stream.
+ * Reconstitutes the instance from a stream (that is, deserializes it).
*
* @param s the stream
*/