ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166y/CountedCompleter.java (file contents):
Revision 1.1 by dl, Mon Apr 9 13:12:18 2012 UTC vs.
Revision 1.6 by dl, Tue Oct 30 14:23:11 2012 UTC

# Line 7 | Line 7
7   package jsr166y;
8  
9   /**
10 < * A resultless {@link ForkJoinTask} with a completion action
10 > * A {@link ForkJoinTask} with a completion action
11   * performed when triggered and there are no remaining pending
12   * actions. Uses of CountedCompleter are similar to those of other
13   * completion based components (such as {@link
# Line 21 | Line 21 | package jsr166y;
21   * #tryComplete}, if the pending action count is nonzero, it is
22   * decremented; otherwise, the completion action is performed, and if
23   * this completer itself has a completer, the process is continued
24 < * with its completer.  As is the case with most basic synchronization
25 < * constructs, these methods affect only internal counts; they do not
26 < * establish any further internal bookkeeping. In particular, the
27 < * identities of pending tasks are not maintained. As illustrated
28 < * below, you can create subclasses that do record some or all pended
29 < * tasks or their results when needed.
24 > * with its completer.  As is the case with related synchronization
25 > * components such as {@link java.util.concurrent.Phaser Phaser} and
26 > * {@link java.util.concurrent.Semaphore Semaphore}, these methods
27 > * affect only internal counts; they do not establish any further
28 > * internal bookkeeping. In particular, the identities of pending
29 > * tasks are not maintained. As illustrated below, you can create
30 > * subclasses that do record some or all pending tasks or their
31 > * results when needed.
32   *
33   * <p>A concrete CountedCompleter class must define method {@link
34   * #compute}, that should, in almost all use cases, invoke {@code
35 < * tryComplete()} before returning. The class may also optionally
35 > * tryComplete()} once before returning. The class may also optionally
36   * override method {@link #onCompletion} to perform an action upon
37 < * normal completion.
37 > * normal completion, and method {@link #onExceptionalCompletion} to
38 > * perform an action upon any exception.
39 > *
40 > * <p>CountedCompleters most often do not bear results, in which case
41 > * they are normally declared as {@code CountedCompleter<Void>}, and
42 > * will always return {@code null} as a result value.  In other cases,
43 > * you should override method {@link #getRawResult} to provide a
44 > * result from {@code join(), invoke()}, and related methods. (Method
45 > * {@link #setRawResult} by default plays no role in CountedCompleters
46 > * but may be overridden for example to maintain fields holding result
47 > * data.)
48   *
49   * <p>A CountedCompleter that does not itself have a completer (i.e.,
50   * one for which {@link #getCompleter} returns {@code null}) can be
# Line 44 | Line 56 | package jsr166y;
56   * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
57   * ForkJoinTask#completeExceptionally} or upon exceptional completion
58   * of method {@code compute}. Upon any exceptional completion, the
59 < * exception is relayed to a task's completer (and its completer, and
60 < * so on), if one exists and it has not otherwise already completed.
59 > * exception may be relayed to a task's completer (and its completer,
60 > * and so on), if one exists and it has not otherwise already
61 > * completed.
62   *
63   * <p><b>Sample Usages.</b>
64   *
65   * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
66   * be arranged in trees similar to those often used with {@link
67   * RecursiveAction}s, although the constructions involved in setting
68 < * them up typically vary. Even though they entail a bit more
68 > * them up typically vary. Here, the completer of each task is its
69 > * parent in the computation tree. Even though they entail a bit more
70   * bookkeeping, CountedCompleters may be better choices when applying
71   * a possibly time-consuming operation (that cannot be further
72   * subdivided) to each element of an array or collection; especially
# Line 77 | Line 91 | package jsr166y;
91   * <pre> {@code
92   * class MyOperation<E> { void apply(E e) { ... }  }
93   *
94 < * class ForEach<E> extends CountedCompleter {
94 > * class ForEach<E> extends CountedCompleter<Void> {
95   *
96   *     public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
97   *         pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
98   *     }
99   *
100   *     final E[] array; final MyOperation<E> op; final int lo, hi;
101 < *     ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
101 > *     ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
102   *         super(p);
103   *         this.array = array; this.op = op; this.lo = lo; this.hi = hi;
104   *     }
# Line 128 | Line 142 | package jsr166y;
142   *
143   * As a further improvement, notice that the left task need not even
144   * exist.  Instead of creating a new one, we can iterate using the
145 < * original task, and add a pending count for each fork:
145 > * original task, and add a pending count for each fork. Additionally,
146 > * this version uses {@code helpComplete} to streamline assistance in
147 > * the execution of forked tasks.
148   *
149   * <pre> {@code
150   * class ForEach<E> ...
# Line 142 | Line 158 | package jsr166y;
158   *         }
159   *         if (h > l)
160   *             op.apply(array[l]);
161 < *         tryComplete();
161 > *         helpComplete();
162   *     }
163   * }</pre>
164   *
# Line 159 | Line 175 | package jsr166y;
175   * and reductions are all of type {@code E}), one way to do this in
176   * divide and conquer designs is to have each subtask record its
177   * sibling, so that it can be accessed in method {@code onCompletion}.
178 < * For clarity, this class uses explicit left and right subtasks, but
179 < * variants of other streamlinings seen in the above example may also
180 < * apply.
178 > * This technique applies to reductions in which the order of
179 > * combining left and right results does not matter; ordered
180 > * reductions require explicit left/right designations.  Variants of
181 > * other streamlinings seen in the above examples may also apply.
182   *
183   * <pre> {@code
184   * class MyMapper<E> { E apply(E v) {  ...  } }
185   * class MyReducer<E> { E apply(E x, E y) {  ...  } }
186 < * class MapReducer<E> extends CountedCompleter {
186 > * class MapReducer<E> extends CountedCompleter<E> {
187   *     final E[] array; final MyMapper<E> mapper;
188   *     final MyReducer<E> reducer; final int lo, hi;
189 < *     MapReducer sibling;
189 > *     MapReducer<E> sibling;
190   *     E result;
191   *     MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
192   *                MyReducer<E> reducer, int lo, int hi) {
# Line 204 | Line 221 | package jsr166y;
221   *                result = reducer.apply(child.result, sib.result);
222   *         }
223   *     }
224 + *     public E getRawResult() { return result; }
225   *
226   *     public static <E> E mapReduce(ForkJoinPool pool, E[] array,
227   *                                   MyMapper<E> mapper, MyReducer<E> reducer) {
228 < *         MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
229 < *                                              reducer, 0, array.length);
212 < *         pool.invoke(mr);
213 < *         return mr.result;
228 > *         return pool.invoke(new MapReducer<E>(null, array, mapper,
229 > *                                              reducer, 0, array.length));
230   *     }
231   * } }</pre>
232   *
# Line 220 | Line 236 | package jsr166y;
236   * triggers another async task. For example:
237   *
238   * <pre> {@code
239 < * class HeaderBuilder extends CountedCompleter { ... }
240 < * class BodyBuilder extends CountedCompleter { ... }
241 < * class PacketSender extends CountedCompleter {
239 > * class HeaderBuilder extends CountedCompleter<...> { ... }
240 > * class BodyBuilder extends CountedCompleter<...> { ... }
241 > * class PacketSender extends CountedCompleter<...> {
242   *     PacketSender(...) { super(null, 1); ... } // trigger on second completion
243   *     public void compute() { } // never called
244 < *     public void onCompletion(CountedCompleter caller) { sendPacket(); }
244 > *     public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
245   * }
246   * // sample use:
247   * PacketSender p = new PacketSender();
# Line 236 | Line 252 | package jsr166y;
252   * @since 1.8
253   * @author Doug Lea
254   */
255 < public abstract class CountedCompleter extends ForkJoinTask<Void> {
255 > public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
256 >    private static final long serialVersionUID = 5232453752276485070L;
257 >
258      /** This task's completer, or null if none */
259 <    final CountedCompleter completer;
259 >    final CountedCompleter<?> completer;
260      /** The number of pending tasks until completion */
261      volatile int pending;
262  
# Line 249 | Line 267 | public abstract class CountedCompleter e
267       * @param completer this tasks completer, or {@code null} if none
268       * @param initialPendingCount the initial pending count
269       */
270 <    protected CountedCompleter(CountedCompleter completer,
270 >    protected CountedCompleter(CountedCompleter<?> completer,
271                                 int initialPendingCount) {
272          this.completer = completer;
273          this.pending = initialPendingCount;
# Line 261 | Line 279 | public abstract class CountedCompleter e
279       *
280       * @param completer this tasks completer, or {@code null} if none
281       */
282 <    protected CountedCompleter(CountedCompleter completer) {
282 >    protected CountedCompleter(CountedCompleter<?> completer) {
283          this.completer = completer;
284      }
285  
# Line 279 | Line 297 | public abstract class CountedCompleter e
297      public abstract void compute();
298  
299      /**
300 <     * Executes the completion action when method {@link #tryComplete}
301 <     * is invoked and there are no pending counts, or when the
302 <     * unconditional method {@link #complete} is invoked.  By default,
303 <     * this method does nothing.
300 >     * Performs an action when method {@link #tryComplete} is invoked
301 >     * and there are no pending counts, or when the unconditional
302 >     * method {@link #complete} is invoked.  By default, this method
303 >     * does nothing.
304 >     *
305 >     * @param caller the task invoking this method (which may
306 >     * be this task itself).
307 >     */
308 >    public void onCompletion(CountedCompleter<?> caller) {
309 >    }
310 >
311 >    /**
312 >     * Performs an action when method {@link #completeExceptionally}
313 >     * is invoked or method {@link #compute} throws an exception, and
314 >     * this task has not otherwise already completed normally. On
315 >     * entry to this method, this task {@link
316 >     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
317 >     * method controls further propagation: If {@code true} and this
318 >     * task has a completer, then this completer is also completed
319 >     * exceptionally.  The default implementation of this method does
320 >     * nothing except return {@code true}.
321       *
322 +     * @param ex the exception
323       * @param caller the task invoking this method (which may
324       * be this task itself).
325 +     * @return true if this exception should be propagated to this
326 +     * tasks completer, if one exists.
327       */
328 <    public void onCompletion(CountedCompleter caller) {
328 >    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
329 >        return true;
330      }
331  
332      /**
# Line 296 | Line 335 | public abstract class CountedCompleter e
335       *
336       * @return the completer
337       */
338 <    public final CountedCompleter getCompleter() {
338 >    public final CountedCompleter<?> getCompleter() {
339          return completer;
340      }
341  
# Line 341 | Line 380 | public abstract class CountedCompleter e
380      }
381  
382      /**
383 +     * Returns the root of the current computation; i.e., this
384 +     * task if it has no completer, else its completer's root.
385 +     *
386 +     * @return the root of the current computation
387 +     */
388 +    public final CountedCompleter<?> getRoot() {
389 +        CountedCompleter<?> a = this, p;
390 +        while ((p = a.completer) != null)
391 +            a = p;
392 +        return a;
393 +    }
394 +
395 +    /**
396       * If the pending count is nonzero, decrements the count;
397       * otherwise invokes {@link #onCompletion} and then similarly
398       * tries to complete this task's completer, if one exists,
399       * else marks this task as complete.
400       */
401      public final void tryComplete() {
402 <        for (CountedCompleter a = this, s = a;;) {
403 <            int c;
402 >        CountedCompleter<?> a = this, s = a;
403 >        for (int c;;) {
404              if ((c = a.pending) == 0) {
405                  a.onCompletion(s);
406                  if ((a = (s = a).completer) == null) {
# Line 362 | Line 414 | public abstract class CountedCompleter e
414      }
415  
416      /**
417 +     * Identical to {@link #tryComplete}, but may additionally execute
418 +     * other tasks within the current computation (i.e., those
419 +     * with the same {@link #getRoot}.
420 +     */
421 +    public final void helpComplete() {
422 +        CountedCompleter<?> a = this, s = a;
423 +        for (int c;;) {
424 +            if ((c = a.pending) == 0) {
425 +                a.onCompletion(s);
426 +                if ((a = (s = a).completer) == null) {
427 +                    s.quietlyComplete();
428 +                    return;
429 +                }
430 +            }
431 +            else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) {
432 +                CountedCompleter<?> root = a.getRoot();
433 +                Thread thread = Thread.currentThread();
434 +                ForkJoinPool.WorkQueue wq =
435 +                    (thread instanceof ForkJoinWorkerThread)?
436 +                    ((ForkJoinWorkerThread)thread).workQueue : null;
437 +                ForkJoinTask<?> t;
438 +                while ((t = (wq != null) ? wq.popCC(root) :
439 +                        ForkJoinPool.popCCFromCommonPool(root)) != null) {
440 +                    t.doExec();
441 +                    if (root.isDone())
442 +                        break;
443 +                }
444 +                return;
445 +            }
446 +        }
447 +    }
448 +
449 +    /**
450       * Regardless of pending count, invokes {@link #onCompletion},
451 <     * marks this task as complete with a {@code null} return value,
452 <     * and further triggers {@link #tryComplete} on this task's
453 <     * completer, if one exists. This method may be useful when
454 <     * forcing completion as soon as any one (versus all) of several
455 <     * subtask results are obtained.
451 >     * marks this task as complete and further triggers {@link
452 >     * #tryComplete} on this task's completer, if one exists. This
453 >     * method may be useful when forcing completion as soon as any one
454 >     * (versus all) of several subtask results are obtained.  The
455 >     * given rawResult is used as an argument to {@link #setRawResult}
456 >     * before marking this task as complete; its value is meaningful
457 >     * only for classes overriding {@code setRawResult}.
458       *
459 <     * @param mustBeNull the {@code null} completion value
459 >     * @param rawResult the raw result
460       */
461 <    public void complete(Void mustBeNull) {
462 <        CountedCompleter p;
461 >    public void complete(T rawResult) {
462 >        CountedCompleter<?> p;
463          onCompletion(this);
464 +        setRawResult(rawResult);
465          quietlyComplete();
466          if ((p = completer) != null)
467              p.tryComplete();
468      }
469  
470      /**
471 +     * Support for FJT exception propagation
472 +     */
473 +    void internalPropagateException(Throwable ex) {
474 +        CountedCompleter<?> a = this, s = a;
475 +        while (a.onExceptionalCompletion(ex, s) &&
476 +               (a = (s = a).completer) != null && a.status >= 0)
477 +            a.recordExceptionalCompletion(ex);
478 +    }
479 +
480 +    /**
481       * Implements execution conventions for CountedCompleters
482       */
483      protected final boolean exec() {
# Line 388 | Line 486 | public abstract class CountedCompleter e
486      }
487  
488      /**
489 <     * Always returns {@code null}.
489 >     * Returns the result of the computation. By default
490 >     * returns {@code null}, which is appropriate for {@code Void}
491 >     * actions, but in other cases should be overridden.
492       *
493 <     * @return {@code null} always
493 >     * @return the result of the computation
494       */
495 <    public final Void getRawResult() { return null; }
495 >    public T getRawResult() { return null; }
496  
497      /**
498 <     * Requires null completion value.
498 >     * A method that result-bearing CountedCompleters may optionally
499 >     * use to help maintain result data.  By default, does nothing.
500       */
501 <    protected final void setRawResult(Void mustBeNull) { }
401 <
402 <    /**
403 <     * Support for FJT exception propagation
404 <     */
405 <    final ForkJoinTask<?> internalGetCompleter() { return completer; }
501 >    protected void setRawResult(T t) { }
502  
503      // Unsafe mechanics
504      private static final sun.misc.Unsafe U;
# Line 417 | Line 513 | public abstract class CountedCompleter e
513          }
514      }
515  
420
516      /**
517       * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
518       * Replace with a simple call to Unsafe.getUnsafe when integrating

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines