A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
- * reflecting their intended use as computational tasks calculating
- * pure functions or operating on purely isolated objects. The
- * primary coordination mechanisms are {@link #fork}, that arranges
+ * reflecting their main use as computational tasks calculating pure
+ * functions or operating on purely isolated objects. The primary
+ * coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
- * avoid {@code synchronized} methods or blocks, and should minimize
- * other blocking synchronization apart from joining other tasks or
- * using synchronizers such as Phasers that are advertised to
- * cooperate with fork/join scheduling. Tasks should also not perform
- * blocking IO, and should ideally access variables that are
- * completely independent of those accessed by other running
- * tasks. Minor breaches of these restrictions, for example using
- * shared output streams, may be tolerable in practice, but frequent
- * use may result in poor performance, and the potential to
- * indefinitely stall if the number of threads not waiting for IO or
- * other external synchronization becomes exhausted. This usage
- * restriction is in part enforced by not permitting checked
- * exceptions such as {@code IOExceptions} to be thrown. However,
- * computations may still encounter unchecked exceptions, that are
- * rethrown to callers attempting to join them. These exceptions may
- * additionally include {@link RejectedExecutionException} stemming
- * from internal resource exhaustion, such as failure to allocate
- * internal task queues. Rethrown exceptions behave in the same way as
- * regular exceptions, but, when possible, contain stack traces (as
- * displayed for example using {@code ex.printStackTrace()}) of both
- * the thread that initiated the computation as well as the thread
- * actually encountering the exception; minimally only the latter.
+ * ideally avoid {@code synchronized} methods or blocks, and should
+ * minimize other blocking synchronization apart from joining other
+ * tasks or using synchronizers such as Phasers that are advertised to
+ * cooperate with fork/join scheduling. Subdividable tasks should also
+ * not perform blocking IO, and should ideally access variables that
+ * are completely independent of those accessed by other running
+ * tasks. These guidelines are loosely enforced by not permitting
+ * checked exceptions such as {@code IOExceptions} to be
+ * thrown. However, computations may still encounter unchecked
+ * exceptions, that are rethrown to callers attempting to join
+ * them. These exceptions may additionally include {@link
+ * RejectedExecutionException} stemming from internal resource
+ * exhaustion, such as failure to allocate internal task
+ * queues. Rethrown exceptions behave in the same way as regular
+ * exceptions, but, when possible, contain stack traces (as displayed
+ * for example using {@code ex.printStackTrace()}) of both the thread
+ * that initiated the computation as well as the thread actually
+ * encountering the exception; minimally only the latter.
+ *
+ *
It is possible to define and use ForkJoinTasks that may block,
+ * but doing do requires three further considerations: (1) Completion
+ * of few if any other tasks should be dependent on a task
+ * that blocks on external synchronization or IO. Event-style async
+ * tasks that are never joined often fall into this category. (2) To
+ * minimize resource impact, tasks should be small; ideally performing
+ * only the (possibly) blocking action. (3) Unless the {@link
+ * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
+ * blocked tasks is known to be less than the pool's {@link
+ * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
+ * enough threads will be available to ensure progress or good
+ * performance.
*
*
The primary method for awaiting completion and extracting
* results of a task is {@link #join}, but there are several variants:
@@ -89,6 +93,13 @@ import java.lang.reflect.Constructor;
* performs the most common form of parallel invocation: forking a set
* of tasks and joining them all.
*
+ *
In the most typical usages, a fork-join pair act like a a call
+ * (fork) and return (join) from a parallel recursive function. As is
+ * the case with other forms of recursive calls, returns (joins)
+ * should be performed innermost-first. For example, {@code a.fork();
+ * b.fork(); b.join(); a.join();} is likely to be substantially more
+ * efficient than joining {@code a} before {@code b}.
+ *
*
The execution status of tasks may be queried at several levels
* of detail: {@link #isDone} is true if a task completed in any way
* (including the case where a task was cancelled without executing);
@@ -125,7 +136,18 @@ import java.lang.reflect.Constructor;
* supports other methods and techniques (for example the use of
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
- * are not statically structured as DAGs.
+ * are not statically structured as DAGs. To support such usages a
+ * ForkJoinTask may be atomically marked using {@link
+ * #markForkJoinTask} and checked for marking using {@link
+ * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not
+ * use these {@code protected} methods or marks for any purpose, but
+ * they may be of use in the construction of specialized subclasses.
+ * For example, parallel graph traversals can use the supplied methods
+ * to avoid revisiting nodes/tasks that have already been
+ * processed. Also, completion based designs can use them to record
+ * that one subtask has completed. (Method names for marking are bulky
+ * in part to encourage definition of methods that reflect their usage
+ * patterns.)
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -165,13 +187,25 @@ public abstract class ForkJoinTask im
* See the internal documentation of class ForkJoinPool for a
* general implementation overview. ForkJoinTasks are mainly
* responsible for maintaining their "status" field amidst relays
- * to methods in ForkJoinWorkerThread and ForkJoinPool. The
- * methods of this class are more-or-less layered into (1) basic
- * status maintenance (2) execution and awaiting completion (3)
- * user-level methods that additionally report results. This is
- * sometimes hard to see because this file orders exported methods
- * in a way that flows well in javadocs.
+ * to methods in ForkJoinWorkerThread and ForkJoinPool.
+ *
+ * The methods of this class are more-or-less layered into
+ * (1) basic status maintenance
+ * (2) execution and awaiting completion
+ * (3) user-level methods that additionally report results.
+ * This is sometimes hard to see because this file orders exported
+ * methods in a way that flows well in javadocs.
+ */
+
+ /**
+ * The number of times to try to help join a task without any
+ * apparent progress before giving up and blocking. The value is
+ * arbitrary but should be large enough to cope with transient
+ * stalls (due to GC etc) that can cause helping methods not to be
+ * able to proceed because other workers have not progressed to
+ * the point where subtasks can be found or taken.
*/
+ private static final int HELP_RETRIES = 32;
/*
* The status field holds run control status bits packed into a
@@ -192,14 +226,16 @@ public abstract class ForkJoinTask im
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
- private static final int NORMAL = -1;
- private static final int CANCELLED = -2;
- private static final int EXCEPTIONAL = -3;
- private static final int SIGNAL = 1;
+ static final int NORMAL = 0xfffffffc; // negative with low 2 bits 0
+ static final int CANCELLED = 0xfffffff8; // must be < NORMAL
+ static final int EXCEPTIONAL = 0xfffffff4; // must be < CANCELLED
+ static final int SIGNAL = 0x00000001;
+ static final int MARKED = 0x00000002;
/**
- * Marks completion and wakes up threads waiting to join this task,
- * also clearing signal request bits.
+ * Marks completion and wakes up threads waiting to join this
+ * task, also clearing signal request bits. A specialization for
+ * NORMAL completion is in method doExec.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@@ -208,8 +244,8 @@ public abstract class ForkJoinTask im
for (int s;;) {
if ((s = status) < 0)
return s;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
- if (s != 0)
+ if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) {
+ if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
@@ -217,27 +253,29 @@ public abstract class ForkJoinTask im
}
/**
- * Tries to block a worker thread until completed or timed out.
- * Uses Object.wait time argument conventions.
- * May fail on contention or interrupt.
+ * Primary execution method for stolen tasks. Unless done, calls
+ * exec and records status if completed, but doesn't wait for
+ * completion otherwise.
*
- * @param millis if > 0, wait time.
+ * @return status on exit from this method
*/
- final void tryAwaitDone(long millis) {
- int s;
- try {
- if (((s = status) > 0 ||
- (s == 0 &&
- UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
- status > 0) {
- synchronized (this) {
- if (status > 0)
- wait(millis);
+ final int doExec() {
+ int s; boolean completed;
+ if ((s = status) >= 0) {
+ try {
+ completed = exec();
+ } catch (Throwable rex) {
+ return setExceptionalCompletion(rex);
+ }
+ while ((s = status) >= 0 && completed) {
+ if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) {
+ if ((s & SIGNAL) != 0)
+ synchronized (this) { notifyAll(); }
+ return NORMAL;
}
}
- } catch (InterruptedException ie) {
- // caller must check termination
}
+ return s;
}
/**
@@ -250,10 +288,7 @@ public abstract class ForkJoinTask im
boolean interrupted = false;
synchronized (this) {
while ((s = status) >= 0) {
- if (s == 0)
- UNSAFE.compareAndSwapInt(this, statusOffset,
- 0, SIGNAL);
- else {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
try {
wait();
} catch (InterruptedException ie) {
@@ -279,10 +314,7 @@ public abstract class ForkJoinTask im
if ((s = status) >= 0) {
synchronized (this) {
while ((s = status) >= 0) {
- if (s == 0)
- UNSAFE.compareAndSwapInt(this, statusOffset,
- 0, SIGNAL);
- else {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
wait(millis);
if (millis > 0L)
break;
@@ -293,66 +325,86 @@ public abstract class ForkJoinTask im
return s;
}
+
/**
- * Primary execution method for stolen tasks. Unless done, calls
- * exec and records status if completed, but doesn't wait for
- * completion otherwise.
+ * Implementation for join, get, quietlyJoin. Directly handles
+ * only cases of already-completed, external wait, and
+ * unfork+exec. Others are relayed to awaitJoin.
+ *
+ * @return status upon completion
*/
- final void doExec() {
- if (status >= 0) {
- boolean completed;
- try {
- completed = exec();
- } catch (Throwable rex) {
- setExceptionalCompletion(rex);
- return;
- }
- if (completed)
- setCompletion(NORMAL); // must be outside try block
+ private int doJoin() {
+ int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
+ if ((s = status) >= 0) {
+ if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
+ s = externalAwaitDone();
+ else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
+ tryUnpush(this) || (s = doExec()) >= 0)
+ s = awaitJoin(w, wt.pool);
}
+ return s;
}
/**
- * Primary mechanics for join, get, quietlyJoin.
+ * Helps and/or blocks until joined.
+ *
+ * @param w the joiner
+ * @param p the pool
* @return status upon completion
*/
- private int doJoin() {
- Thread t; ForkJoinWorkerThread w; int s; boolean completed;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- if ((s = status) < 0)
- return s;
- if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
+ private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) {
+ int s;
+ ForkJoinTask> prevJoin = w.currentJoin;
+ w.currentJoin = this;
+ for (int k = HELP_RETRIES; (s = status) >= 0;) {
+ if ((w.queueSize() > 0) ?
+ w.tryRemoveAndExec(this) : // self-help
+ p.tryHelpStealer(w, this)) // help process tasks
+ k = HELP_RETRIES; // reset if made progress
+ else if ((s = status) < 0) // recheck
+ break;
+ else if (--k > 0) {
+ if ((k & 3) == 1)
+ Thread.yield(); // occasionally yield
+ }
+ else if (k == 0)
+ p.tryPollForAndExec(w, this); // uncommon self-help case
+ else if (p.tryCompensate()) { // true if can block
try {
- completed = exec();
- } catch (Throwable rex) {
- return setExceptionalCompletion(rex);
+ int ss = status;
+ if (ss >= 0 && // assert need signal
+ U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) // block
+ wait();
+ }
+ }
+ } catch (InterruptedException ignore) {
+ } finally {
+ p.incrementActiveCount(); // re-activate
}
- if (completed)
- return setCompletion(NORMAL);
}
- return w.joinTask(this);
}
- else
- return externalAwaitDone();
+ w.currentJoin = prevJoin;
+ return s;
}
/**
- * Primary mechanics for invoke, quietlyInvoke.
+ * Implementation for invoke, quietlyInvoke.
+ *
* @return status upon completion
*/
private int doInvoke() {
- int s; boolean completed;
- if ((s = status) < 0)
- return s;
- try {
- completed = exec();
- } catch (Throwable rex) {
- return setExceptionalCompletion(rex);
+ int s; Thread t;
+ if ((s = doExec()) >= 0) {
+ if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
+ s = externalAwaitDone();
+ else {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ s = awaitJoin(wt.workQueue, wt.pool);
+ }
}
- if (completed)
- return setCompletion(NORMAL);
- else
- return doJoin();
+ return s;
}
// Exception table support
@@ -427,6 +479,21 @@ public abstract class ForkJoinTask im
}
/**
+ * Cancels, ignoring any exceptions thrown by cancel. Used during
+ * worker and pool shutdown. Cancel is spec'ed not to throw any
+ * exceptions, but if it does anyway, we have no recourse during
+ * shutdown, so guard against this case.
+ */
+ static final void cancelIgnoringExceptions(ForkJoinTask> t) {
+ if (t != null && t.status >= 0) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
+ /**
* Removes exception node and clears status
*/
private void clearExceptionalCompletion() {
@@ -491,7 +558,7 @@ public abstract class ForkJoinTask im
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
- Class ec = ex.getClass();
+ Class extends Throwable> ec = ex.getClass();
try {
Constructor> noArgCtor = null;
Constructor>[] cs = ec.getConstructors();// public ctors only
@@ -565,7 +632,7 @@ public abstract class ForkJoinTask im
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
- UNSAFE.throwException(ex);
+ U.throwException(ex);
return getRawResult();
}
@@ -590,8 +657,9 @@ public abstract class ForkJoinTask im
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask fork() {
- ((ForkJoinWorkerThread) Thread.currentThread())
- .pushTask(this);
+ ForkJoinWorkerThread wt;
+ (wt = (ForkJoinWorkerThread)Thread.currentThread()).
+ workQueue.push(this, wt.pool);
return this;
}
@@ -697,12 +765,12 @@ public abstract class ForkJoinTask im
if (t != null) {
if (ex != null)
t.cancel(false);
- else if (t.doJoin() < NORMAL && ex == null)
+ else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
- UNSAFE.throwException(ex);
+ U.throwException(ex);
}
/**
@@ -754,12 +822,12 @@ public abstract class ForkJoinTask im
if (t != null) {
if (ex != null)
t.cancel(false);
- else if (t.doJoin() < NORMAL && ex == null)
+ else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
- UNSAFE.throwException(ex);
+ U.throwException(ex);
return tasks;
}
@@ -794,19 +862,6 @@ public abstract class ForkJoinTask im
return setCompletion(CANCELLED) == CANCELLED;
}
- /**
- * Cancels, ignoring any exceptions thrown by cancel. Used during
- * worker and pool shutdown. Cancel is spec'ed not to throw any
- * exceptions, but if it does anyway, we have no recourse during
- * shutdown, so guard against this case.
- */
- final void cancelIgnoringExceptions() {
- try {
- cancel(false);
- } catch (Throwable ignore) {
- }
- }
-
public final boolean isDone() {
return status < 0;
}
@@ -930,31 +985,51 @@ public abstract class ForkJoinTask im
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ // Messy in part because we measure in nanos, but wait in millis
+ int s; long millis, nanos;
Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- long nanos = unit.toNanos(timeout);
- if (status >= 0) {
- boolean completed = false;
- if (w.unpushTask(this)) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- setExceptionalCompletion(rex);
+ if (!(t instanceof ForkJoinWorkerThread)) {
+ if ((millis = unit.toMillis(timeout)) > 0L)
+ s = externalInterruptibleAwaitDone(millis);
+ else
+ s = status;
+ }
+ else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) {
+ long deadline = System.nanoTime() + nanos;
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ ForkJoinPool.WorkQueue w = wt.workQueue;
+ ForkJoinPool p = wt.pool;
+ if (w.tryUnpush(this))
+ doExec();
+ boolean blocking = false;
+ try {
+ while ((s = status) >= 0) {
+ if (w.runState < 0)
+ cancelIgnoringExceptions(this);
+ else if (!blocking)
+ blocking = p.tryCompensate();
+ else {
+ millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+ if (millis > 0L &&
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ try {
+ synchronized (this) {
+ if (status >= 0)
+ wait(millis);
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+ if ((s = status) < 0 ||
+ (nanos = deadline - System.nanoTime()) <= 0L)
+ break;
}
}
- if (completed)
- setCompletion(NORMAL);
- else if (status >= 0 && nanos > 0)
- w.pool.timedAwaitJoin(this, nanos);
+ } finally {
+ if (blocking)
+ p.incrementActiveCount();
}
}
- else {
- long millis = unit.toMillis(timeout);
- if (millis > 0)
- externalInterruptibleAwaitDone(millis);
- }
- int s = status;
if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
@@ -1000,8 +1075,9 @@ public abstract class ForkJoinTask im
* ClassCastException}.
*/
public static void helpQuiesce() {
- ((ForkJoinWorkerThread) Thread.currentThread())
- .helpQuiescePool();
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ wt.pool.helpQuiescePool(wt.workQueue);
}
/**
@@ -1069,8 +1145,8 @@ public abstract class ForkJoinTask im
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .unpushTask(this);
+ return ((ForkJoinWorkerThread)Thread.currentThread())
+ .workQueue.tryUnpush(this);
}
/**
@@ -1089,7 +1165,7 @@ public abstract class ForkJoinTask im
*/
public static int getQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
- .getQueueSize();
+ .workQueue.queueSize();
}
/**
@@ -1111,8 +1187,52 @@ public abstract class ForkJoinTask im
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .getEstimatedSurplusTaskCount();
+ /*
+ * The aim of this method is to return a cheap heuristic guide
+ * for task partitioning when programmers, frameworks, tools,
+ * or languages have little or no idea about task granularity.
+ * In essence by offering this method, we ask users only about
+ * tradeoffs in overhead vs expected throughput and its
+ * variance, rather than how finely to partition tasks.
+ *
+ * In a steady state strict (tree-structured) computation,
+ * each thread makes available for stealing enough tasks for
+ * other threads to remain active. Inductively, if all threads
+ * play by the same rules, each thread should make available
+ * only a constant number of tasks.
+ *
+ * The minimum useful constant is just 1. But using a value of
+ * 1 would require immediate replenishment upon each steal to
+ * maintain enough tasks, which is infeasible. Further,
+ * partitionings/granularities of offered tasks should
+ * minimize steal rates, which in general means that threads
+ * nearer the top of computation tree should generate more
+ * than those nearer the bottom. In perfect steady state, each
+ * thread is at approximately the same level of computation
+ * tree. However, producing extra tasks amortizes the
+ * uncertainty of progress and diffusion assumptions.
+ *
+ * So, users will want to use values larger, but not much
+ * larger than 1 to both smooth over transient shortages and
+ * hedge against uneven progress; as traded off against the
+ * cost of extra task overhead. We leave the user to pick a
+ * threshold value to compare with the results of this call to
+ * guide decisions, but recommend values such as 3.
+ *
+ * When all threads are active, it is on average OK to
+ * estimate surplus strictly locally. In steady-state, if one
+ * thread is maintaining say 2 surplus tasks, then so are
+ * others. So we can just use estimated queue length.
+ * However, this strategy alone leads to serious mis-estimates
+ * in some non-steady-state conditions (ramp-up, ramp-down,
+ * other stalls). We can detect many of these by further
+ * considering the number of "idle" threads, that are known to
+ * have zero queued tasks, so compensate by a factor of
+ * (#idle/#active) threads.
+ */
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ return wt.workQueue.queueSize() - wt.pool.idlePerActive();
}
// Extension methods
@@ -1169,8 +1289,7 @@ public abstract class ForkJoinTask im
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .peekTask();
+ return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
}
/**
@@ -1189,7 +1308,7 @@ public abstract class ForkJoinTask im
*/
protected static ForkJoinTask> pollNextLocalTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
- .pollLocalTask();
+ .workQueue.nextLocalTask();
}
/**
@@ -1211,8 +1330,51 @@ public abstract class ForkJoinTask im
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .pollTask();
+ ForkJoinWorkerThread wt =
+ (ForkJoinWorkerThread)Thread.currentThread();
+ return wt.pool.nextTaskFor(wt.workQueue);
+ }
+
+ // Mark-bit operations
+
+ /**
+ * Returns true if this task is marked.
+ *
+ * @return true if this task is marked
+ * @since 1.8
+ */
+ public final boolean isMarkedForkJoinTask() {
+ return (status & MARKED) != 0;
+ }
+
+ /**
+ * Atomically sets the mark on this task.
+ *
+ * @return true if this task was previously unmarked
+ * @since 1.8
+ */
+ public final boolean markForkJoinTask() {
+ for (int s;;) {
+ if (((s = status) & MARKED) != 0)
+ return false;
+ if (U.compareAndSwapInt(this, STATUS, s, s | MARKED))
+ return true;
+ }
+ }
+
+ /**
+ * Atomically clears the mark on this task.
+ *
+ * @return true if this task was previously marked
+ * @since 1.8
+ */
+ public final boolean unmarkForkJoinTask() {
+ for (int s;;) {
+ if (((s = status) & MARKED) == 0)
+ return false;
+ if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED))
+ return true;
+ }
}
/**
@@ -1313,11 +1475,10 @@ public abstract class ForkJoinTask im
private static final long serialVersionUID = -7721805057305804111L;
/**
- * Saves the state to a stream (that is, serializes it).
+ * Saves this task to a stream (that is, serializes it).
*
* @serialData the current run status and the exception thrown
* during execution, or {@code null} if none
- * @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
@@ -1326,9 +1487,7 @@ public abstract class ForkJoinTask im
}
/**
- * Reconstitutes the instance from a stream (that is, deserializes it).
- *
- * @param s the stream
+ * Reconstitutes this task from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
@@ -1339,15 +1498,15 @@ public abstract class ForkJoinTask im
}
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long statusOffset;
+ private static final sun.misc.Unsafe U;
+ private static final long STATUS;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue