--- jsr166/src/jsr166y/CountedCompleter.java 2012/04/09 13:12:18 1.1
+++ jsr166/src/jsr166y/CountedCompleter.java 2012/10/31 12:49:24 1.8
@@ -7,7 +7,7 @@
package jsr166y;
/**
- * A resultless {@link ForkJoinTask} with a completion action
+ * A {@link ForkJoinTask} with a completion action
* performed when triggered and there are no remaining pending
* actions. Uses of CountedCompleter are similar to those of other
* completion based components (such as {@link
@@ -21,18 +21,30 @@ package jsr166y;
* #tryComplete}, if the pending action count is nonzero, it is
* decremented; otherwise, the completion action is performed, and if
* this completer itself has a completer, the process is continued
- * with its completer. As is the case with most basic synchronization
- * constructs, these methods affect only internal counts; they do not
- * establish any further internal bookkeeping. In particular, the
- * identities of pending tasks are not maintained. As illustrated
- * below, you can create subclasses that do record some or all pended
- * tasks or their results when needed.
+ * with its completer. As is the case with related synchronization
+ * components such as {@link java.util.concurrent.Phaser Phaser} and
+ * {@link java.util.concurrent.Semaphore Semaphore}, these methods
+ * affect only internal counts; they do not establish any further
+ * internal bookkeeping. In particular, the identities of pending
+ * tasks are not maintained. As illustrated below, you can create
+ * subclasses that do record some or all pending tasks or their
+ * results when needed.
*
*
A concrete CountedCompleter class must define method {@link
* #compute}, that should, in almost all use cases, invoke {@code
- * tryComplete()} before returning. The class may also optionally
+ * tryComplete()} once before returning. The class may also optionally
* override method {@link #onCompletion} to perform an action upon
- * normal completion.
+ * normal completion, and method {@link #onExceptionalCompletion} to
+ * perform an action upon any exception.
+ *
+ *
CountedCompleters most often do not bear results, in which case
+ * they are normally declared as {@code CountedCompleter}, and
+ * will always return {@code null} as a result value. In other cases,
+ * you should override method {@link #getRawResult} to provide a
+ * result from {@code join(), invoke()}, and related methods. (Method
+ * {@link #setRawResult} by default plays no role in CountedCompleters
+ * but may be overridden for example to maintain fields holding result
+ * data.)
*
* A CountedCompleter that does not itself have a completer (i.e.,
* one for which {@link #getCompleter} returns {@code null}) can be
@@ -44,15 +56,17 @@ package jsr166y;
* {@link #complete}, {@link ForkJoinTask#cancel}, {@link
* ForkJoinTask#completeExceptionally} or upon exceptional completion
* of method {@code compute}. Upon any exceptional completion, the
- * exception is relayed to a task's completer (and its completer, and
- * so on), if one exists and it has not otherwise already completed.
+ * exception may be relayed to a task's completer (and its completer,
+ * and so on), if one exists and it has not otherwise already
+ * completed.
*
*
Sample Usages.
*
*
Parallel recursive decomposition. CountedCompleters may
* be arranged in trees similar to those often used with {@link
* RecursiveAction}s, although the constructions involved in setting
- * them up typically vary. Even though they entail a bit more
+ * them up typically vary. Here, the completer of each task is its
+ * parent in the computation tree. Even though they entail a bit more
* bookkeeping, CountedCompleters may be better choices when applying
* a possibly time-consuming operation (that cannot be further
* subdivided) to each element of an array or collection; especially
@@ -77,14 +91,14 @@ package jsr166y;
*
{@code
* class MyOperation { void apply(E e) { ... } }
*
- * class ForEach extends CountedCompleter {
+ * class ForEach extends CountedCompleter {
*
* public static void forEach(ForkJoinPool pool, E[] array, MyOperation op) {
* pool.invoke(new ForEach(null, array, op, 0, array.length));
* }
*
* final E[] array; final MyOperation op; final int lo, hi;
- * ForEach(CountedCompleter p, E[] array, MyOperation op, int lo, int hi) {
+ * ForEach(CountedCompleter> p, E[] array, MyOperation op, int lo, int hi) {
* super(p);
* this.array = array; this.op = op; this.lo = lo; this.hi = hi;
* }
@@ -128,7 +142,9 @@ package jsr166y;
*
* As a further improvement, notice that the left task need not even
* exist. Instead of creating a new one, we can iterate using the
- * original task, and add a pending count for each fork:
+ * original task, and add a pending count for each fork. Additionally,
+ * this version uses {@code helpComplete} to streamline assistance in
+ * the execution of forked tasks.
*
* {@code
* class ForEach ...
@@ -142,7 +158,7 @@ package jsr166y;
* }
* if (h > l)
* op.apply(array[l]);
- * tryComplete();
+ * helpComplete();
* }
* }
*
@@ -159,17 +175,18 @@ package jsr166y;
* and reductions are all of type {@code E}), one way to do this in
* divide and conquer designs is to have each subtask record its
* sibling, so that it can be accessed in method {@code onCompletion}.
- * For clarity, this class uses explicit left and right subtasks, but
- * variants of other streamlinings seen in the above example may also
- * apply.
+ * This technique applies to reductions in which the order of
+ * combining left and right results does not matter; ordered
+ * reductions require explicit left/right designations. Variants of
+ * other streamlinings seen in the above examples may also apply.
*
* {@code
* class MyMapper { E apply(E v) { ... } }
* class MyReducer { E apply(E x, E y) { ... } }
- * class MapReducer extends CountedCompleter {
+ * class MapReducer extends CountedCompleter {
* final E[] array; final MyMapper mapper;
* final MyReducer reducer; final int lo, hi;
- * MapReducer sibling;
+ * MapReducer sibling;
* E result;
* MapReducer(CountedCompleter p, E[] array, MyMapper mapper,
* MyReducer reducer, int lo, int hi) {
@@ -204,13 +221,12 @@ package jsr166y;
* result = reducer.apply(child.result, sib.result);
* }
* }
+ * public E getRawResult() { return result; }
*
* public static E mapReduce(ForkJoinPool pool, E[] array,
* MyMapper mapper, MyReducer reducer) {
- * MapReducer mr = new MapReducer(null, array, mapper,
- * reducer, 0, array.length);
- * pool.invoke(mr);
- * return mr.result;
+ * return pool.invoke(new MapReducer(null, array, mapper,
+ * reducer, 0, array.length));
* }
* } }
*
@@ -220,12 +236,12 @@ package jsr166y;
* triggers another async task. For example:
*
* {@code
- * class HeaderBuilder extends CountedCompleter { ... }
- * class BodyBuilder extends CountedCompleter { ... }
- * class PacketSender extends CountedCompleter {
+ * class HeaderBuilder extends CountedCompleter<...> { ... }
+ * class BodyBuilder extends CountedCompleter<...> { ... }
+ * class PacketSender extends CountedCompleter<...> {
* PacketSender(...) { super(null, 1); ... } // trigger on second completion
* public void compute() { } // never called
- * public void onCompletion(CountedCompleter caller) { sendPacket(); }
+ * public void onCompletion(CountedCompleter> caller) { sendPacket(); }
* }
* // sample use:
* PacketSender p = new PacketSender();
@@ -236,9 +252,11 @@ package jsr166y;
* @since 1.8
* @author Doug Lea
*/
-public abstract class CountedCompleter extends ForkJoinTask {
+public abstract class CountedCompleter extends ForkJoinTask {
+ private static final long serialVersionUID = 5232453752276485070L;
+
/** This task's completer, or null if none */
- final CountedCompleter completer;
+ final CountedCompleter> completer;
/** The number of pending tasks until completion */
volatile int pending;
@@ -249,7 +267,7 @@ public abstract class CountedCompleter e
* @param completer this tasks completer, or {@code null} if none
* @param initialPendingCount the initial pending count
*/
- protected CountedCompleter(CountedCompleter completer,
+ protected CountedCompleter(CountedCompleter> completer,
int initialPendingCount) {
this.completer = completer;
this.pending = initialPendingCount;
@@ -261,7 +279,7 @@ public abstract class CountedCompleter e
*
* @param completer this tasks completer, or {@code null} if none
*/
- protected CountedCompleter(CountedCompleter completer) {
+ protected CountedCompleter(CountedCompleter> completer) {
this.completer = completer;
}
@@ -279,15 +297,36 @@ public abstract class CountedCompleter e
public abstract void compute();
/**
- * Executes the completion action when method {@link #tryComplete}
- * is invoked and there are no pending counts, or when the
- * unconditional method {@link #complete} is invoked. By default,
- * this method does nothing.
+ * Performs an action when method {@link #tryComplete} is invoked
+ * and there are no pending counts, or when the unconditional
+ * method {@link #complete} is invoked. By default, this method
+ * does nothing.
+ *
+ * @param caller the task invoking this method (which may
+ * be this task itself).
+ */
+ public void onCompletion(CountedCompleter> caller) {
+ }
+
+ /**
+ * Performs an action when method {@link #completeExceptionally}
+ * is invoked or method {@link #compute} throws an exception, and
+ * this task has not otherwise already completed normally. On
+ * entry to this method, this task {@link
+ * ForkJoinTask#isCompletedAbnormally}. The return value of this
+ * method controls further propagation: If {@code true} and this
+ * task has a completer, then this completer is also completed
+ * exceptionally. The default implementation of this method does
+ * nothing except return {@code true}.
*
+ * @param ex the exception
* @param caller the task invoking this method (which may
* be this task itself).
+ * @return true if this exception should be propagated to this
+ * tasks completer, if one exists.
*/
- public void onCompletion(CountedCompleter caller) {
+ public boolean onExceptionalCompletion(Throwable ex, CountedCompleter> caller) {
+ return true;
}
/**
@@ -296,7 +335,7 @@ public abstract class CountedCompleter e
*
* @return the completer
*/
- public final CountedCompleter getCompleter() {
+ public final CountedCompleter> getCompleter() {
return completer;
}
@@ -341,14 +380,27 @@ public abstract class CountedCompleter e
}
/**
+ * Returns the root of the current computation; i.e., this
+ * task if it has no completer, else its completer's root.
+ *
+ * @return the root of the current computation
+ */
+ public final CountedCompleter> getRoot() {
+ CountedCompleter> a = this, p;
+ while ((p = a.completer) != null)
+ a = p;
+ return a;
+ }
+
+ /**
* If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion} and then similarly
* tries to complete this task's completer, if one exists,
* else marks this task as complete.
*/
public final void tryComplete() {
- for (CountedCompleter a = this, s = a;;) {
- int c;
+ CountedCompleter> a = this, s = a;
+ for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
@@ -362,24 +414,60 @@ public abstract class CountedCompleter e
}
/**
+ * Identical to {@link #tryComplete}, but may additionally execute
+ * other tasks within the current computation (i.e., those
+ * with the same {@link #getRoot}.
+ */
+ public final void helpComplete() {
+ CountedCompleter> a = this, s = a;
+ for (int c;;) {
+ if ((c = a.pending) == 0) {
+ a.onCompletion(s);
+ if ((a = (s = a).completer) == null) {
+ s.quietlyComplete();
+ return;
+ }
+ }
+ else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) {
+ if (!(Thread.currentThread() instanceof ForkJoinWorkerThread))
+ ForkJoinPool.popAndExecCCFromCommonPool(a);
+ return;
+ }
+ }
+ }
+
+ /**
* Regardless of pending count, invokes {@link #onCompletion},
- * marks this task as complete with a {@code null} return value,
- * and further triggers {@link #tryComplete} on this task's
- * completer, if one exists. This method may be useful when
- * forcing completion as soon as any one (versus all) of several
- * subtask results are obtained.
+ * marks this task as complete and further triggers {@link
+ * #tryComplete} on this task's completer, if one exists. This
+ * method may be useful when forcing completion as soon as any one
+ * (versus all) of several subtask results are obtained. The
+ * given rawResult is used as an argument to {@link #setRawResult}
+ * before marking this task as complete; its value is meaningful
+ * only for classes overriding {@code setRawResult}.
*
- * @param mustBeNull the {@code null} completion value
+ * @param rawResult the raw result
*/
- public void complete(Void mustBeNull) {
- CountedCompleter p;
+ public void complete(T rawResult) {
+ CountedCompleter> p;
onCompletion(this);
+ setRawResult(rawResult);
quietlyComplete();
if ((p = completer) != null)
p.tryComplete();
}
/**
+ * Support for FJT exception propagation
+ */
+ void internalPropagateException(Throwable ex) {
+ CountedCompleter> a = this, s = a;
+ while (a.onExceptionalCompletion(ex, s) &&
+ (a = (s = a).completer) != null && a.status >= 0)
+ a.recordExceptionalCompletion(ex);
+ }
+
+ /**
* Implements execution conventions for CountedCompleters
*/
protected final boolean exec() {
@@ -388,21 +476,19 @@ public abstract class CountedCompleter e
}
/**
- * Always returns {@code null}.
+ * Returns the result of the computation. By default
+ * returns {@code null}, which is appropriate for {@code Void}
+ * actions, but in other cases should be overridden.
*
- * @return {@code null} always
+ * @return the result of the computation
*/
- public final Void getRawResult() { return null; }
+ public T getRawResult() { return null; }
/**
- * Requires null completion value.
+ * A method that result-bearing CountedCompleters may optionally
+ * use to help maintain result data. By default, does nothing.
*/
- protected final void setRawResult(Void mustBeNull) { }
-
- /**
- * Support for FJT exception propagation
- */
- final ForkJoinTask> internalGetCompleter() { return completer; }
+ protected void setRawResult(T t) { }
// Unsafe mechanics
private static final sun.misc.Unsafe U;
@@ -417,7 +503,6 @@ public abstract class CountedCompleter e
}
}
-
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
* Replace with a simple call to Unsafe.getUnsafe when integrating