ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166e/CountedCompleter.java (file contents):
Revision 1.1 by dl, Mon Aug 13 15:52:33 2012 UTC vs.
Revision 1.5 by dl, Tue Oct 30 14:23:03 2012 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8
8   /**
9 < * A resultless {@link ForkJoinTask} with a completion action
9 > * A {@link ForkJoinTask} with a completion action
10   * performed when triggered and there are no remaining pending
11   * actions. Uses of CountedCompleter are similar to those of other
12   * completion based components (such as {@link
# Line 22 | Line 21 | package jsr166e;
21   * decremented; otherwise, the completion action is performed, and if
22   * this completer itself has a completer, the process is continued
23   * with its completer.  As is the case with related synchronization
24 < * components such as {@link Phaser} and {@link
25 < * java.util.concurrent.Semaphore} these methods affect only internal
26 < * counts; they do not establish any further internal bookkeeping. In
27 < * particular, the identities of pending tasks are not maintained. As
28 < * illustrated below, you can create subclasses that do record some or
29 < * all pended tasks or their results when needed.
24 > * components such as {@link java.util.concurrent.Phaser Phaser} and
25 > * {@link java.util.concurrent.Semaphore Semaphore}, these methods
26 > * affect only internal counts; they do not establish any further
27 > * internal bookkeeping. In particular, the identities of pending
28 > * tasks are not maintained. As illustrated below, you can create
29 > * subclasses that do record some or all pending tasks or their
30 > * results when needed.
31   *
32   * <p>A concrete CountedCompleter class must define method {@link
33   * #compute}, that should, in almost all use cases, invoke {@code
# Line 36 | Line 36 | package jsr166e;
36   * normal completion, and method {@link #onExceptionalCompletion} to
37   * perform an action upon any exception.
38   *
39 + * <p>CountedCompleters most often do not bear results, in which case
40 + * they are normally declared as {@code CountedCompleter<Void>}, and
41 + * will always return {@code null} as a result value.  In other cases,
42 + * you should override method {@link #getRawResult} to provide a
43 + * result from {@code join(), invoke()}, and related methods. (Method
44 + * {@link #setRawResult} by default plays no role in CountedCompleters
45 + * but may be overridden for example to maintain fields holding result
46 + * data.)
47 + *
48   * <p>A CountedCompleter that does not itself have a completer (i.e.,
49   * one for which {@link #getCompleter} returns {@code null}) can be
50   * used as a regular ForkJoinTask with this added functionality.
# Line 55 | Line 64 | package jsr166e;
64   * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
65   * be arranged in trees similar to those often used with {@link
66   * RecursiveAction}s, although the constructions involved in setting
67 < * them up typically vary. Even though they entail a bit more
67 > * them up typically vary. Here, the completer of each task is its
68 > * parent in the computation tree. Even though they entail a bit more
69   * bookkeeping, CountedCompleters may be better choices when applying
70   * a possibly time-consuming operation (that cannot be further
71   * subdivided) to each element of an array or collection; especially
# Line 80 | Line 90 | package jsr166e;
90   * <pre> {@code
91   * class MyOperation<E> { void apply(E e) { ... }  }
92   *
93 < * class ForEach<E> extends CountedCompleter {
93 > * class ForEach<E> extends CountedCompleter<Void> {
94   *
95   *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
96   *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
97   *     }
98   *
99   *     final E[] array; final MyOperation<E> op; final int lo, hi;
100 < *     ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
100 > *     ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
101   *         super(p);
102   *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
103   *     }
# Line 131 | Line 141 | package jsr166e;
141   *
142   * As a further improvement, notice that the left task need not even
143   * exist.  Instead of creating a new one, we can iterate using the
144 < * original task, and add a pending count for each fork:
144 > * original task, and add a pending count for each fork. Additionally,
145 > * this version uses {@code helpComplete} to streamline assistance in
146 > * the execution of forked tasks.
147   *
148   * <pre> {@code
149   * class ForEach<E> ...
# Line 145 | Line 157 | package jsr166e;
157   *         }
158   *         if (h > l)
159   *             op.apply(array[l]);
160 < *         tryComplete();
160 > *         helpComplete();
161   *     }
162   * }</pre>
163   *
# Line 162 | Line 174 | package jsr166e;
174   * and reductions are all of type {@code E}), one way to do this in
175   * divide and conquer designs is to have each subtask record its
176   * sibling, so that it can be accessed in method {@code onCompletion}.
177 < * For clarity, this class uses explicit left and right subtasks, but
178 < * variants of other streamlinings seen in the above example may also
179 < * apply.
177 > * This technique applies to reductions in which the order of
178 > * combining left and right results does not matter; ordered
179 > * reductions require explicit left/right designations.  Variants of
180 > * other streamlinings seen in the above examples may also apply.
181   *
182   * <pre> {@code
183   * class MyMapper<E> { E apply(E v) {  ...  } }
184   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
185 < * class MapReducer<E> extends CountedCompleter {
185 > * class MapReducer<E> extends CountedCompleter<E> {
186   *     final E[] array; final MyMapper<E> mapper;
187   *     final MyReducer<E> reducer; final int lo, hi;
188 < *     MapReducer sibling;
188 > *     MapReducer<E> sibling;
189   *     E result;
190   *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
191   *                MyReducer<E> reducer, int lo, int hi) {
# Line 207 | Line 220 | package jsr166e;
220   *                result = reducer.apply(child.result, sib.result);
221   *         }
222   *     }
223 + *     public E getRawResult() { return result; }
224   *
225   *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
226   *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
227 < *         MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
228 < *                                              reducer, 0, array.length);
215 < *         pool.invoke(mr);
216 < *         return mr.result;
227 > *         return pool.invoke(new MapReducer<E>(null, array, mapper,
228 > *                                              reducer, 0, array.length));
229   *     }
230   * } }</pre>
231   *
# Line 223 | Line 235 | package jsr166e;
235   * triggers another async task. For example:
236   *
237   * <pre> {@code
238 < * class HeaderBuilder extends CountedCompleter { ... }
239 < * class BodyBuilder extends CountedCompleter { ... }
240 < * class PacketSender extends CountedCompleter {
238 > * class HeaderBuilder extends CountedCompleter<...> { ... }
239 > * class BodyBuilder extends CountedCompleter<...> { ... }
240 > * class PacketSender extends CountedCompleter<...> {
241   *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
242   *     public void compute() { } // never called
243 < *     public void onCompletion(CountedCompleter caller) { sendPacket(); }
243 > *     public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
244   * }
245   * // sample use:
246   * PacketSender p = new PacketSender();
# Line 239 | Line 251 | package jsr166e;
251   * @since 1.8
252   * @author Doug Lea
253   */
254 < public abstract class CountedCompleter extends ForkJoinTask<Void> {
254 > public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
255      private static final long serialVersionUID = 5232453752276485070L;
256  
257      /** This task's completer, or null if none */
258 <    final CountedCompleter completer;
258 >    final CountedCompleter<?> completer;
259      /** The number of pending tasks until completion */
260      volatile int pending;
261  
# Line 254 | Line 266 | public abstract class CountedCompleter e
266       * @param completer this tasks completer, or {@code null} if none
267       * @param initialPendingCount the initial pending count
268       */
269 <    protected CountedCompleter(CountedCompleter completer,
269 >    protected CountedCompleter(CountedCompleter<?> completer,
270                                 int initialPendingCount) {
271          this.completer = completer;
272          this.pending = initialPendingCount;
# Line 266 | Line 278 | public abstract class CountedCompleter e
278       *
279       * @param completer this tasks completer, or {@code null} if none
280       */
281 <    protected CountedCompleter(CountedCompleter completer) {
281 >    protected CountedCompleter(CountedCompleter<?> completer) {
282          this.completer = completer;
283      }
284  
# Line 292 | Line 304 | public abstract class CountedCompleter e
304       * @param caller the task invoking this method (which may
305       * be this task itself).
306       */
307 <    public void onCompletion(CountedCompleter caller) {
307 >    public void onCompletion(CountedCompleter<?> caller) {
308      }
309  
310      /**
# Line 312 | Line 324 | public abstract class CountedCompleter e
324       * @return true if this exception should be propagated to this
325       * tasks completer, if one exists.
326       */
327 <    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter caller) {
327 >    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
328          return true;
329      }
330  
# Line 322 | Line 334 | public abstract class CountedCompleter e
334       *
335       * @return the completer
336       */
337 <    public final CountedCompleter getCompleter() {
337 >    public final CountedCompleter<?> getCompleter() {
338          return completer;
339      }
340  
# Line 367 | Line 379 | public abstract class CountedCompleter e
379      }
380  
381      /**
382 +     * Returns the root of the current computation; i.e., this
383 +     * task if it has no completer, else its completer's root.
384 +     *
385 +     * @return the root of the current computation
386 +     */
387 +    public final CountedCompleter<?> getRoot() {
388 +        CountedCompleter<?> a = this, p;
389 +        while ((p = a.completer) != null)
390 +            a = p;
391 +        return a;
392 +    }
393 +
394 +    /**
395       * If the pending count is nonzero, decrements the count;
396       * otherwise invokes {@link #onCompletion} and then similarly
397       * tries to complete this task's completer, if one exists,
398       * else marks this task as complete.
399       */
400      public final void tryComplete() {
401 <        CountedCompleter a = this, s = a;
401 >        CountedCompleter<?> a = this, s = a;
402          for (int c;;) {
403              if ((c = a.pending) == 0) {
404                  a.onCompletion(s);
# Line 388 | Line 413 | public abstract class CountedCompleter e
413      }
414  
415      /**
416 +     * Identical to {@link #tryComplete}, but may additionally execute
417 +     * other tasks within the current computation (i.e., those
418 +     * with the same {@link #getRoot}.
419 +     */
420 +    public final void helpComplete() {
421 +        CountedCompleter<?> a = this, s = a;
422 +        for (int c;;) {
423 +            if ((c = a.pending) == 0) {
424 +                a.onCompletion(s);
425 +                if ((a = (s = a).completer) == null) {
426 +                    s.quietlyComplete();
427 +                    return;
428 +                }
429 +            }
430 +            else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) {
431 +                CountedCompleter<?> root = a.getRoot();
432 +                Thread thread = Thread.currentThread();
433 +                ForkJoinPool.WorkQueue wq =
434 +                    (thread instanceof ForkJoinWorkerThread)?
435 +                    ((ForkJoinWorkerThread)thread).workQueue : null;
436 +                ForkJoinTask<?> t;
437 +                while ((t = (wq != null) ? wq.popCC(root) :
438 +                        ForkJoinPool.popCCFromCommonPool(root)) != null) {
439 +                    t.doExec();
440 +                    if (root.isDone())
441 +                        break;
442 +                }
443 +                return;
444 +            }
445 +        }
446 +    }
447 +
448 +    /**
449       * Regardless of pending count, invokes {@link #onCompletion},
450 <     * marks this task as complete with a {@code null} return value,
451 <     * and further triggers {@link #tryComplete} on this task's
452 <     * completer, if one exists. This method may be useful when
453 <     * forcing completion as soon as any one (versus all) of several
454 <     * subtask results are obtained.
450 >     * marks this task as complete and further triggers {@link
451 >     * #tryComplete} on this task's completer, if one exists. This
452 >     * method may be useful when forcing completion as soon as any one
453 >     * (versus all) of several subtask results are obtained.  The
454 >     * given rawResult is used as an argument to {@link #setRawResult}
455 >     * before marking this task as complete; its value is meaningful
456 >     * only for classes overriding {@code setRawResult}.
457       *
458 <     * @param mustBeNull the {@code null} completion value
458 >     * @param rawResult the raw result
459       */
460 <    public void complete(Void mustBeNull) {
461 <        CountedCompleter p;
460 >    public void complete(T rawResult) {
461 >        CountedCompleter<?> p;
462          onCompletion(this);
463 +        setRawResult(rawResult);
464          quietlyComplete();
465          if ((p = completer) != null)
466              p.tryComplete();
# Line 409 | Line 470 | public abstract class CountedCompleter e
470       * Support for FJT exception propagation
471       */
472      void internalPropagateException(Throwable ex) {
473 <        CountedCompleter a = this, s = a;
473 >        CountedCompleter<?> a = this, s = a;
474          while (a.onExceptionalCompletion(ex, s) &&
475                 (a = (s = a).completer) != null && a.status >= 0)
476              a.recordExceptionalCompletion(ex);
# Line 424 | Line 485 | public abstract class CountedCompleter e
485      }
486  
487      /**
488 <     * Always returns {@code null}.
488 >     * Returns the result of the computation. By default
489 >     * returns {@code null}, which is appropriate for {@code Void}
490 >     * actions, but in other cases should be overridden.
491       *
492 <     * @return {@code null} always
492 >     * @return the result of the computation
493       */
494 <    public final Void getRawResult() { return null; }
494 >    public T getRawResult() { return null; }
495  
496      /**
497 <     * Requires null completion value.
497 >     * A method that result-bearing CountedCompleters may optionally
498 >     * use to help maintain result data.  By default, does nothing.
499       */
500 <    protected final void setRawResult(Void mustBeNull) { }
500 >    protected void setRawResult(T t) { }
501  
502      // Unsafe mechanics
503      private static final sun.misc.Unsafe U;
# Line 448 | Line 512 | public abstract class CountedCompleter e
512          }
513      }
514  
451
515      /**
516       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
517       * Replace with a simple call to Unsafe.getUnsafe when integrating

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines