ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166y/CountedCompleter.java (file contents):
Revision 1.1 by dl, Mon Apr 9 13:12:18 2012 UTC vs.
Revision 1.26 by jsr166, Mon Nov 26 12:08:49 2012 UTC

# Line 7 | Line 7
7   package jsr166y;
8  
9   /**
10 < * A resultless {@link ForkJoinTask} with a completion action
11 < * performed when triggered and there are no remaining pending
12 < * actions. Uses of CountedCompleter are similar to those of other
13 < * completion based components (such as {@link
14 < * java.nio.channels.CompletionHandler}) except that multiple
15 < * <em>pending</em> completions may be necessary to trigger the {@link
16 < * #onCompletion} action, not just one. Unless initialized otherwise,
17 < * the {@link #getPendingCount pending count} starts at zero, but may
18 < * be (atomically) changed using methods {@link #setPendingCount},
19 < * {@link #addToPendingCount}, and {@link
20 < * #compareAndSetPendingCount}. Upon invocation of {@link
10 > * A {@link ForkJoinTask} with a completion action performed when
11 > * triggered and there are no remaining pending
12 > * actions. CountedCompleters are in general more robust in the
13 > * presence of subtask stalls and blockage than are other forms of
14 > * ForkJoinTasks, but are less intuitive to program.  Uses of
15 > * CountedCompleter are similar to those of other completion based
16 > * components (such as {@link java.nio.channels.CompletionHandler})
17 > * except that multiple <em>pending</em> completions may be necessary
18 > * to trigger the completion action {@link #onCompletion}, not just one.
19 > * Unless initialized otherwise, the {@linkplain #getPendingCount pending
20 > * count} starts at zero, but may be (atomically) changed using
21 > * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
22 > * {@link #compareAndSetPendingCount}. Upon invocation of {@link
23   * #tryComplete}, if the pending action count is nonzero, it is
24   * decremented; otherwise, the completion action is performed, and if
25   * this completer itself has a completer, the process is continued
26 < * with its completer.  As is the case with most basic synchronization
27 < * constructs, these methods affect only internal counts; they do not
28 < * establish any further internal bookkeeping. In particular, the
29 < * identities of pending tasks are not maintained. As illustrated
30 < * below, you can create subclasses that do record some or all pended
31 < * tasks or their results when needed.
26 > * with its completer.  As is the case with related synchronization
27 > * components such as {@link java.util.concurrent.Phaser Phaser} and
28 > * {@link java.util.concurrent.Semaphore Semaphore}, these methods
29 > * affect only internal counts; they do not establish any further
30 > * internal bookkeeping. In particular, the identities of pending
31 > * tasks are not maintained. As illustrated below, you can create
32 > * subclasses that do record some or all pending tasks or their
33 > * results when needed.  As illustrated below, utility methods
34 > * supporting customization of completion traversals are also
35 > * provided. However, because CountedCompleters provide only basic
36 > * synchronization mechanisms, it may be useful to create further
37 > * abstract subclasses that maintain linkages, fields, and additional
38 > * support methods appropriate for a set of related usages.
39   *
40   * <p>A concrete CountedCompleter class must define method {@link
41 < * #compute}, that should, in almost all use cases, invoke {@code
42 < * tryComplete()} before returning. The class may also optionally
43 < * override method {@link #onCompletion} to perform an action upon
44 < * normal completion.
41 > * #compute}, that should in most cases (as illustrated below), invoke
42 > * {@code tryComplete()} once before returning. The class may also
43 > * optionally override method {@link #onCompletion} to perform an
44 > * action upon normal completion, and method {@link
45 > * #onExceptionalCompletion} to perform an action upon any exception.
46 > *
47 > * <p>CountedCompleters most often do not bear results, in which case
48 > * they are normally declared as {@code CountedCompleter<Void>}, and
49 > * will always return {@code null} as a result value.  In other cases,
50 > * you should override method {@link #getRawResult} to provide a
51 > * result from {@code join(), invoke()}, and related methods.  In
52 > * general, this method should return the value of a field (or a
53 > * function of one or more fields) of the CountedCompleter object that
54 > * holds the result upon completion. Method {@link #setRawResult} by
55 > * default plays no role in CountedCompleters.  It is possible, but
56 > * rarely applicable, to override this method to maintain other
57 > * objects or fields holding result data.
58   *
59   * <p>A CountedCompleter that does not itself have a completer (i.e.,
60   * one for which {@link #getCompleter} returns {@code null}) can be
# Line 44 | Line 66 | package jsr166y;
66   * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
67   * ForkJoinTask#completeExceptionally} or upon exceptional completion
68   * of method {@code compute}. Upon any exceptional completion, the
69 < * exception is relayed to a task's completer (and its completer, and
70 < * so on), if one exists and it has not otherwise already completed.
69 > * exception may be relayed to a task's completer (and its completer,
70 > * and so on), if one exists and it has not otherwise already
71 > * completed. Similarly, cancelling an internal CountedCompleter has
72 > * only a local effect on that completer, so is not often useful.
73   *
74   * <p><b>Sample Usages.</b>
75   *
76   * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
77   * be arranged in trees similar to those often used with {@link
78   * RecursiveAction}s, although the constructions involved in setting
79 < * them up typically vary. Even though they entail a bit more
79 > * them up typically vary. Here, the completer of each task is its
80 > * parent in the computation tree. Even though they entail a bit more
81   * bookkeeping, CountedCompleters may be better choices when applying
82   * a possibly time-consuming operation (that cannot be further
83   * subdivided) to each element of an array or collection; especially
# Line 63 | Line 88 | package jsr166y;
88   * continuations, other threads need not block waiting to perform
89   * them.
90   *
91 < * <p> For example, here is an initial version of a class that uses
91 > * <p>For example, here is an initial version of a class that uses
92   * divide-by-two recursive decomposition to divide work into single
93   * pieces (leaf tasks). Even when work is split into individual calls,
94   * tree-based techniques are usually preferable to directly forking
# Line 72 | Line 97 | package jsr166y;
97   * pair of subtasks to finish triggers completion of its parent
98   * (because no result combination is performed, the default no-op
99   * implementation of method {@code onCompletion} is not overridden). A
100 < * static utility method sets up the base task and invokes it:
100 > * static utility method sets up the base task and invokes it
101 > * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
102   *
103   * <pre> {@code
104   * class MyOperation<E> { void apply(E e) { ... }  }
105   *
106 < * class ForEach<E> extends CountedCompleter {
106 > * class ForEach<E> extends CountedCompleter<Void> {
107   *
108 < *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
109 < *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
108 > *   public static <E> void forEach(E[] array, MyOperation<E> op) {
109 > *     new ForEach<E>(null, array, op, 0, array.length).invoke();
110 > *   }
111 > *
112 > *   final E[] array; final MyOperation<E> op; final int lo, hi;
113 > *   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
114 > *     super(p);
115 > *     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
116 > *   }
117 > *
118 > *   public void compute() { // version 1
119 > *     if (hi - lo >= 2) {
120 > *       int mid = (lo + hi) >>> 1;
121 > *       setPendingCount(2); // must set pending count before fork
122 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
123 > *       new ForEach(this, array, op, lo, mid).fork(); // left child
124   *     }
125 < *
126 < *     final E[] array; final MyOperation<E> op; final int lo, hi;
127 < *     ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
128 < *         super(p);
129 < *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
90 < *     }
91 < *
92 < *     public void compute() { // version 1
93 < *         if (hi - lo >= 2) {
94 < *             int mid = (lo + hi) >>> 1;
95 < *             setPendingCount(2); // must set pending count before fork
96 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
97 < *             new ForEach(this, array, op, lo, mid).fork(); // left child
98 < *         }
99 < *         else if (hi > lo)
100 < *             op.apply(array[lo]);
101 < *         tryComplete();
102 < *     }
103 < * } }</pre>
125 > *     else if (hi > lo)
126 > *       op.apply(array[lo]);
127 > *     tryComplete();
128 > *   }
129 > * }}</pre>
130   *
131   * This design can be improved by noticing that in the recursive case,
132   * the task has nothing to do after forking its right task, so can
133   * directly invoke its left task before returning. (This is an analog
134   * of tail recursion removal.)  Also, because the task returns upon
135   * executing its left task (rather than falling through to invoke
136 < * tryComplete) the pending count is set to one:
136 > * {@code tryComplete}) the pending count is set to one:
137   *
138   * <pre> {@code
139   * class ForEach<E> ...
140 < *     public void compute() { // version 2
141 < *         if (hi - lo >= 2) {
142 < *             int mid = (lo + hi) >>> 1;
143 < *             setPendingCount(1); // only one pending
144 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
145 < *             new ForEach(this, array, op, lo, mid).compute(); // direct invoke
146 < *         }
147 < *         else {
148 < *             if (hi > lo)
149 < *                 op.apply(array[lo]);
150 < *             tryComplete();
125 < *         }
140 > *   public void compute() { // version 2
141 > *     if (hi - lo >= 2) {
142 > *       int mid = (lo + hi) >>> 1;
143 > *       setPendingCount(1); // only one pending
144 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
145 > *       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
146 > *     }
147 > *     else {
148 > *       if (hi > lo)
149 > *         op.apply(array[lo]);
150 > *       tryComplete();
151   *     }
152 + *   }
153   * }</pre>
154   *
155   * As a further improvement, notice that the left task need not even
156   * exist.  Instead of creating a new one, we can iterate using the
157 < * original task, and add a pending count for each fork:
157 > * original task, and add a pending count for each fork. Additionally,
158 > * because no task in this tree implements an {@link #onCompletion}
159 > * method, {@code tryComplete()} can be replaced with {@link
160 > * #propagateCompletion}.
161   *
162   * <pre> {@code
163   * class ForEach<E> ...
164 < *     public void compute() { // version 3
165 < *         int l = lo,  h = hi;
166 < *         while (h - l >= 2) {
167 < *             int mid = (l + h) >>> 1;
168 < *             addToPendingCount(1);
169 < *             new ForEach(this, array, op, mid, h).fork(); // right child
170 < *             h = mid;
142 < *         }
143 < *         if (h > l)
144 < *             op.apply(array[l]);
145 < *         tryComplete();
164 > *   public void compute() { // version 3
165 > *     int l = lo,  h = hi;
166 > *     while (h - l >= 2) {
167 > *       int mid = (l + h) >>> 1;
168 > *       addToPendingCount(1);
169 > *       new ForEach(this, array, op, mid, h).fork(); // right child
170 > *       h = mid;
171   *     }
172 + *     if (h > l)
173 + *       op.apply(array[l]);
174 + *     propagateCompletion();
175 + *   }
176   * }</pre>
177   *
178   * Additional improvements of such classes might entail precomputing
# Line 152 | Line 181 | package jsr166y;
181   * instead of two per iteration, and using an adaptive threshold
182   * instead of always subdividing down to single elements.
183   *
184 + * <p><b>Searching.</b> A tree of CountedCompleters can search for a
185 + * value or property in different parts of a data structure, and
186 + * report a result in an {@link
187 + * java.util.concurrent.atomic.AtomicReference AtomicReference} as
188 + * soon as one is found. The others can poll the result to avoid
189 + * unnecessary work. (You could additionally {@linkplain #cancel
190 + * cancel} other tasks, but it is usually simpler and more efficient
191 + * to just let them notice that the result is set and if so skip
192 + * further processing.)  Illustrating again with an array using full
193 + * partitioning (again, in practice, leaf tasks will almost always
194 + * process more than one element):
195 + *
196 + * <pre> {@code
197 + * class Searcher<E> extends CountedCompleter<E> {
198 + *   final E[] array; final AtomicReference<E> result; final int lo, hi;
199 + *   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
200 + *     super(p);
201 + *     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
202 + *   }
203 + *   public E getRawResult() { return result.get(); }
204 + *   public void compute() { // similar to ForEach version 3
205 + *     int l = lo,  h = hi;
206 + *     while (result.get() == null && h >= l) {
207 + *       if (h - l >= 2) {
208 + *         int mid = (l + h) >>> 1;
209 + *         addToPendingCount(1);
210 + *         new Searcher(this, array, result, mid, h).fork();
211 + *         h = mid;
212 + *       }
213 + *       else {
214 + *         E x = array[l];
215 + *         if (matches(x) && result.compareAndSet(null, x))
216 + *           quietlyCompleteRoot(); // root task is now joinable
217 + *         break;
218 + *       }
219 + *     }
220 + *     tryComplete(); // normally complete whether or not found
221 + *   }
222 + *   boolean matches(E e) { ... } // return true if found
223 + *
224 + *   public static <E> E search(E[] array) {
225 + *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
226 + *   }
227 + *}}</pre>
228 + *
229 + * In this example, as well as others in which tasks have no other
230 + * effects except to compareAndSet a common result, the trailing
231 + * unconditional invocation of {@code tryComplete} could be made
232 + * conditional ({@code if (result.get() == null) tryComplete();})
233 + * because no further bookkeeping is required to manage completions
234 + * once the root task completes.
235 + *
236   * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
237   * results of multiple subtasks usually need to access these results
238   * in method {@link #onCompletion}. As illustrated in the following
# Line 159 | Line 240 | package jsr166y;
240   * and reductions are all of type {@code E}), one way to do this in
241   * divide and conquer designs is to have each subtask record its
242   * sibling, so that it can be accessed in method {@code onCompletion}.
243 < * For clarity, this class uses explicit left and right subtasks, but
244 < * variants of other streamlinings seen in the above example may also
245 < * apply.
243 > * This technique applies to reductions in which the order of
244 > * combining left and right results does not matter; ordered
245 > * reductions require explicit left/right designations.  Variants of
246 > * other streamlinings seen in the above examples may also apply.
247   *
248   * <pre> {@code
249   * class MyMapper<E> { E apply(E v) {  ...  } }
250   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
251 < * class MapReducer<E> extends CountedCompleter {
252 < *     final E[] array; final MyMapper<E> mapper;
253 < *     final MyReducer<E> reducer; final int lo, hi;
254 < *     MapReducer sibling;
255 < *     E result;
256 < *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
257 < *                MyReducer<E> reducer, int lo, int hi) {
258 < *         super(p);
259 < *         this.array = array; this.mapper = mapper;
260 < *         this.reducer = reducer; this.lo = lo; this.hi = hi;
251 > * class MapReducer<E> extends CountedCompleter<E> {
252 > *   final E[] array; final MyMapper<E> mapper;
253 > *   final MyReducer<E> reducer; final int lo, hi;
254 > *   MapReducer<E> sibling;
255 > *   E result;
256 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
257 > *              MyReducer<E> reducer, int lo, int hi) {
258 > *     super(p);
259 > *     this.array = array; this.mapper = mapper;
260 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
261 > *   }
262 > *   public void compute() {
263 > *     if (hi - lo >= 2) {
264 > *       int mid = (lo + hi) >>> 1;
265 > *       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
266 > *       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
267 > *       left.sibling = right;
268 > *       right.sibling = left;
269 > *       setPendingCount(1); // only right is pending
270 > *       right.fork();
271 > *       left.compute();     // directly execute left
272   *     }
273 < *     public void compute() {
274 < *         if (hi - lo >= 2) {
275 < *             int mid = (lo + hi) >>> 1;
276 < *             MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
184 < *             MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
185 < *             left.sibling = right;
186 < *             right.sibling = left;
187 < *             setPendingCount(1); // only right is pending
188 < *             right.fork();
189 < *             left.compute();     // directly execute left
190 < *         }
191 < *         else {
192 < *             if (hi > lo)
193 < *                 result = mapper.apply(array[lo]);
194 < *             tryComplete();
195 < *         }
273 > *     else {
274 > *       if (hi > lo)
275 > *           result = mapper.apply(array[lo]);
276 > *       tryComplete();
277   *     }
278 < *     public void onCompletion(CountedCompleter caller) {
279 < *         if (caller != this) {
280 < *            MapReducer<E> child = (MapReducer<E>)caller;
281 < *            MapReducer<E> sib = child.sibling;
282 < *            if (sib == null || sib.result == null)
283 < *                result = child.result;
284 < *            else
285 < *                result = reducer.apply(child.result, sib.result);
286 < *         }
278 > *   }
279 > *   public void onCompletion(CountedCompleter<?> caller) {
280 > *     if (caller != this) {
281 > *       MapReducer<E> child = (MapReducer<E>)caller;
282 > *       MapReducer<E> sib = child.sibling;
283 > *       if (sib == null || sib.result == null)
284 > *         result = child.result;
285 > *       else
286 > *         result = reducer.apply(child.result, sib.result);
287   *     }
288 + *   }
289 + *   public E getRawResult() { return result; }
290 + *
291 + *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
292 + *     return new MapReducer<E>(null, array, mapper, reducer,
293 + *                              0, array.length).invoke();
294 + *   }
295 + * }}</pre>
296 + *
297 + * Here, method {@code onCompletion} takes a form common to many
298 + * completion designs that combine results. This callback-style method
299 + * is triggered once per task, in either of the two different contexts
300 + * in which the pending count is, or becomes, zero: (1) by a task
301 + * itself, if its pending count is zero upon invocation of {@code
302 + * tryComplete}, or (2) by any of its subtasks when they complete and
303 + * decrement the pending count to zero. The {@code caller} argument
304 + * distinguishes cases.  Most often, when the caller is {@code this},
305 + * no action is necessary. Otherwise the caller argument can be used
306 + * (usually via a cast) to supply a value (and/or links to other
307 + * values) to be combined.  Assuming proper use of pending counts, the
308 + * actions inside {@code onCompletion} occur (once) upon completion of
309 + * a task and its subtasks. No additional synchronization is required
310 + * within this method to ensure thread safety of accesses to fields of
311 + * this task or other completed tasks.
312 + *
313 + * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
314 + * process completions is inapplicable or inconvenient, you can use
315 + * methods {@link #firstComplete} and {@link #nextComplete} to create
316 + * custom traversals.  For example, to define a MapReducer that only
317 + * splits out right-hand tasks in the form of the third ForEach
318 + * example, the completions must cooperatively reduce along
319 + * unexhausted subtask links, which can be done as follows:
320   *
321 < *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
322 < *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
323 < *         MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
324 < *                                              reducer, 0, array.length);
325 < *         pool.invoke(mr);
326 < *         return mr.result;
321 > * <pre> {@code
322 > * class MapReducer<E> extends CountedCompleter<E> { // version 2
323 > *   final E[] array; final MyMapper<E> mapper;
324 > *   final MyReducer<E> reducer; final int lo, hi;
325 > *   MapReducer<E> forks, next; // record subtask forks in list
326 > *   E result;
327 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
328 > *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
329 > *     super(p);
330 > *     this.array = array; this.mapper = mapper;
331 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
332 > *     this.next = next;
333 > *   }
334 > *   public void compute() {
335 > *     int l = lo,  h = hi;
336 > *     while (h - l >= 2) {
337 > *       int mid = (l + h) >>> 1;
338 > *       addToPendingCount(1);
339 > *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork;
340 > *       h = mid;
341 > *     }
342 > *     if (h > l)
343 > *       result = mapper.apply(array[l]);
344 > *     // process completions by reducing along and advancing subtask links
345 > *     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
346 > *       for (MapReducer t = (MapReducer)c, s = t.forks;  s != null; s = t.forks = s.next)
347 > *         t.result = reducer.apply(t.result, s.result);
348   *     }
349 < * } }</pre>
349 > *   }
350 > *   public E getRawResult() { return result; }
351 > *
352 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
353 > *     return new MapReducer<E>(null, array, mapper, reducer,
354 > *                              0, array.length, null).invoke();
355 > *   }
356 > * }}</pre>
357   *
358   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
359   * forked, but instead serve as bits of plumbing in other designs;
# Line 220 | Line 361 | package jsr166y;
361   * triggers another async task. For example:
362   *
363   * <pre> {@code
364 < * class HeaderBuilder extends CountedCompleter { ... }
365 < * class BodyBuilder extends CountedCompleter { ... }
366 < * class PacketSender extends CountedCompleter {
367 < *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
368 < *     public void compute() { } // never called
369 < *     public void onCompletion(CountedCompleter caller) { sendPacket(); }
364 > * class HeaderBuilder extends CountedCompleter<...> { ... }
365 > * class BodyBuilder extends CountedCompleter<...> { ... }
366 > * class PacketSender extends CountedCompleter<...> {
367 > *   PacketSender(...) { super(null, 1); ... } // trigger on second completion
368 > *   public void compute() { } // never called
369 > *   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
370   * }
371   * // sample use:
372   * PacketSender p = new PacketSender();
# Line 236 | Line 377 | package jsr166y;
377   * @since 1.8
378   * @author Doug Lea
379   */
380 < public abstract class CountedCompleter extends ForkJoinTask<Void> {
380 > public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
381 >    private static final long serialVersionUID = 5232453752276485070L;
382 >
383      /** This task's completer, or null if none */
384 <    final CountedCompleter completer;
384 >    final CountedCompleter<?> completer;
385      /** The number of pending tasks until completion */
386      volatile int pending;
387  
# Line 246 | Line 389 | public abstract class CountedCompleter e
389       * Creates a new CountedCompleter with the given completer
390       * and initial pending count.
391       *
392 <     * @param completer this tasks completer, or {@code null} if none
392 >     * @param completer this task's completer, or {@code null} if none
393       * @param initialPendingCount the initial pending count
394       */
395 <    protected CountedCompleter(CountedCompleter completer,
395 >    protected CountedCompleter(CountedCompleter<?> completer,
396                                 int initialPendingCount) {
397          this.completer = completer;
398          this.pending = initialPendingCount;
# Line 259 | Line 402 | public abstract class CountedCompleter e
402       * Creates a new CountedCompleter with the given completer
403       * and an initial pending count of zero.
404       *
405 <     * @param completer this tasks completer, or {@code null} if none
405 >     * @param completer this task's completer, or {@code null} if none
406       */
407 <    protected CountedCompleter(CountedCompleter completer) {
407 >    protected CountedCompleter(CountedCompleter<?> completer) {
408          this.completer = completer;
409      }
410  
# Line 279 | Line 422 | public abstract class CountedCompleter e
422      public abstract void compute();
423  
424      /**
425 <     * Executes the completion action when method {@link #tryComplete}
426 <     * is invoked and there are no pending counts, or when the
427 <     * unconditional method {@link #complete} is invoked.  By default,
428 <     * this method does nothing.
425 >     * Performs an action when method {@link #tryComplete} is invoked
426 >     * and the pending count is zero, or when the unconditional
427 >     * method {@link #complete} is invoked.  By default, this method
428 >     * does nothing. You can distinguish cases by checking the
429 >     * identity of the given caller argument. If not equal to {@code
430 >     * this}, then it is typically a subtask that may contain results
431 >     * (and/or links to other results) to combine.
432       *
433       * @param caller the task invoking this method (which may
434       * be this task itself).
435       */
436 <    public void onCompletion(CountedCompleter caller) {
436 >    public void onCompletion(CountedCompleter<?> caller) {
437 >    }
438 >
439 >    /**
440 >     * Performs an action when method {@link #completeExceptionally}
441 >     * is invoked or method {@link #compute} throws an exception, and
442 >     * this task has not otherwise already completed normally. On
443 >     * entry to this method, this task {@link
444 >     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
445 >     * method controls further propagation: If {@code true} and this
446 >     * task has a completer, then this completer is also completed
447 >     * exceptionally.  The default implementation of this method does
448 >     * nothing except return {@code true}.
449 >     *
450 >     * @param ex the exception
451 >     * @param caller the task invoking this method (which may
452 >     * be this task itself).
453 >     * @return true if this exception should be propagated to this
454 >     * task's completer, if one exists.
455 >     */
456 >    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
457 >        return true;
458      }
459  
460      /**
# Line 296 | Line 463 | public abstract class CountedCompleter e
463       *
464       * @return the completer
465       */
466 <    public final CountedCompleter getCompleter() {
466 >    public final CountedCompleter<?> getCompleter() {
467          return completer;
468      }
469  
# Line 334 | Line 501 | public abstract class CountedCompleter e
501       *
502       * @param expected the expected value
503       * @param count the new value
504 <     * @return true is successful
504 >     * @return true if successful
505       */
506      public final boolean compareAndSetPendingCount(int expected, int count) {
507          return U.compareAndSwapInt(this, PENDING, expected, count);
508      }
509  
510      /**
511 +     * If the pending count is nonzero, (atomically) decrements it.
512 +     *
513 +     * @return the initial (undecremented) pending count holding on entry
514 +     * to this method
515 +     */
516 +    public final int decrementPendingCountUnlessZero() {
517 +        int c;
518 +        do {} while ((c = pending) != 0 &&
519 +                     !U.compareAndSwapInt(this, PENDING, c, c - 1));
520 +        return c;
521 +    }
522 +
523 +    /**
524 +     * Returns the root of the current computation; i.e., this
525 +     * task if it has no completer, else its completer's root.
526 +     *
527 +     * @return the root of the current computation
528 +     */
529 +    public final CountedCompleter<?> getRoot() {
530 +        CountedCompleter<?> a = this, p;
531 +        while ((p = a.completer) != null)
532 +            a = p;
533 +        return a;
534 +    }
535 +
536 +    /**
537       * If the pending count is nonzero, decrements the count;
538       * otherwise invokes {@link #onCompletion} and then similarly
539       * tries to complete this task's completer, if one exists,
540       * else marks this task as complete.
541       */
542      public final void tryComplete() {
543 <        for (CountedCompleter a = this, s = a;;) {
544 <            int c;
543 >        CountedCompleter<?> a = this, s = a;
544 >        for (int c;;) {
545              if ((c = a.pending) == 0) {
546                  a.onCompletion(s);
547                  if ((a = (s = a).completer) == null) {
# Line 362 | Line 555 | public abstract class CountedCompleter e
555      }
556  
557      /**
558 +     * Equivalent to {@link #tryComplete} but does not invoke {@link
559 +     * #onCompletion} along the completion path: If the pending count
560 +     * is nonzero, decrements the count; otherwise, similarly tries to
561 +     * complete this task's completer, if one exists, else marks this
562 +     * task as complete. This method may be useful in cases where
563 +     * {@code onCompletion} should not, or need not, be invoked for
564 +     * each completer in a computation.
565 +     */
566 +    public final void propagateCompletion() {
567 +        CountedCompleter<?> a = this, s = a;
568 +        for (int c;;) {
569 +            if ((c = a.pending) == 0) {
570 +                if ((a = (s = a).completer) == null) {
571 +                    s.quietlyComplete();
572 +                    return;
573 +                }
574 +            }
575 +            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
576 +                return;
577 +        }
578 +    }
579 +
580 +    /**
581       * Regardless of pending count, invokes {@link #onCompletion},
582 <     * marks this task as complete with a {@code null} return value,
583 <     * and further triggers {@link #tryComplete} on this task's
584 <     * completer, if one exists. This method may be useful when
585 <     * forcing completion as soon as any one (versus all) of several
586 <     * subtask results are obtained.
587 <     *
588 <     * @param mustBeNull the {@code null} completion value
589 <     */
590 <    public void complete(Void mustBeNull) {
591 <        CountedCompleter p;
582 >     * marks this task as complete and further triggers {@link
583 >     * #tryComplete} on this task's completer, if one exists.  The
584 >     * given rawResult is used as an argument to {@link #setRawResult}
585 >     * before invoking {@link #onCompletion} or marking this task as
586 >     * complete; its value is meaningful only for classes overriding
587 >     * {@code setRawResult}.
588 >     *
589 >     * <p>This method may be useful when forcing completion as soon as
590 >     * any one (versus all) of several subtask results are obtained.
591 >     * However, in the common (and recommended) case in which {@code
592 >     * setRawResult} is not overridden, this effect can be obtained
593 >     * more simply using {@code quietlyCompleteRoot();}.
594 >     *
595 >     * @param rawResult the raw result
596 >     */
597 >    public void complete(T rawResult) {
598 >        CountedCompleter<?> p;
599 >        setRawResult(rawResult);
600          onCompletion(this);
601          quietlyComplete();
602          if ((p = completer) != null)
603              p.tryComplete();
604      }
605  
606 +
607 +    /**
608 +     * If this task's pending count is zero, returns this task;
609 +     * otherwise decrements its pending count and returns {@code
610 +     * null}. This method is designed to be used with {@link
611 +     * #nextComplete} in completion traversal loops.
612 +     *
613 +     * @return this task, if pending count was zero, else {@code null}
614 +     */
615 +    public final CountedCompleter<?> firstComplete() {
616 +        for (int c;;) {
617 +            if ((c = pending) == 0)
618 +                return this;
619 +            else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
620 +                return null;
621 +        }
622 +    }
623 +
624 +    /**
625 +     * If this task does not have a completer, invokes {@link
626 +     * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
627 +     * this task's pending count is non-zero, decrements its pending
628 +     * count and returns {@code null}.  Otherwise, returns the
629 +     * completer.  This method can be used as part of a completion
630 +     * traversal loop for homogeneous task hierarchies:
631 +     *
632 +     * <pre> {@code
633 +     * for (CountedCompleter<?> c = firstComplete();
634 +     *      c != null;
635 +     *      c = c.nextComplete()) {
636 +     *   // ... process c ...
637 +     * }}</pre>
638 +     *
639 +     * @return the completer, or {@code null} if none
640 +     */
641 +    public final CountedCompleter<?> nextComplete() {
642 +        CountedCompleter<?> p;
643 +        if ((p = completer) != null)
644 +            return p.firstComplete();
645 +        else {
646 +            quietlyComplete();
647 +            return null;
648 +        }
649 +    }
650 +
651 +    /**
652 +     * Equivalent to {@code getRoot().quietlyComplete()}.
653 +     */
654 +    public final void quietlyCompleteRoot() {
655 +        for (CountedCompleter<?> a = this, p;;) {
656 +            if ((p = a.completer) == null) {
657 +                a.quietlyComplete();
658 +                return;
659 +            }
660 +            a = p;
661 +        }
662 +    }
663 +
664      /**
665 <     * Implements execution conventions for CountedCompleters
665 >     * Supports ForkJoinTask exception propagation.
666 >     */
667 >    void internalPropagateException(Throwable ex) {
668 >        CountedCompleter<?> a = this, s = a;
669 >        while (a.onExceptionalCompletion(ex, s) &&
670 >               (a = (s = a).completer) != null && a.status >= 0)
671 >            a.recordExceptionalCompletion(ex);
672 >    }
673 >
674 >    /**
675 >     * Implements execution conventions for CountedCompleters.
676       */
677      protected final boolean exec() {
678          compute();
# Line 388 | Line 680 | public abstract class CountedCompleter e
680      }
681  
682      /**
683 <     * Always returns {@code null}.
683 >     * Returns the result of the computation. By default
684 >     * returns {@code null}, which is appropriate for {@code Void}
685 >     * actions, but in other cases should be overridden, almost
686 >     * always to return a field or function of a field that
687 >     * holds the result upon completion.
688       *
689 <     * @return {@code null} always
689 >     * @return the result of the computation
690       */
691 <    public final Void getRawResult() { return null; }
691 >    public T getRawResult() { return null; }
692  
693      /**
694 <     * Requires null completion value.
694 >     * A method that result-bearing CountedCompleters may optionally
695 >     * use to help maintain result data.  By default, does nothing.
696 >     * Overrides are not recommended. However, if this method is
697 >     * overridden to update existing objects or fields, then it must
698 >     * in general be defined to be thread-safe.
699       */
700 <    protected final void setRawResult(Void mustBeNull) { }
401 <
402 <    /**
403 <     * Support for FJT exception propagation
404 <     */
405 <    final ForkJoinTask<?> internalGetCompleter() { return completer; }
700 >    protected void setRawResult(T t) { }
701  
702      // Unsafe mechanics
703      private static final sun.misc.Unsafe U;
704      private static final long PENDING;
705      static {
706          try {
707 <            U = getUnsafe();
707 >            U = sun.misc.Unsafe.getUnsafe();
708              PENDING = U.objectFieldOffset
709                  (CountedCompleter.class.getDeclaredField("pending"));
710          } catch (Exception e) {
# Line 417 | Line 712 | public abstract class CountedCompleter e
712          }
713      }
714  
420
715      /**
716       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
717       * Replace with a simple call to Unsafe.getUnsafe when integrating
# Line 445 | Line 739 | public abstract class CountedCompleter e
739              }
740          }
741      }
448
742   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines