--- jsr166/src/jsr166e/CountedCompleter.java 2012/11/19 18:12:28 1.13
+++ jsr166/src/jsr166e/CountedCompleter.java 2012/11/25 18:39:07 1.18
@@ -10,9 +10,9 @@ package jsr166e;
* A {@link ForkJoinTask} with a completion action performed when
* triggered and there are no remaining pending
* actions. CountedCompleters are in general more robust in the
- * presence of subtask stalls and blockage than are other forms for
- * ForkJoinTasks, but are in general less intuitive to program. Uses
- * of CountedCompleter are similar to those of other completion based
+ * presence of subtask stalls and blockage than are other forms of
+ * ForkJoinTasks, but are less intuitive to program. Uses of
+ * CountedCompleter are similar to those of other completion based
* components (such as {@link java.nio.channels.CompletionHandler})
* except that multiple pending completions may be necessary
* to trigger the {@link #onCompletion} action, not just one. Unless
@@ -30,10 +30,12 @@ package jsr166e;
* internal bookkeeping. In particular, the identities of pending
* tasks are not maintained. As illustrated below, you can create
* subclasses that do record some or all pending tasks or their
- * results when needed. Because CountedCompleters provide only basic
+ * results when needed. As illustrated below, utility methods
+ * supporting customization of completion traversals are also
+ * provided. However, because CountedCompleters provide only basic
* synchronization mechanisms, it may be useful to create further
- * abstract subclasses that maintain linkages and fields and support
- * methods appropriate for a set of related usages.
+ * abstract subclasses that maintain linkages, fields, and additional
+ * support methods appropriate for a set of related usages.
*
*
A concrete CountedCompleter class must define method {@link
* #compute}, that should in most cases (as illustrated below), invoke
@@ -51,7 +53,7 @@ package jsr166e;
* function of one or more fields) of the CountedCompleter object that
* holds the result upon completion. Method {@link #setRawResult} by
* default plays no role in CountedCompleters. It is possible, but
- * not usually applicable, to override this method to maintain other
+ * rarely applicable, to override this method to maintain other
* objects or fields holding result data.
*
*
A CountedCompleter that does not itself have a completer (i.e.,
@@ -103,27 +105,27 @@ package jsr166e;
*
* class ForEach extends CountedCompleter {
*
- * public static void forEach(E[] array, MyOperation op) {
- * new ForEach(null, array, op, 0, array.length).invoke();
- * }
- *
- * final E[] array; final MyOperation op; final int lo, hi;
- * ForEach(CountedCompleter> p, E[] array, MyOperation op, int lo, int hi) {
- * super(p);
- * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
- * }
- *
- * public void compute() { // version 1
- * if (hi - lo >= 2) {
- * int mid = (lo + hi) >>> 1;
- * setPendingCount(2); // must set pending count before fork
- * new ForEach(this, array, op, mid, hi).fork(); // right child
- * new ForEach(this, array, op, lo, mid).fork(); // left child
- * }
- * else if (hi > lo)
- * op.apply(array[lo]);
- * tryComplete();
- * }
+ * public static void forEach(E[] array, MyOperation op) {
+ * new ForEach(null, array, op, 0, array.length).invoke();
+ * }
+ *
+ * final E[] array; final MyOperation op; final int lo, hi;
+ * ForEach(CountedCompleter> p, E[] array, MyOperation op, int lo, int hi) {
+ * super(p);
+ * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
+ * }
+ *
+ * public void compute() { // version 1
+ * if (hi - lo >= 2) {
+ * int mid = (lo + hi) >>> 1;
+ * setPendingCount(2); // must set pending count before fork
+ * new ForEach(this, array, op, mid, hi).fork(); // right child
+ * new ForEach(this, array, op, lo, mid).fork(); // left child
+ * }
+ * else if (hi > lo)
+ * op.apply(array[lo]);
+ * tryComplete();
+ * }
* } }
*
* This design can be improved by noticing that in the recursive case,
@@ -135,39 +137,42 @@ package jsr166e;
*
* {@code
* class ForEach ...
- * public void compute() { // version 2
- * if (hi - lo >= 2) {
- * int mid = (lo + hi) >>> 1;
- * setPendingCount(1); // only one pending
- * new ForEach(this, array, op, mid, hi).fork(); // right child
- * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
- * }
- * else {
- * if (hi > lo)
- * op.apply(array[lo]);
- * tryComplete();
- * }
+ * public void compute() { // version 2
+ * if (hi - lo >= 2) {
+ * int mid = (lo + hi) >>> 1;
+ * setPendingCount(1); // only one pending
+ * new ForEach(this, array, op, mid, hi).fork(); // right child
+ * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
+ * }
+ * else {
+ * if (hi > lo)
+ * op.apply(array[lo]);
+ * tryComplete();
* }
+ * }
* }
*
* As a further improvement, notice that the left task need not even
* exist. Instead of creating a new one, we can iterate using the
- * original task, and add a pending count for each fork.
+ * original task, and add a pending count for each fork. Additionally,
+ * because no task in this tree implements an {@link #onCompletion}
+ * method, {@code tryComplete()} can be replaced with {@link
+ * #propagateCompletion}.
*
* {@code
* class ForEach ...
- * public void compute() { // version 3
- * int l = lo, h = hi;
- * while (h - l >= 2) {
- * int mid = (l + h) >>> 1;
- * addToPendingCount(1);
- * new ForEach(this, array, op, mid, h).fork(); // right child
- * h = mid;
- * }
- * if (h > l)
- * op.apply(array[l]);
- * tryComplete();
- * }
+ * public void compute() { // version 3
+ * int l = lo, h = hi;
+ * while (h - l >= 2) {
+ * int mid = (l + h) >>> 1;
+ * addToPendingCount(1);
+ * new ForEach(this, array, op, mid, h).fork(); // right child
+ * h = mid;
+ * }
+ * if (h > l)
+ * op.apply(array[l]);
+ * propagateCompletion();
+ * }
* }
*
* Additional improvements of such classes might entail precomputing
@@ -176,6 +181,57 @@ package jsr166e;
* instead of two per iteration, and using an adaptive threshold
* instead of always subdividing down to single elements.
*
+ * Searching. A tree of CountedCompleters can search for a
+ * value or property in different parts of a data structure, and
+ * report a result in an {@link java.util.concurrent.AtomicReference}
+ * as soon as one is found. The others can poll the result to avoid
+ * unnecessary work. (You could additionally {@link #cancel} other
+ * tasks, but it is usually simpler and more efficient to just let
+ * them notice that the result is set and if so skip further
+ * processing.) Illustrating again with an array using full
+ * partitioning (again, in practice, leaf tasks will almost always
+ * process more than one element):
+ *
+ *
{@code
+ * class Searcher extends CountedCompleter {
+ * final E[] array; final AtomicReference result; final int lo, hi;
+ * Searcher(CountedCompleter> p, E[] array, AtomicReference result, int lo, int hi) {
+ * super(p);
+ * this.array = array; this.result = result; this.lo = lo; this.hi = hi;
+ * }
+ * public E getRawResult() { return result.get(); }
+ * public void compute() { // similar to ForEach version 3
+ * int l = lo, h = hi;
+ * while (result.get() == null && h >= l) {
+ * if (h - l >= 2) {
+ * int mid = (l + h) >>> 1;
+ * addToPendingCount(1);
+ * new Searcher(this, array, result, mid, h).fork();
+ * h = mid;
+ * }
+ * else {
+ * E x = array[l];
+ * if (matches(x) && result.compareAndSet(null, x))
+ * quietlyCompleteRoot(); // root task is now joinable
+ * break;
+ * }
+ * }
+ * tryComplete(); // normally complete whether or not found
+ * }
+ * boolean matches(E e) { ... } // return true if found
+ *
+ * public static E search(E[] array) {
+ * return new Searcher(null, array, new AtomicReference(), 0, array.length).invoke();
+ * }
+ *}}
+ *
+ * In this example, as well as others in which tasks have no other
+ * effects except to compareAndSet a common result, the trailing
+ * unconditional invocation of {@code tryComplete} could be made
+ * conditional ({@code if (result.get() == null) tryComplete();})
+ * because no further bookkeeping is required to manage completions
+ * once the root task completes.
+ *
* Recording subtasks. CountedCompleter tasks that combine
* results of multiple subtasks usually need to access these results
* in method {@link #onCompletion}. As illustrated in the following
@@ -192,49 +248,49 @@ package jsr166e;
* class MyMapper { E apply(E v) { ... } }
* class MyReducer { E apply(E x, E y) { ... } }
* class MapReducer extends CountedCompleter {
- * final E[] array; final MyMapper mapper;
- * final MyReducer reducer; final int lo, hi;
- * MapReducer sibling;
- * E result;
- * MapReducer(CountedCompleter p, E[] array, MyMapper mapper,
- * MyReducer reducer, int lo, int hi) {
- * super(p);
- * this.array = array; this.mapper = mapper;
- * this.reducer = reducer; this.lo = lo; this.hi = hi;
- * }
- * public void compute() {
- * if (hi - lo >= 2) {
- * int mid = (lo + hi) >>> 1;
- * MapReducer left = new MapReducer(this, array, mapper, reducer, lo, mid);
- * MapReducer right = new MapReducer(this, array, mapper, reducer, mid, hi);
- * left.sibling = right;
- * right.sibling = left;
- * setPendingCount(1); // only right is pending
- * right.fork();
- * left.compute(); // directly execute left
- * }
- * else {
- * if (hi > lo)
- * result = mapper.apply(array[lo]);
- * tryComplete();
- * }
- * }
- * public void onCompletion(CountedCompleter caller) {
- * if (caller != this) {
- * MapReducer child = (MapReducer)caller;
- * MapReducer sib = child.sibling;
- * if (sib == null || sib.result == null)
- * result = child.result;
- * else
- * result = reducer.apply(child.result, sib.result);
- * }
- * }
- * public E getRawResult() { return result; }
- *
- * public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) {
- * return new MapReducer(null, array, mapper, reducer,
- * 0, array.length).invoke();
- * }
+ * final E[] array; final MyMapper mapper;
+ * final MyReducer reducer; final int lo, hi;
+ * MapReducer sibling;
+ * E result;
+ * MapReducer(CountedCompleter> p, E[] array, MyMapper mapper,
+ * MyReducer reducer, int lo, int hi) {
+ * super(p);
+ * this.array = array; this.mapper = mapper;
+ * this.reducer = reducer; this.lo = lo; this.hi = hi;
+ * }
+ * public void compute() {
+ * if (hi - lo >= 2) {
+ * int mid = (lo + hi) >>> 1;
+ * MapReducer left = new MapReducer(this, array, mapper, reducer, lo, mid);
+ * MapReducer right = new MapReducer(this, array, mapper, reducer, mid, hi);
+ * left.sibling = right;
+ * right.sibling = left;
+ * setPendingCount(1); // only right is pending
+ * right.fork();
+ * left.compute(); // directly execute left
+ * }
+ * else {
+ * if (hi > lo)
+ * result = mapper.apply(array[lo]);
+ * tryComplete();
+ * }
+ * }
+ * public void onCompletion(CountedCompleter> caller) {
+ * if (caller != this) {
+ * MapReducer child = (MapReducer)caller;
+ * MapReducer sib = child.sibling;
+ * if (sib == null || sib.result == null)
+ * result = child.result;
+ * else
+ * result = reducer.apply(child.result, sib.result);
+ * }
+ * }
+ * public E getRawResult() { return result; }
+ *
+ * public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) {
+ * return new MapReducer(null, array, mapper, reducer,
+ * 0, array.length).invoke();
+ * }
* } }
*
* Here, method {@code onCompletion} takes a form common to many
@@ -247,58 +303,56 @@ package jsr166e;
* distinguishes cases. Most often, when the caller is {@code this},
* no action is necessary. Otherwise the caller argument can be used
* (usually via a cast) to supply a value (and/or links to other
- * values) to be combined. Asuuming proper use of pending counts, the
+ * values) to be combined. Assuming proper use of pending counts, the
* actions inside {@code onCompletion} occur (once) upon completion of
* a task and its subtasks. No additional synchronization is required
* within this method to ensure thread safety of accesses to fields of
* this task or other completed tasks.
*
- * Searching. A tree of CountedCompleters can search for a
- * value or property in different parts of a data structure, and
- * report a result in an {@link java.util.concurrent.AtomicReference}
- * as soon as one is found. The others can poll the result to avoid
- * unnecessary work. (You could additionally {@link #cancel} other
- * tasks, but it is usually simpler and more efficient to just let
- * them notice that the result is set and if so skip further
- * processing.) Illustrating again with an array using full
- * partitioning (again, in practice, leaf tasks will almost always
- * process more than one element):
+ *
Completion Traversals. If using {@code onCompletion} to
+ * process completions is inapplicable or inconvenient, you can use
+ * methods {@link #firstComplete} and {@link #nextComplete} to create
+ * custom traversals. For example, to define a MapReducer that only
+ * splits out right-hand tasks in the form of the third ForEach
+ * example, the completions must cooperatively reduce along
+ * unexhausted subtask links, which can be done as follows:
*
*
{@code
- * class Searcher extends CountedCompleter {
- * final E[] array; final AtomicReference result; final int lo, hi;
- * Searcher(CountedCompleter> p, E[] array, AtomicReference result, int lo, int hi) {
- * super(p);
- * this.array = array; this.result = result; this.lo = lo; this.hi = hi;
- * }
- * public E getRawResult() { return result.get(); }
- * public void compute() { // similar to ForEach version 3
- * int l = lo, h = hi;
- * while (h - l >= 2 && result.get() == null) {
- * int mid = (l + h) >>> 1;
- * addToPendingCount(1);
- * new Searcher(this, array, result, mid, h).fork();
- * h = mid;
- * }
- * if (h > l && result.get() == null && matches(array[l]) &&
- * result.compareAndSet(null, array[l]))
- * getRoot().quietlyComplete(); // root task is now joinable
- *
- * tryComplete(); // normally complete whether or not found
- * }
- * boolean matches(E e) { ... } // return true if found
- *
- * public static E search(E[] array) {
- * return new Searcher(null, array, new AtomicReference(), 0, array.length).invoke();
- * }
- *}}
- *
- * In this example, as well as others in which tasks have no other
- * effects except to compareAndSet a common result, the trailing
- * unconditional invocation of {@code tryComplete} could be made
- * conditional ({@code if (result.get() == null) tryComplete();})
- * because no further bookkeeping is required to manage completions
- * once the root task completes.
+ * class MapReducer extends CountedCompleter { // version 2
+ * final E[] array; final MyMapper mapper;
+ * final MyReducer reducer; final int lo, hi;
+ * MapReducer forks, next; // record subtask forks in list
+ * E result;
+ * MapReducer(CountedCompleter> p, E[] array, MyMapper mapper,
+ * MyReducer reducer, int lo, int hi, MapReducer next) {
+ * super(p);
+ * this.array = array; this.mapper = mapper;
+ * this.reducer = reducer; this.lo = lo; this.hi = hi;
+ * this.next = next;
+ * }
+ * public void compute() {
+ * int l = lo, h = hi;
+ * while (h - l >= 2) {
+ * int mid = (l + h) >>> 1;
+ * addToPendingCount(1);
+ * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork;
+ * h = mid;
+ * }
+ * if (h > l)
+ * result = mapper.apply(array[l]);
+ * // process completions by reducing along and advancing subtask links
+ * for (CountedCompleter> c = firstComplete(); c != null; c = c.nextComplete()) {
+ * for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
+ * t.result = reducer.apply(t.result, s.result);
+ * }
+ * }
+ * public E getRawResult() { return result; }
+ *
+ * public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) {
+ * return new MapReducer(null, array, mapper, reducer,
+ * 0, array.length, null).invoke();
+ * }
+ * }}
*
* Triggers. Some CountedCompleters are themselves never
* forked, but instead serve as bits of plumbing in other designs;
@@ -309,9 +363,9 @@ package jsr166e;
* class HeaderBuilder extends CountedCompleter<...> { ... }
* class BodyBuilder extends CountedCompleter<...> { ... }
* class PacketSender extends CountedCompleter<...> {
- * PacketSender(...) { super(null, 1); ... } // trigger on second completion
- * public void compute() { } // never called
- * public void onCompletion(CountedCompleter> caller) { sendPacket(); }
+ * PacketSender(...) { super(null, 1); ... } // trigger on second completion
+ * public void compute() { } // never called
+ * public void onCompletion(CountedCompleter> caller) { sendPacket(); }
* }
* // sample use:
* PacketSender p = new PacketSender();
@@ -446,13 +500,26 @@ public abstract class CountedCompleter a = this, s = a;
+ for (int c;;) {
+ if ((c = a.pending) == 0) {
+ if ((a = (s = a).completer) == null) {
+ s.quietlyComplete();
+ return;
+ }
+ }
+ else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
+ return;
+ }
+ }
+
+ /**
* Regardless of pending count, invokes {@link #onCompletion},
* marks this task as complete and further triggers {@link
* #tryComplete} on this task's completer, if one exists. The
@@ -499,7 +589,7 @@ public abstract class CountedCompleter firstComplete() {
+ for (int c;;) {
+ if ((c = pending) == 0)
+ return this;
+ else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
+ return null;
+ }
+ }
+
+ /**
+ * If this task does not have a completer, invokes {@link
+ * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
+ * this task's pending count is non-zero, decrements its pending
+ * count and returns {@code null}. Otherwise, returns the
+ * completer. This method can be used as part of a completion
+ * traversal loop for homogeneous task hierarchies:
+ *
+ * {@code
+ * for (CountedCompleter> c = firstComplete();
+ * c != null;
+ * c = c.nextComplete()) {
+ * // ... process c ...
+ * }}
+ *
+ * @return the completer, or {@code null} if none
+ */
+ public final CountedCompleter> nextComplete() {
+ CountedCompleter> p;
+ if ((p = completer) != null)
+ return p.firstComplete();
+ else {
+ quietlyComplete();
+ return null;
+ }
+ }
+
+ /**
+ * Equivalent to {@code getRoot().quietlyComplete()}.
+ */
+ public final void quietlyCompleteRoot() {
+ for (CountedCompleter> a = this, p;;) {
+ if ((p = a.completer) == null) {
+ a.quietlyComplete();
+ return;
+ }
+ a = p;
+ }
+ }
+
/**
* Support for FJT exception propagation
*/
@@ -533,7 +681,9 @@ public abstract class CountedCompleter