--- jsr166/src/jsr166y/ForkJoinTask.java 2010/09/07 23:17:10 1.60
+++ jsr166/src/jsr166y/ForkJoinTask.java 2010/10/24 19:37:26 1.66
@@ -6,8 +6,6 @@
package jsr166y;
-import java.util.concurrent.*;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
@@ -15,6 +13,16 @@ import java.util.List;
import java.util.RandomAccess;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@@ -28,10 +36,10 @@ import java.util.WeakHashMap;
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
- * #invokeAll}. However, this class also provides a number of other
- * methods that can come into play in advanced usages, as well as
- * extension mechanics that allow support of new forms of fork/join
- * processing.
+ * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
+ * provides a number of other methods that can come into play in
+ * advanced usages, as well as extension mechanics that allow
+ * support of new forms of fork/join processing.
*
*
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -223,7 +231,7 @@ public abstract class ForkJoinTask im
int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) {
try {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait();
}
@@ -239,13 +247,13 @@ public abstract class ForkJoinTask im
*
* @return status on exit
*/
- final int internalAwaitDone(long millis) {
+ final int internalAwaitDone(long millis, int nanos) {
int s;
if ((s = status) >= 0) {
try {
- synchronized(this) {
+ synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait(millis, 0);
+ wait(millis, nanos);
}
} catch (InterruptedException ie) {
cancelIfTerminating();
@@ -261,8 +269,8 @@ public abstract class ForkJoinTask im
private void externalAwaitDone() {
int s;
while ((s = status) >= 0) {
- synchronized(this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
+ synchronized (this) {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
boolean interrupted = false;
while (status >= 0) {
try {
@@ -642,11 +650,34 @@ public abstract class ForkJoinTask im
setCompletion(NORMAL);
}
+ /**
+ * Waits if necessary for the computation to complete, and then
+ * retrieves its result.
+ *
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ */
public final V get() throws InterruptedException, ExecutionException {
- quietlyJoin();
- if (Thread.interrupted())
- throw new InterruptedException();
- int s = status;
+ int s;
+ if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ quietlyJoin();
+ s = status;
+ }
+ else {
+ while ((s = status) >= 0) {
+ synchronized (this) { // interruptible form of awaitDone
+ if (UNSAFE.compareAndSwapInt(this, statusOffset,
+ s, SIGNAL)) {
+ while (status >= 0)
+ wait();
+ }
+ }
+ }
+ }
if (s < NORMAL) {
Throwable ex;
if (s == CANCELLED)
@@ -657,68 +688,59 @@ public abstract class ForkJoinTask im
return getRawResult();
}
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the computed result
+ * @throws CancellationException if the computation was cancelled
+ * @throws ExecutionException if the computation threw an
+ * exception
+ * @throws InterruptedException if the current thread is not a
+ * member of a ForkJoinPool and was interrupted while waiting
+ * @throws TimeoutException if the wait timed out
+ */
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- Thread t = Thread.currentThread();
- ForkJoinPool pool;
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- if (status >= 0 && w.unpushTask(this))
- quietlyExec();
- pool = w.pool;
- }
- else
- pool = null;
- /*
- * Timed wait loop intermixes cases for FJ (pool != null) and
- * non FJ threads. For FJ, decrement pool count but don't try
- * for replacement; increment count on completion. For non-FJ,
- * deal with interrupts. This is messy, but a little less so
- * than is splitting the FJ and nonFJ cases.
- */
- boolean interrupted = false;
- boolean dec = false; // true if pool count decremented
long nanos = unit.toNanos(timeout);
- for (;;) {
- if (pool == null && Thread.interrupted()) {
- interrupted = true;
- break;
+ if (status >= 0) {
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
+ boolean completed = false; // timed variant of quietlyJoin
+ if (w.unpushTask(this)) {
+ try {
+ completed = exec();
+ } catch (Throwable rex) {
+ setExceptionalCompletion(rex);
+ }
+ }
+ if (completed)
+ setCompletion(NORMAL);
+ else if (status >= 0)
+ w.joinTask(this, true, nanos);
}
- int s = status;
- if (s < 0)
- break;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
+ else if (Thread.interrupted())
+ throw new InterruptedException();
+ else {
long startTime = System.nanoTime();
- long nt; // wait time
- while (status >= 0 &&
+ int s; long nt;
+ while ((s = status) >= 0 &&
(nt = nanos - (System.nanoTime() - startTime)) > 0) {
- if (pool != null && !dec)
- dec = pool.tryDecrementRunningCount();
- else {
+ if (UNSAFE.compareAndSwapInt(this, statusOffset, s,
+ SIGNAL)) {
long ms = nt / 1000000;
int ns = (int) (nt % 1000000);
- try {
- synchronized(this) {
- if (status >= 0)
- wait(ms, ns);
- }
- } catch (InterruptedException ie) {
- if (pool != null)
- cancelIfTerminating();
- else {
- interrupted = true;
- break;
- }
+ synchronized (this) {
+ if (status >= 0)
+ wait(ms, ns); // exit on IE throw
}
}
}
- break;
}
}
- if (pool != null && dec)
- pool.incrementRunningCount();
- if (interrupted)
- throw new InterruptedException();
int es = status;
if (es != NORMAL) {
Throwable ex;
@@ -755,7 +777,7 @@ public abstract class ForkJoinTask im
return;
}
}
- w.joinTask(this);
+ w.joinTask(this, false, 0L);
}
}
else