ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
Revision: 1.2
Committed: Sat Apr 21 11:45:20 2012 UTC (12 years ago) by dl
Branch: MAIN
Changes since 1.1: +52 -21 lines
Log Message:
add CountedCompleter.onExceptionalCompletion

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166y;
8    
9     /**
10     * A resultless {@link ForkJoinTask} with a completion action
11     * performed when triggered and there are no remaining pending
12     * actions. Uses of CountedCompleter are similar to those of other
13     * completion based components (such as {@link
14     * java.nio.channels.CompletionHandler}) except that multiple
15     * <em>pending</em> completions may be necessary to trigger the {@link
16     * #onCompletion} action, not just one. Unless initialized otherwise,
17     * the {@link #getPendingCount pending count} starts at zero, but may
18     * be (atomically) changed using methods {@link #setPendingCount},
19     * {@link #addToPendingCount}, and {@link
20     * #compareAndSetPendingCount}. Upon invocation of {@link
21     * #tryComplete}, if the pending action count is nonzero, it is
22     * decremented; otherwise, the completion action is performed, and if
23     * this completer itself has a completer, the process is continued
24 dl 1.2 * with its completer. As is the case with related synchronization
25     * components such as {@link Phaser} and {@link
26     * java.util.concurrent.Semaphore} these methods affect only internal
27     * counts; they do not establish any further internal bookkeeping. In
28     * particular, the identities of pending tasks are not maintained. As
29     * illustrated below, you can create subclasses that do record some or
30     * all pended tasks or their results when needed.
31 dl 1.1 *
32     * <p>A concrete CountedCompleter class must define method {@link
33     * #compute}, that should, in almost all use cases, invoke {@code
34 dl 1.2 * tryComplete()} once before returning. The class may also optionally
35 dl 1.1 * override method {@link #onCompletion} to perform an action upon
36 dl 1.2 * normal completion, and method {@link #onExceptionalCompletion} to
37     * perform an action upon any exception.
38 dl 1.1 *
39     * <p>A CountedCompleter that does not itself have a completer (i.e.,
40     * one for which {@link #getCompleter} returns {@code null}) can be
41     * used as a regular ForkJoinTask with this added functionality.
42     * However, any completer that in turn has another completer serves
43     * only as an internal helper for other computations, so its own task
44     * status (as reported in methods such as {@link ForkJoinTask#isDone})
45     * is arbitrary; this status changes only upon explicit invocations of
46     * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
47     * ForkJoinTask#completeExceptionally} or upon exceptional completion
48     * of method {@code compute}. Upon any exceptional completion, the
49 dl 1.2 * exception may be relayed to a task's completer (and its completer,
50     * and so on), if one exists and it has not otherwise already
51     * completed.
52 dl 1.1 *
53     * <p><b>Sample Usages.</b>
54     *
55     * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
56     * be arranged in trees similar to those often used with {@link
57     * RecursiveAction}s, although the constructions involved in setting
58     * them up typically vary. Even though they entail a bit more
59     * bookkeeping, CountedCompleters may be better choices when applying
60     * a possibly time-consuming operation (that cannot be further
61     * subdivided) to each element of an array or collection; especially
62     * when the operation takes a significantly different amount of time
63     * to complete for some elements than others, either because of
64     * intrinsic variation (for example IO) or auxiliary effects such as
65     * garbage collection. Because CountedCompleters provide their own
66     * continuations, other threads need not block waiting to perform
67     * them.
68     *
69     * <p> For example, here is an initial version of a class that uses
70     * divide-by-two recursive decomposition to divide work into single
71     * pieces (leaf tasks). Even when work is split into individual calls,
72     * tree-based techniques are usually preferable to directly forking
73     * leaf tasks, because they reduce inter-thread communication and
74     * improve load balancing. In the recursive case, the second of each
75     * pair of subtasks to finish triggers completion of its parent
76     * (because no result combination is performed, the default no-op
77     * implementation of method {@code onCompletion} is not overridden). A
78     * static utility method sets up the base task and invokes it:
79     *
80     * <pre> {@code
81     * class MyOperation<E> { void apply(E e) { ... } }
82     *
83     * class ForEach<E> extends CountedCompleter {
84     *
85     * public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
86     * pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
87     * }
88     *
89     * final E[] array; final MyOperation<E> op; final int lo, hi;
90     * ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
91     * super(p);
92     * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
93     * }
94     *
95     * public void compute() { // version 1
96     * if (hi - lo >= 2) {
97     * int mid = (lo + hi) >>> 1;
98     * setPendingCount(2); // must set pending count before fork
99     * new ForEach(this, array, op, mid, hi).fork(); // right child
100     * new ForEach(this, array, op, lo, mid).fork(); // left child
101     * }
102     * else if (hi > lo)
103     * op.apply(array[lo]);
104     * tryComplete();
105     * }
106     * } }</pre>
107     *
108     * This design can be improved by noticing that in the recursive case,
109     * the task has nothing to do after forking its right task, so can
110     * directly invoke its left task before returning. (This is an analog
111     * of tail recursion removal.) Also, because the task returns upon
112     * executing its left task (rather than falling through to invoke
113     * tryComplete) the pending count is set to one:
114     *
115     * <pre> {@code
116     * class ForEach<E> ...
117     * public void compute() { // version 2
118     * if (hi - lo >= 2) {
119     * int mid = (lo + hi) >>> 1;
120     * setPendingCount(1); // only one pending
121     * new ForEach(this, array, op, mid, hi).fork(); // right child
122     * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
123     * }
124     * else {
125     * if (hi > lo)
126     * op.apply(array[lo]);
127     * tryComplete();
128     * }
129     * }
130     * }</pre>
131     *
132     * As a further improvement, notice that the left task need not even
133     * exist. Instead of creating a new one, we can iterate using the
134     * original task, and add a pending count for each fork:
135     *
136     * <pre> {@code
137     * class ForEach<E> ...
138     * public void compute() { // version 3
139     * int l = lo, h = hi;
140     * while (h - l >= 2) {
141     * int mid = (l + h) >>> 1;
142     * addToPendingCount(1);
143     * new ForEach(this, array, op, mid, h).fork(); // right child
144     * h = mid;
145     * }
146     * if (h > l)
147     * op.apply(array[l]);
148     * tryComplete();
149     * }
150     * }</pre>
151     *
152     * Additional improvements of such classes might entail precomputing
153     * pending counts so that they can be established in constructors,
154     * specializing classes for leaf steps, subdividing by say, four,
155     * instead of two per iteration, and using an adaptive threshold
156     * instead of always subdividing down to single elements.
157     *
158     * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
159     * results of multiple subtasks usually need to access these results
160     * in method {@link #onCompletion}. As illustrated in the following
161     * class (that performs a simplified form of map-reduce where mappings
162     * and reductions are all of type {@code E}), one way to do this in
163     * divide and conquer designs is to have each subtask record its
164     * sibling, so that it can be accessed in method {@code onCompletion}.
165     * For clarity, this class uses explicit left and right subtasks, but
166     * variants of other streamlinings seen in the above example may also
167     * apply.
168     *
169     * <pre> {@code
170     * class MyMapper<E> { E apply(E v) { ... } }
171     * class MyReducer<E> { E apply(E x, E y) { ... } }
172     * class MapReducer<E> extends CountedCompleter {
173     * final E[] array; final MyMapper<E> mapper;
174     * final MyReducer<E> reducer; final int lo, hi;
175     * MapReducer sibling;
176     * E result;
177     * MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
178     * MyReducer<E> reducer, int lo, int hi) {
179     * super(p);
180     * this.array = array; this.mapper = mapper;
181     * this.reducer = reducer; this.lo = lo; this.hi = hi;
182     * }
183     * public void compute() {
184     * if (hi - lo >= 2) {
185     * int mid = (lo + hi) >>> 1;
186     * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
187     * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
188     * left.sibling = right;
189     * right.sibling = left;
190     * setPendingCount(1); // only right is pending
191     * right.fork();
192     * left.compute(); // directly execute left
193     * }
194     * else {
195     * if (hi > lo)
196     * result = mapper.apply(array[lo]);
197     * tryComplete();
198     * }
199     * }
200     * public void onCompletion(CountedCompleter caller) {
201     * if (caller != this) {
202     * MapReducer<E> child = (MapReducer<E>)caller;
203     * MapReducer<E> sib = child.sibling;
204     * if (sib == null || sib.result == null)
205     * result = child.result;
206     * else
207     * result = reducer.apply(child.result, sib.result);
208     * }
209     * }
210     *
211     * public static <E> E mapReduce(ForkJoinPool pool, E[] array,
212     * MyMapper<E> mapper, MyReducer<E> reducer) {
213     * MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
214     * reducer, 0, array.length);
215     * pool.invoke(mr);
216     * return mr.result;
217     * }
218     * } }</pre>
219     *
220     * <p><b>Triggers.</b> Some CountedCompleters are themselves never
221     * forked, but instead serve as bits of plumbing in other designs;
222     * including those in which the completion of one of more async tasks
223     * triggers another async task. For example:
224     *
225     * <pre> {@code
226     * class HeaderBuilder extends CountedCompleter { ... }
227     * class BodyBuilder extends CountedCompleter { ... }
228     * class PacketSender extends CountedCompleter {
229     * PacketSender(...) { super(null, 1); ... } // trigger on second completion
230     * public void compute() { } // never called
231     * public void onCompletion(CountedCompleter caller) { sendPacket(); }
232     * }
233     * // sample use:
234     * PacketSender p = new PacketSender();
235     * new HeaderBuilder(p, ...).fork();
236     * new BodyBuilder(p, ...).fork();
237     * }</pre>
238     *
239     * @since 1.8
240     * @author Doug Lea
241     */
242     public abstract class CountedCompleter extends ForkJoinTask<Void> {
243 dl 1.2 private static final long serialVersionUID = 5232453752276485070L;
244    
245 dl 1.1 /** This task's completer, or null if none */
246     final CountedCompleter completer;
247     /** The number of pending tasks until completion */
248     volatile int pending;
249    
250     /**
251     * Creates a new CountedCompleter with the given completer
252     * and initial pending count.
253     *
254     * @param completer this tasks completer, or {@code null} if none
255     * @param initialPendingCount the initial pending count
256     */
257     protected CountedCompleter(CountedCompleter completer,
258     int initialPendingCount) {
259     this.completer = completer;
260     this.pending = initialPendingCount;
261     }
262    
263     /**
264     * Creates a new CountedCompleter with the given completer
265     * and an initial pending count of zero.
266     *
267     * @param completer this tasks completer, or {@code null} if none
268     */
269     protected CountedCompleter(CountedCompleter completer) {
270     this.completer = completer;
271     }
272    
273     /**
274     * Creates a new CountedCompleter with no completer
275     * and an initial pending count of zero.
276     */
277     protected CountedCompleter() {
278     this.completer = null;
279     }
280    
281     /**
282     * The main computation performed by this task.
283     */
284     public abstract void compute();
285    
286     /**
287 dl 1.2 * Performs an action when method {@link #tryComplete} is invoked
288     * and there are no pending counts, or when the unconditional
289     * method {@link #complete} is invoked. By default, this method
290     * does nothing.
291 dl 1.1 *
292     * @param caller the task invoking this method (which may
293     * be this task itself).
294     */
295     public void onCompletion(CountedCompleter caller) {
296     }
297    
298     /**
299 dl 1.2 * Performs an action when method {@link #completeExceptionally}
300     * is invoked or method {@link #compute} throws an exception, and
301     * this task has not otherwise already completed normally. On
302     * entry to this method, this task {@link
303     * ForkJoinTask#isCompletedAbnormally}. The return value of this
304     * method controls further propagation: If {@code true} and this
305     * task has a completer, then this completer is also completed
306     * exceptionally. The default implementation of this method does
307     * nothing except return {@code true}.
308     *
309     * @param ex the exception
310     * @param caller the task invoking this method (which may
311     * be this task itself).
312     * @return true if this exception should be propagated to this
313     * tasks completer, if one exists.
314     */
315     public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
316     return true;
317     }
318    
319     /**
320 dl 1.1 * Returns the completer established in this task's constructor,
321     * or {@code null} if none.
322     *
323     * @return the completer
324     */
325     public final CountedCompleter getCompleter() {
326     return completer;
327     }
328    
329     /**
330     * Returns the current pending count.
331     *
332     * @return the current pending count
333     */
334     public final int getPendingCount() {
335     return pending;
336     }
337    
338     /**
339     * Sets the pending count to the given value.
340     *
341     * @param count the count
342     */
343     public final void setPendingCount(int count) {
344     pending = count;
345     }
346    
347     /**
348     * Adds (atomically) the given value to the pending count.
349     *
350     * @param delta the value to add
351     */
352     public final void addToPendingCount(int delta) {
353     int c; // note: can replace with intrinsic in jdk8
354     do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
355     }
356    
357     /**
358     * Sets (atomically) the pending count to the given count only if
359     * it currently holds the given expected value.
360     *
361     * @param expected the expected value
362     * @param count the new value
363     * @return true is successful
364     */
365     public final boolean compareAndSetPendingCount(int expected, int count) {
366     return U.compareAndSwapInt(this, PENDING, expected, count);
367     }
368    
369     /**
370     * If the pending count is nonzero, decrements the count;
371     * otherwise invokes {@link #onCompletion} and then similarly
372     * tries to complete this task's completer, if one exists,
373     * else marks this task as complete.
374     */
375     public final void tryComplete() {
376 dl 1.2 CountedCompleter a = this, s = a;
377     for (int c;;) {
378 dl 1.1 if ((c = a.pending) == 0) {
379     a.onCompletion(s);
380     if ((a = (s = a).completer) == null) {
381     s.quietlyComplete();
382     return;
383     }
384     }
385     else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
386     return;
387     }
388     }
389    
390     /**
391     * Regardless of pending count, invokes {@link #onCompletion},
392     * marks this task as complete with a {@code null} return value,
393     * and further triggers {@link #tryComplete} on this task's
394     * completer, if one exists. This method may be useful when
395     * forcing completion as soon as any one (versus all) of several
396     * subtask results are obtained.
397     *
398     * @param mustBeNull the {@code null} completion value
399     */
400     public void complete(Void mustBeNull) {
401     CountedCompleter p;
402     onCompletion(this);
403     quietlyComplete();
404     if ((p = completer) != null)
405     p.tryComplete();
406     }
407    
408     /**
409 dl 1.2 * Support for FJT exception propagation
410     */
411     void internalPropagateException(Throwable ex) {
412     CountedCompleter a = this, s = a;
413     while (a.onExceptionalCompletion(ex, s) &&
414     (a = (s = a).completer) != null && a.status >= 0)
415     a.recordExceptionalCompletion(ex);
416     }
417    
418     /**
419 dl 1.1 * Implements execution conventions for CountedCompleters
420     */
421     protected final boolean exec() {
422     compute();
423     return false;
424     }
425    
426     /**
427     * Always returns {@code null}.
428     *
429     * @return {@code null} always
430     */
431     public final Void getRawResult() { return null; }
432    
433     /**
434     * Requires null completion value.
435     */
436     protected final void setRawResult(Void mustBeNull) { }
437    
438     // Unsafe mechanics
439     private static final sun.misc.Unsafe U;
440     private static final long PENDING;
441     static {
442     try {
443     U = getUnsafe();
444     PENDING = U.objectFieldOffset
445     (CountedCompleter.class.getDeclaredField("pending"));
446     } catch (Exception e) {
447     throw new Error(e);
448     }
449     }
450    
451    
452     /**
453     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
454     * Replace with a simple call to Unsafe.getUnsafe when integrating
455     * into a jdk.
456     *
457     * @return a sun.misc.Unsafe
458     */
459     private static sun.misc.Unsafe getUnsafe() {
460     try {
461     return sun.misc.Unsafe.getUnsafe();
462     } catch (SecurityException se) {
463     try {
464     return java.security.AccessController.doPrivileged
465     (new java.security
466     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
467     public sun.misc.Unsafe run() throws Exception {
468     java.lang.reflect.Field f = sun.misc
469     .Unsafe.class.getDeclaredField("theUnsafe");
470     f.setAccessible(true);
471     return (sun.misc.Unsafe) f.get(null);
472     }});
473     } catch (java.security.PrivilegedActionException e) {
474     throw new RuntimeException("Could not initialize intrinsics",
475     e.getCause());
476     }
477     }
478     }
479    
480     }