ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
Revision: 1.1
Committed: Mon Apr 9 13:12:18 2012 UTC (12 years ago) by dl
Branch: MAIN
Log Message:
Add CountedCompleter; improve tryHelpStealer

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166y;
8    
9     /**
10     * A resultless {@link ForkJoinTask} with a completion action
11     * performed when triggered and there are no remaining pending
12     * actions. Uses of CountedCompleter are similar to those of other
13     * completion based components (such as {@link
14     * java.nio.channels.CompletionHandler}) except that multiple
15     * <em>pending</em> completions may be necessary to trigger the {@link
16     * #onCompletion} action, not just one. Unless initialized otherwise,
17     * the {@link #getPendingCount pending count} starts at zero, but may
18     * be (atomically) changed using methods {@link #setPendingCount},
19     * {@link #addToPendingCount}, and {@link
20     * #compareAndSetPendingCount}. Upon invocation of {@link
21     * #tryComplete}, if the pending action count is nonzero, it is
22     * decremented; otherwise, the completion action is performed, and if
23     * this completer itself has a completer, the process is continued
24     * with its completer. As is the case with most basic synchronization
25     * constructs, these methods affect only internal counts; they do not
26     * establish any further internal bookkeeping. In particular, the
27     * identities of pending tasks are not maintained. As illustrated
28     * below, you can create subclasses that do record some or all pended
29     * tasks or their results when needed.
30     *
31     * <p>A concrete CountedCompleter class must define method {@link
32     * #compute}, that should, in almost all use cases, invoke {@code
33     * tryComplete()} before returning. The class may also optionally
34     * override method {@link #onCompletion} to perform an action upon
35     * normal completion.
36     *
37     * <p>A CountedCompleter that does not itself have a completer (i.e.,
38     * one for which {@link #getCompleter} returns {@code null}) can be
39     * used as a regular ForkJoinTask with this added functionality.
40     * However, any completer that in turn has another completer serves
41     * only as an internal helper for other computations, so its own task
42     * status (as reported in methods such as {@link ForkJoinTask#isDone})
43     * is arbitrary; this status changes only upon explicit invocations of
44     * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
45     * ForkJoinTask#completeExceptionally} or upon exceptional completion
46     * of method {@code compute}. Upon any exceptional completion, the
47     * exception is relayed to a task's completer (and its completer, and
48     * so on), if one exists and it has not otherwise already completed.
49     *
50     * <p><b>Sample Usages.</b>
51     *
52     * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
53     * be arranged in trees similar to those often used with {@link
54     * RecursiveAction}s, although the constructions involved in setting
55     * them up typically vary. Even though they entail a bit more
56     * bookkeeping, CountedCompleters may be better choices when applying
57     * a possibly time-consuming operation (that cannot be further
58     * subdivided) to each element of an array or collection; especially
59     * when the operation takes a significantly different amount of time
60     * to complete for some elements than others, either because of
61     * intrinsic variation (for example IO) or auxiliary effects such as
62     * garbage collection. Because CountedCompleters provide their own
63     * continuations, other threads need not block waiting to perform
64     * them.
65     *
66     * <p> For example, here is an initial version of a class that uses
67     * divide-by-two recursive decomposition to divide work into single
68     * pieces (leaf tasks). Even when work is split into individual calls,
69     * tree-based techniques are usually preferable to directly forking
70     * leaf tasks, because they reduce inter-thread communication and
71     * improve load balancing. In the recursive case, the second of each
72     * pair of subtasks to finish triggers completion of its parent
73     * (because no result combination is performed, the default no-op
74     * implementation of method {@code onCompletion} is not overridden). A
75     * static utility method sets up the base task and invokes it:
76     *
77     * <pre> {@code
78     * class MyOperation<E> { void apply(E e) { ... } }
79     *
80     * class ForEach<E> extends CountedCompleter {
81     *
82     * public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
83     * pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
84     * }
85     *
86     * final E[] array; final MyOperation<E> op; final int lo, hi;
87     * ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
88     * super(p);
89     * this.array = array; this.op = op; this.lo = lo; this.hi = hi;
90     * }
91     *
92     * public void compute() { // version 1
93     * if (hi - lo >= 2) {
94     * int mid = (lo + hi) >>> 1;
95     * setPendingCount(2); // must set pending count before fork
96     * new ForEach(this, array, op, mid, hi).fork(); // right child
97     * new ForEach(this, array, op, lo, mid).fork(); // left child
98     * }
99     * else if (hi > lo)
100     * op.apply(array[lo]);
101     * tryComplete();
102     * }
103     * } }</pre>
104     *
105     * This design can be improved by noticing that in the recursive case,
106     * the task has nothing to do after forking its right task, so can
107     * directly invoke its left task before returning. (This is an analog
108     * of tail recursion removal.) Also, because the task returns upon
109     * executing its left task (rather than falling through to invoke
110     * tryComplete) the pending count is set to one:
111     *
112     * <pre> {@code
113     * class ForEach<E> ...
114     * public void compute() { // version 2
115     * if (hi - lo >= 2) {
116     * int mid = (lo + hi) >>> 1;
117     * setPendingCount(1); // only one pending
118     * new ForEach(this, array, op, mid, hi).fork(); // right child
119     * new ForEach(this, array, op, lo, mid).compute(); // direct invoke
120     * }
121     * else {
122     * if (hi > lo)
123     * op.apply(array[lo]);
124     * tryComplete();
125     * }
126     * }
127     * }</pre>
128     *
129     * As a further improvement, notice that the left task need not even
130     * exist. Instead of creating a new one, we can iterate using the
131     * original task, and add a pending count for each fork:
132     *
133     * <pre> {@code
134     * class ForEach<E> ...
135     * public void compute() { // version 3
136     * int l = lo, h = hi;
137     * while (h - l >= 2) {
138     * int mid = (l + h) >>> 1;
139     * addToPendingCount(1);
140     * new ForEach(this, array, op, mid, h).fork(); // right child
141     * h = mid;
142     * }
143     * if (h > l)
144     * op.apply(array[l]);
145     * tryComplete();
146     * }
147     * }</pre>
148     *
149     * Additional improvements of such classes might entail precomputing
150     * pending counts so that they can be established in constructors,
151     * specializing classes for leaf steps, subdividing by say, four,
152     * instead of two per iteration, and using an adaptive threshold
153     * instead of always subdividing down to single elements.
154     *
155     * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
156     * results of multiple subtasks usually need to access these results
157     * in method {@link #onCompletion}. As illustrated in the following
158     * class (that performs a simplified form of map-reduce where mappings
159     * and reductions are all of type {@code E}), one way to do this in
160     * divide and conquer designs is to have each subtask record its
161     * sibling, so that it can be accessed in method {@code onCompletion}.
162     * For clarity, this class uses explicit left and right subtasks, but
163     * variants of other streamlinings seen in the above example may also
164     * apply.
165     *
166     * <pre> {@code
167     * class MyMapper<E> { E apply(E v) { ... } }
168     * class MyReducer<E> { E apply(E x, E y) { ... } }
169     * class MapReducer<E> extends CountedCompleter {
170     * final E[] array; final MyMapper<E> mapper;
171     * final MyReducer<E> reducer; final int lo, hi;
172     * MapReducer sibling;
173     * E result;
174     * MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
175     * MyReducer<E> reducer, int lo, int hi) {
176     * super(p);
177     * this.array = array; this.mapper = mapper;
178     * this.reducer = reducer; this.lo = lo; this.hi = hi;
179     * }
180     * public void compute() {
181     * if (hi - lo >= 2) {
182     * int mid = (lo + hi) >>> 1;
183     * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
184     * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
185     * left.sibling = right;
186     * right.sibling = left;
187     * setPendingCount(1); // only right is pending
188     * right.fork();
189     * left.compute(); // directly execute left
190     * }
191     * else {
192     * if (hi > lo)
193     * result = mapper.apply(array[lo]);
194     * tryComplete();
195     * }
196     * }
197     * public void onCompletion(CountedCompleter caller) {
198     * if (caller != this) {
199     * MapReducer<E> child = (MapReducer<E>)caller;
200     * MapReducer<E> sib = child.sibling;
201     * if (sib == null || sib.result == null)
202     * result = child.result;
203     * else
204     * result = reducer.apply(child.result, sib.result);
205     * }
206     * }
207     *
208     * public static <E> E mapReduce(ForkJoinPool pool, E[] array,
209     * MyMapper<E> mapper, MyReducer<E> reducer) {
210     * MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
211     * reducer, 0, array.length);
212     * pool.invoke(mr);
213     * return mr.result;
214     * }
215     * } }</pre>
216     *
217     * <p><b>Triggers.</b> Some CountedCompleters are themselves never
218     * forked, but instead serve as bits of plumbing in other designs;
219     * including those in which the completion of one of more async tasks
220     * triggers another async task. For example:
221     *
222     * <pre> {@code
223     * class HeaderBuilder extends CountedCompleter { ... }
224     * class BodyBuilder extends CountedCompleter { ... }
225     * class PacketSender extends CountedCompleter {
226     * PacketSender(...) { super(null, 1); ... } // trigger on second completion
227     * public void compute() { } // never called
228     * public void onCompletion(CountedCompleter caller) { sendPacket(); }
229     * }
230     * // sample use:
231     * PacketSender p = new PacketSender();
232     * new HeaderBuilder(p, ...).fork();
233     * new BodyBuilder(p, ...).fork();
234     * }</pre>
235     *
236     * @since 1.8
237     * @author Doug Lea
238     */
239     public abstract class CountedCompleter extends ForkJoinTask<Void> {
240     /** This task's completer, or null if none */
241     final CountedCompleter completer;
242     /** The number of pending tasks until completion */
243     volatile int pending;
244    
245     /**
246     * Creates a new CountedCompleter with the given completer
247     * and initial pending count.
248     *
249     * @param completer this tasks completer, or {@code null} if none
250     * @param initialPendingCount the initial pending count
251     */
252     protected CountedCompleter(CountedCompleter completer,
253     int initialPendingCount) {
254     this.completer = completer;
255     this.pending = initialPendingCount;
256     }
257    
258     /**
259     * Creates a new CountedCompleter with the given completer
260     * and an initial pending count of zero.
261     *
262     * @param completer this tasks completer, or {@code null} if none
263     */
264     protected CountedCompleter(CountedCompleter completer) {
265     this.completer = completer;
266     }
267    
268     /**
269     * Creates a new CountedCompleter with no completer
270     * and an initial pending count of zero.
271     */
272     protected CountedCompleter() {
273     this.completer = null;
274     }
275    
276     /**
277     * The main computation performed by this task.
278     */
279     public abstract void compute();
280    
281     /**
282     * Executes the completion action when method {@link #tryComplete}
283     * is invoked and there are no pending counts, or when the
284     * unconditional method {@link #complete} is invoked. By default,
285     * this method does nothing.
286     *
287     * @param caller the task invoking this method (which may
288     * be this task itself).
289     */
290     public void onCompletion(CountedCompleter caller) {
291     }
292    
293     /**
294     * Returns the completer established in this task's constructor,
295     * or {@code null} if none.
296     *
297     * @return the completer
298     */
299     public final CountedCompleter getCompleter() {
300     return completer;
301     }
302    
303     /**
304     * Returns the current pending count.
305     *
306     * @return the current pending count
307     */
308     public final int getPendingCount() {
309     return pending;
310     }
311    
312     /**
313     * Sets the pending count to the given value.
314     *
315     * @param count the count
316     */
317     public final void setPendingCount(int count) {
318     pending = count;
319     }
320    
321     /**
322     * Adds (atomically) the given value to the pending count.
323     *
324     * @param delta the value to add
325     */
326     public final void addToPendingCount(int delta) {
327     int c; // note: can replace with intrinsic in jdk8
328     do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
329     }
330    
331     /**
332     * Sets (atomically) the pending count to the given count only if
333     * it currently holds the given expected value.
334     *
335     * @param expected the expected value
336     * @param count the new value
337     * @return true is successful
338     */
339     public final boolean compareAndSetPendingCount(int expected, int count) {
340     return U.compareAndSwapInt(this, PENDING, expected, count);
341     }
342    
343     /**
344     * If the pending count is nonzero, decrements the count;
345     * otherwise invokes {@link #onCompletion} and then similarly
346     * tries to complete this task's completer, if one exists,
347     * else marks this task as complete.
348     */
349     public final void tryComplete() {
350     for (CountedCompleter a = this, s = a;;) {
351     int c;
352     if ((c = a.pending) == 0) {
353     a.onCompletion(s);
354     if ((a = (s = a).completer) == null) {
355     s.quietlyComplete();
356     return;
357     }
358     }
359     else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
360     return;
361     }
362     }
363    
364     /**
365     * Regardless of pending count, invokes {@link #onCompletion},
366     * marks this task as complete with a {@code null} return value,
367     * and further triggers {@link #tryComplete} on this task's
368     * completer, if one exists. This method may be useful when
369     * forcing completion as soon as any one (versus all) of several
370     * subtask results are obtained.
371     *
372     * @param mustBeNull the {@code null} completion value
373     */
374     public void complete(Void mustBeNull) {
375     CountedCompleter p;
376     onCompletion(this);
377     quietlyComplete();
378     if ((p = completer) != null)
379     p.tryComplete();
380     }
381    
382     /**
383     * Implements execution conventions for CountedCompleters
384     */
385     protected final boolean exec() {
386     compute();
387     return false;
388     }
389    
390     /**
391     * Always returns {@code null}.
392     *
393     * @return {@code null} always
394     */
395     public final Void getRawResult() { return null; }
396    
397     /**
398     * Requires null completion value.
399     */
400     protected final void setRawResult(Void mustBeNull) { }
401    
402     /**
403     * Support for FJT exception propagation
404     */
405     final ForkJoinTask<?> internalGetCompleter() { return completer; }
406    
407     // Unsafe mechanics
408     private static final sun.misc.Unsafe U;
409     private static final long PENDING;
410     static {
411     try {
412     U = getUnsafe();
413     PENDING = U.objectFieldOffset
414     (CountedCompleter.class.getDeclaredField("pending"));
415     } catch (Exception e) {
416     throw new Error(e);
417     }
418     }
419    
420    
421     /**
422     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
423     * Replace with a simple call to Unsafe.getUnsafe when integrating
424     * into a jdk.
425     *
426     * @return a sun.misc.Unsafe
427     */
428     private static sun.misc.Unsafe getUnsafe() {
429     try {
430     return sun.misc.Unsafe.getUnsafe();
431     } catch (SecurityException se) {
432     try {
433     return java.security.AccessController.doPrivileged
434     (new java.security
435     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
436     public sun.misc.Unsafe run() throws Exception {
437     java.lang.reflect.Field f = sun.misc
438     .Unsafe.class.getDeclaredField("theUnsafe");
439     f.setAccessible(true);
440     return (sun.misc.Unsafe) f.get(null);
441     }});
442     } catch (java.security.PrivilegedActionException e) {
443     throw new RuntimeException("Could not initialize intrinsics",
444     e.getCause());
445     }
446     }
447     }
448    
449     }