--- jsr166/src/jsr166y/ForkJoinTask.java 2012/01/26 19:09:03 1.83 +++ jsr166/src/jsr166y/ForkJoinTask.java 2012/12/30 02:05:53 1.97 @@ -5,6 +5,7 @@ */ package jsr166y; + import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -29,15 +30,18 @@ import java.lang.reflect.Constructor; * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. * - *
A "main" {@code ForkJoinTask} begins execution when submitted - * to a {@link ForkJoinPool}. Once started, it will usually in turn - * start other subtasks. As indicated by the name of this class, - * many programs using {@code ForkJoinTask} employ only methods - * {@link #fork} and {@link #join}, or derivatives such as {@link + *
A "main" {@code ForkJoinTask} begins execution when it is + * explicitly submitted to a {@link ForkJoinPool}, or, if not already + * engaged in a ForkJoin computation, commenced in the {@link + * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or + * related methods. Once started, it will usually in turn start other + * subtasks. As indicated by the name of this class, many programs + * using {@code ForkJoinTask} employ only methods {@link #fork} and + * {@link #join}, or derivatives such as {@link * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also * provides a number of other methods that can come into play in - * advanced usages, as well as extension mechanics that allow - * support of new forms of fork/join processing. + * advanced usages, as well as extension mechanics that allow support + * of new forms of fork/join processing. * *
A {@code ForkJoinTask} is a lightweight form of {@link Future}. * The efficiency of {@code ForkJoinTask}s stems from a set of @@ -51,7 +55,7 @@ import java.lang.reflect.Constructor; * minimize other blocking synchronization apart from joining other * tasks or using synchronizers such as Phasers that are advertised to * cooperate with fork/join scheduling. Subdividable tasks should also - * not perform blocking IO, and should ideally access variables that + * not perform blocking I/O, and should ideally access variables that * are completely independent of those accessed by other running * tasks. These guidelines are loosely enforced by not permitting * checked exceptions such as {@code IOExceptions} to be @@ -69,10 +73,11 @@ import java.lang.reflect.Constructor; *
It is possible to define and use ForkJoinTasks that may block, * but doing do requires three further considerations: (1) Completion * of few if any other tasks should be dependent on a task - * that blocks on external synchronization or IO. Event-style async - * tasks that are never joined often fall into this category. (2) To - * minimize resource impact, tasks should be small; ideally performing - * only the (possibly) blocking action. (3) Unless the {@link + * that blocks on external synchronization or I/O. Event-style async + * tasks that are never joined (for example, those subclassing {@link + * CountedCompleter}) often fall into this category. (2) To minimize + * resource impact, tasks should be small; ideally performing only the + * (possibly) blocking action. (3) Unless the {@link * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly * blocked tasks is known to be less than the pool's {@link * ForkJoinPool#getParallelism} level, the pool cannot guarantee that @@ -93,7 +98,7 @@ import java.lang.reflect.Constructor; * performs the most common form of parallel invocation: forking a set * of tasks and joining them all. * - *
In the most typical usages, a fork-join pair act like a a call + *
In the most typical usages, a fork-join pair act like a call * (fork) and return (join) from a parallel recursive function. As is * the case with other forms of recursive calls, returns (joins) * should be performed innermost-first. For example, {@code a.fork(); @@ -115,18 +120,13 @@ import java.lang.reflect.Constructor; *
The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. * *
Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -137,17 +137,16 @@ import java.lang.reflect.Constructor; * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that * are not statically structured as DAGs. To support such usages a - * ForkJoinTask may be atomically marked using {@link - * #markForkJoinTask} and checked for marking using {@link - * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not - * use these {@code protected} methods or marks for any purpose, but - * they may be of use in the construction of specialized subclasses. - * For example, parallel graph traversals can use the supplied methods - * to avoid revisiting nodes/tasks that have already been - * processed. Also, completion based designs can use them to record - * that one subtask has completed. (Method names for marking are bulky - * in part to encourage definition of methods that reflect their usage - * patterns.) + * ForkJoinTask may be atomically tagged with a {@code short} + * value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use + * these {@code protected} methods or tags for any purpose, but they + * may be of use in the construction of specialized subclasses. For + * example, parallel graph traversals can use the supplied methods to + * avoid revisiting nodes/tasks that have already been processed. + * (Method names for tagging are bulky in part to encourage definition + * of methods that reflect their usage patterns.) * *
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -197,45 +196,40 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * Arranges to asynchronously execute this task in the pool the
+ * current task is running in, if applicable, or using the {@link
+ * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
+ * it is not necessarily enforced, it is a usage error to fork a
+ * task more than once unless it has completed and been
+ * reinitialized. Subsequent modifications to the state of this
+ * task or any data it operates on are not necessarily
+ * consistently observable by any thread other than the one
+ * executing it unless preceded by a call to {@link #join} or
+ * related methods, or a call to {@link #isDone} returning {@code
+ * true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param t1 the first task
* @param t2 the second task
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask> t1, ForkJoinTask> t2) {
+ int s1, s2;
t2.fork();
- t1.invoke();
- t2.join();
+ if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
+ t1.reportException(s1);
+ if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
+ t2.reportException(s2);
}
/**
@@ -737,12 +727,6 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the tasks
* @throws NullPointerException if any task is null
*/
@@ -770,7 +754,7 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the collection of tasks
* @return the tasks argument, to simplify usage
* @throws NullPointerException if tasks or any element are null
@@ -827,7 +805,7 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
*/
public static void helpQuiesce() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- wt.pool.helpQuiescePool(wt.workQueue);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ wt.pool.helpQuiescePool(wt.workQueue);
+ }
+ else
+ ForkJoinPool.externalHelpQuiescePool();
}
/**
@@ -1097,7 +1096,7 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * typically (but is not guaranteed to) succeed if this task is
+ * the most recently forked task by the current thread, and has
+ * not commenced executing in another thread. This method may be
+ * useful when arranging alternative local processing of tasks
+ * that could have been, but were not, stolen.
*
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread)Thread.currentThread())
- .workQueue.tryUnpush(this);
+ Thread t;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
+ ForkJoinPool.tryExternalUnpush(this));
}
/**
@@ -1155,84 +1150,32 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the number of tasks
*/
public static int getQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.queueSize();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? 0 : q.queueSize();
}
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
- * threads that might steal them. This value may be useful for
+ * threads that might steal them, or zero if this thread is not
+ * operating in a ForkJoinPool. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
- * This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- /*
- * The aim of this method is to return a cheap heuristic guide
- * for task partitioning when programmers, frameworks, tools,
- * or languages have little or no idea about task granularity.
- * In essence by offering this method, we ask users only about
- * tradeoffs in overhead vs expected throughput and its
- * variance, rather than how finely to partition tasks.
- *
- * In a steady state strict (tree-structured) computation,
- * each thread makes available for stealing enough tasks for
- * other threads to remain active. Inductively, if all threads
- * play by the same rules, each thread should make available
- * only a constant number of tasks.
- *
- * The minimum useful constant is just 1. But using a value of
- * 1 would require immediate replenishment upon each steal to
- * maintain enough tasks, which is infeasible. Further,
- * partitionings/granularities of offered tasks should
- * minimize steal rates, which in general means that threads
- * nearer the top of computation tree should generate more
- * than those nearer the bottom. In perfect steady state, each
- * thread is at approximately the same level of computation
- * tree. However, producing extra tasks amortizes the
- * uncertainty of progress and diffusion assumptions.
- *
- * So, users will want to use values larger, but not much
- * larger than 1 to both smooth over transient shortages and
- * hedge against uneven progress; as traded off against the
- * cost of extra task overhead. We leave the user to pick a
- * threshold value to compare with the results of this call to
- * guide decisions, but recommend values such as 3.
- *
- * When all threads are active, it is on average OK to
- * estimate surplus strictly locally. In steady-state, if one
- * thread is maintaining say 2 surplus tasks, then so are
- * others. So we can just use estimated queue length.
- * However, this strategy alone leads to serious mis-estimates
- * in some non-steady-state conditions (ramp-up, ramp-down,
- * other stalls). We can detect many of these by further
- * considering the number of "idle" threads, that are known to
- * have zero queued tasks, so compensate by a factor of
- * (#idle/#active) threads.
- */
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.workQueue.queueSize() - wt.pool.idlePerActive();
+ return ForkJoinPool.getSurplusQueuedTaskCount();
}
// Extension methods
@@ -1258,15 +1201,18 @@ public abstract class ForkJoinTask This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? null : q.peek();
}
/**
* Unschedules and returns, without executing, the next task
- * queued by the current thread but not yet executed. This method
- * is designed primarily to support extensions, and is unlikely to
- * be useful otherwise.
- *
- * This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * queued by the current thread but not yet executed, if the
+ * current thread is operating in a ForkJoinPool. This method is
+ * designed primarily to support extensions, and is unlikely to be
+ * useful otherwise.
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.nextLocalTask();
+ Thread t;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
+ null;
}
/**
- * Unschedules and returns, without executing, the next task
+ * If the current thread is operating in a ForkJoinPool,
+ * unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
- * {@code null} result does not necessarily imply quiescence
- * of the pool this task is operating in. This method is designed
+ * {@code null} result does not necessarily imply quiescence of
+ * the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- * This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask> pollTask() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.pool.nextTaskFor(wt.workQueue);
+ Thread t; ForkJoinWorkerThread wt;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
+ null;
}
- // Mark-bit operations
+ // tag operations
/**
- * Returns true if this task is marked.
+ * Returns the tag for this task.
*
- * @return true if this task is marked
+ * @return the tag for this task
* @since 1.8
*/
- public final boolean isMarkedForkJoinTask() {
- return (status & MARKED) != 0;
+ public final short getForkJoinTaskTag() {
+ return (short)status;
}
/**
- * Atomically sets the mark on this task.
+ * Atomically sets the tag value for this task.
*
- * @return true if this task was previously unmarked
+ * @param tag the tag value
+ * @return the previous value of the tag
* @since 1.8
*/
- public final boolean markForkJoinTask() {
+ public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
- if (((s = status) & MARKED) != 0)
- return false;
- if (U.compareAndSwapInt(this, STATUS, s, s | MARKED))
- return true;
+ if (U.compareAndSwapInt(this, STATUS, s = status,
+ (s & ~SMASK) | (tag & SMASK)))
+ return (short)s;
}
}
/**
- * Atomically clears the mark on this task.
+ * Atomically conditionally sets the tag value for this task.
+ * Among other applications, tags can be used as visit markers
+ * in tasks operating on graphs, as in methods that check: {@code
+ * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
+ * before processing, otherwise exiting because the node has
+ * already been visited.
*
- * @return true if this task was previously marked
+ * @param e the expected tag value
+ * @param tag the new tag value
+ * @return true if successful; i.e., the current value was
+ * equal to e and is now tag.
* @since 1.8
*/
- public final boolean unmarkForkJoinTask() {
+ public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
- if (((s = status) & MARKED) == 0)
+ if ((short)(s = status) != e)
return false;
- if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED))
+ if (U.compareAndSwapInt(this, STATUS, s,
+ (s & ~SMASK) | (tag & SMASK)))
return true;
}
}
@@ -1385,21 +1332,33 @@ public abstract class ForkJoinTask