--- jsr166/src/jsr166y/ForkJoinTask.java 2010/11/22 12:24:34 1.69 +++ jsr166/src/jsr166y/ForkJoinTask.java 2010/11/23 00:10:39 1.70 @@ -134,9 +134,10 @@ import java.util.concurrent.TimeoutExcep * computation. Large tasks should be split into smaller subtasks, * usually via recursive decomposition. As a very rough rule of thumb, * a task should perform more than 100 and less than 10000 basic - * computational steps. If tasks are too big, then parallelism cannot - * improve throughput. If too small, then memory and internal task - * maintenance overhead may overwhelm processing. + * computational steps, and should avoid indefinite looping. If tasks + * are too big, then parallelism cannot improve throughput. If too + * small, then memory and internal task maintenance overhead may + * overwhelm processing. * *

This class provides {@code adapt} methods for {@link Runnable} * and {@link Callable}, that may be of use when mixing execution of @@ -233,17 +234,17 @@ public abstract class ForkJoinTask im } /** - * Blocks a worker thread until completion. Called only by - * pool. Currently unused -- pool-based waits use timeout - * version below. + * Blocks a worker thread until completed or timed out. Called + * only by pool. */ - final void internalAwaitDone() { - int s; // the odd construction reduces lock bias effects - while ((s = status) >= 0) { - try { + final void internalAwaitDone(long millis, int nanos) { + if (status >= 0) { + try { // the odd construction reduces lock bias effects synchronized (this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) - wait(); + if (status > 0 || + UNSAFE.compareAndSwapInt(this, statusOffset, + 0, SIGNAL)) + wait(millis, nanos); } } catch (InterruptedException ie) { cancelIfTerminating(); @@ -252,46 +253,53 @@ public abstract class ForkJoinTask im } /** - * Blocks a worker thread until completed or timed out. Called - * only by pool. - * - * @return status on exit + * Blocks a non-worker-thread until completion. */ - final int internalAwaitDone(long millis, int nanos) { - int s; - if ((s = status) >= 0) { - try { - synchronized (this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) - wait(millis, nanos); + private void externalAwaitDone() { + if (status >= 0) { + boolean interrupted = false; + synchronized(this) { + int s; + while ((s = status) >= 0) { + if (s == 0 && + !UNSAFE.compareAndSwapInt(this, statusOffset, + 0, SIGNAL)) + continue; + try { + wait(); + } catch (InterruptedException ie) { + interrupted = true; + } } - } catch (InterruptedException ie) { - cancelIfTerminating(); } - s = status; + if (interrupted) + Thread.currentThread().interrupt(); } - return s; } /** - * Blocks a non-worker-thread until completion. + * Blocks a non-worker-thread until completion or interruption or timeout */ - private void externalAwaitDone() { - int s; - while ((s = status) >= 0) { - synchronized (this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { - boolean interrupted = false; - while (status >= 0) { - try { - wait(); - } catch (InterruptedException ie) { - interrupted = true; - } - } - if (interrupted) - Thread.currentThread().interrupt(); - break; + private void externalInterruptibleAwaitDone(boolean timed, long nanos) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (status >= 0) { + long startTime = timed ? System.nanoTime() : 0L; + synchronized(this) { + int s; + while ((s = status) >= 0) { + long nt; + if (s == 0 && + !UNSAFE.compareAndSwapInt(this, statusOffset, + 0, SIGNAL)) + continue; + else if (!timed) + wait(); + else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L) + wait(nt / 1000000, (int)(nt % 1000000)); + else + break; } } } @@ -326,7 +334,7 @@ public abstract class ForkJoinTask im * #isDone} returning {@code true}. * *

This method may be invoked only from within {@code - * ForkJoinTask} computations (as may be determined using method + * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. @@ -678,23 +686,13 @@ public abstract class ForkJoinTask im * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { - int s; - if (Thread.currentThread() instanceof ForkJoinWorkerThread) { + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) quietlyJoin(); - s = status; - } - else { - while ((s = status) >= 0) { - synchronized (this) { // interruptible form of awaitDone - if (UNSAFE.compareAndSwapInt(this, statusOffset, - s, SIGNAL)) { - while (status >= 0) - wait(); - } - } - } - } - if (s < NORMAL) { + else + externalInterruptibleAwaitDone(false, 0L); + int s = status; + if (s != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -721,48 +719,17 @@ public abstract class ForkJoinTask im public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long nanos = unit.toNanos(timeout); - if (status >= 0) { - Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; - boolean completed = false; // timed variant of quietlyJoin - if (w.unpushTask(this)) { - try { - completed = exec(); - } catch (Throwable rex) { - setExceptionalCompletion(rex); - } - } - if (completed) - setCompletion(NORMAL); - else if (status >= 0) - w.joinTask(this, true, nanos); - } - else if (Thread.interrupted()) - throw new InterruptedException(); - else { - long startTime = System.nanoTime(); - int s; long nt; - while ((s = status) >= 0 && - (nt = nanos - (System.nanoTime() - startTime)) > 0) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, - SIGNAL)) { - long ms = nt / 1000000; - int ns = (int) (nt % 1000000); - synchronized (this) { - if (status >= 0) - wait(ms, ns); // exit on IE throw - } - } - } - } - } - int es = status; - if (es != NORMAL) { + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) + ((ForkJoinWorkerThread)t).joinTask(this, true, nanos); + else + externalInterruptibleAwaitDone(true, nanos); + int s = status; + if (s != NORMAL) { Throwable ex; - if (es == CANCELLED) + if (s == CANCELLED) throw new CancellationException(); - if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) + if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) throw new ExecutionException(ex); throw new TimeoutException(); }