--- jsr166/src/jsr166y/ForkJoinTask.java 2012/01/26 18:15:12 1.82 +++ jsr166/src/jsr166y/ForkJoinTask.java 2012/02/20 18:20:06 1.86 @@ -93,7 +93,7 @@ import java.lang.reflect.Constructor; * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * - *

In the most typical usages, a fork-join pair act like a a call + *

In the most typical usages, a fork-join pair act like a call * (fork) and return (join) from a parallel recursive function. As is * the case with other forms of recursive calls, returns (joins) * should be performed innermost-first. For example, {@code a.fork(); @@ -143,10 +143,10 @@ import java.lang.reflect.Constructor; * use these {@code protected} methods or marks for any purpose, but * they may be of use in the construction of specialized subclasses. * For example, parallel graph traversals can use the supplied methods - * to avoid revisiting nodes/tasks that have already been - * processed. Also, completion based designs can use them to record - * that one subtask has completed. (Method names for marking are bulky - * in part to encourage definition of methods that reflect their usage + * to avoid revisiting nodes/tasks that have already been processed. + * Also, completion based designs can use them to record that one + * subtask has completed. (Method names for marking are bulky in part + * to encourage definition of methods that reflect their usage * patterns.) * *

Most base support methods are {@code final}, to prevent @@ -197,45 +197,37 @@ public abstract class ForkJoinTask im * methods in a way that flows well in javadocs. */ - /** - * The number of times to try to help join a task without any - * apparent progress before giving up and blocking. The value is - * arbitrary but should be large enough to cope with transient - * stalls (due to GC etc) that can cause helping methods not to be - * able to proceed because other workers have not progressed to - * the point where subtasks can be found or taken. - */ - private static final int HELP_RETRIES = 32; - /* * The status field holds run control status bits packed into a * single int to minimize footprint and to ensure atomicity (via * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status holds value - * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking - * waits by other threads have the SIGNAL bit set. Completion of - * a stolen task with SIGNAL set awakens any waiters via - * notifyAll. Even though suboptimal for some purposes, we use - * basic builtin wait/notify to take advantage of "monitor - * inflation" in JVMs that we would otherwise need to emulate to - * avoid adding further per-task bookkeeping overhead. We want - * these monitors to be "fat", i.e., not use biasing or thin-lock - * techniques, so use some odd coding idioms that tend to avoid - * them. + * values until completed, upon which status (anded with + * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks + * undergoing blocking waits by other threads have the SIGNAL bit + * set. Completion of a stolen task with SIGNAL set awakens any + * waiters via notifyAll. Even though suboptimal for some + * purposes, we use basic builtin wait/notify to take advantage of + * "monitor inflation" in JVMs that we would otherwise need to + * emulate to avoid adding further per-task bookkeeping overhead. + * We want these monitors to be "fat", i.e., not use biasing or + * thin-lock techniques, so use some odd coding idioms that tend + * to avoid them, mainly by arranging that every synchronized + * block performs a wait, notifyAll or both. */ /** The run status of this task */ volatile int status; // accessed directly by pool and workers - static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0 - static final int CANCELLED = 0xfffffff8; // must be < NORMAL - static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED + static final int DONE_MASK = 0xf0000000; // mask out non-completion bits + static final int NORMAL = 0xf0000000; // must be negative + static final int CANCELLED = 0xc0000000; // must be < NORMAL + static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED static final int SIGNAL = 0x00000001; static final int MARKED = 0x00000002; /** * Marks completion and wakes up threads waiting to join this - * task, also clearing signal request bits. A specialization for - * NORMAL completion is in method doExec + * task. A specialization for NORMAL completion is in method + * doExec. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -244,7 +236,7 @@ public abstract class ForkJoinTask im for (int s;;) { if ((s = status) < 0) return s; - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) { + if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { if ((s & SIGNAL) != 0) synchronized (this) { notifyAll(); } return completion; @@ -268,7 +260,7 @@ public abstract class ForkJoinTask im return setExceptionalCompletion(rex); } while ((s = status) >= 0 && completed) { - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) { + if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) { if ((s & SIGNAL) != 0) synchronized (this) { notifyAll(); } return NORMAL; @@ -279,46 +271,57 @@ public abstract class ForkJoinTask im } /** + * Tries to set SIGNAL status. Used by ForkJoinPool. Other + * variants are directly incorporated into externalAwaitDone etc. + * + * @return true if successful + */ + final boolean trySetSignal() { + int s; + return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL); + } + + /** * Blocks a non-worker-thread until completion. * @return status upon completion */ private int externalAwaitDone() { + boolean interrupted = false; int s; - if ((s = status) >= 0) { - boolean interrupted = false; - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + else + notifyAll(); } } - if (interrupted) - Thread.currentThread().interrupt(); } + if (interrupted) + Thread.currentThread().interrupt(); return s; } /** - * Blocks a non-worker-thread until completion or interruption or timeout. + * Blocks a non-worker-thread until completion or interruption. */ - private int externalInterruptibleAwaitDone(long millis) - throws InterruptedException { + private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0) { - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - wait(millis); - if (millis > 0L) - break; - } + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(); + else + notifyAll(); } } } @@ -329,63 +332,21 @@ public abstract class ForkJoinTask im /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and - * unfork+exec. Others are relayed to awaitJoin. + * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion */ private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if ((s = status) >= 0) { - if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) - s = externalAwaitDone(); - else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). - tryUnpush(this) || (s = doExec()) >= 0) - s = awaitJoin(w, wt.pool); - } - return s; - } - - /** - * Helps and/or blocks until joined. - * - * @param w the joiner - * @param p the pool - * @return status upon completion - */ - private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) { - int s; - ForkJoinTask prevJoin = w.currentJoin; - w.currentJoin = this; - for (int k = HELP_RETRIES; (s = status) >= 0;) { - if ((w.queueSize() > 0) ? - w.tryRemoveAndExec(this) : // self-help - p.tryHelpStealer(w, this)) // help process tasks - k = HELP_RETRIES; // reset if made progress - else if ((s = status) < 0) // recheck - break; - else if (--k > 0) { - if ((k & 3) == 1) - Thread.yield(); // occasionally yield - } - else if (k == 0) - p.tryPollForAndExec(w, this); // uncommon self-help case - else if (p.tryCompensate()) { // true if can block - try { - int ss = status; - if (ss >= 0 && // assert need signal - U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) { - synchronized (this) { - if (status >= 0) // block - wait(); - } - } - } catch (InterruptedException ignore) { - } finally { - p.incrementActiveCount(); // re-activate - } + if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { + if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). + tryUnpush(this) || (s = doExec()) >= 0) + s = wt.pool.awaitJoin(w, this); } + else + s = externalAwaitDone(); } - w.currentJoin = prevJoin; return s; } @@ -395,14 +356,13 @@ public abstract class ForkJoinTask im * @return status upon completion */ private int doInvoke() { - int s; Thread t; + int s; Thread t; ForkJoinWorkerThread wt; if ((s = doExec()) >= 0) { - if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, + this); + else s = externalAwaitDone(); - else { - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - s = awaitJoin(wt.workQueue, wt.pool); - } } return s; } @@ -439,7 +399,7 @@ public abstract class ForkJoinTask im * any ForkJoinPool will call helpExpungeStaleExceptions when its * pool becomes isQuiescent. */ - static final class ExceptionNode extends WeakReference>{ + static final class ExceptionNode extends WeakReference> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles @@ -539,7 +499,7 @@ public abstract class ForkJoinTask im * @return the exception, or null if none */ private Throwable getThrowableException() { - if (status != EXCEPTIONAL) + if ((status & DONE_MASK) != EXCEPTIONAL) return null; int h = System.identityHashCode(this); ExceptionNode e; @@ -624,16 +584,14 @@ public abstract class ForkJoinTask im } /** - * Report the result of invoke or join; called only upon - * non-normal return of internal versions. + * Throws exception, if any, associated with the given status. */ - private V reportResult() { - int s; Throwable ex; - if ((s = status) == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) + private void reportException(int s) { + Throwable ex = ((s == CANCELLED) ? new CancellationException() : + (s == EXCEPTIONAL) ? getThrowableException() : + null); + if (ex != null) U.throwException(ex); - return getRawResult(); } // public methods @@ -657,9 +615,7 @@ public abstract class ForkJoinTask im * @return {@code this}, to simplify usage */ public final ForkJoinTask fork() { - ForkJoinWorkerThread wt; - (wt = (ForkJoinWorkerThread)Thread.currentThread()). - workQueue.push(this, wt.pool); + ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this); return this; } @@ -675,10 +631,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V join() { - if (doJoin() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doJoin() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -690,10 +646,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V invoke() { - if (doInvoke() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doInvoke() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -720,9 +676,12 @@ public abstract class ForkJoinTask im * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { + int s1, s2; t2.fork(); - t1.invoke(); - t2.join(); + if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + t1.reportException(s1); + if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + t2.reportException(s2); } /** @@ -859,7 +818,7 @@ public abstract class ForkJoinTask im * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return setCompletion(CANCELLED) == CANCELLED; + return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { @@ -867,7 +826,7 @@ public abstract class ForkJoinTask im } public final boolean isCancelled() { - return status == CANCELLED; + return (status & DONE_MASK) == CANCELLED; } /** @@ -887,7 +846,7 @@ public abstract class ForkJoinTask im * exception and was not cancelled */ public final boolean isCompletedNormally() { - return status == NORMAL; + return (status & DONE_MASK) == NORMAL; } /** @@ -898,7 +857,7 @@ public abstract class ForkJoinTask im * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; + int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); @@ -960,9 +919,9 @@ public abstract class ForkJoinTask im */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(0L); + doJoin() : externalInterruptibleAwaitDone(); Throwable ex; - if (s == CANCELLED) + if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); @@ -985,52 +944,60 @@ public abstract class ForkJoinTask im */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - // Messy in part because we measure in nanos, but wait in millis - int s; long millis, nanos; - Thread t = Thread.currentThread(); - if (!(t instanceof ForkJoinWorkerThread)) { - if ((millis = unit.toMillis(timeout)) > 0L) - s = externalInterruptibleAwaitDone(millis); - else - s = status; - } - else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) { - long deadline = System.nanoTime() + nanos; - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - ForkJoinPool.WorkQueue w = wt.workQueue; - ForkJoinPool p = wt.pool; - if (w.tryUnpush(this)) - doExec(); - boolean blocking = false; + if (Thread.interrupted()) + throw new InterruptedException(); + // Messy in part because we measure in nanosecs, but wait in millisecs + int s; long ns, ms; + if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { + long deadline = System.nanoTime() + ns; + ForkJoinPool p = null; + ForkJoinPool.WorkQueue w = null; + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + w = wt.workQueue; + s = p.helpJoinOnce(w, this); // no retries on failure + } + boolean canBlock = false; + boolean interrupted = false; try { while ((s = status) >= 0) { - if (w.runState < 0) + if (w != null && w.runState < 0) cancelIgnoringExceptions(this); - else if (!blocking) - blocking = p.tryCompensate(); + else if (!canBlock) { + if (p == null || p.tryCompensate(this, null)) + canBlock = true; + } else { - millis = TimeUnit.NANOSECONDS.toMillis(nanos); - if (millis > 0L && + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - try { - synchronized (this) { - if (status >= 0) - wait(millis); + synchronized (this) { + if (status >= 0) { + try { + wait(ms); + } catch (InterruptedException ie) { + if (p == null) + interrupted = true; + } } - } catch (InterruptedException ie) { + else + notifyAll(); } } - if ((s = status) < 0 || - (nanos = deadline - System.nanoTime()) <= 0L) + if ((s = status) < 0 || interrupted || + (ns = deadline - System.nanoTime()) <= 0L) break; } } } finally { - if (blocking) + if (p != null && canBlock) p.incrementActiveCount(); } + if (interrupted) + throw new InterruptedException(); } - if (s != NORMAL) { + if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -1097,7 +1064,7 @@ public abstract class ForkJoinTask im * setRawResult(null)}. */ public void reinitialize() { - if (status == EXCEPTIONAL) + if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; @@ -1385,21 +1352,33 @@ public abstract class ForkJoinTask im static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture { final Runnable runnable; - final T resultOnCompletion; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; - this.resultOnCompletion = result; + this.result = result; // OK to set this even before completion } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adaptor for Runnables without results + */ + static final class AdaptedRunnableAction extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; } - public void run() { invoke(); } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } @@ -1414,9 +1393,9 @@ public abstract class ForkJoinTask im if (callable == null) throw new NullPointerException(); this.callable = callable; } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { try { result = callable.call(); return true; @@ -1428,7 +1407,7 @@ public abstract class ForkJoinTask im throw new RuntimeException(ex); } } - public void run() { invoke(); } + public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } @@ -1441,7 +1420,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new AdaptedRunnable(runnable, null); + return new AdaptedRunnableAction(runnable); } /**