--- jsr166/src/jsr166y/ForkJoinTask.java 2012/01/26 00:08:13 1.81 +++ jsr166/src/jsr166y/ForkJoinTask.java 2012/03/04 19:47:08 1.88 @@ -93,7 +93,7 @@ import java.lang.reflect.Constructor; * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * - *

In the most typical usages, a fork-join pair act like a a call + *

In the most typical usages, a fork-join pair act like a call * (fork) and return (join) from a parallel recursive function. As is * the case with other forms of recursive calls, returns (joins) * should be performed innermost-first. For example, {@code a.fork(); @@ -137,17 +137,17 @@ import java.lang.reflect.Constructor; * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that * are not statically structured as DAGs. To support such usages a - * ForkJoinTask may be atomically marked using {@link - * #markForkJoinTask} and checked for marking using {@link - * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not - * use these {@code protected} methods or marks for any purpose, but + * ForkJoinTask may be atomically tagged with a {@code + * short} value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not + * use these {@code protected} methods or tags for any purpose, but * they may be of use in the construction of specialized subclasses. * For example, parallel graph traversals can use the supplied methods - * to avoid revisiting nodes/tasks that have already been - * processed. Also, completion based designs can use them to record - * that one subtask has completed. (Method names for marking are bulky - * in part to encourage definition of methods that reflect their usage - * patterns.) + * to avoid revisiting nodes/tasks that have already been processed. + * Also, completion based designs can use them to record that subtasks + * have completed. (Method names for tagging are bulky in part to + * encourage definition of methods that reflect their usage patterns.) * *

Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -197,45 +197,40 @@ public abstract class ForkJoinTask im * methods in a way that flows well in javadocs. */ - /** - * The number of times to try to help join a task without any - * apparent progress before giving up and blocking. The value is - * arbitrary but should be large enough to cope with transient - * stalls (due to GC etc) that can cause helping methods not to be - * able to proceed because other workers have not progressed to - * the point where subtasks can be found or taken. - */ - private static final int HELP_RETRIES = 32; - /* * The status field holds run control status bits packed into a * single int to minimize footprint and to ensure atomicity (via * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status holds value - * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking - * waits by other threads have the SIGNAL bit set. Completion of - * a stolen task with SIGNAL set awakens any waiters via - * notifyAll. Even though suboptimal for some purposes, we use - * basic builtin wait/notify to take advantage of "monitor - * inflation" in JVMs that we would otherwise need to emulate to - * avoid adding further per-task bookkeeping overhead. We want - * these monitors to be "fat", i.e., not use biasing or thin-lock - * techniques, so use some odd coding idioms that tend to avoid - * them. + * values until completed, upon which status (anded with + * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks + * undergoing blocking waits by other threads have the SIGNAL bit + * set. Completion of a stolen task with SIGNAL set awakens any + * waiters via notifyAll. Even though suboptimal for some + * purposes, we use basic builtin wait/notify to take advantage of + * "monitor inflation" in JVMs that we would otherwise need to + * emulate to avoid adding further per-task bookkeeping overhead. + * We want these monitors to be "fat", i.e., not use biasing or + * thin-lock techniques, so use some odd coding idioms that tend + * to avoid them, mainly by arranging that every synchronized + * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ volatile int status; // accessed directly by pool and workers - static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0 - static final int CANCELLED = 0xfffffff8; // must be < NORMAL - static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED - static final int SIGNAL = 0x00000001; - static final int MARKED = 0x00000002; + static final int DONE_MASK = 0xf0000000; // mask out non-completion bits + static final int NORMAL = 0xf0000000; // must be negative + static final int CANCELLED = 0xc0000000; // must be < NORMAL + static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** * Marks completion and wakes up threads waiting to join this - * task, also clearing signal request bits. A specialization for - * NORMAL completion is in method doExec + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -244,8 +239,8 @@ public abstract class ForkJoinTask im for (int s;;) { if ((s = status) < 0) return s; - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) { - if ((s & SIGNAL) != 0) + if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -267,125 +262,88 @@ public abstract class ForkJoinTask im } catch (Throwable rex) { return setExceptionalCompletion(rex); } - while ((s = status) >= 0 && completed) { - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) { - if ((s & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return NORMAL; - } - } + if (completed) + s = setCompletion(NORMAL); } return s; } /** + * Tries to set SIGNAL status. Used by ForkJoinPool. Other + * variants are directly incorporated into externalAwaitDone etc. + * + * @return true if successful + */ + final boolean trySetSignal() { + int s; + return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL); + } + + /** * Blocks a non-worker-thread until completion. * @return status upon completion */ private int externalAwaitDone() { + boolean interrupted = false; int s; - if ((s = status) >= 0) { - boolean interrupted = false; - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + else + notifyAll(); } } - if (interrupted) - Thread.currentThread().interrupt(); } + if (interrupted) + Thread.currentThread().interrupt(); return s; } /** - * Blocks a non-worker-thread until completion or interruption or timeout. + * Blocks a non-worker-thread until completion or interruption. */ - private int externalInterruptibleAwaitDone(long millis) - throws InterruptedException { + private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0) { - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - wait(millis); - if (millis > 0L) - break; - } + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(); + else + notifyAll(); } } } return s; } - /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and - * unfork+exec. Others are relayed to awaitJoin. + * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion */ private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if ((s = status) >= 0) { - if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) - s = externalAwaitDone(); - else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). - tryUnpush(this) || (s = doExec()) >= 0) - s = awaitJoin(w, wt.pool); - } - return s; - } - - /** - * Helps and/or blocks until joined. - * - * @param w the joiner - * @param p the pool - * @return status upon completion - */ - private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) { - int s; - ForkJoinTask prevJoin = w.currentJoin; - w.currentJoin = this; - for (int k = HELP_RETRIES; (s = status) >= 0;) { - if ((w.queueSize() > 0) ? - w.tryRemoveAndExec(this) : // self-help - p.tryHelpStealer(w, this)) // help process tasks - k = HELP_RETRIES; // reset if made progress - else if ((s = status) < 0) // recheck - break; - else if (--k > 0) { - if ((k & 3) == 1) - Thread.yield(); // occasionally yield - } - else if (k == 0) - p.tryPollForAndExec(w, this); // uncommon self-help case - else if (p.tryCompensate()) { // true if can block - try { - int ss = status; - if (ss >= 0 && // assert need signal - U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) { - synchronized (this) { - if (status >= 0) // block - wait(); - } - } - } catch (InterruptedException ignore) { - } finally { - p.incrementActiveCount(); // re-activate - } + if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { + if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). + tryUnpush(this) || (s = doExec()) >= 0) + s = wt.pool.awaitJoin(w, this); } + else + s = externalAwaitDone(); } - w.currentJoin = prevJoin; return s; } @@ -395,11 +353,15 @@ public abstract class ForkJoinTask im * @return status upon completion */ private int doInvoke() { - int s; - if ((s = doExec()) < 0) - return s; - else - return doJoin(); + int s; Thread t; ForkJoinWorkerThread wt; + if ((s = doExec()) >= 0) { + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, + this); + else + s = externalAwaitDone(); + } + return s; } // Exception table support @@ -434,7 +396,7 @@ public abstract class ForkJoinTask im * any ForkJoinPool will call helpExpungeStaleExceptions when its * pool becomes isQuiescent. */ - static final class ExceptionNode extends WeakReference>{ + static final class ExceptionNode extends WeakReference> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles @@ -534,7 +496,7 @@ public abstract class ForkJoinTask im * @return the exception, or null if none */ private Throwable getThrowableException() { - if (status != EXCEPTIONAL) + if ((status & DONE_MASK) != EXCEPTIONAL) return null; int h = System.identityHashCode(this); ExceptionNode e; @@ -619,16 +581,14 @@ public abstract class ForkJoinTask im } /** - * Report the result of invoke or join; called only upon - * non-normal return of internal versions. + * Throws exception, if any, associated with the given status. */ - private V reportResult() { - int s; Throwable ex; - if ((s = status) == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) + private void reportException(int s) { + Throwable ex = ((s == CANCELLED) ? new CancellationException() : + (s == EXCEPTIONAL) ? getThrowableException() : + null); + if (ex != null) U.throwException(ex); - return getRawResult(); } // public methods @@ -652,9 +612,7 @@ public abstract class ForkJoinTask im * @return {@code this}, to simplify usage */ public final ForkJoinTask fork() { - ForkJoinWorkerThread wt; - (wt = (ForkJoinWorkerThread)Thread.currentThread()). - workQueue.push(this, wt.pool); + ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this); return this; } @@ -670,10 +628,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V join() { - if (doJoin() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doJoin() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -685,10 +643,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V invoke() { - if (doInvoke() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doInvoke() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -715,9 +673,12 @@ public abstract class ForkJoinTask im * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { + int s1, s2; t2.fork(); - t1.invoke(); - t2.join(); + if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + t1.reportException(s1); + if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + t2.reportException(s2); } /** @@ -854,7 +815,7 @@ public abstract class ForkJoinTask im * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return setCompletion(CANCELLED) == CANCELLED; + return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { @@ -862,7 +823,7 @@ public abstract class ForkJoinTask im } public final boolean isCancelled() { - return status == CANCELLED; + return (status & DONE_MASK) == CANCELLED; } /** @@ -882,7 +843,7 @@ public abstract class ForkJoinTask im * exception and was not cancelled */ public final boolean isCompletedNormally() { - return status == NORMAL; + return (status & DONE_MASK) == NORMAL; } /** @@ -893,7 +854,7 @@ public abstract class ForkJoinTask im * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; + int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); @@ -943,6 +904,18 @@ public abstract class ForkJoinTask im } /** + * Completes this task. The most recent value established by + * {@link #setRawResult} (or {@code null}) will be returned as the + * result of subsequent invocations of {@code join} and related + * operations. This method may be useful when processing sets of + * tasks when some do not otherwise complete normally. Its use in + * other situations is discouraged. + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -955,9 +928,9 @@ public abstract class ForkJoinTask im */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(0L); + doJoin() : externalInterruptibleAwaitDone(); Throwable ex; - if (s == CANCELLED) + if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); @@ -980,52 +953,60 @@ public abstract class ForkJoinTask im */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - // Messy in part because we measure in nanos, but wait in millis - int s; long millis, nanos; - Thread t = Thread.currentThread(); - if (!(t instanceof ForkJoinWorkerThread)) { - if ((millis = unit.toMillis(timeout)) > 0L) - s = externalInterruptibleAwaitDone(millis); - else - s = status; - } - else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) { - long deadline = System.nanoTime() + nanos; - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - ForkJoinPool.WorkQueue w = wt.workQueue; - ForkJoinPool p = wt.pool; - if (w.tryUnpush(this)) - doExec(); - boolean blocking = false; + if (Thread.interrupted()) + throw new InterruptedException(); + // Messy in part because we measure in nanosecs, but wait in millisecs + int s; long ns, ms; + if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { + long deadline = System.nanoTime() + ns; + ForkJoinPool p = null; + ForkJoinPool.WorkQueue w = null; + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + w = wt.workQueue; + s = p.helpJoinOnce(w, this); // no retries on failure + } + boolean canBlock = false; + boolean interrupted = false; try { while ((s = status) >= 0) { - if (w.runState < 0) + if (w != null && w.runState < 0) cancelIgnoringExceptions(this); - else if (!blocking) - blocking = p.tryCompensate(); + else if (!canBlock) { + if (p == null || p.tryCompensate(this, null)) + canBlock = true; + } else { - millis = TimeUnit.NANOSECONDS.toMillis(nanos); - if (millis > 0L && + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - try { - synchronized (this) { - if (status >= 0) - wait(millis); + synchronized (this) { + if (status >= 0) { + try { + wait(ms); + } catch (InterruptedException ie) { + if (p == null) + interrupted = true; + } } - } catch (InterruptedException ie) { + else + notifyAll(); } } - if ((s = status) < 0 || - (nanos = deadline - System.nanoTime()) <= 0L) + if ((s = status) < 0 || interrupted || + (ns = deadline - System.nanoTime()) <= 0L) break; } } } finally { - if (blocking) + if (p != null && canBlock) p.incrementActiveCount(); } + if (interrupted) + throw new InterruptedException(); } - if (s != NORMAL) { + if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -1070,9 +1051,9 @@ public abstract class ForkJoinTask im * ClassCastException}. */ public static void helpQuiesce() { - ForkJoinWorkerThread w = + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)Thread.currentThread(); - w.pool.helpQuiescePool(w.workQueue); + wt.pool.helpQuiescePool(wt.workQueue); } /** @@ -1092,7 +1073,7 @@ public abstract class ForkJoinTask im * setRawResult(null)}. */ public void reinitialize() { - if (status == EXCEPTIONAL) + if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; @@ -1225,9 +1206,9 @@ public abstract class ForkJoinTask im * have zero queued tasks, so compensate by a factor of * (#idle/#active) threads. */ - ForkJoinWorkerThread w = + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)Thread.currentThread(); - return w.workQueue.queueSize() - w.pool.idlePerActive(); + return wt.workQueue.queueSize() - wt.pool.idlePerActive(); } // Extension methods @@ -1325,49 +1306,58 @@ public abstract class ForkJoinTask im * @return a task, or {@code null} if none are available */ protected static ForkJoinTask pollTask() { - ForkJoinWorkerThread w = + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)Thread.currentThread(); - return w.pool.nextTaskFor(w.workQueue); + return wt.pool.nextTaskFor(wt.workQueue); } - // Mark-bit operations + // tag operations /** - * Returns true if this task is marked. + * Returns the tag for this task. * - * @return true if this task is marked + * @return the tag for this task * @since 1.8 */ - public final boolean isMarkedForkJoinTask() { - return (status & MARKED) != 0; + public final short getForkJoinTaskTag() { + return (short)status; } /** - * Atomically sets the mark on this task. + * Atomically sets the tag value for this task. * - * @return true if this task was previously unmarked + * @param tag the tag value + * @return the previous value of the tag * @since 1.8 */ - public final boolean markForkJoinTask() { + public final short setForkJoinTaskTag(short tag) { for (int s;;) { - if (((s = status) & MARKED) != 0) - return false; - if (U.compareAndSwapInt(this, STATUS, s, s | MARKED)) - return true; + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; } } /** - * Atomically clears the mark on this task. + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. * - * @return true if this task was previously marked + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. * @since 1.8 */ - public final boolean unmarkForkJoinTask() { + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { - if (((s = status) & MARKED) == 0) + if ((short)(s = status) != e) return false; - if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED)) + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) return true; } } @@ -1380,21 +1370,33 @@ public abstract class ForkJoinTask im static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture { final Runnable runnable; - final T resultOnCompletion; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; - this.resultOnCompletion = result; + this.result = result; // OK to set this even before completion } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adaptor for Runnables without results + */ + static final class AdaptedRunnableAction extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; } - public void run() { invoke(); } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } @@ -1409,9 +1411,9 @@ public abstract class ForkJoinTask im if (callable == null) throw new NullPointerException(); this.callable = callable; } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { try { result = callable.call(); return true; @@ -1423,7 +1425,7 @@ public abstract class ForkJoinTask im throw new RuntimeException(ex); } } - public void run() { invoke(); } + public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } @@ -1436,7 +1438,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new AdaptedRunnable(runnable, null); + return new AdaptedRunnableAction(runnable); } /**