ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166y/CountedCompleter.java (file contents):
Revision 1.1 by dl, Mon Apr 9 13:12:18 2012 UTC vs.
Revision 1.14 by dl, Mon Nov 19 18:12:42 2012 UTC

# Line 7 | Line 7
7   package jsr166y;
8  
9   /**
10 < * A resultless {@link ForkJoinTask} with a completion action
11 < * performed when triggered and there are no remaining pending
12 < * actions. Uses of CountedCompleter are similar to those of other
13 < * completion based components (such as {@link
14 < * java.nio.channels.CompletionHandler}) except that multiple
15 < * <em>pending</em> completions may be necessary to trigger the {@link
16 < * #onCompletion} action, not just one. Unless initialized otherwise,
17 < * the {@link #getPendingCount pending count} starts at zero, but may
18 < * be (atomically) changed using methods {@link #setPendingCount},
19 < * {@link #addToPendingCount}, and {@link
10 > * A {@link ForkJoinTask} with a completion action performed when
11 > * triggered and there are no remaining pending
12 > * actions. CountedCompleters are in general more robust in the
13 > * presence of subtask stalls and blockage than are other forms for
14 > * ForkJoinTasks, but are in general less intuitive to program.  Uses
15 > * of CountedCompleter are similar to those of other completion based
16 > * components (such as {@link java.nio.channels.CompletionHandler})
17 > * except that multiple <em>pending</em> completions may be necessary
18 > * to trigger the {@link #onCompletion} action, not just one. Unless
19 > * initialized otherwise, the {@link #getPendingCount pending count}
20 > * starts at zero, but may be (atomically) changed using methods
21 > * {@link #setPendingCount}, {@link #addToPendingCount}, and {@link
22   * #compareAndSetPendingCount}. Upon invocation of {@link
23   * #tryComplete}, if the pending action count is nonzero, it is
24   * decremented; otherwise, the completion action is performed, and if
25   * this completer itself has a completer, the process is continued
26 < * with its completer.  As is the case with most basic synchronization
27 < * constructs, these methods affect only internal counts; they do not
28 < * establish any further internal bookkeeping. In particular, the
29 < * identities of pending tasks are not maintained. As illustrated
30 < * below, you can create subclasses that do record some or all pended
31 < * tasks or their results when needed.
26 > * with its completer.  As is the case with related synchronization
27 > * components such as {@link java.util.concurrent.Phaser Phaser} and
28 > * {@link java.util.concurrent.Semaphore Semaphore}, these methods
29 > * affect only internal counts; they do not establish any further
30 > * internal bookkeeping. In particular, the identities of pending
31 > * tasks are not maintained. As illustrated below, you can create
32 > * subclasses that do record some or all pending tasks or their
33 > * results when needed. Because CountedCompleters provide only basic
34 > * synchronization mechanisms, it may be useful to create further
35 > * abstract subclasses that maintain linkages and fields and support
36 > * methods appropriate for a set of related usages.
37   *
38   * <p>A concrete CountedCompleter class must define method {@link
39 < * #compute}, that should, in almost all use cases, invoke {@code
40 < * tryComplete()} before returning. The class may also optionally
41 < * override method {@link #onCompletion} to perform an action upon
42 < * normal completion.
39 > * #compute}, that should in most cases (as illustrated below), invoke
40 > * {@code tryComplete()} once before returning. The class may also
41 > * optionally override method {@link #onCompletion} to perform an
42 > * action upon normal completion, and method {@link
43 > * #onExceptionalCompletion} to perform an action upon any exception.
44 > *
45 > * <p>CountedCompleters most often do not bear results, in which case
46 > * they are normally declared as {@code CountedCompleter<Void>}, and
47 > * will always return {@code null} as a result value.  In other cases,
48 > * you should override method {@link #getRawResult} to provide a
49 > * result from {@code join(), invoke()}, and related methods.  In
50 > * general, this method should return the value of a field (or a
51 > * function of one or more fields) of the CountedCompleter object that
52 > * holds the result upon completion. Method {@link #setRawResult} by
53 > * default plays no role in CountedCompleters.  It is possible, but
54 > * not usually applicable, to override this method to maintain other
55 > * objects or fields holding result data.
56   *
57   * <p>A CountedCompleter that does not itself have a completer (i.e.,
58   * one for which {@link #getCompleter} returns {@code null}) can be
# Line 44 | Line 64 | package jsr166y;
64   * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
65   * ForkJoinTask#completeExceptionally} or upon exceptional completion
66   * of method {@code compute}. Upon any exceptional completion, the
67 < * exception is relayed to a task's completer (and its completer, and
68 < * so on), if one exists and it has not otherwise already completed.
67 > * exception may be relayed to a task's completer (and its completer,
68 > * and so on), if one exists and it has not otherwise already
69 > * completed. Similarly, cancelling an internal CountedCompleter has
70 > * only a local effect on that completer, so is not often useful.
71   *
72   * <p><b>Sample Usages.</b>
73   *
74   * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
75   * be arranged in trees similar to those often used with {@link
76   * RecursiveAction}s, although the constructions involved in setting
77 < * them up typically vary. Even though they entail a bit more
77 > * them up typically vary. Here, the completer of each task is its
78 > * parent in the computation tree. Even though they entail a bit more
79   * bookkeeping, CountedCompleters may be better choices when applying
80   * a possibly time-consuming operation (that cannot be further
81   * subdivided) to each element of an array or collection; especially
# Line 63 | Line 86 | package jsr166y;
86   * continuations, other threads need not block waiting to perform
87   * them.
88   *
89 < * <p> For example, here is an initial version of a class that uses
89 > * <p>For example, here is an initial version of a class that uses
90   * divide-by-two recursive decomposition to divide work into single
91   * pieces (leaf tasks). Even when work is split into individual calls,
92   * tree-based techniques are usually preferable to directly forking
# Line 72 | Line 95 | package jsr166y;
95   * pair of subtasks to finish triggers completion of its parent
96   * (because no result combination is performed, the default no-op
97   * implementation of method {@code onCompletion} is not overridden). A
98 < * static utility method sets up the base task and invokes it:
98 > * static utility method sets up the base task and invokes it
99 > * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
100   *
101   * <pre> {@code
102   * class MyOperation<E> { void apply(E e) { ... }  }
103   *
104 < * class ForEach<E> extends CountedCompleter {
104 > * class ForEach<E> extends CountedCompleter<Void> {
105   *
106 < *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
107 < *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
106 > *     public static <E> void forEach(E[] array, MyOperation<E> op) {
107 > *         new ForEach<E>(null, array, op, 0, array.length).invoke();
108   *     }
109   *
110   *     final E[] array; final MyOperation<E> op; final int lo, hi;
111 < *     ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
111 > *     ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
112   *         super(p);
113   *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
114   *     }
# Line 128 | Line 152 | package jsr166y;
152   *
153   * As a further improvement, notice that the left task need not even
154   * exist.  Instead of creating a new one, we can iterate using the
155 < * original task, and add a pending count for each fork:
155 > * original task, and add a pending count for each fork.
156   *
157   * <pre> {@code
158   * class ForEach<E> ...
# Line 159 | Line 183 | package jsr166y;
183   * and reductions are all of type {@code E}), one way to do this in
184   * divide and conquer designs is to have each subtask record its
185   * sibling, so that it can be accessed in method {@code onCompletion}.
186 < * For clarity, this class uses explicit left and right subtasks, but
187 < * variants of other streamlinings seen in the above example may also
188 < * apply.
186 > * This technique applies to reductions in which the order of
187 > * combining left and right results does not matter; ordered
188 > * reductions require explicit left/right designations.  Variants of
189 > * other streamlinings seen in the above examples may also apply.
190   *
191   * <pre> {@code
192   * class MyMapper<E> { E apply(E v) {  ...  } }
193   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
194 < * class MapReducer<E> extends CountedCompleter {
194 > * class MapReducer<E> extends CountedCompleter<E> {
195   *     final E[] array; final MyMapper<E> mapper;
196   *     final MyReducer<E> reducer; final int lo, hi;
197 < *     MapReducer sibling;
197 > *     MapReducer<E> sibling;
198   *     E result;
199   *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
200   *                MyReducer<E> reducer, int lo, int hi) {
# Line 204 | Line 229 | package jsr166y;
229   *                result = reducer.apply(child.result, sib.result);
230   *         }
231   *     }
232 + *     public E getRawResult() { return result; }
233   *
234 < *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
235 < *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
236 < *         MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
211 < *                                              reducer, 0, array.length);
212 < *         pool.invoke(mr);
213 < *         return mr.result;
234 > *     public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
235 > *         return new MapReducer<E>(null, array, mapper, reducer,
236 > *                                  0, array.length).invoke();
237   *     }
238   * } }</pre>
239   *
240 + * Here, method {@code onCompletion} takes a form common to many
241 + * completion designs that combine results. This callback-style method
242 + * is triggered once per task, in either of the two different contexts
243 + * in which the pending count is, or becomes, zero: (1) by a task
244 + * itself, if its pending count is zero upon invocation of {@code
245 + * tryComplete}, or (2) by any of its subtasks when they complete and
246 + * decrement the pending count to zero. The {@code caller} argument
247 + * distinguishes cases.  Most often, when the caller is {@code this},
248 + * no action is necessary. Otherwise the caller argument can be used
249 + * (usually via a cast) to supply a value (and/or links to other
250 + * values) to be combined.  Asuuming proper use of pending counts, the
251 + * actions inside {@code onCompletion} occur (once) upon completion of
252 + * a task and its subtasks. No additional synchronization is required
253 + * within this method to ensure thread safety of accesses to fields of
254 + * this task or other completed tasks.
255 + *
256 + * <p><b>Searching.</b> A tree of CountedCompleters can search for a
257 + * value or property in different parts of a data structure, and
258 + * report a result in an {@link java.util.concurrent.AtomicReference}
259 + * as soon as one is found. The others can poll the result to avoid
260 + * unnecessary work. (You could additionally {@link #cancel} other
261 + * tasks, but it is usually simpler and more efficient to just let
262 + * them notice that the result is set and if so skip further
263 + * processing.)  Illustrating again with an array using full
264 + * partitioning (again, in practice, leaf tasks will almost always
265 + * process more than one element):
266 + *
267 + * <pre> {@code
268 + * class Searcher<E> extends CountedCompleter<E> {
269 + *     final E[] array; final AtomicReference<E> result; final int lo, hi;
270 + *     Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
271 + *         super(p);
272 + *         this.array = array; this.result = result; this.lo = lo; this.hi = hi;
273 + *     }
274 + *     public E getRawResult() { return result.get(); }
275 + *     public void compute() { // similar to ForEach version 3
276 + *         int l = lo,  h = hi;
277 + *         while (h - l >= 2 && result.get() == null) {
278 + *             int mid = (l + h) >>> 1;
279 + *             addToPendingCount(1);
280 + *             new Searcher(this, array, result, mid, h).fork();
281 + *             h = mid;
282 + *         }
283 + *         if (h > l && result.get() == null && matches(array[l]) &&
284 + *             result.compareAndSet(null, array[l]))
285 + *             getRoot().quietlyComplete(); // root task is now joinable
286 + *
287 + *         tryComplete(); // normally complete whether or not found
288 + *     }
289 + *     boolean matches(E e) { ... } // return true if found
290 + *
291 + *     public static <E> E search(E[] array) {
292 + *         return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
293 + *     }
294 + *}}</pre>
295 + *
296 + * In this example, as well as others in which tasks have no other
297 + * effects except to compareAndSet a common result, the trailing
298 + * unconditional invocation of {@code tryComplete} could be made
299 + * conditional ({@code if (result.get() == null) tryComplete();})
300 + * because no further bookkeeping is required to manage completions
301 + * once the root task completes.
302 + *
303   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
304   * forked, but instead serve as bits of plumbing in other designs;
305   * including those in which the completion of one of more async tasks
306   * triggers another async task. For example:
307   *
308   * <pre> {@code
309 < * class HeaderBuilder extends CountedCompleter { ... }
310 < * class BodyBuilder extends CountedCompleter { ... }
311 < * class PacketSender extends CountedCompleter {
309 > * class HeaderBuilder extends CountedCompleter<...> { ... }
310 > * class BodyBuilder extends CountedCompleter<...> { ... }
311 > * class PacketSender extends CountedCompleter<...> {
312   *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
313   *     public void compute() { } // never called
314 < *     public void onCompletion(CountedCompleter caller) { sendPacket(); }
314 > *     public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
315   * }
316   * // sample use:
317   * PacketSender p = new PacketSender();
# Line 236 | Line 322 | package jsr166y;
322   * @since 1.8
323   * @author Doug Lea
324   */
325 < public abstract class CountedCompleter extends ForkJoinTask<Void> {
325 > public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
326 >    private static final long serialVersionUID = 5232453752276485070L;
327 >
328      /** This task's completer, or null if none */
329 <    final CountedCompleter completer;
329 >    final CountedCompleter<?> completer;
330      /** The number of pending tasks until completion */
331      volatile int pending;
332  
# Line 249 | Line 337 | public abstract class CountedCompleter e
337       * @param completer this tasks completer, or {@code null} if none
338       * @param initialPendingCount the initial pending count
339       */
340 <    protected CountedCompleter(CountedCompleter completer,
340 >    protected CountedCompleter(CountedCompleter<?> completer,
341                                 int initialPendingCount) {
342          this.completer = completer;
343          this.pending = initialPendingCount;
# Line 261 | Line 349 | public abstract class CountedCompleter e
349       *
350       * @param completer this tasks completer, or {@code null} if none
351       */
352 <    protected CountedCompleter(CountedCompleter completer) {
352 >    protected CountedCompleter(CountedCompleter<?> completer) {
353          this.completer = completer;
354      }
355  
# Line 279 | Line 367 | public abstract class CountedCompleter e
367      public abstract void compute();
368  
369      /**
370 <     * Executes the completion action when method {@link #tryComplete}
371 <     * is invoked and there are no pending counts, or when the
372 <     * unconditional method {@link #complete} is invoked.  By default,
373 <     * this method does nothing.
370 >     * Performs an action when method {@link #tryComplete} is invoked
371 >     * and there are no pending counts, or when the unconditional
372 >     * method {@link #complete} is invoked.  By default, this method
373 >     * does nothing. You can distinguish cases by checking the
374 >     * identity of the given caller argument. If not equal to {@code
375 >     * this}, then it is typically a subtask that may contain results
376 >     * (and/or links to other results) to combine.
377       *
378       * @param caller the task invoking this method (which may
379       * be this task itself).
380       */
381 <    public void onCompletion(CountedCompleter caller) {
381 >    public void onCompletion(CountedCompleter<?> caller) {
382 >    }
383 >
384 >    /**
385 >     * Performs an action when method {@link #completeExceptionally}
386 >     * is invoked or method {@link #compute} throws an exception, and
387 >     * this task has not otherwise already completed normally. On
388 >     * entry to this method, this task {@link
389 >     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
390 >     * method controls further propagation: If {@code true} and this
391 >     * task has a completer, then this completer is also completed
392 >     * exceptionally.  The default implementation of this method does
393 >     * nothing except return {@code true}.
394 >     *
395 >     * @param ex the exception
396 >     * @param caller the task invoking this method (which may
397 >     * be this task itself).
398 >     * @return true if this exception should be propagated to this
399 >     * tasks completer, if one exists.
400 >     */
401 >    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
402 >        return true;
403      }
404  
405      /**
# Line 296 | Line 408 | public abstract class CountedCompleter e
408       *
409       * @return the completer
410       */
411 <    public final CountedCompleter getCompleter() {
411 >    public final CountedCompleter<?> getCompleter() {
412          return completer;
413      }
414  
# Line 341 | Line 453 | public abstract class CountedCompleter e
453      }
454  
455      /**
456 +     * Returns the root of the current computation; i.e., this
457 +     * task if it has no completer, else its completer's root.
458 +     *
459 +     * @return the root of the current computation
460 +     */
461 +    public final CountedCompleter<?> getRoot() {
462 +        CountedCompleter<?> a = this, p;
463 +        while ((p = a.completer) != null)
464 +            a = p;
465 +        return a;
466 +    }
467 +
468 +    /**
469       * If the pending count is nonzero, decrements the count;
470       * otherwise invokes {@link #onCompletion} and then similarly
471       * tries to complete this task's completer, if one exists,
472       * else marks this task as complete.
473       */
474      public final void tryComplete() {
475 <        for (CountedCompleter a = this, s = a;;) {
476 <            int c;
475 >        CountedCompleter<?> a = this, s = a;
476 >        for (int c;;) {
477              if ((c = a.pending) == 0) {
478                  a.onCompletion(s);
479                  if ((a = (s = a).completer) == null) {
# Line 363 | Line 488 | public abstract class CountedCompleter e
488  
489      /**
490       * Regardless of pending count, invokes {@link #onCompletion},
491 <     * marks this task as complete with a {@code null} return value,
492 <     * and further triggers {@link #tryComplete} on this task's
493 <     * completer, if one exists. This method may be useful when
494 <     * forcing completion as soon as any one (versus all) of several
495 <     * subtask results are obtained.
496 <     *
497 <     * @param mustBeNull the {@code null} completion value
498 <     */
499 <    public void complete(Void mustBeNull) {
500 <        CountedCompleter p;
491 >     * marks this task as complete and further triggers {@link
492 >     * #tryComplete} on this task's completer, if one exists.  The
493 >     * given rawResult is used as an argument to {@link #setRawResult}
494 >     * before invoking {@link #onCompletion} or marking this task as
495 >     * complete; its value is meaningful only for classes overriding
496 >     * {@code setRawResult}.
497 >     *
498 >     * <p>This method may be useful when forcing completion as soon as
499 >     * any one (versus all) of several subtask results are obtained.
500 >     * However, in the common (and recommended) case in which {@code
501 >     * setRawResult} is not overridden, this effect can be obtained
502 >     * more simply using {@code getRoot().quietlyComplete();}.
503 >     *
504 >     * @param rawResult the raw result
505 >     */
506 >    public void complete(T rawResult) {
507 >        CountedCompleter<?> p;
508 >        setRawResult(rawResult);
509          onCompletion(this);
510          quietlyComplete();
511          if ((p = completer) != null)
# Line 380 | Line 513 | public abstract class CountedCompleter e
513      }
514  
515      /**
516 +     * Support for FJT exception propagation
517 +     */
518 +    void internalPropagateException(Throwable ex) {
519 +        CountedCompleter<?> a = this, s = a;
520 +        while (a.onExceptionalCompletion(ex, s) &&
521 +               (a = (s = a).completer) != null && a.status >= 0)
522 +            a.recordExceptionalCompletion(ex);
523 +    }
524 +
525 +    /**
526       * Implements execution conventions for CountedCompleters
527       */
528      protected final boolean exec() {
# Line 388 | Line 531 | public abstract class CountedCompleter e
531      }
532  
533      /**
534 <     * Always returns {@code null}.
534 >     * Returns the result of the computation. By default
535 >     * returns {@code null}, which is appropriate for {@code Void}
536 >     * actions, but in other cases should be overridden.
537       *
538 <     * @return {@code null} always
394 <     */
395 <    public final Void getRawResult() { return null; }
396 <
397 <    /**
398 <     * Requires null completion value.
538 >     * @return the result of the computation
539       */
540 <    protected final void setRawResult(Void mustBeNull) { }
540 >    public T getRawResult() { return null; }
541  
542      /**
543 <     * Support for FJT exception propagation
543 >     * A method that result-bearing CountedCompleters may optionally
544 >     * use to help maintain result data.  By default, does nothing.
545 >     * If this method is overridden to update existing objects or
546 >     * fields, then it must in general be defined to be thread-safe.
547       */
548 <    final ForkJoinTask<?> internalGetCompleter() { return completer; }
548 >    protected void setRawResult(T t) { }
549  
550      // Unsafe mechanics
551      private static final sun.misc.Unsafe U;
552      private static final long PENDING;
553      static {
554          try {
555 <            U = getUnsafe();
555 >            U = sun.misc.Unsafe.getUnsafe();
556              PENDING = U.objectFieldOffset
557                  (CountedCompleter.class.getDeclaredField("pending"));
558          } catch (Exception e) {
# Line 417 | Line 560 | public abstract class CountedCompleter e
560          }
561      }
562  
420
563      /**
564       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
565       * Replace with a simple call to Unsafe.getUnsafe when integrating
# Line 445 | Line 587 | public abstract class CountedCompleter e
587              }
588          }
589      }
448
590   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines