A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
- * reflecting their intended use as computational tasks calculating
- * pure functions or operating on purely isolated objects. The
- * primary coordination mechanisms are {@link #fork}, that arranges
+ * reflecting their main use as computational tasks calculating pure
+ * functions or operating on purely isolated objects. The primary
+ * coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
- * avoid {@code synchronized} methods or blocks, and should minimize
- * other blocking synchronization apart from joining other tasks or
- * using synchronizers such as Phasers that are advertised to
- * cooperate with fork/join scheduling. Tasks should also not perform
- * blocking IO, and should ideally access variables that are
- * completely independent of those accessed by other running
- * tasks. Minor breaches of these restrictions, for example using
- * shared output streams, may be tolerable in practice, but frequent
- * use may result in poor performance, and the potential to
- * indefinitely stall if the number of threads not waiting for IO or
- * other external synchronization becomes exhausted. This usage
- * restriction is in part enforced by not permitting checked
- * exceptions such as {@code IOExceptions} to be thrown. However,
- * computations may still encounter unchecked exceptions, that are
- * rethrown to callers attempting to join them. These exceptions may
- * additionally include {@link RejectedExecutionException} stemming
- * from internal resource exhaustion, such as failure to allocate
- * internal task queues. Rethrown exceptions behave in the same way as
- * regular exceptions, but, when possible, contain stack traces (as
- * displayed for example using {@code ex.printStackTrace()}) of both
- * the thread that initiated the computation as well as the thread
- * actually encountering the exception; minimally only the latter.
+ * ideally avoid {@code synchronized} methods or blocks, and should
+ * minimize other blocking synchronization apart from joining other
+ * tasks or using synchronizers such as Phasers that are advertised to
+ * cooperate with fork/join scheduling. Subdividable tasks should also
+ * not perform blocking IO, and should ideally access variables that
+ * are completely independent of those accessed by other running
+ * tasks. These guidelines are loosely enforced by not permitting
+ * checked exceptions such as {@code IOExceptions} to be
+ * thrown. However, computations may still encounter unchecked
+ * exceptions, that are rethrown to callers attempting to join
+ * them. These exceptions may additionally include {@link
+ * RejectedExecutionException} stemming from internal resource
+ * exhaustion, such as failure to allocate internal task
+ * queues. Rethrown exceptions behave in the same way as regular
+ * exceptions, but, when possible, contain stack traces (as displayed
+ * for example using {@code ex.printStackTrace()}) of both the thread
+ * that initiated the computation as well as the thread actually
+ * encountering the exception; minimally only the latter.
+ *
+ *
It is possible to define and use ForkJoinTasks that may block,
+ * but doing do requires three further considerations: (1) Completion
+ * of few if any other tasks should be dependent on a task
+ * that blocks on external synchronization or IO. Event-style async
+ * tasks that are never joined often fall into this category. (2) To
+ * minimize resource impact, tasks should be small; ideally performing
+ * only the (possibly) blocking action. (3) Unless the {@link
+ * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
+ * blocked tasks is known to be less than the pool's {@link
+ * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
+ * enough threads will be available to ensure progress or good
+ * performance.
*
*
The primary method for awaiting completion and extracting
* results of a task is {@link #join}, but there are several variants:
@@ -89,6 +93,13 @@ import java.lang.reflect.Constructor;
* performs the most common form of parallel invocation: forking a set
* of tasks and joining them all.
*
+ *
In the most typical usages, a fork-join pair act like a call
+ * (fork) and return (join) from a parallel recursive function. As is
+ * the case with other forms of recursive calls, returns (joins)
+ * should be performed innermost-first. For example, {@code a.fork();
+ * b.fork(); b.join(); a.join();} is likely to be substantially more
+ * efficient than joining {@code a} before {@code b}.
+ *
*
The execution status of tasks may be queried at several levels
* of detail: {@link #isDone} is true if a task completed in any way
* (including the case where a task was cancelled without executing);
@@ -125,7 +136,18 @@ import java.lang.reflect.Constructor;
* supports other methods and techniques (for example the use of
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
- * are not statically structured as DAGs.
+ * are not statically structured as DAGs. To support such usages a
+ * ForkJoinTask may be atomically tagged with a {@code
+ * short} value using {@link #setForkJoinTaskTag} or {@link
+ * #compareAndSetForkJoinTaskTag} and checked using {@link
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not
+ * use these {@code protected} methods or tags for any purpose, but
+ * they may be of use in the construction of specialized subclasses.
+ * For example, parallel graph traversals can use the supplied methods
+ * to avoid revisiting nodes/tasks that have already been processed.
+ * Also, completion based designs can use them to record that subtasks
+ * have completed. (Method names for tagging are bulky in part to
+ * encourage definition of methods that reflect their usage patterns.)
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -165,41 +187,50 @@ public abstract class ForkJoinTask im
* See the internal documentation of class ForkJoinPool for a
* general implementation overview. ForkJoinTasks are mainly
* responsible for maintaining their "status" field amidst relays
- * to methods in ForkJoinWorkerThread and ForkJoinPool. The
- * methods of this class are more-or-less layered into (1) basic
- * status maintenance (2) execution and awaiting completion (3)
- * user-level methods that additionally report results. This is
- * sometimes hard to see because this file orders exported methods
- * in a way that flows well in javadocs.
+ * to methods in ForkJoinWorkerThread and ForkJoinPool.
+ *
+ * The methods of this class are more-or-less layered into
+ * (1) basic status maintenance
+ * (2) execution and awaiting completion
+ * (3) user-level methods that additionally report results.
+ * This is sometimes hard to see because this file orders exported
+ * methods in a way that flows well in javadocs.
*/
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
- * values until completed, upon which status holds value
- * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
- * waits by other threads have the SIGNAL bit set. Completion of
- * a stolen task with SIGNAL set awakens any waiters via
- * notifyAll. Even though suboptimal for some purposes, we use
- * basic builtin wait/notify to take advantage of "monitor
- * inflation" in JVMs that we would otherwise need to emulate to
- * avoid adding further per-task bookkeeping overhead. We want
- * these monitors to be "fat", i.e., not use biasing or thin-lock
- * techniques, so use some odd coding idioms that tend to avoid
- * them.
+ * values until completed, upon which status (anded with
+ * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
+ * undergoing blocking waits by other threads have the SIGNAL bit
+ * set. Completion of a stolen task with SIGNAL set awakens any
+ * waiters via notifyAll. Even though suboptimal for some
+ * purposes, we use basic builtin wait/notify to take advantage of
+ * "monitor inflation" in JVMs that we would otherwise need to
+ * emulate to avoid adding further per-task bookkeeping overhead.
+ * We want these monitors to be "fat", i.e., not use biasing or
+ * thin-lock techniques, so use some odd coding idioms that tend
+ * to avoid them, mainly by arranging that every synchronized
+ * block performs a wait, notifyAll or both.
+ *
+ * These control bits occupy only (some of) the upper half (16
+ * bits) of status field. The lower bits are used for user-defined
+ * tags.
*/
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
- private static final int NORMAL = -1;
- private static final int CANCELLED = -2;
- private static final int EXCEPTIONAL = -3;
- private static final int SIGNAL = 1;
+ static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
+ static final int NORMAL = 0xf0000000; // must be negative
+ static final int CANCELLED = 0xc0000000; // must be < NORMAL
+ static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
+ static final int SIGNAL = 0x00010000; // must be >= 1 << 16
+ static final int SMASK = 0x0000ffff; // short bits for tags
/**
- * Marks completion and wakes up threads waiting to join this task,
- * also clearing signal request bits.
+ * Marks completion and wakes up threads waiting to join this
+ * task.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@@ -208,8 +239,8 @@ public abstract class ForkJoinTask im
for (int s;;) {
if ((s = status) < 0)
return s;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
- if (s != 0)
+ if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
+ if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
@@ -217,27 +248,35 @@ public abstract class ForkJoinTask im
}
/**
- * Tries to block a worker thread until completed or timed out.
- * Uses Object.wait time argument conventions.
- * May fail on contention or interrupt.
+ * Primary execution method for stolen tasks. Unless done, calls
+ * exec and records status if completed, but doesn't wait for
+ * completion otherwise.
*
- * @param millis if > 0, wait time.
+ * @return status on exit from this method
*/
- final void tryAwaitDone(long millis) {
- int s;
- try {
- if (((s = status) > 0 ||
- (s == 0 &&
- UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
- status > 0) {
- synchronized (this) {
- if (status > 0)
- wait(millis);
- }
+ final int doExec() {
+ int s; boolean completed;
+ if ((s = status) >= 0) {
+ try {
+ completed = exec();
+ } catch (Throwable rex) {
+ return setExceptionalCompletion(rex);
}
- } catch (InterruptedException ie) {
- // caller must check termination
+ if (completed)
+ s = setCompletion(NORMAL);
}
+ return s;
+ }
+
+ /**
+ * Tries to set SIGNAL status. Used by ForkJoinPool. Other
+ * variants are directly incorporated into externalAwaitDone etc.
+ *
+ * @return true if successful
+ */
+ final boolean trySetSignal() {
+ int s;
+ return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL);
}
/**
@@ -245,48 +284,42 @@ public abstract class ForkJoinTask im
* @return status upon completion
*/
private int externalAwaitDone() {
+ boolean interrupted = false;
int s;
- if ((s = status) >= 0) {
- boolean interrupted = false;
- synchronized (this) {
- while ((s = status) >= 0) {
- if (s == 0)
- UNSAFE.compareAndSwapInt(this, statusOffset,
- 0, SIGNAL);
- else {
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
+ else
+ notifyAll();
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
}
+ if (interrupted)
+ Thread.currentThread().interrupt();
return s;
}
/**
- * Blocks a non-worker-thread until completion or interruption or timeout.
+ * Blocks a non-worker-thread until completion or interruption.
*/
- private int externalInterruptibleAwaitDone(long millis)
- throws InterruptedException {
+ private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
- if ((s = status) >= 0) {
- synchronized (this) {
- while ((s = status) >= 0) {
- if (s == 0)
- UNSAFE.compareAndSwapInt(this, statusOffset,
- 0, SIGNAL);
- else {
- wait(millis);
- if (millis > 0L)
- break;
- }
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait();
+ else
+ notifyAll();
}
}
}
@@ -294,65 +327,41 @@ public abstract class ForkJoinTask im
}
/**
- * Primary execution method for stolen tasks. Unless done, calls
- * exec and records status if completed, but doesn't wait for
- * completion otherwise.
- */
- final void doExec() {
- if (status >= 0) {
- boolean completed;
- try {
- completed = exec();
- } catch (Throwable rex) {
- setExceptionalCompletion(rex);
- return;
- }
- if (completed)
- setCompletion(NORMAL); // must be outside try block
- }
- }
-
- /**
- * Primary mechanics for join, get, quietlyJoin.
+ * Implementation for join, get, quietlyJoin. Directly handles
+ * only cases of already-completed, external wait, and
+ * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
+ *
* @return status upon completion
*/
private int doJoin() {
- Thread t; ForkJoinWorkerThread w; int s; boolean completed;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- if ((s = status) < 0)
- return s;
- if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- return setExceptionalCompletion(rex);
- }
- if (completed)
- return setCompletion(NORMAL);
+ int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
+ if ((s = status) >= 0) {
+ if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
+ if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
+ tryUnpush(this) || (s = doExec()) >= 0)
+ s = wt.pool.awaitJoin(w, this);
}
- return w.joinTask(this);
+ else
+ s = externalAwaitDone();
}
- else
- return externalAwaitDone();
+ return s;
}
/**
- * Primary mechanics for invoke, quietlyInvoke.
+ * Implementation for invoke, quietlyInvoke.
+ *
* @return status upon completion
*/
private int doInvoke() {
- int s; boolean completed;
- if ((s = status) < 0)
- return s;
- try {
- completed = exec();
- } catch (Throwable rex) {
- return setExceptionalCompletion(rex);
+ int s; Thread t; ForkJoinWorkerThread wt;
+ if ((s = doExec()) >= 0) {
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,
+ this);
+ else
+ s = externalAwaitDone();
}
- if (completed)
- return setCompletion(NORMAL);
- else
- return doJoin();
+ return s;
}
// Exception table support
@@ -387,7 +396,7 @@ public abstract class ForkJoinTask im
* any ForkJoinPool will call helpExpungeStaleExceptions when its
* pool becomes isQuiescent.
*/
- static final class ExceptionNode extends WeakReference>{
+ static final class ExceptionNode extends WeakReference> {
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
@@ -427,6 +436,21 @@ public abstract class ForkJoinTask im
}
/**
+ * Cancels, ignoring any exceptions thrown by cancel. Used during
+ * worker and pool shutdown. Cancel is spec'ed not to throw any
+ * exceptions, but if it does anyway, we have no recourse during
+ * shutdown, so guard against this case.
+ */
+ static final void cancelIgnoringExceptions(ForkJoinTask> t) {
+ if (t != null && t.status >= 0) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
+ /**
* Removes exception node and clears status
*/
private void clearExceptionalCompletion() {
@@ -472,7 +496,7 @@ public abstract class ForkJoinTask im
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
- if (status != EXCEPTIONAL)
+ if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
@@ -491,7 +515,7 @@ public abstract class ForkJoinTask im
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
- Class ec = ex.getClass();
+ Class extends Throwable> ec = ex.getClass();
try {
Constructor> noArgCtor = null;
Constructor>[] cs = ec.getConstructors();// public ctors only
@@ -557,16 +581,14 @@ public abstract class ForkJoinTask im
}
/**
- * Report the result of invoke or join; called only upon
- * non-normal return of internal versions.
+ * Throws exception, if any, associated with the given status.
*/
- private V reportResult() {
- int s; Throwable ex;
- if ((s = status) == CANCELLED)
- throw new CancellationException();
- if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
- UNSAFE.throwException(ex);
- return getRawResult();
+ private void reportException(int s) {
+ Throwable ex = ((s == CANCELLED) ? new CancellationException() :
+ (s == EXCEPTIONAL) ? getThrowableException() :
+ null);
+ if (ex != null)
+ U.throwException(ex);
}
// public methods
@@ -590,8 +612,7 @@ public abstract class ForkJoinTask im
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask fork() {
- ((ForkJoinWorkerThread) Thread.currentThread())
- .pushTask(this);
+ ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this);
return this;
}
@@ -607,10 +628,10 @@ public abstract class ForkJoinTask im
* @return the computed result
*/
public final V join() {
- if (doJoin() != NORMAL)
- return reportResult();
- else
- return getRawResult();
+ int s;
+ if ((s = doJoin() & DONE_MASK) != NORMAL)
+ reportException(s);
+ return getRawResult();
}
/**
@@ -622,10 +643,10 @@ public abstract class ForkJoinTask im
* @return the computed result
*/
public final V invoke() {
- if (doInvoke() != NORMAL)
- return reportResult();
- else
- return getRawResult();
+ int s;
+ if ((s = doInvoke() & DONE_MASK) != NORMAL)
+ reportException(s);
+ return getRawResult();
}
/**
@@ -652,9 +673,12 @@ public abstract class ForkJoinTask im
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask> t1, ForkJoinTask> t2) {
+ int s1, s2;
t2.fork();
- t1.invoke();
- t2.join();
+ if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
+ t1.reportException(s1);
+ if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
+ t2.reportException(s2);
}
/**
@@ -697,12 +721,12 @@ public abstract class ForkJoinTask im
if (t != null) {
if (ex != null)
t.cancel(false);
- else if (t.doJoin() < NORMAL && ex == null)
+ else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
- UNSAFE.throwException(ex);
+ U.throwException(ex);
}
/**
@@ -754,12 +778,12 @@ public abstract class ForkJoinTask im
if (t != null) {
if (ex != null)
t.cancel(false);
- else if (t.doJoin() < NORMAL && ex == null)
+ else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
- UNSAFE.throwException(ex);
+ U.throwException(ex);
return tasks;
}
@@ -791,20 +815,7 @@ public abstract class ForkJoinTask im
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- return setCompletion(CANCELLED) == CANCELLED;
- }
-
- /**
- * Cancels, ignoring any exceptions thrown by cancel. Used during
- * worker and pool shutdown. Cancel is spec'ed not to throw any
- * exceptions, but if it does anyway, we have no recourse during
- * shutdown, so guard against this case.
- */
- final void cancelIgnoringExceptions() {
- try {
- cancel(false);
- } catch (Throwable ignore) {
- }
+ return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
public final boolean isDone() {
@@ -812,7 +823,7 @@ public abstract class ForkJoinTask im
}
public final boolean isCancelled() {
- return status == CANCELLED;
+ return (status & DONE_MASK) == CANCELLED;
}
/**
@@ -832,7 +843,7 @@ public abstract class ForkJoinTask im
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
- return status == NORMAL;
+ return (status & DONE_MASK) == NORMAL;
}
/**
@@ -843,7 +854,7 @@ public abstract class ForkJoinTask im
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
- int s = status;
+ int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
@@ -893,6 +904,18 @@ public abstract class ForkJoinTask im
}
/**
+ * Completes this task. The most recent value established by
+ * {@link #setRawResult} (or {@code null}) will be returned as the
+ * result of subsequent invocations of {@code join} and related
+ * operations. This method may be useful when processing sets of
+ * tasks when some do not otherwise complete normally. Its use in
+ * other situations is discouraged.
+ */
+ public final void quietlyComplete() {
+ setCompletion(NORMAL);
+ }
+
+ /**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
@@ -905,9 +928,9 @@ public abstract class ForkJoinTask im
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
- doJoin() : externalInterruptibleAwaitDone(0L);
+ doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
- if (s == CANCELLED)
+ if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
@@ -930,32 +953,60 @@ public abstract class ForkJoinTask im
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- long nanos = unit.toNanos(timeout);
- if (status >= 0) {
- boolean completed = false;
- if (w.unpushTask(this)) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- setExceptionalCompletion(rex);
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ // Messy in part because we measure in nanosecs, but wait in millisecs
+ int s; long ns, ms;
+ if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
+ long deadline = System.nanoTime() + ns;
+ ForkJoinPool p = null;
+ ForkJoinPool.WorkQueue w = null;
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ p = wt.pool;
+ w = wt.workQueue;
+ s = p.helpJoinOnce(w, this); // no retries on failure
+ }
+ boolean canBlock = false;
+ boolean interrupted = false;
+ try {
+ while ((s = status) >= 0) {
+ if (w != null && w.runState < 0)
+ cancelIgnoringExceptions(this);
+ else if (!canBlock) {
+ if (p == null || p.tryCompensate(this, null))
+ canBlock = true;
+ }
+ else {
+ if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait(ms);
+ } catch (InterruptedException ie) {
+ if (p == null)
+ interrupted = true;
+ }
+ }
+ else
+ notifyAll();
+ }
+ }
+ if ((s = status) < 0 || interrupted ||
+ (ns = deadline - System.nanoTime()) <= 0L)
+ break;
}
}
- if (completed)
- setCompletion(NORMAL);
- else if (status >= 0 && nanos > 0)
- w.pool.timedAwaitJoin(this, nanos);
+ } finally {
+ if (p != null && canBlock)
+ p.incrementActiveCount();
}
+ if (interrupted)
+ throw new InterruptedException();
}
- else {
- long millis = unit.toMillis(timeout);
- if (millis > 0)
- externalInterruptibleAwaitDone(millis);
- }
- int s = status;
- if (s != NORMAL) {
+ if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -1000,8 +1051,9 @@ public abstract class ForkJoinTask im
* ClassCastException}.
*/
public static void helpQuiesce() {
- ((ForkJoinWorkerThread) Thread.currentThread())
- .helpQuiescePool();
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ wt.pool.helpQuiescePool(wt.workQueue);
}
/**
@@ -1021,7 +1073,7 @@ public abstract class ForkJoinTask im
* setRawResult(null)}.
*/
public void reinitialize() {
- if (status == EXCEPTIONAL)
+ if ((status & DONE_MASK) == EXCEPTIONAL)
clearExceptionalCompletion();
else
status = 0;
@@ -1069,8 +1121,8 @@ public abstract class ForkJoinTask im
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .unpushTask(this);
+ return ((ForkJoinWorkerThread)Thread.currentThread())
+ .workQueue.tryUnpush(this);
}
/**
@@ -1089,7 +1141,7 @@ public abstract class ForkJoinTask im
*/
public static int getQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
- .getQueueSize();
+ .workQueue.queueSize();
}
/**
@@ -1111,8 +1163,52 @@ public abstract class ForkJoinTask im
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .getEstimatedSurplusTaskCount();
+ /*
+ * The aim of this method is to return a cheap heuristic guide
+ * for task partitioning when programmers, frameworks, tools,
+ * or languages have little or no idea about task granularity.
+ * In essence by offering this method, we ask users only about
+ * tradeoffs in overhead vs expected throughput and its
+ * variance, rather than how finely to partition tasks.
+ *
+ * In a steady state strict (tree-structured) computation,
+ * each thread makes available for stealing enough tasks for
+ * other threads to remain active. Inductively, if all threads
+ * play by the same rules, each thread should make available
+ * only a constant number of tasks.
+ *
+ * The minimum useful constant is just 1. But using a value of
+ * 1 would require immediate replenishment upon each steal to
+ * maintain enough tasks, which is infeasible. Further,
+ * partitionings/granularities of offered tasks should
+ * minimize steal rates, which in general means that threads
+ * nearer the top of computation tree should generate more
+ * than those nearer the bottom. In perfect steady state, each
+ * thread is at approximately the same level of computation
+ * tree. However, producing extra tasks amortizes the
+ * uncertainty of progress and diffusion assumptions.
+ *
+ * So, users will want to use values larger, but not much
+ * larger than 1 to both smooth over transient shortages and
+ * hedge against uneven progress; as traded off against the
+ * cost of extra task overhead. We leave the user to pick a
+ * threshold value to compare with the results of this call to
+ * guide decisions, but recommend values such as 3.
+ *
+ * When all threads are active, it is on average OK to
+ * estimate surplus strictly locally. In steady-state, if one
+ * thread is maintaining say 2 surplus tasks, then so are
+ * others. So we can just use estimated queue length.
+ * However, this strategy alone leads to serious mis-estimates
+ * in some non-steady-state conditions (ramp-up, ramp-down,
+ * other stalls). We can detect many of these by further
+ * considering the number of "idle" threads, that are known to
+ * have zero queued tasks, so compensate by a factor of
+ * (#idle/#active) threads.
+ */
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ return wt.workQueue.queueSize() - wt.pool.idlePerActive();
}
// Extension methods
@@ -1169,8 +1265,7 @@ public abstract class ForkJoinTask im
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .peekTask();
+ return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
}
/**
@@ -1189,7 +1284,7 @@ public abstract class ForkJoinTask im
*/
protected static ForkJoinTask> pollNextLocalTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
- .pollLocalTask();
+ .workQueue.nextLocalTask();
}
/**
@@ -1211,8 +1306,60 @@ public abstract class ForkJoinTask im
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .pollTask();
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ return wt.pool.nextTaskFor(wt.workQueue);
+ }
+
+ // tag operations
+
+ /**
+ * Returns the tag for this task.
+ *
+ * @return the tag for this task
+ * @since 1.8
+ */
+ public final short getForkJoinTaskTag() {
+ return (short)status;
+ }
+
+ /**
+ * Atomically sets the tag value for this task.
+ *
+ * @param tag the tag value
+ * @return the previous value of the tag
+ * @since 1.8
+ */
+ public final short setForkJoinTaskTag(short tag) {
+ for (int s;;) {
+ if (U.compareAndSwapInt(this, STATUS, s = status,
+ (s & ~SMASK) | (tag & SMASK)))
+ return (short)s;
+ }
+ }
+
+ /**
+ * Atomically conditionally sets the tag value for this task.
+ * Among other applications, tags can be used as visit markers
+ * in tasks operating on graphs, as in mathods that check: {@code
+ * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
+ * before processing, otherwise exiting because the node has
+ * already been visited.
+ *
+ * @param e the expected tag value
+ * @param tag the new tag value
+ * @return true if successful; i.e., the current value was
+ * equal to e and is now tag.
+ * @since 1.8
+ */
+ public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
+ for (int s;;) {
+ if ((short)(s = status) != e)
+ return false;
+ if (U.compareAndSwapInt(this, STATUS, s,
+ (s & ~SMASK) | (tag & SMASK)))
+ return true;
+ }
}
/**
@@ -1223,21 +1370,33 @@ public abstract class ForkJoinTask im
static final class AdaptedRunnable extends ForkJoinTask
implements RunnableFuture {
final Runnable runnable;
- final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
- this.resultOnCompletion = result;
+ this.result = result; // OK to set this even before completion
}
- public T getRawResult() { return result; }
- public void setRawResult(T v) { result = v; }
- public boolean exec() {
- runnable.run();
- result = resultOnCompletion;
- return true;
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ public final boolean exec() { runnable.run(); return true; }
+ public final void run() { invoke(); }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
+ /**
+ * Adaptor for Runnables without results
+ */
+ static final class AdaptedRunnableAction extends ForkJoinTask
+ implements RunnableFuture {
+ final Runnable runnable;
+ AdaptedRunnableAction(Runnable runnable) {
+ if (runnable == null) throw new NullPointerException();
+ this.runnable = runnable;
}
- public void run() { invoke(); }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ public final boolean exec() { runnable.run(); return true; }
+ public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
@@ -1252,9 +1411,9 @@ public abstract class ForkJoinTask im
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
- public T getRawResult() { return result; }
- public void setRawResult(T v) { result = v; }
- public boolean exec() {
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ public final boolean exec() {
try {
result = callable.call();
return true;
@@ -1266,7 +1425,7 @@ public abstract class ForkJoinTask im
throw new RuntimeException(ex);
}
}
- public void run() { invoke(); }
+ public final void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
@@ -1279,7 +1438,7 @@ public abstract class ForkJoinTask im
* @return the task
*/
public static ForkJoinTask> adapt(Runnable runnable) {
- return new AdaptedRunnable(runnable, null);
+ return new AdaptedRunnableAction(runnable);
}
/**
@@ -1313,11 +1472,10 @@ public abstract class ForkJoinTask im
private static final long serialVersionUID = -7721805057305804111L;
/**
- * Saves the state to a stream (that is, serializes it).
+ * Saves this task to a stream (that is, serializes it).
*
* @serialData the current run status and the exception thrown
* during execution, or {@code null} if none
- * @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
@@ -1326,9 +1484,7 @@ public abstract class ForkJoinTask im
}
/**
- * Reconstitutes the instance from a stream (that is, deserializes it).
- *
- * @param s the stream
+ * Reconstitutes this task from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
@@ -1339,15 +1495,15 @@ public abstract class ForkJoinTask im
}
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long statusOffset;
+ private static final sun.misc.Unsafe U;
+ private static final long STATUS;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue