ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166e/CountedCompleter.java (file contents):
Revision 1.1 by dl, Mon Aug 13 15:52:33 2012 UTC vs.
Revision 1.31 by jsr166, Sat Jul 27 19:53:27 2013 UTC

# Line 7 | Line 7
7   package jsr166e;
8  
9   /**
10 < * A resultless {@link ForkJoinTask} with a completion action
11 < * performed when triggered and there are no remaining pending
12 < * actions. Uses of CountedCompleter are similar to those of other
13 < * completion based components (such as {@link
14 < * java.nio.channels.CompletionHandler}) except that multiple
15 < * <em>pending</em> completions may be necessary to trigger the {@link
16 < * #onCompletion} action, not just one. Unless initialized otherwise,
17 < * the {@link #getPendingCount pending count} starts at zero, but may
18 < * be (atomically) changed using methods {@link #setPendingCount},
19 < * {@link #addToPendingCount}, and {@link
20 < * #compareAndSetPendingCount}. Upon invocation of {@link
10 > * A {@link ForkJoinTask} with a completion action performed when
11 > * triggered and there are no remaining pending actions.
12 > * CountedCompleters are in general more robust in the
13 > * presence of subtask stalls and blockage than are other forms of
14 > * ForkJoinTasks, but are less intuitive to program.  Uses of
15 > * CountedCompleter are similar to those of other completion based
16 > * components (such as {@link java.nio.channels.CompletionHandler})
17 > * except that multiple <em>pending</em> completions may be necessary
18 > * to trigger the completion action {@link #onCompletion(CountedCompleter)},
19 > * not just one.
20 > * Unless initialized otherwise, the {@linkplain #getPendingCount pending
21 > * count} starts at zero, but may be (atomically) changed using
22 > * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
23 > * {@link #compareAndSetPendingCount}. Upon invocation of {@link
24   * #tryComplete}, if the pending action count is nonzero, it is
25   * decremented; otherwise, the completion action is performed, and if
26   * this completer itself has a completer, the process is continued
27   * with its completer.  As is the case with related synchronization
28 < * components such as {@link Phaser} and {@link
29 < * java.util.concurrent.Semaphore} these methods affect only internal
30 < * counts; they do not establish any further internal bookkeeping. In
31 < * particular, the identities of pending tasks are not maintained. As
32 < * illustrated below, you can create subclasses that do record some or
33 < * all pended tasks or their results when needed.
28 > * components such as {@link java.util.concurrent.Phaser Phaser} and
29 > * {@link java.util.concurrent.Semaphore Semaphore}, these methods
30 > * affect only internal counts; they do not establish any further
31 > * internal bookkeeping. In particular, the identities of pending
32 > * tasks are not maintained. As illustrated below, you can create
33 > * subclasses that do record some or all pending tasks or their
34 > * results when needed.  As illustrated below, utility methods
35 > * supporting customization of completion traversals are also
36 > * provided. However, because CountedCompleters provide only basic
37 > * synchronization mechanisms, it may be useful to create further
38 > * abstract subclasses that maintain linkages, fields, and additional
39 > * support methods appropriate for a set of related usages.
40   *
41   * <p>A concrete CountedCompleter class must define method {@link
42 < * #compute}, that should, in almost all use cases, invoke {@code
43 < * tryComplete()} once before returning. The class may also optionally
44 < * override method {@link #onCompletion} to perform an action upon
45 < * normal completion, and method {@link #onExceptionalCompletion} to
42 > * #compute}, that should in most cases (as illustrated below), invoke
43 > * {@code tryComplete()} once before returning. The class may also
44 > * optionally override method {@link #onCompletion(CountedCompleter)}
45 > * to perform an action upon normal completion, and method
46 > * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
47   * perform an action upon any exception.
48   *
49 + * <p>CountedCompleters most often do not bear results, in which case
50 + * they are normally declared as {@code CountedCompleter<Void>}, and
51 + * will always return {@code null} as a result value.  In other cases,
52 + * you should override method {@link #getRawResult} to provide a
53 + * result from {@code join(), invoke()}, and related methods.  In
54 + * general, this method should return the value of a field (or a
55 + * function of one or more fields) of the CountedCompleter object that
56 + * holds the result upon completion. Method {@link #setRawResult} by
57 + * default plays no role in CountedCompleters.  It is possible, but
58 + * rarely applicable, to override this method to maintain other
59 + * objects or fields holding result data.
60 + *
61   * <p>A CountedCompleter that does not itself have a completer (i.e.,
62   * one for which {@link #getCompleter} returns {@code null}) can be
63   * used as a regular ForkJoinTask with this added functionality.
# Line 43 | Line 65 | package jsr166e;
65   * only as an internal helper for other computations, so its own task
66   * status (as reported in methods such as {@link ForkJoinTask#isDone})
67   * is arbitrary; this status changes only upon explicit invocations of
68 < * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
69 < * ForkJoinTask#completeExceptionally} or upon exceptional completion
70 < * of method {@code compute}. Upon any exceptional completion, the
71 < * exception may be relayed to a task's completer (and its completer,
72 < * and so on), if one exists and it has not otherwise already
73 < * completed.
68 > * {@link #complete}, {@link ForkJoinTask#cancel},
69 > * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
70 > * exceptional completion of method {@code compute}. Upon any
71 > * exceptional completion, the exception may be relayed to a task's
72 > * completer (and its completer, and so on), if one exists and it has
73 > * not otherwise already completed. Similarly, cancelling an internal
74 > * CountedCompleter has only a local effect on that completer, so is
75 > * not often useful.
76   *
77   * <p><b>Sample Usages.</b>
78   *
79   * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
80   * be arranged in trees similar to those often used with {@link
81   * RecursiveAction}s, although the constructions involved in setting
82 < * them up typically vary. Even though they entail a bit more
82 > * them up typically vary. Here, the completer of each task is its
83 > * parent in the computation tree. Even though they entail a bit more
84   * bookkeeping, CountedCompleters may be better choices when applying
85   * a possibly time-consuming operation (that cannot be further
86   * subdivided) to each element of an array or collection; especially
87   * when the operation takes a significantly different amount of time
88   * to complete for some elements than others, either because of
89 < * intrinsic variation (for example IO) or auxiliary effects such as
89 > * intrinsic variation (for example I/O) or auxiliary effects such as
90   * garbage collection.  Because CountedCompleters provide their own
91   * continuations, other threads need not block waiting to perform
92   * them.
93   *
94 < * <p> For example, here is an initial version of a class that uses
94 > * <p>For example, here is an initial version of a class that uses
95   * divide-by-two recursive decomposition to divide work into single
96   * pieces (leaf tasks). Even when work is split into individual calls,
97   * tree-based techniques are usually preferable to directly forking
# Line 74 | Line 99 | package jsr166e;
99   * improve load balancing. In the recursive case, the second of each
100   * pair of subtasks to finish triggers completion of its parent
101   * (because no result combination is performed, the default no-op
102 < * implementation of method {@code onCompletion} is not overridden). A
103 < * static utility method sets up the base task and invokes it:
102 > * implementation of method {@code onCompletion} is not overridden).
103 > * A static utility method sets up the base task and invokes it
104 > * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
105   *
106   * <pre> {@code
107   * class MyOperation<E> { void apply(E e) { ... }  }
108   *
109 < * class ForEach<E> extends CountedCompleter {
84 < *
85 < *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
86 < *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
87 < *     }
88 < *
89 < *     final E[] array; final MyOperation<E> op; final int lo, hi;
90 < *     ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
91 < *         super(p);
92 < *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
93 < *     }
109 > * class ForEach<E> extends CountedCompleter<Void> {
110   *
111 < *     public void compute() { // version 1
112 < *         if (hi - lo >= 2) {
113 < *             int mid = (lo + hi) >>> 1;
114 < *             setPendingCount(2); // must set pending count before fork
115 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
116 < *             new ForEach(this, array, op, lo, mid).fork(); // left child
117 < *         }
118 < *         else if (hi > lo)
119 < *             op.apply(array[lo]);
120 < *         tryComplete();
111 > *   public static <E> void forEach(E[] array, MyOperation<E> op) {
112 > *     new ForEach<E>(null, array, op, 0, array.length).invoke();
113 > *   }
114 > *
115 > *   final E[] array; final MyOperation<E> op; final int lo, hi;
116 > *   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
117 > *     super(p);
118 > *     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
119 > *   }
120 > *
121 > *   public void compute() { // version 1
122 > *     if (hi - lo >= 2) {
123 > *       int mid = (lo + hi) >>> 1;
124 > *       setPendingCount(2); // must set pending count before fork
125 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
126 > *       new ForEach(this, array, op, lo, mid).fork(); // left child
127   *     }
128 < * } }</pre>
128 > *     else if (hi > lo)
129 > *       op.apply(array[lo]);
130 > *     tryComplete();
131 > *   }
132 > * }}</pre>
133   *
134   * This design can be improved by noticing that in the recursive case,
135   * the task has nothing to do after forking its right task, so can
136   * directly invoke its left task before returning. (This is an analog
137   * of tail recursion removal.)  Also, because the task returns upon
138   * executing its left task (rather than falling through to invoke
139 < * tryComplete) the pending count is set to one:
139 > * {@code tryComplete}) the pending count is set to one:
140   *
141   * <pre> {@code
142   * class ForEach<E> ...
143 < *     public void compute() { // version 2
144 < *         if (hi - lo >= 2) {
145 < *             int mid = (lo + hi) >>> 1;
146 < *             setPendingCount(1); // only one pending
147 < *             new ForEach(this, array, op, mid, hi).fork(); // right child
148 < *             new ForEach(this, array, op, lo, mid).compute(); // direct invoke
123 < *         }
124 < *         else {
125 < *             if (hi > lo)
126 < *                 op.apply(array[lo]);
127 < *             tryComplete();
128 < *         }
143 > *   public void compute() { // version 2
144 > *     if (hi - lo >= 2) {
145 > *       int mid = (lo + hi) >>> 1;
146 > *       setPendingCount(1); // only one pending
147 > *       new ForEach(this, array, op, mid, hi).fork(); // right child
148 > *       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
149   *     }
150 + *     else {
151 + *       if (hi > lo)
152 + *         op.apply(array[lo]);
153 + *       tryComplete();
154 + *     }
155 + *   }
156   * }</pre>
157   *
158 < * As a further improvement, notice that the left task need not even
159 < * exist.  Instead of creating a new one, we can iterate using the
160 < * original task, and add a pending count for each fork:
158 > * As a further improvement, notice that the left task need not even exist.
159 > * Instead of creating a new one, we can iterate using the original task,
160 > * and add a pending count for each fork.  Additionally, because no task
161 > * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
162 > * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
163   *
164   * <pre> {@code
165   * class ForEach<E> ...
166 < *     public void compute() { // version 3
167 < *         int l = lo,  h = hi;
168 < *         while (h - l >= 2) {
169 < *             int mid = (l + h) >>> 1;
170 < *             addToPendingCount(1);
171 < *             new ForEach(this, array, op, mid, h).fork(); // right child
172 < *             h = mid;
145 < *         }
146 < *         if (h > l)
147 < *             op.apply(array[l]);
148 < *         tryComplete();
166 > *   public void compute() { // version 3
167 > *     int l = lo,  h = hi;
168 > *     while (h - l >= 2) {
169 > *       int mid = (l + h) >>> 1;
170 > *       addToPendingCount(1);
171 > *       new ForEach(this, array, op, mid, h).fork(); // right child
172 > *       h = mid;
173   *     }
174 + *     if (h > l)
175 + *       op.apply(array[l]);
176 + *     propagateCompletion();
177 + *   }
178   * }</pre>
179   *
180   * Additional improvements of such classes might entail precomputing
# Line 155 | Line 183 | package jsr166e;
183   * instead of two per iteration, and using an adaptive threshold
184   * instead of always subdividing down to single elements.
185   *
186 + * <p><b>Searching.</b> A tree of CountedCompleters can search for a
187 + * value or property in different parts of a data structure, and
188 + * report a result in an {@link
189 + * java.util.concurrent.atomic.AtomicReference AtomicReference} as
190 + * soon as one is found. The others can poll the result to avoid
191 + * unnecessary work. (You could additionally {@linkplain #cancel
192 + * cancel} other tasks, but it is usually simpler and more efficient
193 + * to just let them notice that the result is set and if so skip
194 + * further processing.)  Illustrating again with an array using full
195 + * partitioning (again, in practice, leaf tasks will almost always
196 + * process more than one element):
197 + *
198 + * <pre> {@code
199 + * class Searcher<E> extends CountedCompleter<E> {
200 + *   final E[] array; final AtomicReference<E> result; final int lo, hi;
201 + *   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
202 + *     super(p);
203 + *     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
204 + *   }
205 + *   public E getRawResult() { return result.get(); }
206 + *   public void compute() { // similar to ForEach version 3
207 + *     int l = lo,  h = hi;
208 + *     while (result.get() == null && h >= l) {
209 + *       if (h - l >= 2) {
210 + *         int mid = (l + h) >>> 1;
211 + *         addToPendingCount(1);
212 + *         new Searcher(this, array, result, mid, h).fork();
213 + *         h = mid;
214 + *       }
215 + *       else {
216 + *         E x = array[l];
217 + *         if (matches(x) && result.compareAndSet(null, x))
218 + *           quietlyCompleteRoot(); // root task is now joinable
219 + *         break;
220 + *       }
221 + *     }
222 + *     tryComplete(); // normally complete whether or not found
223 + *   }
224 + *   boolean matches(E e) { ... } // return true if found
225 + *
226 + *   public static <E> E search(E[] array) {
227 + *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
228 + *   }
229 + * }}</pre>
230 + *
231 + * In this example, as well as others in which tasks have no other
232 + * effects except to compareAndSet a common result, the trailing
233 + * unconditional invocation of {@code tryComplete} could be made
234 + * conditional ({@code if (result.get() == null) tryComplete();})
235 + * because no further bookkeeping is required to manage completions
236 + * once the root task completes.
237 + *
238   * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
239   * results of multiple subtasks usually need to access these results
240 < * in method {@link #onCompletion}. As illustrated in the following
240 > * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
241   * class (that performs a simplified form of map-reduce where mappings
242   * and reductions are all of type {@code E}), one way to do this in
243   * divide and conquer designs is to have each subtask record its
244   * sibling, so that it can be accessed in method {@code onCompletion}.
245 < * For clarity, this class uses explicit left and right subtasks, but
246 < * variants of other streamlinings seen in the above example may also
247 < * apply.
245 > * This technique applies to reductions in which the order of
246 > * combining left and right results does not matter; ordered
247 > * reductions require explicit left/right designations.  Variants of
248 > * other streamlinings seen in the above examples may also apply.
249   *
250   * <pre> {@code
251   * class MyMapper<E> { E apply(E v) {  ...  } }
252   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
253 < * class MapReducer<E> extends CountedCompleter {
254 < *     final E[] array; final MyMapper<E> mapper;
255 < *     final MyReducer<E> reducer; final int lo, hi;
256 < *     MapReducer sibling;
257 < *     E result;
258 < *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
259 < *                MyReducer<E> reducer, int lo, int hi) {
260 < *         super(p);
261 < *         this.array = array; this.mapper = mapper;
262 < *         this.reducer = reducer; this.lo = lo; this.hi = hi;
253 > * class MapReducer<E> extends CountedCompleter<E> {
254 > *   final E[] array; final MyMapper<E> mapper;
255 > *   final MyReducer<E> reducer; final int lo, hi;
256 > *   MapReducer<E> sibling;
257 > *   E result;
258 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
259 > *              MyReducer<E> reducer, int lo, int hi) {
260 > *     super(p);
261 > *     this.array = array; this.mapper = mapper;
262 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
263 > *   }
264 > *   public void compute() {
265 > *     if (hi - lo >= 2) {
266 > *       int mid = (lo + hi) >>> 1;
267 > *       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
268 > *       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
269 > *       left.sibling = right;
270 > *       right.sibling = left;
271 > *       setPendingCount(1); // only right is pending
272 > *       right.fork();
273 > *       left.compute();     // directly execute left
274   *     }
275 < *     public void compute() {
276 < *         if (hi - lo >= 2) {
277 < *             int mid = (lo + hi) >>> 1;
278 < *             MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
187 < *             MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
188 < *             left.sibling = right;
189 < *             right.sibling = left;
190 < *             setPendingCount(1); // only right is pending
191 < *             right.fork();
192 < *             left.compute();     // directly execute left
193 < *         }
194 < *         else {
195 < *             if (hi > lo)
196 < *                 result = mapper.apply(array[lo]);
197 < *             tryComplete();
198 < *         }
275 > *     else {
276 > *       if (hi > lo)
277 > *           result = mapper.apply(array[lo]);
278 > *       tryComplete();
279   *     }
280 < *     public void onCompletion(CountedCompleter caller) {
281 < *         if (caller != this) {
282 < *            MapReducer<E> child = (MapReducer<E>)caller;
283 < *            MapReducer<E> sib = child.sibling;
284 < *            if (sib == null || sib.result == null)
285 < *                result = child.result;
286 < *            else
287 < *                result = reducer.apply(child.result, sib.result);
288 < *         }
280 > *   }
281 > *   public void onCompletion(CountedCompleter<?> caller) {
282 > *     if (caller != this) {
283 > *       MapReducer<E> child = (MapReducer<E>)caller;
284 > *       MapReducer<E> sib = child.sibling;
285 > *       if (sib == null || sib.result == null)
286 > *         result = child.result;
287 > *       else
288 > *         result = reducer.apply(child.result, sib.result);
289   *     }
290 + *   }
291 + *   public E getRawResult() { return result; }
292 + *
293 + *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
294 + *     return new MapReducer<E>(null, array, mapper, reducer,
295 + *                              0, array.length).invoke();
296 + *   }
297 + * }}</pre>
298 + *
299 + * Here, method {@code onCompletion} takes a form common to many
300 + * completion designs that combine results. This callback-style method
301 + * is triggered once per task, in either of the two different contexts
302 + * in which the pending count is, or becomes, zero: (1) by a task
303 + * itself, if its pending count is zero upon invocation of {@code
304 + * tryComplete}, or (2) by any of its subtasks when they complete and
305 + * decrement the pending count to zero. The {@code caller} argument
306 + * distinguishes cases.  Most often, when the caller is {@code this},
307 + * no action is necessary. Otherwise the caller argument can be used
308 + * (usually via a cast) to supply a value (and/or links to other
309 + * values) to be combined.  Assuming proper use of pending counts, the
310 + * actions inside {@code onCompletion} occur (once) upon completion of
311 + * a task and its subtasks. No additional synchronization is required
312 + * within this method to ensure thread safety of accesses to fields of
313 + * this task or other completed tasks.
314 + *
315 + * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
316 + * process completions is inapplicable or inconvenient, you can use
317 + * methods {@link #firstComplete} and {@link #nextComplete} to create
318 + * custom traversals.  For example, to define a MapReducer that only
319 + * splits out right-hand tasks in the form of the third ForEach
320 + * example, the completions must cooperatively reduce along
321 + * unexhausted subtask links, which can be done as follows:
322   *
323 < *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
324 < *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
325 < *         MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
326 < *                                              reducer, 0, array.length);
327 < *         pool.invoke(mr);
328 < *         return mr.result;
323 > * <pre> {@code
324 > * class MapReducer<E> extends CountedCompleter<E> { // version 2
325 > *   final E[] array; final MyMapper<E> mapper;
326 > *   final MyReducer<E> reducer; final int lo, hi;
327 > *   MapReducer<E> forks, next; // record subtask forks in list
328 > *   E result;
329 > *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
330 > *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
331 > *     super(p);
332 > *     this.array = array; this.mapper = mapper;
333 > *     this.reducer = reducer; this.lo = lo; this.hi = hi;
334 > *     this.next = next;
335 > *   }
336 > *   public void compute() {
337 > *     int l = lo,  h = hi;
338 > *     while (h - l >= 2) {
339 > *       int mid = (l + h) >>> 1;
340 > *       addToPendingCount(1);
341 > *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
342 > *       h = mid;
343 > *     }
344 > *     if (h > l)
345 > *       result = mapper.apply(array[l]);
346 > *     // process completions by reducing along and advancing subtask links
347 > *     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
348 > *       for (MapReducer t = (MapReducer)c, s = t.forks;  s != null; s = t.forks = s.next)
349 > *         t.result = reducer.apply(t.result, s.result);
350   *     }
351 < * } }</pre>
351 > *   }
352 > *   public E getRawResult() { return result; }
353 > *
354 > *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
355 > *     return new MapReducer<E>(null, array, mapper, reducer,
356 > *                              0, array.length, null).invoke();
357 > *   }
358 > * }}</pre>
359   *
360   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
361   * forked, but instead serve as bits of plumbing in other designs;
362 < * including those in which the completion of one of more async tasks
362 > * including those in which the completion of one or more async tasks
363   * triggers another async task. For example:
364   *
365   * <pre> {@code
366 < * class HeaderBuilder extends CountedCompleter { ... }
367 < * class BodyBuilder extends CountedCompleter { ... }
368 < * class PacketSender extends CountedCompleter {
369 < *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 < *     public void compute() { } // never called
371 < *     public void onCompletion(CountedCompleter caller) { sendPacket(); }
366 > * class HeaderBuilder extends CountedCompleter<...> { ... }
367 > * class BodyBuilder extends CountedCompleter<...> { ... }
368 > * class PacketSender extends CountedCompleter<...> {
369 > *   PacketSender(...) { super(null, 1); ... } // trigger on second completion
370 > *   public void compute() { } // never called
371 > *   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
372   * }
373   * // sample use:
374   * PacketSender p = new PacketSender();
# Line 239 | Line 379 | package jsr166e;
379   * @since 1.8
380   * @author Doug Lea
381   */
382 < public abstract class CountedCompleter extends ForkJoinTask<Void> {
382 > public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
383      private static final long serialVersionUID = 5232453752276485070L;
384  
385      /** This task's completer, or null if none */
386 <    final CountedCompleter completer;
386 >    final CountedCompleter<?> completer;
387      /** The number of pending tasks until completion */
388      volatile int pending;
389  
# Line 251 | Line 391 | public abstract class CountedCompleter e
391       * Creates a new CountedCompleter with the given completer
392       * and initial pending count.
393       *
394 <     * @param completer this tasks completer, or {@code null} if none
394 >     * @param completer this task's completer, or {@code null} if none
395       * @param initialPendingCount the initial pending count
396       */
397 <    protected CountedCompleter(CountedCompleter completer,
397 >    protected CountedCompleter(CountedCompleter<?> completer,
398                                 int initialPendingCount) {
399          this.completer = completer;
400          this.pending = initialPendingCount;
# Line 264 | Line 404 | public abstract class CountedCompleter e
404       * Creates a new CountedCompleter with the given completer
405       * and an initial pending count of zero.
406       *
407 <     * @param completer this tasks completer, or {@code null} if none
407 >     * @param completer this task's completer, or {@code null} if none
408       */
409 <    protected CountedCompleter(CountedCompleter completer) {
409 >    protected CountedCompleter(CountedCompleter<?> completer) {
410          this.completer = completer;
411      }
412  
# Line 285 | Line 425 | public abstract class CountedCompleter e
425  
426      /**
427       * Performs an action when method {@link #tryComplete} is invoked
428 <     * and there are no pending counts, or when the unconditional
428 >     * and the pending count is zero, or when the unconditional
429       * method {@link #complete} is invoked.  By default, this method
430 <     * does nothing.
430 >     * does nothing. You can distinguish cases by checking the
431 >     * identity of the given caller argument. If not equal to {@code
432 >     * this}, then it is typically a subtask that may contain results
433 >     * (and/or links to other results) to combine.
434       *
435       * @param caller the task invoking this method (which may
436 <     * be this task itself).
436 >     * be this task itself)
437       */
438 <    public void onCompletion(CountedCompleter caller) {
438 >    public void onCompletion(CountedCompleter<?> caller) {
439      }
440  
441      /**
442 <     * Performs an action when method {@link #completeExceptionally}
443 <     * is invoked or method {@link #compute} throws an exception, and
444 <     * this task has not otherwise already completed normally. On
445 <     * entry to this method, this task {@link
446 <     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
447 <     * method controls further propagation: If {@code true} and this
448 <     * task has a completer, then this completer is also completed
449 <     * exceptionally.  The default implementation of this method does
450 <     * nothing except return {@code true}.
442 >     * Performs an action when method {@link
443 >     * #completeExceptionally(Throwable)} is invoked or method {@link
444 >     * #compute} throws an exception, and this task has not already
445 >     * otherwise completed normally. On entry to this method, this task
446 >     * {@link ForkJoinTask#isCompletedAbnormally}.  The return value
447 >     * of this method controls further propagation: If {@code true}
448 >     * and this task has a completer that has not completed, then that
449 >     * completer is also completed exceptionally, with the same
450 >     * exception as this completer.  The default implementation of
451 >     * this method does nothing except return {@code true}.
452       *
453       * @param ex the exception
454       * @param caller the task invoking this method (which may
455 <     * be this task itself).
456 <     * @return true if this exception should be propagated to this
457 <     * tasks completer, if one exists.
455 >     * be this task itself)
456 >     * @return {@code true} if this exception should be propagated to this
457 >     * task's completer, if one exists
458       */
459 <    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
459 >    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
460          return true;
461      }
462  
# Line 322 | Line 466 | public abstract class CountedCompleter e
466       *
467       * @return the completer
468       */
469 <    public final CountedCompleter getCompleter() {
469 >    public final CountedCompleter<?> getCompleter() {
470          return completer;
471      }
472  
# Line 350 | Line 494 | public abstract class CountedCompleter e
494       * @param delta the value to add
495       */
496      public final void addToPendingCount(int delta) {
497 <        int c; // note: can replace with intrinsic in jdk8
497 >        int c;
498          do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
499      }
500  
# Line 360 | Line 504 | public abstract class CountedCompleter e
504       *
505       * @param expected the expected value
506       * @param count the new value
507 <     * @return true is successful
507 >     * @return {@code true} if successful
508       */
509      public final boolean compareAndSetPendingCount(int expected, int count) {
510          return U.compareAndSwapInt(this, PENDING, expected, count);
511      }
512  
513      /**
514 +     * If the pending count is nonzero, (atomically) decrements it.
515 +     *
516 +     * @return the initial (undecremented) pending count holding on entry
517 +     * to this method
518 +     */
519 +    public final int decrementPendingCountUnlessZero() {
520 +        int c;
521 +        do {} while ((c = pending) != 0 &&
522 +                     !U.compareAndSwapInt(this, PENDING, c, c - 1));
523 +        return c;
524 +    }
525 +
526 +    /**
527 +     * Returns the root of the current computation; i.e., this
528 +     * task if it has no completer, else its completer's root.
529 +     *
530 +     * @return the root of the current computation
531 +     */
532 +    public final CountedCompleter<?> getRoot() {
533 +        CountedCompleter<?> a = this, p;
534 +        while ((p = a.completer) != null)
535 +            a = p;
536 +        return a;
537 +    }
538 +
539 +    /**
540       * If the pending count is nonzero, decrements the count;
541 <     * otherwise invokes {@link #onCompletion} and then similarly
542 <     * tries to complete this task's completer, if one exists,
543 <     * else marks this task as complete.
541 >     * otherwise invokes {@link #onCompletion(CountedCompleter)}
542 >     * and then similarly tries to complete this task's completer,
543 >     * if one exists, else marks this task as complete.
544       */
545      public final void tryComplete() {
546 <        CountedCompleter a = this, s = a;
546 >        CountedCompleter<?> a = this, s = a;
547          for (int c;;) {
548              if ((c = a.pending) == 0) {
549                  a.onCompletion(s);
# Line 388 | Line 558 | public abstract class CountedCompleter e
558      }
559  
560      /**
561 <     * Regardless of pending count, invokes {@link #onCompletion},
562 <     * marks this task as complete with a {@code null} return value,
563 <     * and further triggers {@link #tryComplete} on this task's
564 <     * completer, if one exists. This method may be useful when
565 <     * forcing completion as soon as any one (versus all) of several
566 <     * subtask results are obtained.
567 <     *
398 <     * @param mustBeNull the {@code null} completion value
561 >     * Equivalent to {@link #tryComplete} but does not invoke {@link
562 >     * #onCompletion(CountedCompleter)} along the completion path:
563 >     * If the pending count is nonzero, decrements the count;
564 >     * otherwise, similarly tries to complete this task's completer, if
565 >     * one exists, else marks this task as complete. This method may be
566 >     * useful in cases where {@code onCompletion} should not, or need
567 >     * not, be invoked for each completer in a computation.
568       */
569 <    public void complete(Void mustBeNull) {
570 <        CountedCompleter p;
569 >    public final void propagateCompletion() {
570 >        CountedCompleter<?> a = this, s = a;
571 >        for (int c;;) {
572 >            if ((c = a.pending) == 0) {
573 >                if ((a = (s = a).completer) == null) {
574 >                    s.quietlyComplete();
575 >                    return;
576 >                }
577 >            }
578 >            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
579 >                return;
580 >        }
581 >    }
582 >
583 >    /**
584 >     * Regardless of pending count, invokes
585 >     * {@link #onCompletion(CountedCompleter)}, marks this task as
586 >     * complete and further triggers {@link #tryComplete} on this
587 >     * task's completer, if one exists.  The given rawResult is
588 >     * used as an argument to {@link #setRawResult} before invoking
589 >     * {@link #onCompletion(CountedCompleter)} or marking this task
590 >     * as complete; its value is meaningful only for classes
591 >     * overriding {@code setRawResult}.  This method does not modify
592 >     * the pending count.
593 >     *
594 >     * <p>This method may be useful when forcing completion as soon as
595 >     * any one (versus all) of several subtask results are obtained.
596 >     * However, in the common (and recommended) case in which {@code
597 >     * setRawResult} is not overridden, this effect can be obtained
598 >     * more simply using {@code quietlyCompleteRoot();}.
599 >     *
600 >     * @param rawResult the raw result
601 >     */
602 >    public void complete(T rawResult) {
603 >        CountedCompleter<?> p;
604 >        setRawResult(rawResult);
605          onCompletion(this);
606          quietlyComplete();
607          if ((p = completer) != null)
608              p.tryComplete();
609      }
610  
611 +
612 +    /**
613 +     * If this task's pending count is zero, returns this task;
614 +     * otherwise decrements its pending count and returns {@code
615 +     * null}. This method is designed to be used with {@link
616 +     * #nextComplete} in completion traversal loops.
617 +     *
618 +     * @return this task, if pending count was zero, else {@code null}
619 +     */
620 +    public final CountedCompleter<?> firstComplete() {
621 +        for (int c;;) {
622 +            if ((c = pending) == 0)
623 +                return this;
624 +            else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
625 +                return null;
626 +        }
627 +    }
628 +
629 +    /**
630 +     * If this task does not have a completer, invokes {@link
631 +     * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
632 +     * the completer's pending count is non-zero, decrements that
633 +     * pending count and returns {@code null}.  Otherwise, returns the
634 +     * completer.  This method can be used as part of a completion
635 +     * traversal loop for homogeneous task hierarchies:
636 +     *
637 +     * <pre> {@code
638 +     * for (CountedCompleter<?> c = firstComplete();
639 +     *      c != null;
640 +     *      c = c.nextComplete()) {
641 +     *   // ... process c ...
642 +     * }}</pre>
643 +     *
644 +     * @return the completer, or {@code null} if none
645 +     */
646 +    public final CountedCompleter<?> nextComplete() {
647 +        CountedCompleter<?> p;
648 +        if ((p = completer) != null)
649 +            return p.firstComplete();
650 +        else {
651 +            quietlyComplete();
652 +            return null;
653 +        }
654 +    }
655 +
656 +    /**
657 +     * Equivalent to {@code getRoot().quietlyComplete()}.
658 +     */
659 +    public final void quietlyCompleteRoot() {
660 +        for (CountedCompleter<?> a = this, p;;) {
661 +            if ((p = a.completer) == null) {
662 +                a.quietlyComplete();
663 +                return;
664 +            }
665 +            a = p;
666 +        }
667 +    }
668 +
669      /**
670 <     * Support for FJT exception propagation
670 >     * Supports ForkJoinTask exception propagation.
671       */
672      void internalPropagateException(Throwable ex) {
673 <        CountedCompleter a = this, s = a;
673 >        CountedCompleter<?> a = this, s = a;
674          while (a.onExceptionalCompletion(ex, s) &&
675 <               (a = (s = a).completer) != null && a.status >= 0)
676 <            a.recordExceptionalCompletion(ex);
675 >               (a = (s = a).completer) != null && a.status >= 0 &&
676 >               a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
677 >            ;
678      }
679  
680      /**
681 <     * Implements execution conventions for CountedCompleters
681 >     * Implements execution conventions for CountedCompleters.
682       */
683      protected final boolean exec() {
684          compute();
# Line 424 | Line 686 | public abstract class CountedCompleter e
686      }
687  
688      /**
689 <     * Always returns {@code null}.
689 >     * Returns the result of the computation.  By default,
690 >     * returns {@code null}, which is appropriate for {@code Void}
691 >     * actions, but in other cases should be overridden, almost
692 >     * always to return a field or function of a field that
693 >     * holds the result upon completion.
694       *
695 <     * @return {@code null} always
695 >     * @return the result of the computation
696       */
697 <    public final Void getRawResult() { return null; }
697 >    public T getRawResult() { return null; }
698  
699      /**
700 <     * Requires null completion value.
700 >     * A method that result-bearing CountedCompleters may optionally
701 >     * use to help maintain result data.  By default, does nothing.
702 >     * Overrides are not recommended. However, if this method is
703 >     * overridden to update existing objects or fields, then it must
704 >     * in general be defined to be thread-safe.
705       */
706 <    protected final void setRawResult(Void mustBeNull) { }
706 >    protected void setRawResult(T t) { }
707  
708      // Unsafe mechanics
709      private static final sun.misc.Unsafe U;
# Line 448 | Line 718 | public abstract class CountedCompleter e
718          }
719      }
720  
451
721      /**
722       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
723       * Replace with a simple call to Unsafe.getUnsafe when integrating
# Line 459 | Line 728 | public abstract class CountedCompleter e
728      private static sun.misc.Unsafe getUnsafe() {
729          try {
730              return sun.misc.Unsafe.getUnsafe();
731 <        } catch (SecurityException se) {
732 <            try {
733 <                return java.security.AccessController.doPrivileged
734 <                    (new java.security
735 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
736 <                        public sun.misc.Unsafe run() throws Exception {
737 <                            java.lang.reflect.Field f = sun.misc
738 <                                .Unsafe.class.getDeclaredField("theUnsafe");
739 <                            f.setAccessible(true);
740 <                            return (sun.misc.Unsafe) f.get(null);
741 <                        }});
742 <            } catch (java.security.PrivilegedActionException e) {
743 <                throw new RuntimeException("Could not initialize intrinsics",
744 <                                           e.getCause());
745 <            }
731 >        } catch (SecurityException tryReflectionInstead) {}
732 >        try {
733 >            return java.security.AccessController.doPrivileged
734 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
735 >                public sun.misc.Unsafe run() throws Exception {
736 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
737 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
738 >                        f.setAccessible(true);
739 >                        Object x = f.get(null);
740 >                        if (k.isInstance(x))
741 >                            return k.cast(x);
742 >                    }
743 >                    throw new NoSuchFieldError("the Unsafe");
744 >                }});
745 >        } catch (java.security.PrivilegedActionException e) {
746 >            throw new RuntimeException("Could not initialize intrinsics",
747 >                                       e.getCause());
748          }
749      }
479
750   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines