--- jsr166/src/jsr166y/ForkJoinTask.java 2012/02/20 18:20:06 1.86
+++ jsr166/src/jsr166y/ForkJoinTask.java 2012/04/21 11:45:20 1.90
@@ -5,6 +5,7 @@
*/
package jsr166y;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -70,9 +71,10 @@ import java.lang.reflect.Constructor;
* but doing do requires three further considerations: (1) Completion
* of few if any other tasks should be dependent on a task
* that blocks on external synchronization or IO. Event-style async
- * tasks that are never joined often fall into this category. (2) To
- * minimize resource impact, tasks should be small; ideally performing
- * only the (possibly) blocking action. (3) Unless the {@link
+ * tasks that are never joined (for example, those subclassing {@link
+ * CountedCompleter}) often fall into this category. (2) To minimize
+ * resource impact, tasks should be small; ideally performing only the
+ * (possibly) blocking action. (3) Unless the {@link
* ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
* blocked tasks is known to be less than the pool's {@link
* ForkJoinPool#getParallelism} level, the pool cannot guarantee that
@@ -115,18 +117,19 @@ import java.lang.reflect.Constructor;
*
The ForkJoinTask class is not usually directly subclassed.
* Instead, you subclass one of the abstract classes that support a
* particular style of fork/join processing, typically {@link
- * RecursiveAction} for computations that do not return results, or
- * {@link RecursiveTask} for those that do. Normally, a concrete
- * ForkJoinTask subclass declares fields comprising its parameters,
- * established in a constructor, and then defines a {@code compute}
- * method that somehow uses the control methods supplied by this base
- * class. While these methods have {@code public} access (to allow
- * instances of different task subclasses to call each other's
- * methods), some of them may only be called from within other
- * ForkJoinTasks (as may be determined using method {@link
- * #inForkJoinPool}). Attempts to invoke them in other contexts
- * result in exceptions or errors, possibly including
- * {@code ClassCastException}.
+ * RecursiveAction} for most computations that do not return results,
+ * {@link RecursiveTask} for those that do, and {@link
+ * CountedCompleter} for those in which completed actions trigger
+ * other actions. Normally, a concrete ForkJoinTask subclass declares
+ * fields comprising its parameters, established in a constructor, and
+ * then defines a {@code compute} method that somehow uses the control
+ * methods supplied by this base class. While these methods have
+ * {@code public} access (to allow instances of different task
+ * subclasses to call each other's methods), some of them may only be
+ * called from within other ForkJoinTasks (as may be determined using
+ * method {@link #inForkJoinPool}). Attempts to invoke them in other
+ * contexts result in exceptions or errors, possibly including {@code
+ * ClassCastException}.
*
*
Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
@@ -137,17 +140,16 @@ import java.lang.reflect.Constructor;
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
- * ForkJoinTask may be atomically marked using {@link
- * #markForkJoinTask} and checked for marking using {@link
- * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not
- * use these {@code protected} methods or marks for any purpose, but
- * they may be of use in the construction of specialized subclasses.
- * For example, parallel graph traversals can use the supplied methods
- * to avoid revisiting nodes/tasks that have already been processed.
- * Also, completion based designs can use them to record that one
- * subtask has completed. (Method names for marking are bulky in part
- * to encourage definition of methods that reflect their usage
- * patterns.)
+ * ForkJoinTask may be atomically tagged with a {@code short}
+ * value using {@link #setForkJoinTaskTag} or {@link
+ * #compareAndSetForkJoinTaskTag} and checked using {@link
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
+ * these {@code protected} methods or tags for any purpose, but they
+ * may be of use in the construction of specialized subclasses. For
+ * example, parallel graph traversals can use the supplied methods to
+ * avoid revisiting nodes/tasks that have already been processed.
+ * (Method names for tagging are bulky in part to encourage definition
+ * of methods that reflect their usage patterns.)
*
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -213,6 +215,10 @@ public abstract class ForkJoinTask im
* thin-lock techniques, so use some odd coding idioms that tend
* to avoid them, mainly by arranging that every synchronized
* block performs a wait, notifyAll or both.
+ *
+ * These control bits occupy only (some of) the upper half (16
+ * bits) of status field. The lower bits are used for user-defined
+ * tags.
*/
/** The run status of this task */
@@ -221,13 +227,12 @@ public abstract class ForkJoinTask im
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
- static final int SIGNAL = 0x00000001;
- static final int MARKED = 0x00000002;
+ static final int SIGNAL = 0x00010000; // must be >= 1 << 16
+ static final int SMASK = 0x0000ffff; // short bits for tags
/**
* Marks completion and wakes up threads waiting to join this
- * task. A specialization for NORMAL completion is in method
- * doExec.
+ * task.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
@@ -237,7 +242,7 @@ public abstract class ForkJoinTask im
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
- if ((s & SIGNAL) != 0)
+ if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
@@ -259,26 +264,22 @@ public abstract class ForkJoinTask im
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
- while ((s = status) >= 0 && completed) {
- if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) {
- if ((s & SIGNAL) != 0)
- synchronized (this) { notifyAll(); }
- return NORMAL;
- }
- }
+ if (completed)
+ s = setCompletion(NORMAL);
}
return s;
}
/**
- * Tries to set SIGNAL status. Used by ForkJoinPool. Other
- * variants are directly incorporated into externalAwaitDone etc.
+ * Tries to set SIGNAL status unless already completed. Used by
+ * ForkJoinPool. Other variants are directly incorporated into
+ * externalAwaitDone etc.
*
* @return true if successful
*/
final boolean trySetSignal() {
- int s;
- return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL);
+ int s = status;
+ return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
}
/**
@@ -328,7 +329,6 @@ public abstract class ForkJoinTask im
return s;
}
-
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
@@ -412,30 +412,52 @@ public abstract class ForkJoinTask im
}
/**
- * Records exception and sets exceptional completion.
+ * Records exception and sets status.
*
* @return status on exit
*/
- private int setExceptionalCompletion(Throwable ex) {
- int h = System.identityHashCode(this);
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
- try {
- expungeStaleExceptions();
- ExceptionNode[] t = exceptionTable;
- int i = h & (t.length - 1);
- for (ExceptionNode e = t[i]; ; e = e.next) {
- if (e == null) {
- t[i] = new ExceptionNode(this, ex, t[i]);
- break;
+ final int recordExceptionalCompletion(Throwable ex) {
+ int s;
+ if ((s = status) >= 0) {
+ int h = System.identityHashCode(this);
+ final ReentrantLock lock = exceptionTableLock;
+ lock.lock();
+ try {
+ expungeStaleExceptions();
+ ExceptionNode[] t = exceptionTable;
+ int i = h & (t.length - 1);
+ for (ExceptionNode e = t[i]; ; e = e.next) {
+ if (e == null) {
+ t[i] = new ExceptionNode(this, ex, t[i]);
+ break;
+ }
+ if (e.get() == this) // already present
+ break;
}
- if (e.get() == this) // already present
- break;
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
+ s = setCompletion(EXCEPTIONAL);
}
- return setCompletion(EXCEPTIONAL);
+ return s;
+ }
+
+ /**
+ * Records exception and possibly propagates
+ *
+ * @return status on exit
+ */
+ private int setExceptionalCompletion(Throwable ex) {
+ int s = recordExceptionalCompletion(ex);
+ if ((s & DONE_MASK) == EXCEPTIONAL)
+ internalPropagateException(ex);
+ return s;
+ }
+
+ /**
+ * Hook for exception propagation support for tasks with completers.
+ */
+ void internalPropagateException(Throwable ex) {
}
/**
@@ -517,7 +539,7 @@ public abstract class ForkJoinTask im
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (e.thrower != Thread.currentThread().getId()) {
+ if (false && e.thrower != Thread.currentThread().getId()) {
Class extends Throwable> ec = ex.getClass();
try {
Constructor> noArgCtor = null;
@@ -907,6 +929,18 @@ public abstract class ForkJoinTask im
}
/**
+ * Completes this task normally without setting a value. The most
+ * recent value established by {@link #setRawResult} (or {@code
+ * null} by default) will be returned as the result of subsequent
+ * invocations of {@code join} and related operations.
+ *
+ * @since 1.8
+ */
+ public final void quietlyComplete() {
+ setCompletion(NORMAL);
+ }
+
+ /**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
@@ -1225,15 +1259,18 @@ public abstract class ForkJoinTask im
protected abstract void setRawResult(V value);
/**
- * Immediately performs the base action of this task. This method
- * is designed to support extensions, and should not in general be
- * called otherwise. The return value controls whether this task
- * is considered to be done normally. It may return false in
+ * Immediately performs the base action of this task and returns
+ * true if, upon return from this method, this task is guaranteed
+ * to have completed normally. This method may return false
+ * otherwise, to indicate that this task is not necessarily
+ * complete (or is not known to be complete), for example in
* asynchronous actions that require explicit invocations of
- * {@link #complete} to become joinable. It may also throw an
- * (unchecked) exception to indicate abnormal exit.
+ * completion methods. This method may also throw an (unchecked)
+ * exception to indicate abnormal exit. This method is designed to
+ * support extensions, and should not in general be called
+ * otherwise.
*
- * @return {@code true} if completed normally
+ * @return {@code true} if this task is known to have completed normally
*/
protected abstract boolean exec();
@@ -1302,44 +1339,53 @@ public abstract class ForkJoinTask im
return wt.pool.nextTaskFor(wt.workQueue);
}
- // Mark-bit operations
+ // tag operations
/**
- * Returns true if this task is marked.
+ * Returns the tag for this task.
*
- * @return true if this task is marked
+ * @return the tag for this task
* @since 1.8
*/
- public final boolean isMarkedForkJoinTask() {
- return (status & MARKED) != 0;
+ public final short getForkJoinTaskTag() {
+ return (short)status;
}
/**
- * Atomically sets the mark on this task.
+ * Atomically sets the tag value for this task.
*
- * @return true if this task was previously unmarked
+ * @param tag the tag value
+ * @return the previous value of the tag
* @since 1.8
*/
- public final boolean markForkJoinTask() {
+ public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
- if (((s = status) & MARKED) != 0)
- return false;
- if (U.compareAndSwapInt(this, STATUS, s, s | MARKED))
- return true;
+ if (U.compareAndSwapInt(this, STATUS, s = status,
+ (s & ~SMASK) | (tag & SMASK)))
+ return (short)s;
}
}
/**
- * Atomically clears the mark on this task.
+ * Atomically conditionally sets the tag value for this task.
+ * Among other applications, tags can be used as visit markers
+ * in tasks operating on graphs, as in methods that check: {@code
+ * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
+ * before processing, otherwise exiting because the node has
+ * already been visited.
*
- * @return true if this task was previously marked
+ * @param e the expected tag value
+ * @param tag the new tag value
+ * @return true if successful; i.e., the current value was
+ * equal to e and is now tag.
* @since 1.8
*/
- public final boolean unmarkForkJoinTask() {
+ public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
- if (((s = status) & MARKED) == 0)
+ if ((short)(s = status) != e)
return false;
- if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED))
+ if (U.compareAndSwapInt(this, STATUS, s,
+ (s & ~SMASK) | (tag & SMASK)))
return true;
}
}