--- jsr166/src/jsr166y/ForkJoinTask.java 2010/09/07 23:17:10 1.60 +++ jsr166/src/jsr166y/ForkJoinTask.java 2010/10/24 19:37:26 1.66 @@ -6,8 +6,6 @@ package jsr166y; -import java.util.concurrent.*; - import java.io.Serializable; import java.util.Collection; import java.util.Collections; @@ -15,6 +13,16 @@ import java.util.List; import java.util.RandomAccess; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. @@ -28,10 +36,10 @@ import java.util.WeakHashMap; * start other subtasks. As indicated by the name of this class, * many programs using {@code ForkJoinTask} employ only methods * {@link #fork} and {@link #join}, or derivatives such as {@link - * #invokeAll}. However, this class also provides a number of other - * methods that can come into play in advanced usages, as well as - * extension mechanics that allow support of new forms of fork/join - * processing. + * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also + * provides a number of other methods that can come into play in + * advanced usages, as well as extension mechanics that allow + * support of new forms of fork/join processing. * *

A {@code ForkJoinTask} is a lightweight form of {@link Future}. * The efficiency of {@code ForkJoinTask}s stems from a set of @@ -223,7 +231,7 @@ public abstract class ForkJoinTask im int s; // the odd construction reduces lock bias effects while ((s = status) >= 0) { try { - synchronized(this) { + synchronized (this) { if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) wait(); } @@ -239,13 +247,13 @@ public abstract class ForkJoinTask im * * @return status on exit */ - final int internalAwaitDone(long millis) { + final int internalAwaitDone(long millis, int nanos) { int s; if ((s = status) >= 0) { try { - synchronized(this) { + synchronized (this) { if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) - wait(millis, 0); + wait(millis, nanos); } } catch (InterruptedException ie) { cancelIfTerminating(); @@ -261,8 +269,8 @@ public abstract class ForkJoinTask im private void externalAwaitDone() { int s; while ((s = status) >= 0) { - synchronized(this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ + synchronized (this) { + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { boolean interrupted = false; while (status >= 0) { try { @@ -642,11 +650,34 @@ public abstract class ForkJoinTask im setCompletion(NORMAL); } + /** + * Waits if necessary for the computation to complete, and then + * retrieves its result. + * + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread is not a + * member of a ForkJoinPool and was interrupted while waiting + */ public final V get() throws InterruptedException, ExecutionException { - quietlyJoin(); - if (Thread.interrupted()) - throw new InterruptedException(); - int s = status; + int s; + if (Thread.currentThread() instanceof ForkJoinWorkerThread) { + quietlyJoin(); + s = status; + } + else { + while ((s = status) >= 0) { + synchronized (this) { // interruptible form of awaitDone + if (UNSAFE.compareAndSwapInt(this, statusOffset, + s, SIGNAL)) { + while (status >= 0) + wait(); + } + } + } + } if (s < NORMAL) { Throwable ex; if (s == CANCELLED) @@ -657,68 +688,59 @@ public abstract class ForkJoinTask im return getRawResult(); } + /** + * Waits if necessary for at most the given time for the computation + * to complete, and then retrieves its result, if available. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread is not a + * member of a ForkJoinPool and was interrupted while waiting + * @throws TimeoutException if the wait timed out + */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Thread t = Thread.currentThread(); - ForkJoinPool pool; - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - if (status >= 0 && w.unpushTask(this)) - quietlyExec(); - pool = w.pool; - } - else - pool = null; - /* - * Timed wait loop intermixes cases for FJ (pool != null) and - * non FJ threads. For FJ, decrement pool count but don't try - * for replacement; increment count on completion. For non-FJ, - * deal with interrupts. This is messy, but a little less so - * than is splitting the FJ and nonFJ cases. - */ - boolean interrupted = false; - boolean dec = false; // true if pool count decremented long nanos = unit.toNanos(timeout); - for (;;) { - if (pool == null && Thread.interrupted()) { - interrupted = true; - break; + if (status >= 0) { + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; + boolean completed = false; // timed variant of quietlyJoin + if (w.unpushTask(this)) { + try { + completed = exec(); + } catch (Throwable rex) { + setExceptionalCompletion(rex); + } + } + if (completed) + setCompletion(NORMAL); + else if (status >= 0) + w.joinTask(this, true, nanos); } - int s = status; - if (s < 0) - break; - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { + else if (Thread.interrupted()) + throw new InterruptedException(); + else { long startTime = System.nanoTime(); - long nt; // wait time - while (status >= 0 && + int s; long nt; + while ((s = status) >= 0 && (nt = nanos - (System.nanoTime() - startTime)) > 0) { - if (pool != null && !dec) - dec = pool.tryDecrementRunningCount(); - else { + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, + SIGNAL)) { long ms = nt / 1000000; int ns = (int) (nt % 1000000); - try { - synchronized(this) { - if (status >= 0) - wait(ms, ns); - } - } catch (InterruptedException ie) { - if (pool != null) - cancelIfTerminating(); - else { - interrupted = true; - break; - } + synchronized (this) { + if (status >= 0) + wait(ms, ns); // exit on IE throw } } } - break; } } - if (pool != null && dec) - pool.incrementRunningCount(); - if (interrupted) - throw new InterruptedException(); int es = status; if (es != NORMAL) { Throwable ex; @@ -755,7 +777,7 @@ public abstract class ForkJoinTask im return; } } - w.joinTask(this); + w.joinTask(this, false, 0L); } } else