--- jsr166/src/jsr166y/ForkJoinTask.java 2012/01/31 01:51:13 1.85 +++ jsr166/src/jsr166y/ForkJoinTask.java 2012/04/21 11:45:20 1.90 @@ -5,6 +5,7 @@ */ package jsr166y; + import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -70,9 +71,10 @@ import java.lang.reflect.Constructor; * but doing do requires three further considerations: (1) Completion * of few if any other tasks should be dependent on a task * that blocks on external synchronization or IO. Event-style async - * tasks that are never joined often fall into this category. (2) To - * minimize resource impact, tasks should be small; ideally performing - * only the (possibly) blocking action. (3) Unless the {@link + * tasks that are never joined (for example, those subclassing {@link + * CountedCompleter}) often fall into this category. (2) To minimize + * resource impact, tasks should be small; ideally performing only the + * (possibly) blocking action. (3) Unless the {@link * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly * blocked tasks is known to be less than the pool's {@link * ForkJoinPool#getParallelism} level, the pool cannot guarantee that @@ -115,18 +117,19 @@ import java.lang.reflect.Constructor; *

The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. While these methods have + * {@code public} access (to allow instances of different task + * subclasses to call each other's methods), some of them may only be + * called from within other ForkJoinTasks (as may be determined using + * method {@link #inForkJoinPool}). Attempts to invoke them in other + * contexts result in exceptions or errors, possibly including {@code + * ClassCastException}. * *

Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -137,17 +140,16 @@ import java.lang.reflect.Constructor; * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that * are not statically structured as DAGs. To support such usages a - * ForkJoinTask may be atomically marked using {@link - * #markForkJoinTask} and checked for marking using {@link - * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not - * use these {@code protected} methods or marks for any purpose, but - * they may be of use in the construction of specialized subclasses. - * For example, parallel graph traversals can use the supplied methods - * to avoid revisiting nodes/tasks that have already been processed. - * Also, completion based designs can use them to record that one - * subtask has completed. (Method names for marking are bulky in part - * to encourage definition of methods that reflect their usage - * patterns.) + * ForkJoinTask may be atomically tagged with a {@code short} + * value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use + * these {@code protected} methods or tags for any purpose, but they + * may be of use in the construction of specialized subclasses. For + * example, parallel graph traversals can use the supplied methods to + * avoid revisiting nodes/tasks that have already been processed. + * (Method names for tagging are bulky in part to encourage definition + * of methods that reflect their usage patterns.) * *

Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -197,45 +199,40 @@ public abstract class ForkJoinTask im * methods in a way that flows well in javadocs. */ - /** - * The number of times to try to help join a task without any - * apparent progress before giving up and blocking. The value is - * arbitrary but should be large enough to cope with transient - * stalls (due to GC etc) that can cause helping methods not to be - * able to proceed because other workers have not progressed to - * the point where subtasks can be found or taken. - */ - private static final int HELP_RETRIES = 32; - /* * The status field holds run control status bits packed into a * single int to minimize footprint and to ensure atomicity (via * CAS). Status is initially zero, and takes on nonnegative - * values until completed, upon which status holds value - * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking - * waits by other threads have the SIGNAL bit set. Completion of - * a stolen task with SIGNAL set awakens any waiters via - * notifyAll. Even though suboptimal for some purposes, we use - * basic builtin wait/notify to take advantage of "monitor - * inflation" in JVMs that we would otherwise need to emulate to - * avoid adding further per-task bookkeeping overhead. We want - * these monitors to be "fat", i.e., not use biasing or thin-lock - * techniques, so use some odd coding idioms that tend to avoid - * them. + * values until completed, upon which status (anded with + * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks + * undergoing blocking waits by other threads have the SIGNAL bit + * set. Completion of a stolen task with SIGNAL set awakens any + * waiters via notifyAll. Even though suboptimal for some + * purposes, we use basic builtin wait/notify to take advantage of + * "monitor inflation" in JVMs that we would otherwise need to + * emulate to avoid adding further per-task bookkeeping overhead. + * We want these monitors to be "fat", i.e., not use biasing or + * thin-lock techniques, so use some odd coding idioms that tend + * to avoid them, mainly by arranging that every synchronized + * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ volatile int status; // accessed directly by pool and workers - static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0 - static final int CANCELLED = 0xfffffff8; // must be < NORMAL - static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED - static final int SIGNAL = 0x00000001; - static final int MARKED = 0x00000002; + static final int DONE_MASK = 0xf0000000; // mask out non-completion bits + static final int NORMAL = 0xf0000000; // must be negative + static final int CANCELLED = 0xc0000000; // must be < NORMAL + static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** * Marks completion and wakes up threads waiting to join this - * task, also clearing signal request bits. A specialization for - * NORMAL completion is in method doExec. + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -244,8 +241,8 @@ public abstract class ForkJoinTask im for (int s;;) { if ((s = status) < 0) return s; - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) { - if ((s & SIGNAL) != 0) + if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -267,125 +264,89 @@ public abstract class ForkJoinTask im } catch (Throwable rex) { return setExceptionalCompletion(rex); } - while ((s = status) >= 0 && completed) { - if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) { - if ((s & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return NORMAL; - } - } + if (completed) + s = setCompletion(NORMAL); } return s; } /** + * Tries to set SIGNAL status unless already completed. Used by + * ForkJoinPool. Other variants are directly incorporated into + * externalAwaitDone etc. + * + * @return true if successful + */ + final boolean trySetSignal() { + int s = status; + return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); + } + + /** * Blocks a non-worker-thread until completion. * @return status upon completion */ private int externalAwaitDone() { + boolean interrupted = false; int s; - if ((s = status) >= 0) { - boolean interrupted = false; - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + else + notifyAll(); } } - if (interrupted) - Thread.currentThread().interrupt(); } + if (interrupted) + Thread.currentThread().interrupt(); return s; } /** - * Blocks a non-worker-thread until completion or interruption or timeout. + * Blocks a non-worker-thread until completion or interruption. */ - private int externalInterruptibleAwaitDone(long millis) - throws InterruptedException { + private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); - if ((s = status) >= 0) { - synchronized (this) { - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - wait(millis); - if (millis > 0L) - break; - } + while ((s = status) >= 0) { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) + wait(); + else + notifyAll(); } } } return s; } - /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and - * unfork+exec. Others are relayed to awaitJoin. + * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion */ private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if ((s = status) >= 0) { - if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) - s = externalAwaitDone(); - else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). - tryUnpush(this) || (s = doExec()) >= 0) - s = awaitJoin(w, wt.pool); - } - return s; - } - - /** - * Helps and/or blocks until joined. - * - * @param w the joiner - * @param p the pool - * @return status upon completion - */ - private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) { - int s; - ForkJoinTask prevJoin = w.currentJoin; - w.currentJoin = this; - for (int k = HELP_RETRIES; (s = status) >= 0;) { - if ((w.queueSize() > 0) ? - w.tryRemoveAndExec(this) : // self-help - p.tryHelpStealer(w, this)) // help process tasks - k = HELP_RETRIES; // reset if made progress - else if ((s = status) < 0) // recheck - break; - else if (--k > 0) { - if ((k & 3) == 1) - Thread.yield(); // occasionally yield - } - else if (k == 0) - p.tryPollForAndExec(w, this); // uncommon self-help case - else if (p.tryCompensate()) { // true if can block - try { - int ss = status; - if (ss >= 0 && // assert need signal - U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) { - synchronized (this) { - if (status >= 0) // block - wait(); - } - } - } catch (InterruptedException ignore) { - } finally { - p.incrementActiveCount(); // re-activate - } + if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { + if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue). + tryUnpush(this) || (s = doExec()) >= 0) + s = wt.pool.awaitJoin(w, this); } + else + s = externalAwaitDone(); } - w.currentJoin = prevJoin; return s; } @@ -395,14 +356,13 @@ public abstract class ForkJoinTask im * @return status upon completion */ private int doInvoke() { - int s; Thread t; + int s; Thread t; ForkJoinWorkerThread wt; if ((s = doExec()) >= 0) { - if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, + this); + else s = externalAwaitDone(); - else { - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - s = awaitJoin(wt.workQueue, wt.pool); - } } return s; } @@ -452,30 +412,52 @@ public abstract class ForkJoinTask im } /** - * Records exception and sets exceptional completion. + * Records exception and sets status. * * @return status on exit */ - private int setExceptionalCompletion(Throwable ex) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i]); - break; + final int recordExceptionalCompletion(Throwable ex) { + int s; + if ((s = status) >= 0) { + int h = System.identityHashCode(this); + final ReentrantLock lock = exceptionTableLock; + lock.lock(); + try { + expungeStaleExceptions(); + ExceptionNode[] t = exceptionTable; + int i = h & (t.length - 1); + for (ExceptionNode e = t[i]; ; e = e.next) { + if (e == null) { + t[i] = new ExceptionNode(this, ex, t[i]); + break; + } + if (e.get() == this) // already present + break; } - if (e.get() == this) // already present - break; + } finally { + lock.unlock(); } - } finally { - lock.unlock(); + s = setCompletion(EXCEPTIONAL); } - return setCompletion(EXCEPTIONAL); + return s; + } + + /** + * Records exception and possibly propagates + * + * @return status on exit + */ + private int setExceptionalCompletion(Throwable ex) { + int s = recordExceptionalCompletion(ex); + if ((s & DONE_MASK) == EXCEPTIONAL) + internalPropagateException(ex); + return s; + } + + /** + * Hook for exception propagation support for tasks with completers. + */ + void internalPropagateException(Throwable ex) { } /** @@ -539,7 +521,7 @@ public abstract class ForkJoinTask im * @return the exception, or null if none */ private Throwable getThrowableException() { - if (status != EXCEPTIONAL) + if ((status & DONE_MASK) != EXCEPTIONAL) return null; int h = System.identityHashCode(this); ExceptionNode e; @@ -557,7 +539,7 @@ public abstract class ForkJoinTask im Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (e.thrower != Thread.currentThread().getId()) { + if (false && e.thrower != Thread.currentThread().getId()) { Class ec = ex.getClass(); try { Constructor noArgCtor = null; @@ -624,16 +606,14 @@ public abstract class ForkJoinTask im } /** - * Report the result of invoke or join; called only upon - * non-normal return of internal versions. + * Throws exception, if any, associated with the given status. */ - private V reportResult() { - int s; Throwable ex; - if ((s = status) == CANCELLED) - throw new CancellationException(); - if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) + private void reportException(int s) { + Throwable ex = ((s == CANCELLED) ? new CancellationException() : + (s == EXCEPTIONAL) ? getThrowableException() : + null); + if (ex != null) U.throwException(ex); - return getRawResult(); } // public methods @@ -657,9 +637,7 @@ public abstract class ForkJoinTask im * @return {@code this}, to simplify usage */ public final ForkJoinTask fork() { - ForkJoinWorkerThread wt; - (wt = (ForkJoinWorkerThread)Thread.currentThread()). - workQueue.push(this, wt.pool); + ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this); return this; } @@ -675,10 +653,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V join() { - if (doJoin() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doJoin() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -690,10 +668,10 @@ public abstract class ForkJoinTask im * @return the computed result */ public final V invoke() { - if (doInvoke() != NORMAL) - return reportResult(); - else - return getRawResult(); + int s; + if ((s = doInvoke() & DONE_MASK) != NORMAL) + reportException(s); + return getRawResult(); } /** @@ -720,9 +698,12 @@ public abstract class ForkJoinTask im * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { + int s1, s2; t2.fork(); - t1.invoke(); - t2.join(); + if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) + t1.reportException(s1); + if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) + t2.reportException(s2); } /** @@ -859,7 +840,7 @@ public abstract class ForkJoinTask im * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return setCompletion(CANCELLED) == CANCELLED; + return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { @@ -867,7 +848,7 @@ public abstract class ForkJoinTask im } public final boolean isCancelled() { - return status == CANCELLED; + return (status & DONE_MASK) == CANCELLED; } /** @@ -887,7 +868,7 @@ public abstract class ForkJoinTask im * exception and was not cancelled */ public final boolean isCompletedNormally() { - return status == NORMAL; + return (status & DONE_MASK) == NORMAL; } /** @@ -898,7 +879,7 @@ public abstract class ForkJoinTask im * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; + int s = status & DONE_MASK; return ((s >= NORMAL) ? null : (s == CANCELLED) ? new CancellationException() : getThrowableException()); @@ -948,6 +929,18 @@ public abstract class ForkJoinTask im } /** + * Completes this task normally without setting a value. The most + * recent value established by {@link #setRawResult} (or {@code + * null} by default) will be returned as the result of subsequent + * invocations of {@code join} and related operations. + * + * @since 1.8 + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -960,9 +953,9 @@ public abstract class ForkJoinTask im */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(0L); + doJoin() : externalInterruptibleAwaitDone(); Throwable ex; - if (s == CANCELLED) + if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); @@ -985,52 +978,60 @@ public abstract class ForkJoinTask im */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - // Messy in part because we measure in nanos, but wait in millis - int s; long millis, nanos; - Thread t = Thread.currentThread(); - if (!(t instanceof ForkJoinWorkerThread)) { - if ((millis = unit.toMillis(timeout)) > 0L) - s = externalInterruptibleAwaitDone(millis); - else - s = status; - } - else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) { - long deadline = System.nanoTime() + nanos; - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - ForkJoinPool.WorkQueue w = wt.workQueue; - ForkJoinPool p = wt.pool; - if (w.tryUnpush(this)) - doExec(); - boolean blocking = false; + if (Thread.interrupted()) + throw new InterruptedException(); + // Messy in part because we measure in nanosecs, but wait in millisecs + int s; long ns, ms; + if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) { + long deadline = System.nanoTime() + ns; + ForkJoinPool p = null; + ForkJoinPool.WorkQueue w = null; + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + w = wt.workQueue; + s = p.helpJoinOnce(w, this); // no retries on failure + } + boolean canBlock = false; + boolean interrupted = false; try { while ((s = status) >= 0) { - if (w.runState < 0) + if (w != null && w.runState < 0) cancelIgnoringExceptions(this); - else if (!blocking) - blocking = p.tryCompensate(); + else if (!canBlock) { + if (p == null || p.tryCompensate(this, null)) + canBlock = true; + } else { - millis = TimeUnit.NANOSECONDS.toMillis(nanos); - if (millis > 0L && + if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - try { - synchronized (this) { - if (status >= 0) - wait(millis); + synchronized (this) { + if (status >= 0) { + try { + wait(ms); + } catch (InterruptedException ie) { + if (p == null) + interrupted = true; + } } - } catch (InterruptedException ie) { + else + notifyAll(); } } - if ((s = status) < 0 || - (nanos = deadline - System.nanoTime()) <= 0L) + if ((s = status) < 0 || interrupted || + (ns = deadline - System.nanoTime()) <= 0L) break; } } } finally { - if (blocking) + if (p != null && canBlock) p.incrementActiveCount(); } + if (interrupted) + throw new InterruptedException(); } - if (s != NORMAL) { + if ((s &= DONE_MASK) != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); @@ -1097,7 +1098,7 @@ public abstract class ForkJoinTask im * setRawResult(null)}. */ public void reinitialize() { - if (status == EXCEPTIONAL) + if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; @@ -1258,15 +1259,18 @@ public abstract class ForkJoinTask im protected abstract void setRawResult(V value); /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in + * Immediately performs the base action of this task and returns + * true if, upon return from this method, this task is guaranteed + * to have completed normally. This method may return false + * otherwise, to indicate that this task is not necessarily + * complete (or is not known to be complete), for example in * asynchronous actions that require explicit invocations of - * {@link #complete} to become joinable. It may also throw an - * (unchecked) exception to indicate abnormal exit. + * completion methods. This method may also throw an (unchecked) + * exception to indicate abnormal exit. This method is designed to + * support extensions, and should not in general be called + * otherwise. * - * @return {@code true} if completed normally + * @return {@code true} if this task is known to have completed normally */ protected abstract boolean exec(); @@ -1335,44 +1339,53 @@ public abstract class ForkJoinTask im return wt.pool.nextTaskFor(wt.workQueue); } - // Mark-bit operations + // tag operations /** - * Returns true if this task is marked. + * Returns the tag for this task. * - * @return true if this task is marked + * @return the tag for this task * @since 1.8 */ - public final boolean isMarkedForkJoinTask() { - return (status & MARKED) != 0; + public final short getForkJoinTaskTag() { + return (short)status; } /** - * Atomically sets the mark on this task. + * Atomically sets the tag value for this task. * - * @return true if this task was previously unmarked + * @param tag the tag value + * @return the previous value of the tag * @since 1.8 */ - public final boolean markForkJoinTask() { + public final short setForkJoinTaskTag(short tag) { for (int s;;) { - if (((s = status) & MARKED) != 0) - return false; - if (U.compareAndSwapInt(this, STATUS, s, s | MARKED)) - return true; + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; } } /** - * Atomically clears the mark on this task. + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. * - * @return true if this task was previously marked + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. * @since 1.8 */ - public final boolean unmarkForkJoinTask() { + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { - if (((s = status) & MARKED) == 0) + if ((short)(s = status) != e) return false; - if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED)) + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) return true; } } @@ -1385,21 +1398,33 @@ public abstract class ForkJoinTask im static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture { final Runnable runnable; - final T resultOnCompletion; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; - this.resultOnCompletion = result; + this.result = result; // OK to set this even before completion } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adaptor for Runnables without results + */ + static final class AdaptedRunnableAction extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; } - public void run() { invoke(); } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } @@ -1414,9 +1439,9 @@ public abstract class ForkJoinTask im if (callable == null) throw new NullPointerException(); this.callable = callable; } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { try { result = callable.call(); return true; @@ -1428,7 +1453,7 @@ public abstract class ForkJoinTask im throw new RuntimeException(ex); } } - public void run() { invoke(); } + public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } @@ -1441,7 +1466,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new AdaptedRunnable(runnable, null); + return new AdaptedRunnableAction(runnable); } /**