ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
Revision: 1.3
Committed: Thu Aug 16 12:25:03 2012 UTC (11 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.2: +55 -39 lines
Log Message:
Parameterize CountedCompleters

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166y;
8    
9     /**
10 dl 1.3 * A {@link ForkJoinTask} with a completion action
11 dl 1.1 * performed when triggered and there are no remaining pending
12     * actions. Uses of CountedCompleter are similar to those of other
13     * completion based components (such as {@link
14     * java.nio.channels.CompletionHandler}) except that multiple
15     * <em>pending</em> completions may be necessary to trigger the {@link
16     * #onCompletion} action, not just one. Unless initialized otherwise,
17     * the {@link #getPendingCount pending count} starts at zero, but may
18     * be (atomically) changed using methods {@link #setPendingCount},
19     * {@link #addToPendingCount}, and {@link
20     * #compareAndSetPendingCount}. Upon invocation of {@link
21     * #tryComplete}, if the pending action count is nonzero, it is
22     * decremented; otherwise, the completion action is performed, and if
23     * this completer itself has a completer, the process is continued
24 dl 1.2 * with its completer. As is the case with related synchronization
25     * components such as {@link Phaser} and {@link
26     * java.util.concurrent.Semaphore} these methods affect only internal
27     * counts; they do not establish any further internal bookkeeping. In
28     * particular, the identities of pending tasks are not maintained. As
29     * illustrated below, you can create subclasses that do record some or
30     * all pended tasks or their results when needed.
31 dl 1.1 *
32     * <p>A concrete CountedCompleter class must define method {@link
33     * #compute}, that should, in almost all use cases, invoke {@code
34 dl 1.2 * tryComplete()} once before returning. The class may also optionally
35 dl 1.1 * override method {@link #onCompletion} to perform an action upon
36 dl 1.2 * normal completion, and method {@link #onExceptionalCompletion} to
37     * perform an action upon any exception.
38 dl 1.1 *
39 dl 1.3 * <p>CountedCompleters most often do not bear results, in which case
40     * they are normally declared as {@code CountedCompleter<Void>}, and
41     * will always return {@code null} as a result value. In other cases,
42     * you should override method {@link #getRawResult} to provide a
43     * result from {@code join(), invoke()}, and related methods. (Method
44     * {@link #setRawResult} by default plays no role in CountedCompleters
45     * but may be overridden for example to maintain fields holding result
46     * data.)
47     *
48 dl 1.1 * <p>A CountedCompleter that does not itself have a completer (i.e.,
49     * one for which {@link #getCompleter} returns {@code null}) can be
50     * used as a regular ForkJoinTask with this added functionality.
51     * However, any completer that in turn has another completer serves
52     * only as an internal helper for other computations, so its own task
53     * status (as reported in methods such as {@link ForkJoinTask#isDone})
54     * is arbitrary; this status changes only upon explicit invocations of
55     * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
56     * ForkJoinTask#completeExceptionally} or upon exceptional completion
57     * of method {@code compute}. Upon any exceptional completion, the
58 dl 1.2 * exception may be relayed to a task's completer (and its completer,
59     * and so on), if one exists and it has not otherwise already
60     * completed.
61 dl 1.1 *
62     * <p><b>Sample Usages.</b>
63     *
64     * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
65     * be arranged in trees similar to those often used with {@link
66     * RecursiveAction}s, although the constructions involved in setting
67 dl 1.3 * them up typically vary. Here, the completer of each task is its
68     * parent in the computation tree. Even though they entail a bit more
69 dl 1.1 * bookkeeping, CountedCompleters may be better choices when applying
70     * a possibly time-consuming operation (that cannot be further
71     * subdivided) to each element of an array or collection; especially
72     * when the operation takes a significantly different amount of time
73     * to complete for some elements than others, either because of
74     * intrinsic variation (for example IO) or auxiliary effects such as
75     * garbage collection. Because CountedCompleters provide their own
76     * continuations, other threads need not block waiting to perform
77     * them.
78     *
79     * <p> For example, here is an initial version of a class that uses
80     * divide-by-two recursive decomposition to divide work into single
81     * pieces (leaf tasks). Even when work is split into individual calls,
82     * tree-based techniques are usually preferable to directly forking
83     * leaf tasks, because they reduce inter-thread communication and
84     * improve load balancing. In the recursive case, the second of each
85     * pair of subtasks to finish triggers completion of its parent
86     * (because no result combination is performed, the default no-op
87     * implementation of method {@code onCompletion} is not overridden). A
88     * static utility method sets up the base task and invokes it:
89     *
90     * <pre> {@code
91     * class MyOperation<E> { void apply(E e) { ... } }
92     *
93 dl 1.3 * class ForEach<E> extends CountedCompleter<Void> {
94 dl 1.1 *
95     * public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
96     * pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
97     * }
98     *
99     * final E[] array; final MyOperation<E> op; final int lo, hi;
100 dl 1.3 * ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
101 dl 1.1 * super(p);
102     * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
103     * }
104     *
105     * public void compute() { // version 1
106     * if (hi - lo >= 2) {
107     * int mid = (lo + hi) >>> 1;
108     * setPendingCount(2); // must set pending count before fork
109     * new ForEach(this, array, op, mid, hi).fork(); // right child
110     * new ForEach(this, array, op, lo, mid).fork(); // left child
111     * }
112     * else if (hi > lo)
113     * op.apply(array[lo]);
114     * tryComplete();
115     * }
116     * } }</pre>
117     *
118     * This design can be improved by noticing that in the recursive case,
119     * the task has nothing to do after forking its right task, so can
120     * directly invoke its left task before returning. (This is an analog
121     * of tail recursion removal.) Also, because the task returns upon
122     * executing its left task (rather than falling through to invoke
123     * tryComplete) the pending count is set to one:
124     *
125     * <pre> {@code
126     * class ForEach<E> ...
127     * public void compute() { // version 2
128     * if (hi - lo >= 2) {
129     * int mid = (lo + hi) >>> 1;
130     * setPendingCount(1); // only one pending
131     * new ForEach(this, array, op, mid, hi).fork(); // right child
132     * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
133     * }
134     * else {
135     * if (hi > lo)
136     * op.apply(array[lo]);
137     * tryComplete();
138     * }
139     * }
140     * }</pre>
141     *
142     * As a further improvement, notice that the left task need not even
143     * exist. Instead of creating a new one, we can iterate using the
144     * original task, and add a pending count for each fork:
145     *
146     * <pre> {@code
147     * class ForEach<E> ...
148     * public void compute() { // version 3
149     * int l = lo, h = hi;
150     * while (h - l >= 2) {
151     * int mid = (l + h) >>> 1;
152     * addToPendingCount(1);
153     * new ForEach(this, array, op, mid, h).fork(); // right child
154     * h = mid;
155     * }
156     * if (h > l)
157     * op.apply(array[l]);
158     * tryComplete();
159     * }
160     * }</pre>
161     *
162     * Additional improvements of such classes might entail precomputing
163     * pending counts so that they can be established in constructors,
164     * specializing classes for leaf steps, subdividing by say, four,
165     * instead of two per iteration, and using an adaptive threshold
166     * instead of always subdividing down to single elements.
167     *
168     * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
169     * results of multiple subtasks usually need to access these results
170     * in method {@link #onCompletion}. As illustrated in the following
171     * class (that performs a simplified form of map-reduce where mappings
172     * and reductions are all of type {@code E}), one way to do this in
173     * divide and conquer designs is to have each subtask record its
174     * sibling, so that it can be accessed in method {@code onCompletion}.
175 dl 1.3 * This technique applies to reductions in which the order of
176     * combining left and right results does not matter; ordered
177     * reductions require explicit left/right designations. Variants of
178     * other streamlinings seen in the above examples may also apply.
179 dl 1.1 *
180     * <pre> {@code
181     * class MyMapper<E> { E apply(E v) { ... } }
182     * class MyReducer<E> { E apply(E x, E y) { ... } }
183 dl 1.3 * class MapReducer<E> extends CountedCompleter<E> {
184 dl 1.1 * final E[] array; final MyMapper<E> mapper;
185     * final MyReducer<E> reducer; final int lo, hi;
186 dl 1.3 * MapReducer<E> sibling;
187 dl 1.1 * E result;
188     * MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
189     * MyReducer<E> reducer, int lo, int hi) {
190     * super(p);
191     * this.array = array; this.mapper = mapper;
192     * this.reducer = reducer; this.lo = lo; this.hi = hi;
193     * }
194     * public void compute() {
195     * if (hi - lo >= 2) {
196     * int mid = (lo + hi) >>> 1;
197     * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
198     * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
199     * left.sibling = right;
200     * right.sibling = left;
201     * setPendingCount(1); // only right is pending
202     * right.fork();
203     * left.compute(); // directly execute left
204     * }
205     * else {
206     * if (hi > lo)
207     * result = mapper.apply(array[lo]);
208     * tryComplete();
209     * }
210     * }
211     * public void onCompletion(CountedCompleter caller) {
212     * if (caller != this) {
213     * MapReducer<E> child = (MapReducer<E>)caller;
214     * MapReducer<E> sib = child.sibling;
215     * if (sib == null || sib.result == null)
216     * result = child.result;
217     * else
218     * result = reducer.apply(child.result, sib.result);
219     * }
220     * }
221 dl 1.3 * public E getRawResult() { return result; }
222 dl 1.1 *
223     * public static <E> E mapReduce(ForkJoinPool pool, E[] array,
224     * MyMapper<E> mapper, MyReducer<E> reducer) {
225 dl 1.3 * return pool.invoke(new MapReducer<E>(null, array, mapper,
226     * reducer, 0, array.length));
227 dl 1.1 * }
228     * } }</pre>
229     *
230     * <p><b>Triggers.</b> Some CountedCompleters are themselves never
231     * forked, but instead serve as bits of plumbing in other designs;
232     * including those in which the completion of one of more async tasks
233     * triggers another async task. For example:
234     *
235     * <pre> {@code
236 dl 1.3 * class HeaderBuilder extends CountedCompleter<...> { ... }
237     * class BodyBuilder extends CountedCompleter<...> { ... }
238     * class PacketSender extends CountedCompleter<...> {
239 dl 1.1 * PacketSender(...) { super(null, 1); ... } // trigger on second completion
240     * public void compute() { } // never called
241 dl 1.3 * public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
242 dl 1.1 * }
243     * // sample use:
244     * PacketSender p = new PacketSender();
245     * new HeaderBuilder(p, ...).fork();
246     * new BodyBuilder(p, ...).fork();
247     * }</pre>
248     *
249     * @since 1.8
250     * @author Doug Lea
251     */
252 dl 1.3 public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
253 dl 1.2 private static final long serialVersionUID = 5232453752276485070L;
254    
255 dl 1.1 /** This task's completer, or null if none */
256 dl 1.3 final CountedCompleter<?> completer;
257 dl 1.1 /** The number of pending tasks until completion */
258     volatile int pending;
259    
260     /**
261     * Creates a new CountedCompleter with the given completer
262     * and initial pending count.
263     *
264     * @param completer this tasks completer, or {@code null} if none
265     * @param initialPendingCount the initial pending count
266     */
267 dl 1.3 protected CountedCompleter(CountedCompleter<?> completer,
268 dl 1.1 int initialPendingCount) {
269     this.completer = completer;
270     this.pending = initialPendingCount;
271     }
272    
273     /**
274     * Creates a new CountedCompleter with the given completer
275     * and an initial pending count of zero.
276     *
277     * @param completer this tasks completer, or {@code null} if none
278     */
279 dl 1.3 protected CountedCompleter(CountedCompleter<?> completer) {
280 dl 1.1 this.completer = completer;
281     }
282    
283     /**
284     * Creates a new CountedCompleter with no completer
285     * and an initial pending count of zero.
286     */
287     protected CountedCompleter() {
288     this.completer = null;
289     }
290    
291     /**
292     * The main computation performed by this task.
293     */
294     public abstract void compute();
295    
296     /**
297 dl 1.2 * Performs an action when method {@link #tryComplete} is invoked
298     * and there are no pending counts, or when the unconditional
299     * method {@link #complete} is invoked. By default, this method
300     * does nothing.
301 dl 1.1 *
302     * @param caller the task invoking this method (which may
303     * be this task itself).
304     */
305 dl 1.3 public void onCompletion(CountedCompleter<?> caller) {
306 dl 1.1 }
307    
308     /**
309 dl 1.2 * Performs an action when method {@link #completeExceptionally}
310     * is invoked or method {@link #compute} throws an exception, and
311     * this task has not otherwise already completed normally. On
312     * entry to this method, this task {@link
313     * ForkJoinTask#isCompletedAbnormally}. The return value of this
314     * method controls further propagation: If {@code true} and this
315     * task has a completer, then this completer is also completed
316     * exceptionally. The default implementation of this method does
317     * nothing except return {@code true}.
318     *
319     * @param ex the exception
320     * @param caller the task invoking this method (which may
321     * be this task itself).
322     * @return true if this exception should be propagated to this
323     * tasks completer, if one exists.
324     */
325 dl 1.3 public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
326 dl 1.2 return true;
327     }
328    
329     /**
330 dl 1.1 * Returns the completer established in this task's constructor,
331     * or {@code null} if none.
332     *
333     * @return the completer
334     */
335 dl 1.3 public final CountedCompleter<?> getCompleter() {
336 dl 1.1 return completer;
337     }
338    
339     /**
340     * Returns the current pending count.
341     *
342     * @return the current pending count
343     */
344     public final int getPendingCount() {
345     return pending;
346     }
347    
348     /**
349     * Sets the pending count to the given value.
350     *
351     * @param count the count
352     */
353     public final void setPendingCount(int count) {
354     pending = count;
355     }
356    
357     /**
358     * Adds (atomically) the given value to the pending count.
359     *
360     * @param delta the value to add
361     */
362     public final void addToPendingCount(int delta) {
363     int c; // note: can replace with intrinsic in jdk8
364     do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
365     }
366    
367     /**
368     * Sets (atomically) the pending count to the given count only if
369     * it currently holds the given expected value.
370     *
371     * @param expected the expected value
372     * @param count the new value
373     * @return true is successful
374     */
375     public final boolean compareAndSetPendingCount(int expected, int count) {
376     return U.compareAndSwapInt(this, PENDING, expected, count);
377     }
378    
379     /**
380     * If the pending count is nonzero, decrements the count;
381     * otherwise invokes {@link #onCompletion} and then similarly
382     * tries to complete this task's completer, if one exists,
383     * else marks this task as complete.
384     */
385     public final void tryComplete() {
386 dl 1.3 CountedCompleter<?> a = this, s = a;
387 dl 1.2 for (int c;;) {
388 dl 1.1 if ((c = a.pending) == 0) {
389     a.onCompletion(s);
390     if ((a = (s = a).completer) == null) {
391     s.quietlyComplete();
392     return;
393     }
394     }
395     else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
396     return;
397     }
398     }
399    
400     /**
401     * Regardless of pending count, invokes {@link #onCompletion},
402 dl 1.3 * marks this task as complete and further triggers {@link
403     * #tryComplete} on this task's completer, if one exists. This
404     * method may be useful when forcing completion as soon as any one
405     * (versus all) of several subtask results are obtained. The
406     * given rawResult is used as an argument to {@link #setRawResult}
407     * before marking this task as complete; its value is meaningful
408     * only for classes overriding {@code setRawResult}.
409 dl 1.1 *
410 dl 1.3 * @param rawResult the raw result
411 dl 1.1 */
412 dl 1.3 public void complete(T rawResult) {
413     CountedCompleter<?> p;
414 dl 1.1 onCompletion(this);
415 dl 1.3 setRawResult(rawResult);
416 dl 1.1 quietlyComplete();
417     if ((p = completer) != null)
418     p.tryComplete();
419     }
420    
421     /**
422 dl 1.2 * Support for FJT exception propagation
423     */
424     void internalPropagateException(Throwable ex) {
425 dl 1.3 CountedCompleter<?> a = this, s = a;
426 dl 1.2 while (a.onExceptionalCompletion(ex, s) &&
427     (a = (s = a).completer) != null && a.status >= 0)
428     a.recordExceptionalCompletion(ex);
429     }
430    
431     /**
432 dl 1.1 * Implements execution conventions for CountedCompleters
433     */
434     protected final boolean exec() {
435     compute();
436     return false;
437     }
438    
439     /**
440 dl 1.3 * Returns the result of the computation. By default
441     * returns {@code null}, which is appropriate for {@code Void}
442     * actions, but in other cases should be overridden.
443 dl 1.1 *
444 dl 1.3 * @return the result of the computation
445 dl 1.1 */
446 dl 1.3 public T getRawResult() { return null; }
447 dl 1.1
448     /**
449 dl 1.3 * A method that result-bearing CountedCompleters may optionally
450     * use to help maintain result data. By default, does nothing.
451 dl 1.1 */
452 dl 1.3 protected void setRawResult(T t) { }
453 dl 1.1
454     // Unsafe mechanics
455     private static final sun.misc.Unsafe U;
456     private static final long PENDING;
457     static {
458     try {
459     U = getUnsafe();
460     PENDING = U.objectFieldOffset
461     (CountedCompleter.class.getDeclaredField("pending"));
462     } catch (Exception e) {
463     throw new Error(e);
464     }
465     }
466    
467    
468     /**
469     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
470     * Replace with a simple call to Unsafe.getUnsafe when integrating
471     * into a jdk.
472     *
473     * @return a sun.misc.Unsafe
474     */
475     private static sun.misc.Unsafe getUnsafe() {
476     try {
477     return sun.misc.Unsafe.getUnsafe();
478     } catch (SecurityException se) {
479     try {
480     return java.security.AccessController.doPrivileged
481     (new java.security
482     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
483     public sun.misc.Unsafe run() throws Exception {
484     java.lang.reflect.Field f = sun.misc
485     .Unsafe.class.getDeclaredField("theUnsafe");
486     f.setAccessible(true);
487     return (sun.misc.Unsafe) f.get(null);
488     }});
489     } catch (java.security.PrivilegedActionException e) {
490     throw new RuntimeException("Could not initialize intrinsics",
491     e.getCause());
492     }
493     }
494     }
495    
496     }