ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CountedCompleter.java
Revision: 1.3
Committed: Mon Aug 13 15:50:31 2012 UTC (11 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.2: +10 -7 lines
Log Message:
Merge in jsr166e

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     /**
10     * A resultless {@link ForkJoinTask} with a completion action
11     * performed when triggered and there are no remaining pending
12     * actions. Uses of CountedCompleter are similar to those of other
13     * completion based components (such as {@link
14     * java.nio.channels.CompletionHandler}) except that multiple
15     * <em>pending</em> completions may be necessary to trigger the {@link
16     * #onCompletion} action, not just one. Unless initialized otherwise,
17     * the {@link #getPendingCount pending count} starts at zero, but may
18     * be (atomically) changed using methods {@link #setPendingCount},
19     * {@link #addToPendingCount}, and {@link
20     * #compareAndSetPendingCount}. Upon invocation of {@link
21     * #tryComplete}, if the pending action count is nonzero, it is
22     * decremented; otherwise, the completion action is performed, and if
23     * this completer itself has a completer, the process is continued
24 dl 1.2 * with its completer. As is the case with related synchronization
25     * components such as {@link Phaser} and {@link
26     * java.util.concurrent.Semaphore} these methods affect only internal
27     * counts; they do not establish any further internal bookkeeping. In
28     * particular, the identities of pending tasks are not maintained. As
29     * illustrated below, you can create subclasses that do record some or
30     * all pended tasks or their results when needed.
31 dl 1.1 *
32     * <p>A concrete CountedCompleter class must define method {@link
33     * #compute}, that should, in almost all use cases, invoke {@code
34 dl 1.2 * tryComplete()} once before returning. The class may also optionally
35 dl 1.1 * override method {@link #onCompletion} to perform an action upon
36 dl 1.2 * normal completion, and method {@link #onExceptionalCompletion} to
37     * perform an action upon any exception.
38 dl 1.1 *
39     * <p>A CountedCompleter that does not itself have a completer (i.e.,
40     * one for which {@link #getCompleter} returns {@code null}) can be
41     * used as a regular ForkJoinTask with this added functionality.
42     * However, any completer that in turn has another completer serves
43     * only as an internal helper for other computations, so its own task
44     * status (as reported in methods such as {@link ForkJoinTask#isDone})
45     * is arbitrary; this status changes only upon explicit invocations of
46     * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
47     * ForkJoinTask#completeExceptionally} or upon exceptional completion
48     * of method {@code compute}. Upon any exceptional completion, the
49 dl 1.2 * exception may be relayed to a task's completer (and its completer,
50     * and so on), if one exists and it has not otherwise already
51     * completed.
52 dl 1.1 *
53     * <p><b>Sample Usages.</b>
54     *
55     * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
56     * be arranged in trees similar to those often used with {@link
57     * RecursiveAction}s, although the constructions involved in setting
58     * them up typically vary. Even though they entail a bit more
59     * bookkeeping, CountedCompleters may be better choices when applying
60     * a possibly time-consuming operation (that cannot be further
61     * subdivided) to each element of an array or collection; especially
62     * when the operation takes a significantly different amount of time
63     * to complete for some elements than others, either because of
64     * intrinsic variation (for example IO) or auxiliary effects such as
65     * garbage collection. Because CountedCompleters provide their own
66     * continuations, other threads need not block waiting to perform
67     * them.
68     *
69     * <p> For example, here is an initial version of a class that uses
70     * divide-by-two recursive decomposition to divide work into single
71     * pieces (leaf tasks). Even when work is split into individual calls,
72     * tree-based techniques are usually preferable to directly forking
73     * leaf tasks, because they reduce inter-thread communication and
74     * improve load balancing. In the recursive case, the second of each
75     * pair of subtasks to finish triggers completion of its parent
76     * (because no result combination is performed, the default no-op
77     * implementation of method {@code onCompletion} is not overridden). A
78     * static utility method sets up the base task and invokes it:
79     *
80     * <pre> {@code
81     * class MyOperation<E> { void apply(E e) { ... } }
82     *
83     * class ForEach<E> extends CountedCompleter {
84     *
85     * public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
86     * pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
87     * }
88     *
89     * final E[] array; final MyOperation<E> op; final int lo, hi;
90     * ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
91     * super(p);
92     * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
93     * }
94     *
95     * public void compute() { // version 1
96     * if (hi - lo >= 2) {
97     * int mid = (lo + hi) >>> 1;
98     * setPendingCount(2); // must set pending count before fork
99     * new ForEach(this, array, op, mid, hi).fork(); // right child
100     * new ForEach(this, array, op, lo, mid).fork(); // left child
101     * }
102     * else if (hi > lo)
103     * op.apply(array[lo]);
104     * tryComplete();
105     * }
106     * } }</pre>
107     *
108     * This design can be improved by noticing that in the recursive case,
109     * the task has nothing to do after forking its right task, so can
110     * directly invoke its left task before returning. (This is an analog
111     * of tail recursion removal.) Also, because the task returns upon
112     * executing its left task (rather than falling through to invoke
113     * tryComplete) the pending count is set to one:
114     *
115     * <pre> {@code
116     * class ForEach<E> ...
117     * public void compute() { // version 2
118     * if (hi - lo >= 2) {
119     * int mid = (lo + hi) >>> 1;
120     * setPendingCount(1); // only one pending
121     * new ForEach(this, array, op, mid, hi).fork(); // right child
122     * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
123     * }
124     * else {
125     * if (hi > lo)
126     * op.apply(array[lo]);
127     * tryComplete();
128     * }
129     * }
130     * }</pre>
131     *
132     * As a further improvement, notice that the left task need not even
133     * exist. Instead of creating a new one, we can iterate using the
134     * original task, and add a pending count for each fork:
135     *
136     * <pre> {@code
137     * class ForEach<E> ...
138     * public void compute() { // version 3
139     * int l = lo, h = hi;
140     * while (h - l >= 2) {
141     * int mid = (l + h) >>> 1;
142     * addToPendingCount(1);
143     * new ForEach(this, array, op, mid, h).fork(); // right child
144     * h = mid;
145     * }
146     * if (h > l)
147     * op.apply(array[l]);
148     * tryComplete();
149     * }
150     * }</pre>
151     *
152     * Additional improvements of such classes might entail precomputing
153     * pending counts so that they can be established in constructors,
154     * specializing classes for leaf steps, subdividing by say, four,
155     * instead of two per iteration, and using an adaptive threshold
156     * instead of always subdividing down to single elements.
157     *
158     * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
159     * results of multiple subtasks usually need to access these results
160     * in method {@link #onCompletion}. As illustrated in the following
161     * class (that performs a simplified form of map-reduce where mappings
162     * and reductions are all of type {@code E}), one way to do this in
163     * divide and conquer designs is to have each subtask record its
164     * sibling, so that it can be accessed in method {@code onCompletion}.
165     * For clarity, this class uses explicit left and right subtasks, but
166     * variants of other streamlinings seen in the above example may also
167     * apply.
168     *
169     * <pre> {@code
170     * class MyMapper<E> { E apply(E v) { ... } }
171     * class MyReducer<E> { E apply(E x, E y) { ... } }
172     * class MapReducer<E> extends CountedCompleter {
173     * final E[] array; final MyMapper<E> mapper;
174     * final MyReducer<E> reducer; final int lo, hi;
175 dl 1.3 * MapReducer leftSibling, rightSibling;
176 dl 1.1 * E result;
177     * MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
178     * MyReducer<E> reducer, int lo, int hi) {
179     * super(p);
180     * this.array = array; this.mapper = mapper;
181     * this.reducer = reducer; this.lo = lo; this.hi = hi;
182     * }
183     * public void compute() {
184     * if (hi - lo >= 2) {
185     * int mid = (lo + hi) >>> 1;
186     * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
187     * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
188 dl 1.3 * left.rightSibling = right;
189     * right.leftSibling = left;
190 dl 1.1 * setPendingCount(1); // only right is pending
191     * right.fork();
192     * left.compute(); // directly execute left
193     * }
194     * else {
195     * if (hi > lo)
196     * result = mapper.apply(array[lo]);
197     * tryComplete();
198     * }
199     * }
200     * public void onCompletion(CountedCompleter caller) {
201     * if (caller != this) {
202     * MapReducer<E> child = (MapReducer<E>)caller;
203 dl 1.3 * MapReducer<E> left = (t.leftSibling == null) ? t : t.leftSibling;
204     * MapReducer<E> right = (t.rightSibling == null) ? t : t.rightSibling;
205     * if (left == null)
206     * result = right.result;
207     * else if (right == null)
208     * result = left.result;
209 dl 1.1 * else
210 dl 1.3 * result = reducer.apply(left.result, right.result);
211 dl 1.1 * }
212     * }
213     *
214     * public static <E> E mapReduce(ForkJoinPool pool, E[] array,
215     * MyMapper<E> mapper, MyReducer<E> reducer) {
216     * MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
217     * reducer, 0, array.length);
218     * pool.invoke(mr);
219     * return mr.result;
220     * }
221     * } }</pre>
222     *
223     * <p><b>Triggers.</b> Some CountedCompleters are themselves never
224     * forked, but instead serve as bits of plumbing in other designs;
225     * including those in which the completion of one of more async tasks
226     * triggers another async task. For example:
227     *
228     * <pre> {@code
229     * class HeaderBuilder extends CountedCompleter { ... }
230     * class BodyBuilder extends CountedCompleter { ... }
231     * class PacketSender extends CountedCompleter {
232     * PacketSender(...) { super(null, 1); ... } // trigger on second completion
233     * public void compute() { } // never called
234     * public void onCompletion(CountedCompleter caller) { sendPacket(); }
235     * }
236     * // sample use:
237     * PacketSender p = new PacketSender();
238     * new HeaderBuilder(p, ...).fork();
239     * new BodyBuilder(p, ...).fork();
240     * }</pre>
241     *
242     * @since 1.8
243     * @author Doug Lea
244     */
245     public abstract class CountedCompleter extends ForkJoinTask<Void> {
246 dl 1.2 private static final long serialVersionUID = 5232453752276485070L;
247    
248 dl 1.1 /** This task's completer, or null if none */
249     final CountedCompleter completer;
250     /** The number of pending tasks until completion */
251     volatile int pending;
252    
253     /**
254     * Creates a new CountedCompleter with the given completer
255     * and initial pending count.
256     *
257     * @param completer this tasks completer, or {@code null} if none
258     * @param initialPendingCount the initial pending count
259     */
260     protected CountedCompleter(CountedCompleter completer,
261     int initialPendingCount) {
262     this.completer = completer;
263     this.pending = initialPendingCount;
264     }
265    
266     /**
267     * Creates a new CountedCompleter with the given completer
268     * and an initial pending count of zero.
269     *
270     * @param completer this tasks completer, or {@code null} if none
271     */
272     protected CountedCompleter(CountedCompleter completer) {
273     this.completer = completer;
274     }
275    
276     /**
277     * Creates a new CountedCompleter with no completer
278     * and an initial pending count of zero.
279     */
280     protected CountedCompleter() {
281     this.completer = null;
282     }
283    
284     /**
285     * The main computation performed by this task.
286     */
287     public abstract void compute();
288    
289     /**
290 dl 1.2 * Performs an action when method {@link #tryComplete} is invoked
291     * and there are no pending counts, or when the unconditional
292     * method {@link #complete} is invoked. By default, this method
293     * does nothing.
294 dl 1.1 *
295     * @param caller the task invoking this method (which may
296     * be this task itself).
297     */
298     public void onCompletion(CountedCompleter caller) {
299     }
300    
301     /**
302 dl 1.2 * Performs an action when method {@link #completeExceptionally}
303     * is invoked or method {@link #compute} throws an exception, and
304     * this task has not otherwise already completed normally. On
305     * entry to this method, this task {@link
306     * ForkJoinTask#isCompletedAbnormally}. The return value of this
307     * method controls further propagation: If {@code true} and this
308     * task has a completer, then this completer is also completed
309     * exceptionally. The default implementation of this method does
310     * nothing except return {@code true}.
311     *
312     * @param ex the exception
313     * @param caller the task invoking this method (which may
314     * be this task itself).
315     * @return true if this exception should be propagated to this
316     * tasks completer, if one exists.
317     */
318     public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
319     return true;
320     }
321    
322     /**
323 dl 1.1 * Returns the completer established in this task's constructor,
324     * or {@code null} if none.
325     *
326     * @return the completer
327     */
328     public final CountedCompleter getCompleter() {
329     return completer;
330     }
331    
332     /**
333     * Returns the current pending count.
334     *
335     * @return the current pending count
336     */
337     public final int getPendingCount() {
338     return pending;
339     }
340    
341     /**
342     * Sets the pending count to the given value.
343     *
344     * @param count the count
345     */
346     public final void setPendingCount(int count) {
347     pending = count;
348     }
349    
350     /**
351     * Adds (atomically) the given value to the pending count.
352     *
353     * @param delta the value to add
354     */
355     public final void addToPendingCount(int delta) {
356     int c; // note: can replace with intrinsic in jdk8
357     do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
358     }
359    
360     /**
361     * Sets (atomically) the pending count to the given count only if
362     * it currently holds the given expected value.
363     *
364     * @param expected the expected value
365     * @param count the new value
366     * @return true is successful
367     */
368     public final boolean compareAndSetPendingCount(int expected, int count) {
369     return U.compareAndSwapInt(this, PENDING, expected, count);
370     }
371    
372     /**
373     * If the pending count is nonzero, decrements the count;
374     * otherwise invokes {@link #onCompletion} and then similarly
375     * tries to complete this task's completer, if one exists,
376     * else marks this task as complete.
377     */
378     public final void tryComplete() {
379 dl 1.2 CountedCompleter a = this, s = a;
380     for (int c;;) {
381 dl 1.1 if ((c = a.pending) == 0) {
382     a.onCompletion(s);
383     if ((a = (s = a).completer) == null) {
384     s.quietlyComplete();
385     return;
386     }
387     }
388     else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
389     return;
390     }
391     }
392    
393     /**
394     * Regardless of pending count, invokes {@link #onCompletion},
395     * marks this task as complete with a {@code null} return value,
396     * and further triggers {@link #tryComplete} on this task's
397     * completer, if one exists. This method may be useful when
398     * forcing completion as soon as any one (versus all) of several
399     * subtask results are obtained.
400     *
401     * @param mustBeNull the {@code null} completion value
402     */
403     public void complete(Void mustBeNull) {
404     CountedCompleter p;
405     onCompletion(this);
406     quietlyComplete();
407     if ((p = completer) != null)
408     p.tryComplete();
409     }
410    
411     /**
412 dl 1.2 * Support for FJT exception propagation
413     */
414     void internalPropagateException(Throwable ex) {
415     CountedCompleter a = this, s = a;
416     while (a.onExceptionalCompletion(ex, s) &&
417     (a = (s = a).completer) != null && a.status >= 0)
418     a.recordExceptionalCompletion(ex);
419     }
420    
421     /**
422 dl 1.1 * Implements execution conventions for CountedCompleters
423     */
424     protected final boolean exec() {
425     compute();
426     return false;
427     }
428    
429     /**
430     * Always returns {@code null}.
431     *
432     * @return {@code null} always
433     */
434     public final Void getRawResult() { return null; }
435    
436     /**
437     * Requires null completion value.
438     */
439     protected final void setRawResult(Void mustBeNull) { }
440    
441    
442     // Unsafe mechanics
443     private static final sun.misc.Unsafe U;
444     private static final long PENDING;
445     static {
446     try {
447     U = sun.misc.Unsafe.getUnsafe();
448     PENDING = U.objectFieldOffset
449     (CountedCompleter.class.getDeclaredField("pending"));
450     } catch (Exception e) {
451     throw new Error(e);
452     }
453     }
454     }