--- jsr166/src/jsr166y/ForkJoinTask.java 2010/09/18 12:10:21 1.63 +++ jsr166/src/jsr166y/ForkJoinTask.java 2010/10/24 19:37:26 1.66 @@ -6,7 +6,6 @@ package jsr166y; -import java.util.concurrent.*; import java.io.Serializable; import java.util.Collection; import java.util.Collections; @@ -14,6 +13,16 @@ import java.util.List; import java.util.RandomAccess; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. @@ -238,13 +247,13 @@ public abstract class ForkJoinTask im * * @return status on exit */ - final int internalAwaitDone(long millis) { + final int internalAwaitDone(long millis, int nanos) { int s; if ((s = status) >= 0) { try { synchronized (this) { if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) - wait(millis, 0); + wait(millis, nanos); } } catch (InterruptedException ie) { cancelIfTerminating(); @@ -261,7 +270,7 @@ public abstract class ForkJoinTask im int s; while ((s = status) >= 0) { synchronized (this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { boolean interrupted = false; while (status >= 0) { try { @@ -661,7 +670,7 @@ public abstract class ForkJoinTask im else { while ((s = status) >= 0) { synchronized (this) { // interruptible form of awaitDone - if (UNSAFE.compareAndSwapInt(this, statusOffset, + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { while (status >= 0) wait(); @@ -695,66 +704,43 @@ public abstract class ForkJoinTask im */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Thread t = Thread.currentThread(); - ForkJoinPool pool; - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - if (status >= 0 && w.unpushTask(this)) - quietlyExec(); - pool = w.pool; - } - else - pool = null; - /* - * Timed wait loop intermixes cases for FJ (pool != null) and - * non FJ threads. For FJ, decrement pool count but don't try - * for replacement; increment count on completion. For non-FJ, - * deal with interrupts. This is messy, but a little less so - * than is splitting the FJ and nonFJ cases. - */ - boolean interrupted = false; - boolean dec = false; // true if pool count decremented long nanos = unit.toNanos(timeout); - for (;;) { - if (pool == null && Thread.interrupted()) { - interrupted = true; - break; + if (status >= 0) { + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; + boolean completed = false; // timed variant of quietlyJoin + if (w.unpushTask(this)) { + try { + completed = exec(); + } catch (Throwable rex) { + setExceptionalCompletion(rex); + } + } + if (completed) + setCompletion(NORMAL); + else if (status >= 0) + w.joinTask(this, true, nanos); } - int s = status; - if (s < 0) - break; - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { + else if (Thread.interrupted()) + throw new InterruptedException(); + else { long startTime = System.nanoTime(); - long nt; // wait time - while (status >= 0 && + int s; long nt; + while ((s = status) >= 0 && (nt = nanos - (System.nanoTime() - startTime)) > 0) { - if (pool != null && !dec) - dec = pool.tryDecrementRunningCount(); - else { + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, + SIGNAL)) { long ms = nt / 1000000; int ns = (int) (nt % 1000000); - try { - synchronized (this) { - if (status >= 0) - wait(ms, ns); - } - } catch (InterruptedException ie) { - if (pool != null) - cancelIfTerminating(); - else { - interrupted = true; - break; - } + synchronized (this) { + if (status >= 0) + wait(ms, ns); // exit on IE throw } } } - break; } } - if (pool != null && dec) - pool.incrementRunningCount(); - if (interrupted) - throw new InterruptedException(); int es = status; if (es != NORMAL) { Throwable ex; @@ -791,7 +777,7 @@ public abstract class ForkJoinTask im return; } } - w.joinTask(this); + w.joinTask(this, false, 0L); } } else