--- jsr166/src/jsr166y/ForkJoinTask.java 2012/01/31 01:51:13 1.85
+++ jsr166/src/jsr166y/ForkJoinTask.java 2013/01/09 02:51:37 1.99
@@ -5,6 +5,7 @@
*/
package jsr166y;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -29,15 +30,18 @@ import java.lang.reflect.Constructor;
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
- *
A "main" {@code ForkJoinTask} begins execution when submitted
- * to a {@link ForkJoinPool}. Once started, it will usually in turn
- * start other subtasks. As indicated by the name of this class,
- * many programs using {@code ForkJoinTask} employ only methods
- * {@link #fork} and {@link #join}, or derivatives such as {@link
+ *
A "main" {@code ForkJoinTask} begins execution when it is
+ * explicitly submitted to a {@link ForkJoinPool}, or, if not already
+ * engaged in a ForkJoin computation, commenced in the {@link
+ * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
+ * related methods. Once started, it will usually in turn start other
+ * subtasks. As indicated by the name of this class, many programs
+ * using {@code ForkJoinTask} employ only methods {@link #fork} and
+ * {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
- * advanced usages, as well as extension mechanics that allow
- * support of new forms of fork/join processing.
+ * advanced usages, as well as extension mechanics that allow support
+ * of new forms of fork/join processing.
*
*
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -51,7 +55,7 @@ import java.lang.reflect.Constructor;
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling. Subdividable tasks should also
- * not perform blocking IO, and should ideally access variables that
+ * not perform blocking I/O, and should ideally access variables that
* are completely independent of those accessed by other running
* tasks. These guidelines are loosely enforced by not permitting
* checked exceptions such as {@code IOExceptions} to be
@@ -69,10 +73,11 @@ import java.lang.reflect.Constructor;
*
It is possible to define and use ForkJoinTasks that may block,
* but doing do requires three further considerations: (1) Completion
* of few if any other tasks should be dependent on a task
- * that blocks on external synchronization or IO. Event-style async
- * tasks that are never joined often fall into this category. (2) To
- * minimize resource impact, tasks should be small; ideally performing
- * only the (possibly) blocking action. (3) Unless the {@link
+ * that blocks on external synchronization or I/O. Event-style async
+ * tasks that are never joined (for example, those subclassing {@link
+ * CountedCompleter}) often fall into this category. (2) To minimize
+ * resource impact, tasks should be small; ideally performing only the
+ * (possibly) blocking action. (3) Unless the {@link
* ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
* blocked tasks is known to be less than the pool's {@link
* ForkJoinPool#getParallelism} level, the pool cannot guarantee that
@@ -115,18 +120,13 @@ import java.lang.reflect.Constructor;
*
The ForkJoinTask class is not usually directly subclassed.
* Instead, you subclass one of the abstract classes that support a
* particular style of fork/join processing, typically {@link
- * RecursiveAction} for computations that do not return results, or
- * {@link RecursiveTask} for those that do. Normally, a concrete
- * ForkJoinTask subclass declares fields comprising its parameters,
- * established in a constructor, and then defines a {@code compute}
- * method that somehow uses the control methods supplied by this base
- * class. While these methods have {@code public} access (to allow
- * instances of different task subclasses to call each other's
- * methods), some of them may only be called from within other
- * ForkJoinTasks (as may be determined using method {@link
- * #inForkJoinPool}). Attempts to invoke them in other contexts
- * result in exceptions or errors, possibly including
- * {@code ClassCastException}.
+ * RecursiveAction} for most computations that do not return results,
+ * {@link RecursiveTask} for those that do, and {@link
+ * CountedCompleter} for those in which completed actions trigger
+ * other actions. Normally, a concrete ForkJoinTask subclass declares
+ * fields comprising its parameters, established in a constructor, and
+ * then defines a {@code compute} method that somehow uses the control
+ * methods supplied by this base class.
*
*
Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
@@ -137,17 +137,16 @@ import java.lang.reflect.Constructor;
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
- * ForkJoinTask may be atomically marked using {@link
- * #markForkJoinTask} and checked for marking using {@link
- * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not
- * use these {@code protected} methods or marks for any purpose, but
- * they may be of use in the construction of specialized subclasses.
- * For example, parallel graph traversals can use the supplied methods
- * to avoid revisiting nodes/tasks that have already been processed.
- * Also, completion based designs can use them to record that one
- * subtask has completed. (Method names for marking are bulky in part
- * to encourage definition of methods that reflect their usage
- * patterns.)
+ * ForkJoinTask may be atomically tagged with a {@code short}
+ * value using {@link #setForkJoinTaskTag} or {@link
+ * #compareAndSetForkJoinTaskTag} and checked using {@link
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
+ * these {@code protected} methods or tags for any purpose, but they
+ * may be of use in the construction of specialized subclasses. For
+ * example, parallel graph traversals can use the supplied methods to
+ * avoid revisiting nodes/tasks that have already been processed.
+ * (Method names for tagging are bulky in part to encourage definition
+ * of methods that reflect their usage patterns.)
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -197,45 +196,40 @@ public abstract class ForkJoinTask im
* methods in a way that flows well in javadocs.
*/
- /**
- * The number of times to try to help join a task without any
- * apparent progress before giving up and blocking. The value is
- * arbitrary but should be large enough to cope with transient
- * stalls (due to GC etc) that can cause helping methods not to be
- * able to proceed because other workers have not progressed to
- * the point where subtasks can be found or taken.
- */
- private static final int HELP_RETRIES = 32;
-
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
- * values until completed, upon which status holds value
- * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
- * waits by other threads have the SIGNAL bit set. Completion of
- * a stolen task with SIGNAL set awakens any waiters via
- * notifyAll. Even though suboptimal for some purposes, we use
- * basic builtin wait/notify to take advantage of "monitor
- * inflation" in JVMs that we would otherwise need to emulate to
- * avoid adding further per-task bookkeeping overhead. We want
- * these monitors to be "fat", i.e., not use biasing or thin-lock
- * techniques, so use some odd coding idioms that tend to avoid
- * them.
+ * values until completed, upon which status (anded with
+ * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
+ * undergoing blocking waits by other threads have the SIGNAL bit
+ * set. Completion of a stolen task with SIGNAL set awakens any
+ * waiters via notifyAll. Even though suboptimal for some
+ * purposes, we use basic builtin wait/notify to take advantage of
+ * "monitor inflation" in JVMs that we would otherwise need to
+ * emulate to avoid adding further per-task bookkeeping overhead.
+ * We want these monitors to be "fat", i.e., not use biasing or
+ * thin-lock techniques, so use some odd coding idioms that tend
+ * to avoid them, mainly by arranging that every synchronized
+ * block performs a wait, notifyAll or both.
+ *
+ * These control bits occupy only (some of) the upper half (16
+ * bits) of status field. The lower bits are used for user-defined
+ * tags.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
- static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0
- static final int CANCELLED = 0xfffffff8; // must be < NORMAL
- static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED
- static final int SIGNAL = 0x00000001;
- static final int MARKED = 0x00000002;
+ static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
+ static final int NORMAL = 0xf0000000; // must be negative
+ static final int CANCELLED = 0xc0000000; // must be < NORMAL
+ static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
+ static final int SIGNAL = 0x00010000; // must be >= 1 << 16
+ static final int SMASK = 0x0000ffff; // short bits for tags
/**
* Marks completion and wakes up threads waiting to join this
- * task, also clearing signal request bits. A specialization for
- * NORMAL completion is in method doExec.
+ * task.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@@ -244,8 +238,8 @@ public abstract class ForkJoinTask im
for (int s;;) {
if ((s = status) < 0)
return s;
- if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) {
- if ((s & SIGNAL) != 0)
+ if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
+ if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
@@ -267,58 +261,67 @@ public abstract class ForkJoinTask im
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
- while ((s = status) >= 0 && completed) {
- if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) {
- if ((s & SIGNAL) != 0)
- synchronized (this) { notifyAll(); }
- return NORMAL;
- }
- }
+ if (completed)
+ s = setCompletion(NORMAL);
}
return s;
}
/**
+ * Tries to set SIGNAL status unless already completed. Used by
+ * ForkJoinPool. Other variants are directly incorporated into
+ * externalAwaitDone etc.
+ *
+ * @return true if successful
+ */
+ final boolean trySetSignal() {
+ int s = status;
+ return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
+ }
+
+ /**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s;
- if ((s = status) >= 0) {
- boolean interrupted = false;
- synchronized (this) {
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ ForkJoinPool.externalHelpJoin(this);
+ boolean interrupted = false;
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
+ else
+ notifyAll();
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
}
+ if (interrupted)
+ Thread.currentThread().interrupt();
return s;
}
/**
- * Blocks a non-worker-thread until completion or interruption or timeout.
+ * Blocks a non-worker-thread until completion or interruption.
*/
- private int externalInterruptibleAwaitDone(long millis)
- throws InterruptedException {
+ private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
- if ((s = status) >= 0) {
- synchronized (this) {
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- wait(millis);
- if (millis > 0L)
- break;
- }
+ ForkJoinPool.externalHelpJoin(this);
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait();
+ else
+ notifyAll();
}
}
}
@@ -329,64 +332,18 @@ public abstract class ForkJoinTask im
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
- * unfork+exec. Others are relayed to awaitJoin.
+ * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
- if ((s = status) >= 0) {
- if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
- s = externalAwaitDone();
- else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
- tryUnpush(this) || (s = doExec()) >= 0)
- s = awaitJoin(w, wt.pool);
- }
- return s;
- }
-
- /**
- * Helps and/or blocks until joined.
- *
- * @param w the joiner
- * @param p the pool
- * @return status upon completion
- */
- private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) {
- int s;
- ForkJoinTask> prevJoin = w.currentJoin;
- w.currentJoin = this;
- for (int k = HELP_RETRIES; (s = status) >= 0;) {
- if ((w.queueSize() > 0) ?
- w.tryRemoveAndExec(this) : // self-help
- p.tryHelpStealer(w, this)) // help process tasks
- k = HELP_RETRIES; // reset if made progress
- else if ((s = status) < 0) // recheck
- break;
- else if (--k > 0) {
- if ((k & 3) == 1)
- Thread.yield(); // occasionally yield
- }
- else if (k == 0)
- p.tryPollForAndExec(w, this); // uncommon self-help case
- else if (p.tryCompensate()) { // true if can block
- try {
- int ss = status;
- if (ss >= 0 && // assert need signal
- U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) // block
- wait();
- }
- }
- } catch (InterruptedException ignore) {
- } finally {
- p.incrementActiveCount(); // re-activate
- }
- }
- }
- w.currentJoin = prevJoin;
- return s;
+ return (s = status) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (w = (wt = (ForkJoinWorkerThread)t).workQueue).
+ tryUnpush(this) && (s = doExec()) < 0 ? s :
+ wt.pool.awaitJoin(w, this) :
+ externalAwaitDone();
}
/**
@@ -395,16 +352,11 @@ public abstract class ForkJoinTask im
* @return status upon completion
*/
private int doInvoke() {
- int s; Thread t;
- if ((s = doExec()) >= 0) {
- if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
- s = externalAwaitDone();
- else {
- ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- s = awaitJoin(wt.workQueue, wt.pool);
- }
- }
- return s;
+ int s; Thread t; ForkJoinWorkerThread wt;
+ return (s = doExec()) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ externalAwaitDone();
}
// Exception table support
@@ -452,30 +404,52 @@ public abstract class ForkJoinTask im
}
/**
- * Records exception and sets exceptional completion.
+ * Records exception and sets status.
*
* @return status on exit
*/
- private int setExceptionalCompletion(Throwable ex) {
- int h = System.identityHashCode(this);
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
- try {
- expungeStaleExceptions();
- ExceptionNode[] t = exceptionTable;
- int i = h & (t.length - 1);
- for (ExceptionNode e = t[i]; ; e = e.next) {
- if (e == null) {
- t[i] = new ExceptionNode(this, ex, t[i]);
- break;
+ final int recordExceptionalCompletion(Throwable ex) {
+ int s;
+ if ((s = status) >= 0) {
+ int h = System.identityHashCode(this);
+ final ReentrantLock lock = exceptionTableLock;
+ lock.lock();
+ try {
+ expungeStaleExceptions();
+ ExceptionNode[] t = exceptionTable;
+ int i = h & (t.length - 1);
+ for (ExceptionNode e = t[i]; ; e = e.next) {
+ if (e == null) {
+ t[i] = new ExceptionNode(this, ex, t[i]);
+ break;
+ }
+ if (e.get() == this) // already present
+ break;
}
- if (e.get() == this) // already present
- break;
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
+ s = setCompletion(EXCEPTIONAL);
}
- return setCompletion(EXCEPTIONAL);
+ return s;
+ }
+
+ /**
+ * Records exception and possibly propagates.
+ *
+ * @return status on exit
+ */
+ private int setExceptionalCompletion(Throwable ex) {
+ int s = recordExceptionalCompletion(ex);
+ if ((s & DONE_MASK) == EXCEPTIONAL)
+ internalPropagateException(ex);
+ return s;
+ }
+
+ /**
+ * Hook for exception propagation support for tasks with completers.
+ */
+ void internalPropagateException(Throwable ex) {
}
/**
@@ -539,7 +513,7 @@ public abstract class ForkJoinTask im
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
- if (status != EXCEPTIONAL)
+ if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
@@ -557,7 +531,7 @@ public abstract class ForkJoinTask im
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (e.thrower != Thread.currentThread().getId()) {
+ if (false && e.thrower != Thread.currentThread().getId()) {
Class extends Throwable> ec = ex.getClass();
try {
Constructor> noArgCtor = null;
@@ -624,42 +598,62 @@ public abstract class ForkJoinTask im
}
/**
- * Report the result of invoke or join; called only upon
- * non-normal return of internal versions.
+ * A version of "sneaky throw" to relay exceptions
+ */
+ static void rethrow(final Throwable ex) {
+ if (ex != null) {
+ if (ex instanceof Error)
+ throw (Error)ex;
+ if (ex instanceof RuntimeException)
+ throw (RuntimeException)ex;
+ ForkJoinTask.uncheckedThrow(ex);
+ }
+ }
+
+ /**
+ * The sneaky part of sneaky throw, relying on generics
+ * limitations to evade compiler complaints about rethrowing
+ * unchecked exceptions
*/
- private V reportResult() {
- int s; Throwable ex;
- if ((s = status) == CANCELLED)
+ @SuppressWarnings("unchecked") static
+ void uncheckedThrow(Throwable t) throws T {
+ if (t != null)
+ throw (T)t; // rely on vacuous cast
+ }
+
+ /**
+ * Throws exception, if any, associated with the given status.
+ */
+ private void reportException(int s) {
+ if (s == CANCELLED)
throw new CancellationException();
- if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
- U.throwException(ex);
- return getRawResult();
+ if (s == EXCEPTIONAL)
+ rethrow(getThrowableException());
}
// public methods
/**
- * Arranges to asynchronously execute this task. While it is not
- * necessarily enforced, it is a usage error to fork a task more
- * than once unless it has completed and been reinitialized.
- * Subsequent modifications to the state of this task or any data
- * it operates on are not necessarily consistently observable by
- * any thread other than the one executing it unless preceded by a
- * call to {@link #join} or related methods, or a call to {@link
- * #isDone} returning {@code true}.
- *
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * Arranges to asynchronously execute this task in the pool the
+ * current task is running in, if applicable, or using the {@link
+ * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
+ * it is not necessarily enforced, it is a usage error to fork a
+ * task more than once unless it has completed and been
+ * reinitialized. Subsequent modifications to the state of this
+ * task or any data it operates on are not necessarily
+ * consistently observable by any thread other than the one
+ * executing it unless preceded by a call to {@link #join} or
+ * related methods, or a call to {@link #isDone} returning {@code
+ * true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask fork() {
- ForkJoinWorkerThread wt;
- (wt = (ForkJoinWorkerThread)Thread.currentThread()).
- workQueue.push(this, wt.pool);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).workQueue.push(this);
+ else
+ ForkJoinPool.common.externalPush(this);
return this;
}
@@ -675,10 +669,10 @@ public abstract class ForkJoinTask im
* @return the computed result
*/
public final V join() {
- if (doJoin() != NORMAL)
- return reportResult();
- else
- return getRawResult();
+ int s;
+ if ((s = doJoin() & DONE_MASK) != NORMAL)
+ reportException(s);
+ return getRawResult();
}
/**
@@ -690,10 +684,10 @@ public abstract class ForkJoinTask im
* @return the computed result
*/
public final V invoke() {
- if (doInvoke() != NORMAL)
- return reportResult();
- else
- return getRawResult();
+ int s;
+ if ((s = doInvoke() & DONE_MASK) != NORMAL)
+ reportException(s);
+ return getRawResult();
}
/**
@@ -709,20 +703,17 @@ public abstract class ForkJoinTask im
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param t1 the first task
* @param t2 the second task
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask> t1, ForkJoinTask> t2) {
+ int s1, s2;
t2.fork();
- t1.invoke();
- t2.join();
+ if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
+ t1.reportException(s1);
+ if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
+ t2.reportException(s2);
}
/**
@@ -737,12 +728,6 @@ public abstract class ForkJoinTask im
* related methods to check if they have been cancelled, completed
* normally or exceptionally, or left unprocessed.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the tasks
* @throws NullPointerException if any task is null
*/
@@ -770,7 +755,7 @@ public abstract class ForkJoinTask im
}
}
if (ex != null)
- U.throwException(ex);
+ rethrow(ex);
}
/**
@@ -786,12 +771,6 @@ public abstract class ForkJoinTask im
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the collection of tasks
* @return the tasks argument, to simplify usage
* @throws NullPointerException if tasks or any element are null
@@ -827,7 +806,7 @@ public abstract class ForkJoinTask im
}
}
if (ex != null)
- U.throwException(ex);
+ rethrow(ex);
return tasks;
}
@@ -859,7 +838,7 @@ public abstract class ForkJoinTask im
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- return setCompletion(CANCELLED) == CANCELLED;
+ return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
public final boolean isDone() {
@@ -867,7 +846,7 @@ public abstract class ForkJoinTask im
}
public final boolean isCancelled() {
- return status == CANCELLED;
+ return (status & DONE_MASK) == CANCELLED;
}
/**
@@ -887,7 +866,7 @@ public abstract class ForkJoinTask im
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
- return status == NORMAL;
+ return (status & DONE_MASK) == NORMAL;
}
/**
@@ -898,7 +877,7 @@ public abstract class ForkJoinTask im
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
- int s = status;
+ int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
@@ -948,6 +927,18 @@ public abstract class ForkJoinTask im
}
/**
+ * Completes this task normally without setting a value. The most
+ * recent value established by {@link #setRawResult} (or {@code
+ * null} by default) will be returned as the result of subsequent
+ * invocations of {@code join} and related operations.
+ *
+ * @since 1.8
+ */
+ public final void quietlyComplete() {
+ setCompletion(NORMAL);
+ }
+
+ /**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
@@ -960,9 +951,9 @@ public abstract class ForkJoinTask im
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
- doJoin() : externalInterruptibleAwaitDone(0L);
+ doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
- if (s == CANCELLED)
+ if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
@@ -985,52 +976,63 @@ public abstract class ForkJoinTask im
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- // Messy in part because we measure in nanos, but wait in millis
- int s; long millis, nanos;
- Thread t = Thread.currentThread();
- if (!(t instanceof ForkJoinWorkerThread)) {
- if ((millis = unit.toMillis(timeout)) > 0L)
- s = externalInterruptibleAwaitDone(millis);
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ // Messy in part because we measure in nanosecs, but wait in millisecs
+ int s; long ms;
+ long ns = unit.toNanos(timeout);
+ if ((s = status) >= 0 && ns > 0L) {
+ long deadline = System.nanoTime() + ns;
+ ForkJoinPool p = null;
+ ForkJoinPool.WorkQueue w = null;
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ p = wt.pool;
+ w = wt.workQueue;
+ p.helpJoinOnce(w, this); // no retries on failure
+ }
else
- s = status;
- }
- else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) {
- long deadline = System.nanoTime() + nanos;
- ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- ForkJoinPool.WorkQueue w = wt.workQueue;
- ForkJoinPool p = wt.pool;
- if (w.tryUnpush(this))
- doExec();
- boolean blocking = false;
+ ForkJoinPool.externalHelpJoin(this);
+ boolean canBlock = false;
+ boolean interrupted = false;
try {
while ((s = status) >= 0) {
- if (w.runState < 0)
+ if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this);
- else if (!blocking)
- blocking = p.tryCompensate();
+ else if (!canBlock) {
+ if (p == null || p.tryCompensate())
+ canBlock = true;
+ }
else {
- millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- if (millis > 0L &&
+ if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- try {
- synchronized (this) {
- if (status >= 0)
- wait(millis);
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait(ms);
+ } catch (InterruptedException ie) {
+ if (p == null)
+ interrupted = true;
+ }
}
- } catch (InterruptedException ie) {
+ else
+ notifyAll();
}
}
- if ((s = status) < 0 ||
- (nanos = deadline - System.nanoTime()) <= 0L)
+ if ((s = status) < 0 || interrupted ||
+ (ns = deadline - System.nanoTime()) <= 0L)
break;
}
}
} finally {
- if (blocking)
+ if (p != null && canBlock)
p.incrementActiveCount();
}
+ if (interrupted)
+ throw new InterruptedException();
}
- if (s != NORMAL) {
+ if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -1067,17 +1069,15 @@ public abstract class ForkJoinTask im
* be of use in designs in which many tasks are forked, but none
* are explicitly joined, instead executing them until all are
* processed.
- *
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
*/
public static void helpQuiesce() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- wt.pool.helpQuiescePool(wt.workQueue);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ wt.pool.helpQuiescePool(wt.workQueue);
+ }
+ else
+ ForkJoinPool.quiesceCommonPool();
}
/**
@@ -1097,7 +1097,7 @@ public abstract class ForkJoinTask im
* setRawResult(null)}.
*/
public void reinitialize() {
- if (status == EXCEPTIONAL)
+ if ((status & DONE_MASK) == EXCEPTIONAL)
clearExceptionalCompletion();
else
status = 0;
@@ -1130,23 +1130,19 @@ public abstract class ForkJoinTask im
/**
* Tries to unschedule this task for execution. This method will
- * typically succeed if this task is the most recently forked task
- * by the current thread, and has not commenced executing in
- * another thread. This method may be useful when arranging
- * alternative local processing of tasks that could have been, but
- * were not, stolen.
- *
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * typically (but is not guaranteed to) succeed if this task is
+ * the most recently forked task by the current thread, and has
+ * not commenced executing in another thread. This method may be
+ * useful when arranging alternative local processing of tasks
+ * that could have been, but were not, stolen.
*
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread)Thread.currentThread())
- .workQueue.tryUnpush(this);
+ Thread t;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
+ ForkJoinPool.tryExternalUnpush(this));
}
/**
@@ -1155,84 +1151,32 @@ public abstract class ForkJoinTask im
* value may be useful for heuristic decisions about whether to
* fork other tasks.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the number of tasks
*/
public static int getQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.queueSize();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? 0 : q.queueSize();
}
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
- * threads that might steal them. This value may be useful for
+ * threads that might steal them, or zero if this thread is not
+ * operating in a ForkJoinPool. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- /*
- * The aim of this method is to return a cheap heuristic guide
- * for task partitioning when programmers, frameworks, tools,
- * or languages have little or no idea about task granularity.
- * In essence by offering this method, we ask users only about
- * tradeoffs in overhead vs expected throughput and its
- * variance, rather than how finely to partition tasks.
- *
- * In a steady state strict (tree-structured) computation,
- * each thread makes available for stealing enough tasks for
- * other threads to remain active. Inductively, if all threads
- * play by the same rules, each thread should make available
- * only a constant number of tasks.
- *
- * The minimum useful constant is just 1. But using a value of
- * 1 would require immediate replenishment upon each steal to
- * maintain enough tasks, which is infeasible. Further,
- * partitionings/granularities of offered tasks should
- * minimize steal rates, which in general means that threads
- * nearer the top of computation tree should generate more
- * than those nearer the bottom. In perfect steady state, each
- * thread is at approximately the same level of computation
- * tree. However, producing extra tasks amortizes the
- * uncertainty of progress and diffusion assumptions.
- *
- * So, users will want to use values larger, but not much
- * larger than 1 to both smooth over transient shortages and
- * hedge against uneven progress; as traded off against the
- * cost of extra task overhead. We leave the user to pick a
- * threshold value to compare with the results of this call to
- * guide decisions, but recommend values such as 3.
- *
- * When all threads are active, it is on average OK to
- * estimate surplus strictly locally. In steady-state, if one
- * thread is maintaining say 2 surplus tasks, then so are
- * others. So we can just use estimated queue length.
- * However, this strategy alone leads to serious mis-estimates
- * in some non-steady-state conditions (ramp-up, ramp-down,
- * other stalls). We can detect many of these by further
- * considering the number of "idle" threads, that are known to
- * have zero queued tasks, so compensate by a factor of
- * (#idle/#active) threads.
- */
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.workQueue.queueSize() - wt.pool.idlePerActive();
+ return ForkJoinPool.getSurplusQueuedTaskCount();
}
// Extension methods
@@ -1258,15 +1202,18 @@ public abstract class ForkJoinTask im
protected abstract void setRawResult(V value);
/**
- * Immediately performs the base action of this task. This method
- * is designed to support extensions, and should not in general be
- * called otherwise. The return value controls whether this task
- * is considered to be done normally. It may return false in
+ * Immediately performs the base action of this task and returns
+ * true if, upon return from this method, this task is guaranteed
+ * to have completed normally. This method may return false
+ * otherwise, to indicate that this task is not necessarily
+ * complete (or is not known to be complete), for example in
* asynchronous actions that require explicit invocations of
- * {@link #complete} to become joinable. It may also throw an
- * (unchecked) exception to indicate abnormal exit.
+ * completion methods. This method may also throw an (unchecked)
+ * exception to indicate abnormal exit. This method is designed to
+ * support extensions, and should not in general be called
+ * otherwise.
*
- * @return {@code true} if completed normally
+ * @return {@code true} if this task is known to have completed normally
*/
protected abstract boolean exec();
@@ -1280,99 +1227,100 @@ public abstract class ForkJoinTask im
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? null : q.peek();
}
/**
* Unschedules and returns, without executing, the next task
- * queued by the current thread but not yet executed. This method
- * is designed primarily to support extensions, and is unlikely to
- * be useful otherwise.
- *
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * queued by the current thread but not yet executed, if the
+ * current thread is operating in a ForkJoinPool. This method is
+ * designed primarily to support extensions, and is unlikely to be
+ * useful otherwise.
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.nextLocalTask();
+ Thread t;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
+ null;
}
/**
- * Unschedules and returns, without executing, the next task
+ * If the current thread is operating in a ForkJoinPool,
+ * unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
- * {@code null} result does not necessarily imply quiescence
- * of the pool this task is operating in. This method is designed
+ * {@code null} result does not necessarily imply quiescence of
+ * the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- *
This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollTask() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.pool.nextTaskFor(wt.workQueue);
+ Thread t; ForkJoinWorkerThread wt;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
+ null;
}
- // Mark-bit operations
+ // tag operations
/**
- * Returns true if this task is marked.
+ * Returns the tag for this task.
*
- * @return true if this task is marked
+ * @return the tag for this task
* @since 1.8
*/
- public final boolean isMarkedForkJoinTask() {
- return (status & MARKED) != 0;
+ public final short getForkJoinTaskTag() {
+ return (short)status;
}
/**
- * Atomically sets the mark on this task.
+ * Atomically sets the tag value for this task.
*
- * @return true if this task was previously unmarked
+ * @param tag the tag value
+ * @return the previous value of the tag
* @since 1.8
*/
- public final boolean markForkJoinTask() {
+ public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
- if (((s = status) & MARKED) != 0)
- return false;
- if (U.compareAndSwapInt(this, STATUS, s, s | MARKED))
- return true;
+ if (U.compareAndSwapInt(this, STATUS, s = status,
+ (s & ~SMASK) | (tag & SMASK)))
+ return (short)s;
}
}
/**
- * Atomically clears the mark on this task.
+ * Atomically conditionally sets the tag value for this task.
+ * Among other applications, tags can be used as visit markers
+ * in tasks operating on graphs, as in methods that check: {@code
+ * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
+ * before processing, otherwise exiting because the node has
+ * already been visited.
*
- * @return true if this task was previously marked
+ * @param e the expected tag value
+ * @param tag the new tag value
+ * @return true if successful; i.e., the current value was
+ * equal to e and is now tag.
* @since 1.8
*/
- public final boolean unmarkForkJoinTask() {
+ public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
- if (((s = status) & MARKED) == 0)
+ if ((short)(s = status) != e)
return false;
- if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED))
+ if (U.compareAndSwapInt(this, STATUS, s,
+ (s & ~SMASK) | (tag & SMASK)))
return true;
}
}
@@ -1385,21 +1333,33 @@ public abstract class ForkJoinTask im
static final class AdaptedRunnable extends ForkJoinTask
implements RunnableFuture {
final Runnable runnable;
- final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
- this.resultOnCompletion = result;
+ this.result = result; // OK to set this even before completion
}
- public T getRawResult() { return result; }
- public void setRawResult(T v) { result = v; }
- public boolean exec() {
- runnable.run();
- result = resultOnCompletion;
- return true;
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ public final boolean exec() { runnable.run(); return true; }
+ public final void run() { invoke(); }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
+ /**
+ * Adaptor for Runnables without results
+ */
+ static final class AdaptedRunnableAction extends ForkJoinTask
+ implements RunnableFuture {
+ final Runnable runnable;
+ AdaptedRunnableAction(Runnable runnable) {
+ if (runnable == null) throw new NullPointerException();
+ this.runnable = runnable;
}
- public void run() { invoke(); }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ public final boolean exec() { runnable.run(); return true; }
+ public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
@@ -1414,9 +1374,9 @@ public abstract class ForkJoinTask im
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
- public T getRawResult() { return result; }
- public void setRawResult(T v) { result = v; }
- public boolean exec() {
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ public final boolean exec() {
try {
result = callable.call();
return true;
@@ -1428,7 +1388,7 @@ public abstract class ForkJoinTask im
throw new RuntimeException(ex);
}
}
- public void run() { invoke(); }
+ public final void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
@@ -1441,7 +1401,7 @@ public abstract class ForkJoinTask im
* @return the task
*/
public static ForkJoinTask> adapt(Runnable runnable) {
- return new AdaptedRunnable(runnable, null);
+ return new AdaptedRunnableAction(runnable);
}
/**
@@ -1500,14 +1460,16 @@ public abstract class ForkJoinTask im
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long STATUS;
+
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue