--- jsr166/src/jsr166y/ForkJoinTask.java 2010/11/22 12:24:34 1.69
+++ jsr166/src/jsr166y/ForkJoinTask.java 2010/11/23 00:10:39 1.70
@@ -134,9 +134,10 @@ import java.util.concurrent.TimeoutExcep
* computation. Large tasks should be split into smaller subtasks,
* usually via recursive decomposition. As a very rough rule of thumb,
* a task should perform more than 100 and less than 10000 basic
- * computational steps. If tasks are too big, then parallelism cannot
- * improve throughput. If too small, then memory and internal task
- * maintenance overhead may overwhelm processing.
+ * computational steps, and should avoid indefinite looping. If tasks
+ * are too big, then parallelism cannot improve throughput. If too
+ * small, then memory and internal task maintenance overhead may
+ * overwhelm processing.
*
*
This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
@@ -233,17 +234,17 @@ public abstract class ForkJoinTask im
}
/**
- * Blocks a worker thread until completion. Called only by
- * pool. Currently unused -- pool-based waits use timeout
- * version below.
+ * Blocks a worker thread until completed or timed out. Called
+ * only by pool.
*/
- final void internalAwaitDone() {
- int s; // the odd construction reduces lock bias effects
- while ((s = status) >= 0) {
- try {
+ final void internalAwaitDone(long millis, int nanos) {
+ if (status >= 0) {
+ try { // the odd construction reduces lock bias effects
synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait();
+ if (status > 0 ||
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL))
+ wait(millis, nanos);
}
} catch (InterruptedException ie) {
cancelIfTerminating();
@@ -252,46 +253,53 @@ public abstract class ForkJoinTask im
}
/**
- * Blocks a worker thread until completed or timed out. Called
- * only by pool.
- *
- * @return status on exit
+ * Blocks a non-worker-thread until completion.
*/
- final int internalAwaitDone(long millis, int nanos) {
- int s;
- if ((s = status) >= 0) {
- try {
- synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait(millis, nanos);
+ private void externalAwaitDone() {
+ if (status >= 0) {
+ boolean interrupted = false;
+ synchronized(this) {
+ int s;
+ while ((s = status) >= 0) {
+ if (s == 0 &&
+ !UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL))
+ continue;
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
}
- } catch (InterruptedException ie) {
- cancelIfTerminating();
}
- s = status;
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
- return s;
}
/**
- * Blocks a non-worker-thread until completion.
+ * Blocks a non-worker-thread until completion or interruption or timeout
*/
- private void externalAwaitDone() {
- int s;
- while ((s = status) >= 0) {
- synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
- boolean interrupted = false;
- while (status >= 0) {
- try {
- wait();
- } catch (InterruptedException ie) {
- interrupted = true;
- }
- }
- if (interrupted)
- Thread.currentThread().interrupt();
- break;
+ private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+ throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (status >= 0) {
+ long startTime = timed ? System.nanoTime() : 0L;
+ synchronized(this) {
+ int s;
+ while ((s = status) >= 0) {
+ long nt;
+ if (s == 0 &&
+ !UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL))
+ continue;
+ else if (!timed)
+ wait();
+ else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
+ wait(nt / 1000000, (int)(nt % 1000000));
+ else
+ break;
}
}
}
@@ -326,7 +334,7 @@ public abstract class ForkJoinTask im
* #isDone} returning {@code true}.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -678,23 +686,13 @@ public abstract class ForkJoinTask im
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s;
- if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread)
quietlyJoin();
- s = status;
- }
- else {
- while ((s = status) >= 0) {
- synchronized (this) { // interruptible form of awaitDone
- if (UNSAFE.compareAndSwapInt(this, statusOffset,
- s, SIGNAL)) {
- while (status >= 0)
- wait();
- }
- }
- }
- }
- if (s < NORMAL) {
+ else
+ externalInterruptibleAwaitDone(false, 0L);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -721,48 +719,17 @@ public abstract class ForkJoinTask im
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout);
- if (status >= 0) {
- Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- boolean completed = false; // timed variant of quietlyJoin
- if (w.unpushTask(this)) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- setExceptionalCompletion(rex);
- }
- }
- if (completed)
- setCompletion(NORMAL);
- else if (status >= 0)
- w.joinTask(this, true, nanos);
- }
- else if (Thread.interrupted())
- throw new InterruptedException();
- else {
- long startTime = System.nanoTime();
- int s; long nt;
- while ((s = status) >= 0 &&
- (nt = nanos - (System.nanoTime() - startTime)) > 0) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,
- SIGNAL)) {
- long ms = nt / 1000000;
- int ns = (int) (nt % 1000000);
- synchronized (this) {
- if (status >= 0)
- wait(ms, ns); // exit on IE throw
- }
- }
- }
- }
- }
- int es = status;
- if (es != NORMAL) {
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
+ else
+ externalInterruptibleAwaitDone(true, nanos);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
- if (es == CANCELLED)
+ if (s == CANCELLED)
throw new CancellationException();
- if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}