ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166e/CountedCompleter.java (file contents):
Revision 1.4 by jsr166, Sun Oct 21 05:28:08 2012 UTC vs.
Revision 1.31 by jsr166, Sat Jul 27 19:53:27 2013 UTC

# Line 7 | Line 7
7   package jsr166e;
8  
9   /**
10 < * A {@link ForkJoinTask} with a completion action
11 < * performed when triggered and there are no remaining pending
12 < * actions. Uses of CountedCompleter are similar to those of other
13 < * completion based components (such as {@link
14 < * java.nio.channels.CompletionHandler}) except that multiple
15 < * <em>pending</em> completions may be necessary to trigger the {@link
16 < * #onCompletion} action, not just one. Unless initialized otherwise,
17 < * the {@link #getPendingCount pending count} starts at zero, but may
18 < * be (atomically) changed using methods {@link #setPendingCount},
19 < * {@link #addToPendingCount}, and {@link
20 < * #compareAndSetPendingCount}. Upon invocation of {@link
10 > * A {@link ForkJoinTask} with a completion action performed when
11 > * triggered and there are no remaining pending actions.
12 > * CountedCompleters are in general more robust in the
13 > * presence of subtask stalls and blockage than are other forms of
14 > * ForkJoinTasks, but are less intuitive to program.  Uses of
15 > * CountedCompleter are similar to those of other completion based
16 > * components (such as {@link java.nio.channels.CompletionHandler})
17 > * except that multiple <em>pending</em> completions may be necessary
18 > * to trigger the completion action {@link #onCompletion(CountedCompleter)},
19 > * not just one.
20 > * Unless initialized otherwise, the {@linkplain #getPendingCount pending
21 > * count} starts at zero, but may be (atomically) changed using
22 > * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
23 > * {@link #compareAndSetPendingCount}. Upon invocation of {@link
24   * #tryComplete}, if the pending action count is nonzero, it is
25   * decremented; otherwise, the completion action is performed, and if
26   * this completer itself has a completer, the process is continued
# Line 28 | Line 31 | package jsr166e;
31   * internal bookkeeping. In particular, the identities of pending
32   * tasks are not maintained. As illustrated below, you can create
33   * subclasses that do record some or all pending tasks or their
34 < * results when needed.
34 > * results when needed.  As illustrated below, utility methods
35 > * supporting customization of completion traversals are also
36 > * provided. However, because CountedCompleters provide only basic
37 > * synchronization mechanisms, it may be useful to create further
38 > * abstract subclasses that maintain linkages, fields, and additional
39 > * support methods appropriate for a set of related usages.
40   *
41   * <p>A concrete CountedCompleter class must define method {@link
42 < * #compute}, that should, in almost all use cases, invoke {@code
43 < * tryComplete()} once before returning. The class may also optionally
44 < * override method {@link #onCompletion} to perform an action upon
45 < * normal completion, and method {@link #onExceptionalCompletion} to
42 > * #compute}, that should in most cases (as illustrated below), invoke
43 > * {@code tryComplete()} once before returning. The class may also
44 > * optionally override method {@link #onCompletion(CountedCompleter)}
45 > * to perform an action upon normal completion, and method
46 > * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
47   * perform an action upon any exception.
48   *
49   * <p>CountedCompleters most often do not bear results, in which case
50   * they are normally declared as {@code CountedCompleter<Void>}, and
51   * will always return {@code null} as a result value.  In other cases,
52   * you should override method {@link #getRawResult} to provide a
53 < * result from {@code join(), invoke()}, and related methods. (Method
54 < * {@link #setRawResult} by default plays no role in CountedCompleters
55 < * but may be overridden for example to maintain fields holding result
56 < * data.)
53 > * result from {@code join(), invoke()}, and related methods.  In
54 > * general, this method should return the value of a field (or a
55 > * function of one or more fields) of the CountedCompleter object that
56 > * holds the result upon completion. Method {@link #setRawResult} by
57 > * default plays no role in CountedCompleters.  It is possible, but
58 > * rarely applicable, to override this method to maintain other
59 > * objects or fields holding result data.
60   *
61   * <p>A CountedCompleter that does not itself have a completer (i.e.,
62   * one for which {@link #getCompleter} returns {@code null}) can be
# Line 53 | Line 65 | package jsr166e;
65   * only as an internal helper for other computations, so its own task
66   * status (as reported in methods such as {@link ForkJoinTask#isDone})
67   * is arbitrary; this status changes only upon explicit invocations of
68 < * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
69 < * ForkJoinTask#completeExceptionally} or upon exceptional completion
70 < * of method {@code compute}. Upon any exceptional completion, the
71 < * exception may be relayed to a task's completer (and its completer,
72 < * and so on), if one exists and it has not otherwise already
73 < * completed.
68 > * {@link #complete}, {@link ForkJoinTask#cancel},
69 > * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
70 > * exceptional completion of method {@code compute}. Upon any
71 > * exceptional completion, the exception may be relayed to a task's
72 > * completer (and its completer, and so on), if one exists and it has
73 > * not otherwise already completed. Similarly, cancelling an internal
74 > * CountedCompleter has only a local effect on that completer, so is
75 > * not often useful.
76   *
77   * <p><b>Sample Usages.</b>
78   *
# Line 72 | Line 86 | package jsr166e;
86   * subdivided) to each element of an array or collection; especially
87   * when the operation takes a significantly different amount of time
88   * to complete for some elements than others, either because of
89 < * intrinsic variation (for example IO) or auxiliary effects such as
89 > * intrinsic variation (for example I/O) or auxiliary effects such as
90   * garbage collection.  Because CountedCompleters provide their own
91   * continuations, other threads need not block waiting to perform
92   * them.
93   *
94 < * <p> For example, here is an initial version of a class that uses
94 > * <p>For example, here is an initial version of a class that uses
95   * divide-by-two recursive decomposition to divide work into single
96   * pieces (leaf tasks). Even when work is split into individual calls,
97   * tree-based techniques are usually preferable to directly forking
# Line 85 | Line 99 | package jsr166e;
99   * improve load balancing. In the recursive case, the second of each
100   * pair of subtasks to finish triggers completion of its parent
101   * (because no result combination is performed, the default no-op
102 < * implementation of method {@code onCompletion} is not overridden). A
103 < * static utility method sets up the base task and invokes it:
102 > * implementation of method {@code onCompletion} is not overridden).
103 > * A static utility method sets up the base task and invokes it
104 > * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
105   *
106   * <pre> {@code
107   * class MyOperation<E> { void apply(E e) { ... }  }
108   *
109   * class ForEach<E> extends CountedCompleter<Void> {
110   *
111 < *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
112 < *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
111 > *   public static <E> void forEach(E[] array, MyOperation<E> op) {
112 > *     new ForEach<E>(null, array, op, 0, array.length).invoke();
113 > *   }
114 > *
115 > *   final E[] array; final MyOperation<E> op; final int lo, hi;
116 > *   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
117 > *     super(p);
118 > *     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
119 > *   }
120 > *
121 > *   public void compute() { // version 1
122 > *     if (hi - lo >= 2) {
123 > *       int mid = (lo + hi) >>> 1;
124 > *       setPendingCount(2); // must set pending count before fork
125 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
126 > *       new ForEach(this, array, op, lo, mid).fork(); // left child
127   *     }
128 < *
129 < *     final E[] array; final MyOperation<E> op; final int lo, hi;
130 < *     ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
131 < *         super(p);
132 < *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
104 < *     }
105 < *
106 < *     public void compute() { // version 1
107 < *         if (hi - lo >= 2) {
108 < *             int mid = (lo + hi) >>> 1;
109 < *             setPendingCount(2); // must set pending count before fork
110 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
111 < *             new ForEach(this, array, op, lo, mid).fork(); // left child
112 < *         }
113 < *         else if (hi > lo)
114 < *             op.apply(array[lo]);
115 < *         tryComplete();
116 < *     }
117 < * } }</pre>
128 > *     else if (hi > lo)
129 > *       op.apply(array[lo]);
130 > *     tryComplete();
131 > *   }
132 > * }}</pre>
133   *
134   * This design can be improved by noticing that in the recursive case,
135   * the task has nothing to do after forking its right task, so can
136   * directly invoke its left task before returning. (This is an analog
137   * of tail recursion removal.)  Also, because the task returns upon
138   * executing its left task (rather than falling through to invoke
139 < * tryComplete) the pending count is set to one:
139 > * {@code tryComplete}) the pending count is set to one:
140   *
141   * <pre> {@code
142   * class ForEach<E> ...
143 < *     public void compute() { // version 2
144 < *         if (hi - lo >= 2) {
145 < *             int mid = (lo + hi) >>> 1;
146 < *             setPendingCount(1); // only one pending
147 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
148 < *             new ForEach(this, array, op, lo, mid).compute(); // direct invoke
149 < *         }
150 < *         else {
151 < *             if (hi > lo)
152 < *                 op.apply(array[lo]);
153 < *             tryComplete();
139 < *         }
143 > *   public void compute() { // version 2
144 > *     if (hi - lo >= 2) {
145 > *       int mid = (lo + hi) >>> 1;
146 > *       setPendingCount(1); // only one pending
147 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
148 > *       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
149 > *     }
150 > *     else {
151 > *       if (hi > lo)
152 > *         op.apply(array[lo]);
153 > *       tryComplete();
154   *     }
155 + *   }
156   * }</pre>
157   *
158 < * As a further improvement, notice that the left task need not even
159 < * exist.  Instead of creating a new one, we can iterate using the
160 < * original task, and add a pending count for each fork:
158 > * As a further improvement, notice that the left task need not even exist.
159 > * Instead of creating a new one, we can iterate using the original task,
160 > * and add a pending count for each fork.  Additionally, because no task
161 > * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
162 > * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
163   *
164   * <pre> {@code
165   * class ForEach<E> ...
166 < *     public void compute() { // version 3
167 < *         int l = lo,  h = hi;
168 < *         while (h - l >= 2) {
169 < *             int mid = (l + h) >>> 1;
170 < *             addToPendingCount(1);
171 < *             new ForEach(this, array, op, mid, h).fork(); // right child
172 < *             h = mid;
156 < *         }
157 < *         if (h > l)
158 < *             op.apply(array[l]);
159 < *         tryComplete();
166 > *   public void compute() { // version 3
167 > *     int l = lo,  h = hi;
168 > *     while (h - l >= 2) {
169 > *       int mid = (l + h) >>> 1;
170 > *       addToPendingCount(1);
171 > *       new ForEach(this, array, op, mid, h).fork(); // right child
172 > *       h = mid;
173   *     }
174 + *     if (h > l)
175 + *       op.apply(array[l]);
176 + *     propagateCompletion();
177 + *   }
178   * }</pre>
179   *
180   * Additional improvements of such classes might entail precomputing
# Line 166 | Line 183 | package jsr166e;
183   * instead of two per iteration, and using an adaptive threshold
184   * instead of always subdividing down to single elements.
185   *
186 + * <p><b>Searching.</b> A tree of CountedCompleters can search for a
187 + * value or property in different parts of a data structure, and
188 + * report a result in an {@link
189 + * java.util.concurrent.atomic.AtomicReference AtomicReference} as
190 + * soon as one is found. The others can poll the result to avoid
191 + * unnecessary work. (You could additionally {@linkplain #cancel
192 + * cancel} other tasks, but it is usually simpler and more efficient
193 + * to just let them notice that the result is set and if so skip
194 + * further processing.)  Illustrating again with an array using full
195 + * partitioning (again, in practice, leaf tasks will almost always
196 + * process more than one element):
197 + *
198 + * <pre> {@code
199 + * class Searcher<E> extends CountedCompleter<E> {
200 + *   final E[] array; final AtomicReference<E> result; final int lo, hi;
201 + *   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
202 + *     super(p);
203 + *     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
204 + *   }
205 + *   public E getRawResult() { return result.get(); }
206 + *   public void compute() { // similar to ForEach version 3
207 + *     int l = lo,  h = hi;
208 + *     while (result.get() == null && h >= l) {
209 + *       if (h - l >= 2) {
210 + *         int mid = (l + h) >>> 1;
211 + *         addToPendingCount(1);
212 + *         new Searcher(this, array, result, mid, h).fork();
213 + *         h = mid;
214 + *       }
215 + *       else {
216 + *         E x = array[l];
217 + *         if (matches(x) && result.compareAndSet(null, x))
218 + *           quietlyCompleteRoot(); // root task is now joinable
219 + *         break;
220 + *       }
221 + *     }
222 + *     tryComplete(); // normally complete whether or not found
223 + *   }
224 + *   boolean matches(E e) { ... } // return true if found
225 + *
226 + *   public static <E> E search(E[] array) {
227 + *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
228 + *   }
229 + * }}</pre>
230 + *
231 + * In this example, as well as others in which tasks have no other
232 + * effects except to compareAndSet a common result, the trailing
233 + * unconditional invocation of {@code tryComplete} could be made
234 + * conditional ({@code if (result.get() == null) tryComplete();})
235 + * because no further bookkeeping is required to manage completions
236 + * once the root task completes.
237 + *
238   * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
239   * results of multiple subtasks usually need to access these results
240 < * in method {@link #onCompletion}. As illustrated in the following
240 > * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
241   * class (that performs a simplified form of map-reduce where mappings
242   * and reductions are all of type {@code E}), one way to do this in
243   * divide and conquer designs is to have each subtask record its
# Line 182 | Line 251 | package jsr166e;
251   * class MyMapper<E> { E apply(E v) {  ...  } }
252   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
253   * class MapReducer<E> extends CountedCompleter<E> {
254 < *     final E[] array; final MyMapper<E> mapper;
255 < *     final MyReducer<E> reducer; final int lo, hi;
256 < *     MapReducer<E> sibling;
257 < *     E result;
258 < *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
259 < *                MyReducer<E> reducer, int lo, int hi) {
260 < *         super(p);
261 < *         this.array = array; this.mapper = mapper;
262 < *         this.reducer = reducer; this.lo = lo; this.hi = hi;
254 > *   final E[] array; final MyMapper<E> mapper;
255 > *   final MyReducer<E> reducer; final int lo, hi;
256 > *   MapReducer<E> sibling;
257 > *   E result;
258 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
259 > *              MyReducer<E> reducer, int lo, int hi) {
260 > *     super(p);
261 > *     this.array = array; this.mapper = mapper;
262 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
263 > *   }
264 > *   public void compute() {
265 > *     if (hi - lo >= 2) {
266 > *       int mid = (lo + hi) >>> 1;
267 > *       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
268 > *       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
269 > *       left.sibling = right;
270 > *       right.sibling = left;
271 > *       setPendingCount(1); // only right is pending
272 > *       right.fork();
273 > *       left.compute();     // directly execute left
274   *     }
275 < *     public void compute() {
276 < *         if (hi - lo >= 2) {
277 < *             int mid = (lo + hi) >>> 1;
278 < *             MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
199 < *             MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
200 < *             left.sibling = right;
201 < *             right.sibling = left;
202 < *             setPendingCount(1); // only right is pending
203 < *             right.fork();
204 < *             left.compute();     // directly execute left
205 < *         }
206 < *         else {
207 < *             if (hi > lo)
208 < *                 result = mapper.apply(array[lo]);
209 < *             tryComplete();
210 < *         }
275 > *     else {
276 > *       if (hi > lo)
277 > *           result = mapper.apply(array[lo]);
278 > *       tryComplete();
279   *     }
280 < *     public void onCompletion(CountedCompleter caller) {
281 < *         if (caller != this) {
282 < *            MapReducer<E> child = (MapReducer<E>)caller;
283 < *            MapReducer<E> sib = child.sibling;
284 < *            if (sib == null || sib.result == null)
285 < *                result = child.result;
286 < *            else
287 < *                result = reducer.apply(child.result, sib.result);
288 < *         }
280 > *   }
281 > *   public void onCompletion(CountedCompleter<?> caller) {
282 > *     if (caller != this) {
283 > *       MapReducer<E> child = (MapReducer<E>)caller;
284 > *       MapReducer<E> sib = child.sibling;
285 > *       if (sib == null || sib.result == null)
286 > *         result = child.result;
287 > *       else
288 > *         result = reducer.apply(child.result, sib.result);
289   *     }
290 < *     public E getRawResult() { return result; }
290 > *   }
291 > *   public E getRawResult() { return result; }
292   *
293 < *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
294 < *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
295 < *         return pool.invoke(new MapReducer<E>(null, array, mapper,
296 < *                                              reducer, 0, array.length));
293 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
294 > *     return new MapReducer<E>(null, array, mapper, reducer,
295 > *                              0, array.length).invoke();
296 > *   }
297 > * }}</pre>
298 > *
299 > * Here, method {@code onCompletion} takes a form common to many
300 > * completion designs that combine results. This callback-style method
301 > * is triggered once per task, in either of the two different contexts
302 > * in which the pending count is, or becomes, zero: (1) by a task
303 > * itself, if its pending count is zero upon invocation of {@code
304 > * tryComplete}, or (2) by any of its subtasks when they complete and
305 > * decrement the pending count to zero. The {@code caller} argument
306 > * distinguishes cases.  Most often, when the caller is {@code this},
307 > * no action is necessary. Otherwise the caller argument can be used
308 > * (usually via a cast) to supply a value (and/or links to other
309 > * values) to be combined.  Assuming proper use of pending counts, the
310 > * actions inside {@code onCompletion} occur (once) upon completion of
311 > * a task and its subtasks. No additional synchronization is required
312 > * within this method to ensure thread safety of accesses to fields of
313 > * this task or other completed tasks.
314 > *
315 > * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
316 > * process completions is inapplicable or inconvenient, you can use
317 > * methods {@link #firstComplete} and {@link #nextComplete} to create
318 > * custom traversals.  For example, to define a MapReducer that only
319 > * splits out right-hand tasks in the form of the third ForEach
320 > * example, the completions must cooperatively reduce along
321 > * unexhausted subtask links, which can be done as follows:
322 > *
323 > * <pre> {@code
324 > * class MapReducer<E> extends CountedCompleter<E> { // version 2
325 > *   final E[] array; final MyMapper<E> mapper;
326 > *   final MyReducer<E> reducer; final int lo, hi;
327 > *   MapReducer<E> forks, next; // record subtask forks in list
328 > *   E result;
329 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
330 > *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
331 > *     super(p);
332 > *     this.array = array; this.mapper = mapper;
333 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
334 > *     this.next = next;
335 > *   }
336 > *   public void compute() {
337 > *     int l = lo,  h = hi;
338 > *     while (h - l >= 2) {
339 > *       int mid = (l + h) >>> 1;
340 > *       addToPendingCount(1);
341 > *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
342 > *       h = mid;
343   *     }
344 < * } }</pre>
344 > *     if (h > l)
345 > *       result = mapper.apply(array[l]);
346 > *     // process completions by reducing along and advancing subtask links
347 > *     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
348 > *       for (MapReducer t = (MapReducer)c, s = t.forks;  s != null; s = t.forks = s.next)
349 > *         t.result = reducer.apply(t.result, s.result);
350 > *     }
351 > *   }
352 > *   public E getRawResult() { return result; }
353 > *
354 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
355 > *     return new MapReducer<E>(null, array, mapper, reducer,
356 > *                              0, array.length, null).invoke();
357 > *   }
358 > * }}</pre>
359   *
360   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
361   * forked, but instead serve as bits of plumbing in other designs;
362 < * including those in which the completion of one of more async tasks
362 > * including those in which the completion of one or more async tasks
363   * triggers another async task. For example:
364   *
365   * <pre> {@code
366   * class HeaderBuilder extends CountedCompleter<...> { ... }
367   * class BodyBuilder extends CountedCompleter<...> { ... }
368   * class PacketSender extends CountedCompleter<...> {
369 < *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 < *     public void compute() { } // never called
371 < *     public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
369 > *   PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 > *   public void compute() { } // never called
371 > *   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
372   * }
373   * // sample use:
374   * PacketSender p = new PacketSender();
# Line 262 | Line 391 | public abstract class CountedCompleter<T
391       * Creates a new CountedCompleter with the given completer
392       * and initial pending count.
393       *
394 <     * @param completer this tasks completer, or {@code null} if none
394 >     * @param completer this task's completer, or {@code null} if none
395       * @param initialPendingCount the initial pending count
396       */
397      protected CountedCompleter(CountedCompleter<?> completer,
# Line 275 | Line 404 | public abstract class CountedCompleter<T
404       * Creates a new CountedCompleter with the given completer
405       * and an initial pending count of zero.
406       *
407 <     * @param completer this tasks completer, or {@code null} if none
407 >     * @param completer this task's completer, or {@code null} if none
408       */
409      protected CountedCompleter(CountedCompleter<?> completer) {
410          this.completer = completer;
# Line 296 | Line 425 | public abstract class CountedCompleter<T
425  
426      /**
427       * Performs an action when method {@link #tryComplete} is invoked
428 <     * and there are no pending counts, or when the unconditional
428 >     * and the pending count is zero, or when the unconditional
429       * method {@link #complete} is invoked.  By default, this method
430 <     * does nothing.
430 >     * does nothing. You can distinguish cases by checking the
431 >     * identity of the given caller argument. If not equal to {@code
432 >     * this}, then it is typically a subtask that may contain results
433 >     * (and/or links to other results) to combine.
434       *
435       * @param caller the task invoking this method (which may
436 <     * be this task itself).
436 >     * be this task itself)
437       */
438      public void onCompletion(CountedCompleter<?> caller) {
439      }
440  
441      /**
442 <     * Performs an action when method {@link #completeExceptionally}
443 <     * is invoked or method {@link #compute} throws an exception, and
444 <     * this task has not otherwise already completed normally. On
445 <     * entry to this method, this task {@link
446 <     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
447 <     * method controls further propagation: If {@code true} and this
448 <     * task has a completer, then this completer is also completed
449 <     * exceptionally.  The default implementation of this method does
450 <     * nothing except return {@code true}.
442 >     * Performs an action when method {@link
443 >     * #completeExceptionally(Throwable)} is invoked or method {@link
444 >     * #compute} throws an exception, and this task has not already
445 >     * otherwise completed normally. On entry to this method, this task
446 >     * {@link ForkJoinTask#isCompletedAbnormally}.  The return value
447 >     * of this method controls further propagation: If {@code true}
448 >     * and this task has a completer that has not completed, then that
449 >     * completer is also completed exceptionally, with the same
450 >     * exception as this completer.  The default implementation of
451 >     * this method does nothing except return {@code true}.
452       *
453       * @param ex the exception
454       * @param caller the task invoking this method (which may
455 <     * be this task itself).
456 <     * @return true if this exception should be propagated to this
457 <     * tasks completer, if one exists.
455 >     * be this task itself)
456 >     * @return {@code true} if this exception should be propagated to this
457 >     * task's completer, if one exists
458       */
459      public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
460          return true;
# Line 361 | Line 494 | public abstract class CountedCompleter<T
494       * @param delta the value to add
495       */
496      public final void addToPendingCount(int delta) {
497 <        int c; // note: can replace with intrinsic in jdk8
497 >        int c;
498          do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
499      }
500  
# Line 371 | Line 504 | public abstract class CountedCompleter<T
504       *
505       * @param expected the expected value
506       * @param count the new value
507 <     * @return true is successful
507 >     * @return {@code true} if successful
508       */
509      public final boolean compareAndSetPendingCount(int expected, int count) {
510          return U.compareAndSwapInt(this, PENDING, expected, count);
511      }
512  
513      /**
514 +     * If the pending count is nonzero, (atomically) decrements it.
515 +     *
516 +     * @return the initial (undecremented) pending count holding on entry
517 +     * to this method
518 +     */
519 +    public final int decrementPendingCountUnlessZero() {
520 +        int c;
521 +        do {} while ((c = pending) != 0 &&
522 +                     !U.compareAndSwapInt(this, PENDING, c, c - 1));
523 +        return c;
524 +    }
525 +
526 +    /**
527 +     * Returns the root of the current computation; i.e., this
528 +     * task if it has no completer, else its completer's root.
529 +     *
530 +     * @return the root of the current computation
531 +     */
532 +    public final CountedCompleter<?> getRoot() {
533 +        CountedCompleter<?> a = this, p;
534 +        while ((p = a.completer) != null)
535 +            a = p;
536 +        return a;
537 +    }
538 +
539 +    /**
540       * If the pending count is nonzero, decrements the count;
541 <     * otherwise invokes {@link #onCompletion} and then similarly
542 <     * tries to complete this task's completer, if one exists,
543 <     * else marks this task as complete.
541 >     * otherwise invokes {@link #onCompletion(CountedCompleter)}
542 >     * and then similarly tries to complete this task's completer,
543 >     * if one exists, else marks this task as complete.
544       */
545      public final void tryComplete() {
546          CountedCompleter<?> a = this, s = a;
# Line 399 | Line 558 | public abstract class CountedCompleter<T
558      }
559  
560      /**
561 <     * Regardless of pending count, invokes {@link #onCompletion},
562 <     * marks this task as complete and further triggers {@link
563 <     * #tryComplete} on this task's completer, if one exists. This
564 <     * method may be useful when forcing completion as soon as any one
565 <     * (versus all) of several subtask results are obtained.  The
566 <     * given rawResult is used as an argument to {@link #setRawResult}
567 <     * before marking this task as complete; its value is meaningful
568 <     * only for classes overriding {@code setRawResult}.
561 >     * Equivalent to {@link #tryComplete} but does not invoke {@link
562 >     * #onCompletion(CountedCompleter)} along the completion path:
563 >     * If the pending count is nonzero, decrements the count;
564 >     * otherwise, similarly tries to complete this task's completer, if
565 >     * one exists, else marks this task as complete. This method may be
566 >     * useful in cases where {@code onCompletion} should not, or need
567 >     * not, be invoked for each completer in a computation.
568 >     */
569 >    public final void propagateCompletion() {
570 >        CountedCompleter<?> a = this, s = a;
571 >        for (int c;;) {
572 >            if ((c = a.pending) == 0) {
573 >                if ((a = (s = a).completer) == null) {
574 >                    s.quietlyComplete();
575 >                    return;
576 >                }
577 >            }
578 >            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
579 >                return;
580 >        }
581 >    }
582 >
583 >    /**
584 >     * Regardless of pending count, invokes
585 >     * {@link #onCompletion(CountedCompleter)}, marks this task as
586 >     * complete and further triggers {@link #tryComplete} on this
587 >     * task's completer, if one exists.  The given rawResult is
588 >     * used as an argument to {@link #setRawResult} before invoking
589 >     * {@link #onCompletion(CountedCompleter)} or marking this task
590 >     * as complete; its value is meaningful only for classes
591 >     * overriding {@code setRawResult}.  This method does not modify
592 >     * the pending count.
593 >     *
594 >     * <p>This method may be useful when forcing completion as soon as
595 >     * any one (versus all) of several subtask results are obtained.
596 >     * However, in the common (and recommended) case in which {@code
597 >     * setRawResult} is not overridden, this effect can be obtained
598 >     * more simply using {@code quietlyCompleteRoot();}.
599       *
600       * @param rawResult the raw result
601       */
602      public void complete(T rawResult) {
603          CountedCompleter<?> p;
415        onCompletion(this);
604          setRawResult(rawResult);
605 +        onCompletion(this);
606          quietlyComplete();
607          if ((p = completer) != null)
608              p.tryComplete();
609      }
610  
611 +
612 +    /**
613 +     * If this task's pending count is zero, returns this task;
614 +     * otherwise decrements its pending count and returns {@code
615 +     * null}. This method is designed to be used with {@link
616 +     * #nextComplete} in completion traversal loops.
617 +     *
618 +     * @return this task, if pending count was zero, else {@code null}
619 +     */
620 +    public final CountedCompleter<?> firstComplete() {
621 +        for (int c;;) {
622 +            if ((c = pending) == 0)
623 +                return this;
624 +            else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
625 +                return null;
626 +        }
627 +    }
628 +
629 +    /**
630 +     * If this task does not have a completer, invokes {@link
631 +     * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
632 +     * the completer's pending count is non-zero, decrements that
633 +     * pending count and returns {@code null}.  Otherwise, returns the
634 +     * completer.  This method can be used as part of a completion
635 +     * traversal loop for homogeneous task hierarchies:
636 +     *
637 +     * <pre> {@code
638 +     * for (CountedCompleter<?> c = firstComplete();
639 +     *      c != null;
640 +     *      c = c.nextComplete()) {
641 +     *   // ... process c ...
642 +     * }}</pre>
643 +     *
644 +     * @return the completer, or {@code null} if none
645 +     */
646 +    public final CountedCompleter<?> nextComplete() {
647 +        CountedCompleter<?> p;
648 +        if ((p = completer) != null)
649 +            return p.firstComplete();
650 +        else {
651 +            quietlyComplete();
652 +            return null;
653 +        }
654 +    }
655 +
656      /**
657 <     * Support for FJT exception propagation
657 >     * Equivalent to {@code getRoot().quietlyComplete()}.
658 >     */
659 >    public final void quietlyCompleteRoot() {
660 >        for (CountedCompleter<?> a = this, p;;) {
661 >            if ((p = a.completer) == null) {
662 >                a.quietlyComplete();
663 >                return;
664 >            }
665 >            a = p;
666 >        }
667 >    }
668 >
669 >    /**
670 >     * Supports ForkJoinTask exception propagation.
671       */
672      void internalPropagateException(Throwable ex) {
673          CountedCompleter<?> a = this, s = a;
674          while (a.onExceptionalCompletion(ex, s) &&
675 <               (a = (s = a).completer) != null && a.status >= 0)
676 <            a.recordExceptionalCompletion(ex);
675 >               (a = (s = a).completer) != null && a.status >= 0 &&
676 >               a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
677 >            ;
678      }
679  
680      /**
681 <     * Implements execution conventions for CountedCompleters
681 >     * Implements execution conventions for CountedCompleters.
682       */
683      protected final boolean exec() {
684          compute();
# Line 438 | Line 686 | public abstract class CountedCompleter<T
686      }
687  
688      /**
689 <     * Returns the result of the computation. By default
689 >     * Returns the result of the computation.  By default,
690       * returns {@code null}, which is appropriate for {@code Void}
691 <     * actions, but in other cases should be overridden.
691 >     * actions, but in other cases should be overridden, almost
692 >     * always to return a field or function of a field that
693 >     * holds the result upon completion.
694       *
695       * @return the result of the computation
696       */
# Line 449 | Line 699 | public abstract class CountedCompleter<T
699      /**
700       * A method that result-bearing CountedCompleters may optionally
701       * use to help maintain result data.  By default, does nothing.
702 +     * Overrides are not recommended. However, if this method is
703 +     * overridden to update existing objects or fields, then it must
704 +     * in general be defined to be thread-safe.
705       */
706      protected void setRawResult(T t) { }
707  
# Line 465 | Line 718 | public abstract class CountedCompleter<T
718          }
719      }
720  
468
721      /**
722       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
723       * Replace with a simple call to Unsafe.getUnsafe when integrating
# Line 476 | Line 728 | public abstract class CountedCompleter<T
728      private static sun.misc.Unsafe getUnsafe() {
729          try {
730              return sun.misc.Unsafe.getUnsafe();
731 <        } catch (SecurityException se) {
732 <            try {
733 <                return java.security.AccessController.doPrivileged
734 <                    (new java.security
735 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
736 <                        public sun.misc.Unsafe run() throws Exception {
737 <                            java.lang.reflect.Field f = sun.misc
738 <                                .Unsafe.class.getDeclaredField("theUnsafe");
739 <                            f.setAccessible(true);
740 <                            return (sun.misc.Unsafe) f.get(null);
741 <                        }});
742 <            } catch (java.security.PrivilegedActionException e) {
743 <                throw new RuntimeException("Could not initialize intrinsics",
744 <                                           e.getCause());
745 <            }
731 >        } catch (SecurityException tryReflectionInstead) {}
732 >        try {
733 >            return java.security.AccessController.doPrivileged
734 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
735 >                public sun.misc.Unsafe run() throws Exception {
736 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
737 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
738 >                        f.setAccessible(true);
739 >                        Object x = f.get(null);
740 >                        if (k.isInstance(x))
741 >                            return k.cast(x);
742 >                    }
743 >                    throw new NoSuchFieldError("the Unsafe");
744 >                }});
745 >        } catch (java.security.PrivilegedActionException e) {
746 >            throw new RuntimeException("Could not initialize intrinsics",
747 >                                       e.getCause());
748          }
749      }
496
750   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines