ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166e/CountedCompleter.java (file contents):
Revision 1.13 by dl, Mon Nov 19 18:12:28 2012 UTC vs.
Revision 1.30 by dl, Wed Jun 19 14:55:40 2013 UTC

# Line 8 | Line 8 | package jsr166e;
8  
9   /**
10   * A {@link ForkJoinTask} with a completion action performed when
11 < * triggered and there are no remaining pending
12 < * actions. CountedCompleters are in general more robust in the
13 < * presence of subtask stalls and blockage than are other forms for
14 < * ForkJoinTasks, but are in general less intuitive to program.  Uses
15 < * of CountedCompleter are similar to those of other completion based
11 > * triggered and there are no remaining pending actions.
12 > * CountedCompleters are in general more robust in the
13 > * presence of subtask stalls and blockage than are other forms of
14 > * ForkJoinTasks, but are less intuitive to program.  Uses of
15 > * CountedCompleter are similar to those of other completion based
16   * components (such as {@link java.nio.channels.CompletionHandler})
17   * except that multiple <em>pending</em> completions may be necessary
18 < * to trigger the {@link #onCompletion} action, not just one. Unless
19 < * initialized otherwise, the {@link #getPendingCount pending count}
20 < * starts at zero, but may be (atomically) changed using methods
21 < * {@link #setPendingCount}, {@link #addToPendingCount}, and {@link
22 < * #compareAndSetPendingCount}. Upon invocation of {@link
18 > * to trigger the completion action {@link #onCompletion(CountedCompleter)},
19 > * not just one.
20 > * Unless initialized otherwise, the {@linkplain #getPendingCount pending
21 > * count} starts at zero, but may be (atomically) changed using
22 > * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
23 > * {@link #compareAndSetPendingCount}. Upon invocation of {@link
24   * #tryComplete}, if the pending action count is nonzero, it is
25   * decremented; otherwise, the completion action is performed, and if
26   * this completer itself has a completer, the process is continued
# Line 30 | Line 31 | package jsr166e;
31   * internal bookkeeping. In particular, the identities of pending
32   * tasks are not maintained. As illustrated below, you can create
33   * subclasses that do record some or all pending tasks or their
34 < * results when needed. Because CountedCompleters provide only basic
34 > * results when needed.  As illustrated below, utility methods
35 > * supporting customization of completion traversals are also
36 > * provided. However, because CountedCompleters provide only basic
37   * synchronization mechanisms, it may be useful to create further
38 < * abstract subclasses that maintain linkages and fields and support
39 < * methods appropriate for a set of related usages.
38 > * abstract subclasses that maintain linkages, fields, and additional
39 > * support methods appropriate for a set of related usages.
40   *
41   * <p>A concrete CountedCompleter class must define method {@link
42   * #compute}, that should in most cases (as illustrated below), invoke
43   * {@code tryComplete()} once before returning. The class may also
44 < * optionally override method {@link #onCompletion} to perform an
45 < * action upon normal completion, and method {@link
46 < * #onExceptionalCompletion} to perform an action upon any exception.
44 > * optionally override method {@link #onCompletion(CountedCompleter)}
45 > * to perform an action upon normal completion, and method
46 > * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
47 > * perform an action upon any exception.
48   *
49   * <p>CountedCompleters most often do not bear results, in which case
50   * they are normally declared as {@code CountedCompleter<Void>}, and
# Line 51 | Line 55 | package jsr166e;
55   * function of one or more fields) of the CountedCompleter object that
56   * holds the result upon completion. Method {@link #setRawResult} by
57   * default plays no role in CountedCompleters.  It is possible, but
58 < * not usually applicable, to override this method to maintain other
58 > * rarely applicable, to override this method to maintain other
59   * objects or fields holding result data.
60   *
61   * <p>A CountedCompleter that does not itself have a completer (i.e.,
# Line 61 | Line 65 | package jsr166e;
65   * only as an internal helper for other computations, so its own task
66   * status (as reported in methods such as {@link ForkJoinTask#isDone})
67   * is arbitrary; this status changes only upon explicit invocations of
68 < * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
69 < * ForkJoinTask#completeExceptionally} or upon exceptional completion
70 < * of method {@code compute}. Upon any exceptional completion, the
71 < * exception may be relayed to a task's completer (and its completer,
72 < * and so on), if one exists and it has not otherwise already
73 < * completed. Similarly, cancelling an internal CountedCompleter has
74 < * only a local effect on that completer, so is not often useful.
68 > * {@link #complete}, {@link ForkJoinTask#cancel},
69 > * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
70 > * exceptional completion of method {@code compute}. Upon any
71 > * exceptional completion, the exception may be relayed to a task's
72 > * completer (and its completer, and so on), if one exists and it has
73 > * not otherwise already completed. Similarly, cancelling an internal
74 > * CountedCompleter has only a local effect on that completer, so is
75 > * not often useful.
76   *
77   * <p><b>Sample Usages.</b>
78   *
# Line 81 | Line 86 | package jsr166e;
86   * subdivided) to each element of an array or collection; especially
87   * when the operation takes a significantly different amount of time
88   * to complete for some elements than others, either because of
89 < * intrinsic variation (for example IO) or auxiliary effects such as
89 > * intrinsic variation (for example I/O) or auxiliary effects such as
90   * garbage collection.  Because CountedCompleters provide their own
91   * continuations, other threads need not block waiting to perform
92   * them.
# Line 94 | Line 99 | package jsr166e;
99   * improve load balancing. In the recursive case, the second of each
100   * pair of subtasks to finish triggers completion of its parent
101   * (because no result combination is performed, the default no-op
102 < * implementation of method {@code onCompletion} is not overridden). A
103 < * static utility method sets up the base task and invokes it
102 > * implementation of method {@code onCompletion} is not overridden).
103 > * A static utility method sets up the base task and invokes it
104   * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
105   *
106   * <pre> {@code
# Line 103 | Line 108 | package jsr166e;
108   *
109   * class ForEach<E> extends CountedCompleter<Void> {
110   *
111 < *     public static <E> void forEach(E[] array, MyOperation<E> op) {
112 < *         new ForEach<E>(null, array, op, 0, array.length).invoke();
113 < *     }
114 < *
115 < *     final E[] array; final MyOperation<E> op; final int lo, hi;
116 < *     ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
117 < *         super(p);
118 < *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
119 < *     }
120 < *
121 < *     public void compute() { // version 1
122 < *         if (hi - lo >= 2) {
123 < *             int mid = (lo + hi) >>> 1;
124 < *             setPendingCount(2); // must set pending count before fork
125 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
126 < *             new ForEach(this, array, op, lo, mid).fork(); // left child
127 < *         }
128 < *         else if (hi > lo)
129 < *             op.apply(array[lo]);
130 < *         tryComplete();
131 < *     }
132 < * } }</pre>
111 > *   public static <E> void forEach(E[] array, MyOperation<E> op) {
112 > *     new ForEach<E>(null, array, op, 0, array.length).invoke();
113 > *   }
114 > *
115 > *   final E[] array; final MyOperation<E> op; final int lo, hi;
116 > *   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
117 > *     super(p);
118 > *     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
119 > *   }
120 > *
121 > *   public void compute() { // version 1
122 > *     if (hi - lo >= 2) {
123 > *       int mid = (lo + hi) >>> 1;
124 > *       setPendingCount(2); // must set pending count before fork
125 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
126 > *       new ForEach(this, array, op, lo, mid).fork(); // left child
127 > *     }
128 > *     else if (hi > lo)
129 > *       op.apply(array[lo]);
130 > *     tryComplete();
131 > *   }
132 > * }}</pre>
133   *
134   * This design can be improved by noticing that in the recursive case,
135   * the task has nothing to do after forking its right task, so can
136   * directly invoke its left task before returning. (This is an analog
137   * of tail recursion removal.)  Also, because the task returns upon
138   * executing its left task (rather than falling through to invoke
139 < * tryComplete) the pending count is set to one:
139 > * {@code tryComplete}) the pending count is set to one:
140   *
141   * <pre> {@code
142   * class ForEach<E> ...
143 < *     public void compute() { // version 2
144 < *         if (hi - lo >= 2) {
145 < *             int mid = (lo + hi) >>> 1;
146 < *             setPendingCount(1); // only one pending
147 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
148 < *             new ForEach(this, array, op, lo, mid).compute(); // direct invoke
149 < *         }
150 < *         else {
151 < *             if (hi > lo)
152 < *                 op.apply(array[lo]);
153 < *             tryComplete();
149 < *         }
143 > *   public void compute() { // version 2
144 > *     if (hi - lo >= 2) {
145 > *       int mid = (lo + hi) >>> 1;
146 > *       setPendingCount(1); // only one pending
147 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
148 > *       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
149 > *     }
150 > *     else {
151 > *       if (hi > lo)
152 > *         op.apply(array[lo]);
153 > *       tryComplete();
154   *     }
155 + *   }
156   * }</pre>
157   *
158 < * As a further improvement, notice that the left task need not even
159 < * exist.  Instead of creating a new one, we can iterate using the
160 < * original task, and add a pending count for each fork.
158 > * As a further improvement, notice that the left task need not even exist.
159 > * Instead of creating a new one, we can iterate using the original task,
160 > * and add a pending count for each fork.  Additionally, because no task
161 > * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
162 > * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
163   *
164   * <pre> {@code
165   * class ForEach<E> ...
166 < *     public void compute() { // version 3
167 < *         int l = lo,  h = hi;
168 < *         while (h - l >= 2) {
169 < *             int mid = (l + h) >>> 1;
170 < *             addToPendingCount(1);
171 < *             new ForEach(this, array, op, mid, h).fork(); // right child
172 < *             h = mid;
173 < *         }
174 < *         if (h > l)
175 < *             op.apply(array[l]);
176 < *         tryComplete();
177 < *     }
166 > *   public void compute() { // version 3
167 > *     int l = lo,  h = hi;
168 > *     while (h - l >= 2) {
169 > *       int mid = (l + h) >>> 1;
170 > *       addToPendingCount(1);
171 > *       new ForEach(this, array, op, mid, h).fork(); // right child
172 > *       h = mid;
173 > *     }
174 > *     if (h > l)
175 > *       op.apply(array[l]);
176 > *     propagateCompletion();
177 > *   }
178   * }</pre>
179   *
180   * Additional improvements of such classes might entail precomputing
# Line 176 | Line 183 | package jsr166e;
183   * instead of two per iteration, and using an adaptive threshold
184   * instead of always subdividing down to single elements.
185   *
186 + * <p><b>Searching.</b> A tree of CountedCompleters can search for a
187 + * value or property in different parts of a data structure, and
188 + * report a result in an {@link
189 + * java.util.concurrent.atomic.AtomicReference AtomicReference} as
190 + * soon as one is found. The others can poll the result to avoid
191 + * unnecessary work. (You could additionally {@linkplain #cancel
192 + * cancel} other tasks, but it is usually simpler and more efficient
193 + * to just let them notice that the result is set and if so skip
194 + * further processing.)  Illustrating again with an array using full
195 + * partitioning (again, in practice, leaf tasks will almost always
196 + * process more than one element):
197 + *
198 + * <pre> {@code
199 + * class Searcher<E> extends CountedCompleter<E> {
200 + *   final E[] array; final AtomicReference<E> result; final int lo, hi;
201 + *   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
202 + *     super(p);
203 + *     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
204 + *   }
205 + *   public E getRawResult() { return result.get(); }
206 + *   public void compute() { // similar to ForEach version 3
207 + *     int l = lo,  h = hi;
208 + *     while (result.get() == null && h >= l) {
209 + *       if (h - l >= 2) {
210 + *         int mid = (l + h) >>> 1;
211 + *         addToPendingCount(1);
212 + *         new Searcher(this, array, result, mid, h).fork();
213 + *         h = mid;
214 + *       }
215 + *       else {
216 + *         E x = array[l];
217 + *         if (matches(x) && result.compareAndSet(null, x))
218 + *           quietlyCompleteRoot(); // root task is now joinable
219 + *         break;
220 + *       }
221 + *     }
222 + *     tryComplete(); // normally complete whether or not found
223 + *   }
224 + *   boolean matches(E e) { ... } // return true if found
225 + *
226 + *   public static <E> E search(E[] array) {
227 + *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
228 + *   }
229 + * }}</pre>
230 + *
231 + * In this example, as well as others in which tasks have no other
232 + * effects except to compareAndSet a common result, the trailing
233 + * unconditional invocation of {@code tryComplete} could be made
234 + * conditional ({@code if (result.get() == null) tryComplete();})
235 + * because no further bookkeeping is required to manage completions
236 + * once the root task completes.
237 + *
238   * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
239   * results of multiple subtasks usually need to access these results
240 < * in method {@link #onCompletion}. As illustrated in the following
240 > * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
241   * class (that performs a simplified form of map-reduce where mappings
242   * and reductions are all of type {@code E}), one way to do this in
243   * divide and conquer designs is to have each subtask record its
# Line 192 | Line 251 | package jsr166e;
251   * class MyMapper<E> { E apply(E v) {  ...  } }
252   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
253   * class MapReducer<E> extends CountedCompleter<E> {
254 < *     final E[] array; final MyMapper<E> mapper;
255 < *     final MyReducer<E> reducer; final int lo, hi;
256 < *     MapReducer<E> sibling;
257 < *     E result;
258 < *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
259 < *                MyReducer<E> reducer, int lo, int hi) {
260 < *         super(p);
261 < *         this.array = array; this.mapper = mapper;
262 < *         this.reducer = reducer; this.lo = lo; this.hi = hi;
263 < *     }
264 < *     public void compute() {
265 < *         if (hi - lo >= 2) {
266 < *             int mid = (lo + hi) >>> 1;
267 < *             MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
268 < *             MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
269 < *             left.sibling = right;
270 < *             right.sibling = left;
271 < *             setPendingCount(1); // only right is pending
272 < *             right.fork();
273 < *             left.compute();     // directly execute left
274 < *         }
275 < *         else {
276 < *             if (hi > lo)
277 < *                 result = mapper.apply(array[lo]);
278 < *             tryComplete();
279 < *         }
280 < *     }
281 < *     public void onCompletion(CountedCompleter caller) {
282 < *         if (caller != this) {
283 < *            MapReducer<E> child = (MapReducer<E>)caller;
284 < *            MapReducer<E> sib = child.sibling;
285 < *            if (sib == null || sib.result == null)
286 < *                result = child.result;
287 < *            else
288 < *                result = reducer.apply(child.result, sib.result);
289 < *         }
290 < *     }
291 < *     public E getRawResult() { return result; }
292 < *
293 < *     public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
294 < *         return new MapReducer<E>(null, array, mapper, reducer,
295 < *                                  0, array.length).invoke();
296 < *     }
297 < * } }</pre>
254 > *   final E[] array; final MyMapper<E> mapper;
255 > *   final MyReducer<E> reducer; final int lo, hi;
256 > *   MapReducer<E> sibling;
257 > *   E result;
258 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
259 > *              MyReducer<E> reducer, int lo, int hi) {
260 > *     super(p);
261 > *     this.array = array; this.mapper = mapper;
262 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
263 > *   }
264 > *   public void compute() {
265 > *     if (hi - lo >= 2) {
266 > *       int mid = (lo + hi) >>> 1;
267 > *       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
268 > *       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
269 > *       left.sibling = right;
270 > *       right.sibling = left;
271 > *       setPendingCount(1); // only right is pending
272 > *       right.fork();
273 > *       left.compute();     // directly execute left
274 > *     }
275 > *     else {
276 > *       if (hi > lo)
277 > *           result = mapper.apply(array[lo]);
278 > *       tryComplete();
279 > *     }
280 > *   }
281 > *   public void onCompletion(CountedCompleter<?> caller) {
282 > *     if (caller != this) {
283 > *       MapReducer<E> child = (MapReducer<E>)caller;
284 > *       MapReducer<E> sib = child.sibling;
285 > *       if (sib == null || sib.result == null)
286 > *         result = child.result;
287 > *       else
288 > *         result = reducer.apply(child.result, sib.result);
289 > *     }
290 > *   }
291 > *   public E getRawResult() { return result; }
292 > *
293 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
294 > *     return new MapReducer<E>(null, array, mapper, reducer,
295 > *                              0, array.length).invoke();
296 > *   }
297 > * }}</pre>
298   *
299   * Here, method {@code onCompletion} takes a form common to many
300   * completion designs that combine results. This callback-style method
# Line 247 | Line 306 | package jsr166e;
306   * distinguishes cases.  Most often, when the caller is {@code this},
307   * no action is necessary. Otherwise the caller argument can be used
308   * (usually via a cast) to supply a value (and/or links to other
309 < * values) to be combined.  Asuuming proper use of pending counts, the
309 > * values) to be combined.  Assuming proper use of pending counts, the
310   * actions inside {@code onCompletion} occur (once) upon completion of
311   * a task and its subtasks. No additional synchronization is required
312   * within this method to ensure thread safety of accesses to fields of
313   * this task or other completed tasks.
314   *
315 < * <p><b>Searching.</b> A tree of CountedCompleters can search for a
316 < * value or property in different parts of a data structure, and
317 < * report a result in an {@link java.util.concurrent.AtomicReference}
318 < * as soon as one is found. The others can poll the result to avoid
319 < * unnecessary work. (You could additionally {@link #cancel} other
320 < * tasks, but it is usually simpler and more efficient to just let
321 < * them notice that the result is set and if so skip further
263 < * processing.)  Illustrating again with an array using full
264 < * partitioning (again, in practice, leaf tasks will almost always
265 < * process more than one element):
315 > * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
316 > * process completions is inapplicable or inconvenient, you can use
317 > * methods {@link #firstComplete} and {@link #nextComplete} to create
318 > * custom traversals.  For example, to define a MapReducer that only
319 > * splits out right-hand tasks in the form of the third ForEach
320 > * example, the completions must cooperatively reduce along
321 > * unexhausted subtask links, which can be done as follows:
322   *
323   * <pre> {@code
324 < * class Searcher<E> extends CountedCompleter<E> {
325 < *     final E[] array; final AtomicReference<E> result; final int lo, hi;
326 < *     Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
327 < *         super(p);
328 < *         this.array = array; this.result = result; this.lo = lo; this.hi = hi;
329 < *     }
330 < *     public E getRawResult() { return result.get(); }
331 < *     public void compute() { // similar to ForEach version 3
332 < *         int l = lo,  h = hi;
333 < *         while (h - l >= 2 && result.get() == null) {
334 < *             int mid = (l + h) >>> 1;
335 < *             addToPendingCount(1);
336 < *             new Searcher(this, array, result, mid, h).fork();
337 < *             h = mid;
338 < *         }
339 < *         if (h > l && result.get() == null && matches(array[l]) &&
340 < *             result.compareAndSet(null, array[l]))
341 < *             getRoot().quietlyComplete(); // root task is now joinable
342 < *
343 < *         tryComplete(); // normally complete whether or not found
344 < *     }
345 < *     boolean matches(E e) { ... } // return true if found
346 < *
347 < *     public static <E> E search(E[] array) {
348 < *         return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
349 < *     }
350 < *}}</pre>
351 < *
352 < * In this example, as well as others in which tasks have no other
353 < * effects except to compareAndSet a common result, the trailing
354 < * unconditional invocation of {@code tryComplete} could be made
355 < * conditional ({@code if (result.get() == null) tryComplete();})
356 < * because no further bookkeeping is required to manage completions
357 < * once the root task completes.
324 > * class MapReducer<E> extends CountedCompleter<E> { // version 2
325 > *   final E[] array; final MyMapper<E> mapper;
326 > *   final MyReducer<E> reducer; final int lo, hi;
327 > *   MapReducer<E> forks, next; // record subtask forks in list
328 > *   E result;
329 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
330 > *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
331 > *     super(p);
332 > *     this.array = array; this.mapper = mapper;
333 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
334 > *     this.next = next;
335 > *   }
336 > *   public void compute() {
337 > *     int l = lo,  h = hi;
338 > *     while (h - l >= 2) {
339 > *       int mid = (l + h) >>> 1;
340 > *       addToPendingCount(1);
341 > *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
342 > *       h = mid;
343 > *     }
344 > *     if (h > l)
345 > *       result = mapper.apply(array[l]);
346 > *     // process completions by reducing along and advancing subtask links
347 > *     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
348 > *       for (MapReducer t = (MapReducer)c, s = t.forks;  s != null; s = t.forks = s.next)
349 > *         t.result = reducer.apply(t.result, s.result);
350 > *     }
351 > *   }
352 > *   public E getRawResult() { return result; }
353 > *
354 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
355 > *     return new MapReducer<E>(null, array, mapper, reducer,
356 > *                              0, array.length, null).invoke();
357 > *   }
358 > * }}</pre>
359   *
360   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
361   * forked, but instead serve as bits of plumbing in other designs;
362 < * including those in which the completion of one of more async tasks
362 > * including those in which the completion of one or more async tasks
363   * triggers another async task. For example:
364   *
365   * <pre> {@code
366   * class HeaderBuilder extends CountedCompleter<...> { ... }
367   * class BodyBuilder extends CountedCompleter<...> { ... }
368   * class PacketSender extends CountedCompleter<...> {
369 < *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 < *     public void compute() { } // never called
371 < *     public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
369 > *   PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 > *   public void compute() { } // never called
371 > *   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
372   * }
373   * // sample use:
374   * PacketSender p = new PacketSender();
# Line 334 | Line 391 | public abstract class CountedCompleter<T
391       * Creates a new CountedCompleter with the given completer
392       * and initial pending count.
393       *
394 <     * @param completer this tasks completer, or {@code null} if none
394 >     * @param completer this task's completer, or {@code null} if none
395       * @param initialPendingCount the initial pending count
396       */
397      protected CountedCompleter(CountedCompleter<?> completer,
# Line 347 | Line 404 | public abstract class CountedCompleter<T
404       * Creates a new CountedCompleter with the given completer
405       * and an initial pending count of zero.
406       *
407 <     * @param completer this tasks completer, or {@code null} if none
407 >     * @param completer this task's completer, or {@code null} if none
408       */
409      protected CountedCompleter(CountedCompleter<?> completer) {
410          this.completer = completer;
# Line 368 | Line 425 | public abstract class CountedCompleter<T
425  
426      /**
427       * Performs an action when method {@link #tryComplete} is invoked
428 <     * and there are no pending counts, or when the unconditional
428 >     * and the pending count is zero, or when the unconditional
429       * method {@link #complete} is invoked.  By default, this method
430       * does nothing. You can distinguish cases by checking the
431       * identity of the given caller argument. If not equal to {@code
# Line 376 | Line 433 | public abstract class CountedCompleter<T
433       * (and/or links to other results) to combine.
434       *
435       * @param caller the task invoking this method (which may
436 <     * be this task itself).
436 >     * be this task itself)
437       */
438      public void onCompletion(CountedCompleter<?> caller) {
439      }
440  
441      /**
442 <     * Performs an action when method {@link #completeExceptionally}
443 <     * is invoked or method {@link #compute} throws an exception, and
444 <     * this task has not otherwise already completed normally. On
445 <     * entry to this method, this task {@link
446 <     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
447 <     * method controls further propagation: If {@code true} and this
448 <     * task has a completer, then this completer is also completed
449 <     * exceptionally.  The default implementation of this method does
450 <     * nothing except return {@code true}.
442 >     * Performs an action when method {@link
443 >     * #completeExceptionally(Throwable)} is invoked or method {@link
444 >     * #compute} throws an exception, and this task has not already
445 >     * otherwise completed normally. On entry to this method, this task
446 >     * {@link ForkJoinTask#isCompletedAbnormally}.  The return value
447 >     * of this method controls further propagation: If {@code true}
448 >     * and this task has a completer that has not completed, then that
449 >     * completer is also completed exceptionally, with the same
450 >     * exception as this completer.  The default implementation of
451 >     * this method does nothing except return {@code true}.
452       *
453       * @param ex the exception
454       * @param caller the task invoking this method (which may
455 <     * be this task itself).
456 <     * @return true if this exception should be propagated to this
457 <     * tasks completer, if one exists.
455 >     * be this task itself)
456 >     * @return {@code true} if this exception should be propagated to this
457 >     * task's completer, if one exists
458       */
459      public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
460          return true;
# Line 436 | Line 494 | public abstract class CountedCompleter<T
494       * @param delta the value to add
495       */
496      public final void addToPendingCount(int delta) {
497 <        int c; // note: can replace with intrinsic in jdk8
497 >        int c;
498          do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
499      }
500  
# Line 446 | Line 504 | public abstract class CountedCompleter<T
504       *
505       * @param expected the expected value
506       * @param count the new value
507 <     * @return true is successful
507 >     * @return {@code true} if successful
508       */
509      public final boolean compareAndSetPendingCount(int expected, int count) {
510          return U.compareAndSwapInt(this, PENDING, expected, count);
511      }
512  
513      /**
514 +     * If the pending count is nonzero, (atomically) decrements it.
515 +     *
516 +     * @return the initial (undecremented) pending count holding on entry
517 +     * to this method
518 +     */
519 +    public final int decrementPendingCountUnlessZero() {
520 +        int c;
521 +        do {} while ((c = pending) != 0 &&
522 +                     !U.compareAndSwapInt(this, PENDING, c, c - 1));
523 +        return c;
524 +    }
525 +
526 +    /**
527       * Returns the root of the current computation; i.e., this
528       * task if it has no completer, else its completer's root.
529       *
# Line 467 | Line 538 | public abstract class CountedCompleter<T
538  
539      /**
540       * If the pending count is nonzero, decrements the count;
541 <     * otherwise invokes {@link #onCompletion} and then similarly
542 <     * tries to complete this task's completer, if one exists,
543 <     * else marks this task as complete.
541 >     * otherwise invokes {@link #onCompletion(CountedCompleter)}
542 >     * and then similarly tries to complete this task's completer,
543 >     * if one exists, else marks this task as complete.
544       */
545      public final void tryComplete() {
546          CountedCompleter<?> a = this, s = a;
# Line 487 | Line 558 | public abstract class CountedCompleter<T
558      }
559  
560      /**
561 <     * Regardless of pending count, invokes {@link #onCompletion},
562 <     * marks this task as complete and further triggers {@link
563 <     * #tryComplete} on this task's completer, if one exists.  The
564 <     * given rawResult is used as an argument to {@link #setRawResult}
565 <     * before invoking {@link #onCompletion} or marking this task as
566 <     * complete; its value is meaningful only for classes overriding
567 <     * {@code setRawResult}.
561 >     * Equivalent to {@link #tryComplete} but does not invoke {@link
562 >     * #onCompletion(CountedCompleter)} along the completion path:
563 >     * If the pending count is nonzero, decrements the count;
564 >     * otherwise, similarly tries to complete this task's completer, if
565 >     * one exists, else marks this task as complete. This method may be
566 >     * useful in cases where {@code onCompletion} should not, or need
567 >     * not, be invoked for each completer in a computation.
568 >     */
569 >    public final void propagateCompletion() {
570 >        CountedCompleter<?> a = this, s = a;
571 >        for (int c;;) {
572 >            if ((c = a.pending) == 0) {
573 >                if ((a = (s = a).completer) == null) {
574 >                    s.quietlyComplete();
575 >                    return;
576 >                }
577 >            }
578 >            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
579 >                return;
580 >        }
581 >    }
582 >
583 >    /**
584 >     * Regardless of pending count, invokes
585 >     * {@link #onCompletion(CountedCompleter)}, marks this task as
586 >     * complete and further triggers {@link #tryComplete} on this
587 >     * task's completer, if one exists.  The given rawResult is
588 >     * used as an argument to {@link #setRawResult} before invoking
589 >     * {@link #onCompletion(CountedCompleter)} or marking this task
590 >     * as complete; its value is meaningful only for classes
591 >     * overriding {@code setRawResult}.  This method does not modify
592 >     * the pending count.
593       *
594       * <p>This method may be useful when forcing completion as soon as
595       * any one (versus all) of several subtask results are obtained.
596       * However, in the common (and recommended) case in which {@code
597       * setRawResult} is not overridden, this effect can be obtained
598 <     * more simply using {@code getRoot().quietlyComplete();}.
598 >     * more simply using {@code quietlyCompleteRoot();}.
599       *
600       * @param rawResult the raw result
601       */
# Line 512 | Line 608 | public abstract class CountedCompleter<T
608              p.tryComplete();
609      }
610  
611 +
612      /**
613 <     * Support for FJT exception propagation
613 >     * If this task's pending count is zero, returns this task;
614 >     * otherwise decrements its pending count and returns {@code
615 >     * null}. This method is designed to be used with {@link
616 >     * #nextComplete} in completion traversal loops.
617 >     *
618 >     * @return this task, if pending count was zero, else {@code null}
619 >     */
620 >    public final CountedCompleter<?> firstComplete() {
621 >        for (int c;;) {
622 >            if ((c = pending) == 0)
623 >                return this;
624 >            else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
625 >                return null;
626 >        }
627 >    }
628 >
629 >    /**
630 >     * If this task does not have a completer, invokes {@link
631 >     * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
632 >     * the completer's pending count is non-zero, decrements that
633 >     * pending count and returns {@code null}.  Otherwise, returns the
634 >     * completer.  This method can be used as part of a completion
635 >     * traversal loop for homogeneous task hierarchies:
636 >     *
637 >     * <pre> {@code
638 >     * for (CountedCompleter<?> c = firstComplete();
639 >     *      c != null;
640 >     *      c = c.nextComplete()) {
641 >     *   // ... process c ...
642 >     * }}</pre>
643 >     *
644 >     * @return the completer, or {@code null} if none
645 >     */
646 >    public final CountedCompleter<?> nextComplete() {
647 >        CountedCompleter<?> p;
648 >        if ((p = completer) != null)
649 >            return p.firstComplete();
650 >        else {
651 >            quietlyComplete();
652 >            return null;
653 >        }
654 >    }
655 >
656 >    /**
657 >     * Equivalent to {@code getRoot().quietlyComplete()}.
658 >     */
659 >    public final void quietlyCompleteRoot() {
660 >        for (CountedCompleter<?> a = this, p;;) {
661 >            if ((p = a.completer) == null) {
662 >                a.quietlyComplete();
663 >                return;
664 >            }
665 >            a = p;
666 >        }
667 >    }
668 >
669 >    /**
670 >     * Supports ForkJoinTask exception propagation.
671       */
672      void internalPropagateException(Throwable ex) {
673          CountedCompleter<?> a = this, s = a;
674          while (a.onExceptionalCompletion(ex, s) &&
675 <               (a = (s = a).completer) != null && a.status >= 0)
676 <            a.recordExceptionalCompletion(ex);
675 >               (a = (s = a).completer) != null && a.status >= 0 &&
676 >               a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
677 >            ;
678      }
679  
680      /**
681 <     * Implements execution conventions for CountedCompleters
681 >     * Implements execution conventions for CountedCompleters.
682       */
683      protected final boolean exec() {
684          compute();
# Line 533 | Line 688 | public abstract class CountedCompleter<T
688      /**
689       * Returns the result of the computation. By default
690       * returns {@code null}, which is appropriate for {@code Void}
691 <     * actions, but in other cases should be overridden.
691 >     * actions, but in other cases should be overridden, almost
692 >     * always to return a field or function of a field that
693 >     * holds the result upon completion.
694       *
695       * @return the result of the computation
696       */
# Line 542 | Line 699 | public abstract class CountedCompleter<T
699      /**
700       * A method that result-bearing CountedCompleters may optionally
701       * use to help maintain result data.  By default, does nothing.
702 <     * If this method is overridden to update existing objects or
703 <     * fields, then it must in general be defined to be thread-safe.
702 >     * Overrides are not recommended. However, if this method is
703 >     * overridden to update existing objects or fields, then it must
704 >     * in general be defined to be thread-safe.
705       */
706      protected void setRawResult(T t) { }
707  
# Line 552 | Line 710 | public abstract class CountedCompleter<T
710      private static final long PENDING;
711      static {
712          try {
713 <            U = sun.misc.Unsafe.getUnsafe();
713 >            U = getUnsafe();
714              PENDING = U.objectFieldOffset
715                  (CountedCompleter.class.getDeclaredField("pending"));
716          } catch (Exception e) {
# Line 570 | Line 728 | public abstract class CountedCompleter<T
728      private static sun.misc.Unsafe getUnsafe() {
729          try {
730              return sun.misc.Unsafe.getUnsafe();
731 <        } catch (SecurityException se) {
732 <            try {
733 <                return java.security.AccessController.doPrivileged
734 <                    (new java.security
735 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
736 <                        public sun.misc.Unsafe run() throws Exception {
737 <                            java.lang.reflect.Field f = sun.misc
738 <                                .Unsafe.class.getDeclaredField("theUnsafe");
739 <                            f.setAccessible(true);
740 <                            return (sun.misc.Unsafe) f.get(null);
741 <                        }});
742 <            } catch (java.security.PrivilegedActionException e) {
743 <                throw new RuntimeException("Could not initialize intrinsics",
744 <                                           e.getCause());
745 <            }
731 >        } catch (SecurityException tryReflectionInstead) {}
732 >        try {
733 >            return java.security.AccessController.doPrivileged
734 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
735 >                public sun.misc.Unsafe run() throws Exception {
736 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
737 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
738 >                        f.setAccessible(true);
739 >                        Object x = f.get(null);
740 >                        if (k.isInstance(x))
741 >                            return k.cast(x);
742 >                    }
743 >                    throw new NoSuchFieldError("the Unsafe");
744 >                }});
745 >        } catch (java.security.PrivilegedActionException e) {
746 >            throw new RuntimeException("Could not initialize intrinsics",
747 >                                       e.getCause());
748          }
749      }
750   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines