/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package jsr166y;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.RandomAccess;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.reflect.Constructor;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
*
A "main" {@code ForkJoinTask} begins execution when submitted
* to a {@link ForkJoinPool}. Once started, it will usually in turn
* start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow
* support of new forms of fork/join processing.
*
*
A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
* reflecting their main use as computational tasks calculating pure
* functions or operating on purely isolated objects. The primary
* coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
* ideally avoid {@code synchronized} methods or blocks, and should
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling. Subdividable tasks should also
* not perform blocking IO, and should ideally access variables that
* are completely independent of those accessed by other running
* tasks. These guidelines are loosely enforced by not permitting
* checked exceptions such as {@code IOExceptions} to be
* thrown. However, computations may still encounter unchecked
* exceptions, that are rethrown to callers attempting to join
* them. These exceptions may additionally include {@link
* RejectedExecutionException} stemming from internal resource
* exhaustion, such as failure to allocate internal task
* queues. Rethrown exceptions behave in the same way as regular
* exceptions, but, when possible, contain stack traces (as displayed
* for example using {@code ex.printStackTrace()}) of both the thread
* that initiated the computation as well as the thread actually
* encountering the exception; minimally only the latter.
*
*
It is possible to define and use ForkJoinTasks that may block,
* but doing do requires three further considerations: (1) Completion
* of few if any other tasks should be dependent on a task
* that blocks on external synchronization or IO. Event-style async
* tasks that are never joined often fall into this category. (2) To
* minimize resource impact, tasks should be small; ideally performing
* only the (possibly) blocking action. (3) Unless the {@link
* ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
* blocked tasks is known to be less than the pool's {@link
* ForkJoinPool#getParallelism} level, the pool cannot guarantee that
* enough threads will be available to ensure progress or good
* performance.
*
*
The primary method for awaiting completion and extracting
* results of a task is {@link #join}, but there are several variants:
* The {@link Future#get} methods support interruptible and/or timed
* waits for completion and report results using {@code Future}
* conventions. Method {@link #invoke} is semantically
* equivalent to {@code fork(); join()} but always attempts to begin
* execution in the current thread. The "quiet" forms of
* these methods do not extract results or report exceptions. These
* may be useful when a set of tasks are being executed, and you need
* to delay processing of results or exceptions until all complete.
* Method {@code invokeAll} (available in multiple versions)
* performs the most common form of parallel invocation: forking a set
* of tasks and joining them all.
*
*
In the most typical usages, a fork-join pair act like a a call
* (fork) and return (join) from a parallel recursive function. As is
* the case with other forms of recursive calls, returns (joins)
* should be performed innermost-first. For example, {@code a.fork();
* b.fork(); b.join(); a.join();} is likely to be substantially more
* efficient than joining {@code a} before {@code b}.
*
*
The execution status of tasks may be queried at several levels
* of detail: {@link #isDone} is true if a task completed in any way
* (including the case where a task was cancelled without executing);
* {@link #isCompletedNormally} is true if a task completed without
* cancellation or encountering an exception; {@link #isCancelled} is
* true if the task was cancelled (in which case {@link #getException}
* returns a {@link java.util.concurrent.CancellationException}); and
* {@link #isCompletedAbnormally} is true if a task was either
* cancelled or encountered an exception, in which case {@link
* #getException} will return either the encountered exception or
* {@link java.util.concurrent.CancellationException}.
*
*
The ForkJoinTask class is not usually directly subclassed.
* Instead, you subclass one of the abstract classes that support a
* particular style of fork/join processing, typically {@link
* RecursiveAction} for computations that do not return results, or
* {@link RecursiveTask} for those that do. Normally, a concrete
* ForkJoinTask subclass declares fields comprising its parameters,
* established in a constructor, and then defines a {@code compute}
* method that somehow uses the control methods supplied by this base
* class. While these methods have {@code public} access (to allow
* instances of different task subclasses to call each other's
* methods), some of them may only be called from within other
* ForkJoinTasks (as may be determined using method {@link
* #inForkJoinPool}). Attempts to invoke them in other contexts
* result in exceptions or errors, possibly including
* {@code ClassCastException}.
*
*
Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
* parallel computation can be described as a directed acyclic graph
* (DAG). Otherwise, executions may encounter a form of deadlock as
* tasks cyclically wait for each other. However, this framework
* supports other methods and techniques (for example the use of
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
* ForkJoinTask may be atomically marked using {@link
* #markForkJoinTask} and checked for marking using {@link
* #isMarkedForkJoinTask}. The ForkJoinTask implementation does not
* use these {@code protected} methods or marks for any purpose, but
* they may be of use in the construction of specialized subclasses.
* For example, parallel graph traversals can use the supplied methods
* to avoid revisiting nodes/tasks that have already been
* processed. Also, completion based designs can use them to record
* that one subtask has completed. (Method names for marking are bulky
* in part to encourage definition of methods that reflect their usage
* patterns.)
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
* underlying lightweight task scheduling framework. Developers
* creating new basic styles of fork/join processing should minimally
* implement {@code protected} methods {@link #exec}, {@link
* #setRawResult}, and {@link #getRawResult}, while also introducing
* an abstract computational method that can be implemented in its
* subclasses, possibly relying on other {@code protected} methods
* provided by this class.
*
*
ForkJoinTasks should perform relatively small amounts of
* computation. Large tasks should be split into smaller subtasks,
* usually via recursive decomposition. As a very rough rule of thumb,
* a task should perform more than 100 and less than 10000 basic
* computational steps, and should avoid indefinite looping. If tasks
* are too big, then parallelism cannot improve throughput. If too
* small, then memory and internal task maintenance overhead may
* overwhelm processing.
*
*
This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
* {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
* of this form, consider using a pool constructed in asyncMode.
*
*
ForkJoinTasks are {@code Serializable}, which enables them to be
* used in extensions such as remote execution frameworks. It is
* sensible to serialize tasks only before or after, but not during,
* execution. Serialization is not relied on during execution itself.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class ForkJoinTask implements Future, Serializable {
/*
* See the internal documentation of class ForkJoinPool for a
* general implementation overview. ForkJoinTasks are mainly
* responsible for maintaining their "status" field amidst relays
* to methods in ForkJoinWorkerThread and ForkJoinPool.
*
* The methods of this class are more-or-less layered into
* (1) basic status maintenance
* (2) execution and awaiting completion
* (3) user-level methods that additionally report results.
* This is sometimes hard to see because this file orders exported
* methods in a way that flows well in javadocs.
*/
/**
* The number of times to try to help join a task without any
* apparent progress before giving up and blocking. The value is
* arbitrary but should be large enough to cope with transient
* stalls (due to GC etc) that can cause helping methods not to be
* able to proceed because other workers have not progressed to
* the point where subtasks can be found or taken.
*/
private static final int HELP_RETRIES = 32;
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
* NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
* basic builtin wait/notify to take advantage of "monitor
* inflation" in JVMs that we would otherwise need to emulate to
* avoid adding further per-task bookkeeping overhead. We want
* these monitors to be "fat", i.e., not use biasing or thin-lock
* techniques, so use some odd coding idioms that tend to avoid
* them.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0
static final int CANCELLED = 0xfffffff8; // must be < NORMAL
static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED
static final int SIGNAL = 0x00000001;
static final int MARKED = 0x00000002;
/**
* Marks completion and wakes up threads waiting to join this
* task, also clearing signal request bits. A specialization for
* NORMAL completion is in method doExec.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) {
if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*
* @return status on exit from this method
*/
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
while ((s = status) >= 0 && completed) {
if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) {
if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return NORMAL;
}
}
}
return s;
}
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s;
if ((s = status) >= 0) {
boolean interrupted = false;
synchronized (this) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
/**
* Blocks a non-worker-thread until completion or interruption or timeout.
*/
private int externalInterruptibleAwaitDone(long millis)
throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0) {
synchronized (this) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
wait(millis);
if (millis > 0L)
break;
}
}
}
}
return s;
}
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
if ((s = status) >= 0) {
if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
s = externalAwaitDone();
else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) || (s = doExec()) >= 0)
s = awaitJoin(w, wt.pool);
}
return s;
}
/**
* Helps and/or blocks until joined.
*
* @param w the joiner
* @param p the pool
* @return status upon completion
*/
private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) {
int s;
ForkJoinTask> prevJoin = w.currentJoin;
w.currentJoin = this;
for (int k = HELP_RETRIES; (s = status) >= 0;) {
if ((w.queueSize() > 0) ?
w.tryRemoveAndExec(this) : // self-help
p.tryHelpStealer(w, this)) // help process tasks
k = HELP_RETRIES; // reset if made progress
else if ((s = status) < 0) // recheck
break;
else if (--k > 0) {
if ((k & 3) == 1)
Thread.yield(); // occasionally yield
}
else if (k == 0)
p.tryPollForAndExec(w, this); // uncommon self-help case
else if (p.tryCompensate()) { // true if can block
try {
int ss = status;
if (ss >= 0 && // assert need signal
U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) {
synchronized (this) {
if (status >= 0) // block
wait();
}
}
} catch (InterruptedException ignore) {
} finally {
p.incrementActiveCount(); // re-activate
}
}
}
w.currentJoin = prevJoin;
return s;
}
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t;
if ((s = doExec()) >= 0) {
if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
s = externalAwaitDone();
else {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
s = awaitJoin(wt.workQueue, wt.pool);
}
}
return s;
}
// Exception table support
/**
* Table of exceptions thrown by tasks, to enable reporting by
* callers. Because exceptions are rare, we don't directly keep
* them with task objects, but instead use a weak ref table. Note
* that cancellation exceptions don't appear in the table, but are
* instead recorded as status values.
*
* Note: These statics are initialized below in static block.
*/
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue