ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CountedCompleter.java
(Generate patch)

Comparing jsr166/src/jsr166e/CountedCompleter.java (file contents):
Revision 1.18 by jsr166, Sun Nov 25 18:39:07 2012 UTC vs.
Revision 1.31 by jsr166, Sat Jul 27 19:53:27 2013 UTC

# Line 8 | Line 8 | package jsr166e;
8  
9   /**
10   * A {@link ForkJoinTask} with a completion action performed when
11 < * triggered and there are no remaining pending
12 < * actions. CountedCompleters are in general more robust in the
11 > * triggered and there are no remaining pending actions.
12 > * CountedCompleters are in general more robust in the
13   * presence of subtask stalls and blockage than are other forms of
14   * ForkJoinTasks, but are less intuitive to program.  Uses of
15   * CountedCompleter are similar to those of other completion based
16   * components (such as {@link java.nio.channels.CompletionHandler})
17   * except that multiple <em>pending</em> completions may be necessary
18 < * to trigger the {@link #onCompletion} action, not just one. Unless
19 < * initialized otherwise, the {@link #getPendingCount pending count}
20 < * starts at zero, but may be (atomically) changed using methods
21 < * {@link #setPendingCount}, {@link #addToPendingCount}, and {@link
22 < * #compareAndSetPendingCount}. Upon invocation of {@link
18 > * to trigger the completion action {@link #onCompletion(CountedCompleter)},
19 > * not just one.
20 > * Unless initialized otherwise, the {@linkplain #getPendingCount pending
21 > * count} starts at zero, but may be (atomically) changed using
22 > * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
23 > * {@link #compareAndSetPendingCount}. Upon invocation of {@link
24   * #tryComplete}, if the pending action count is nonzero, it is
25   * decremented; otherwise, the completion action is performed, and if
26   * this completer itself has a completer, the process is continued
# Line 40 | Line 41 | package jsr166e;
41   * <p>A concrete CountedCompleter class must define method {@link
42   * #compute}, that should in most cases (as illustrated below), invoke
43   * {@code tryComplete()} once before returning. The class may also
44 < * optionally override method {@link #onCompletion} to perform an
45 < * action upon normal completion, and method {@link
46 < * #onExceptionalCompletion} to perform an action upon any exception.
44 > * optionally override method {@link #onCompletion(CountedCompleter)}
45 > * to perform an action upon normal completion, and method
46 > * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
47 > * perform an action upon any exception.
48   *
49   * <p>CountedCompleters most often do not bear results, in which case
50   * they are normally declared as {@code CountedCompleter<Void>}, and
# Line 63 | Line 65 | package jsr166e;
65   * only as an internal helper for other computations, so its own task
66   * status (as reported in methods such as {@link ForkJoinTask#isDone})
67   * is arbitrary; this status changes only upon explicit invocations of
68 < * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
69 < * ForkJoinTask#completeExceptionally} or upon exceptional completion
70 < * of method {@code compute}. Upon any exceptional completion, the
71 < * exception may be relayed to a task's completer (and its completer,
72 < * and so on), if one exists and it has not otherwise already
73 < * completed. Similarly, cancelling an internal CountedCompleter has
74 < * only a local effect on that completer, so is not often useful.
68 > * {@link #complete}, {@link ForkJoinTask#cancel},
69 > * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
70 > * exceptional completion of method {@code compute}. Upon any
71 > * exceptional completion, the exception may be relayed to a task's
72 > * completer (and its completer, and so on), if one exists and it has
73 > * not otherwise already completed. Similarly, cancelling an internal
74 > * CountedCompleter has only a local effect on that completer, so is
75 > * not often useful.
76   *
77   * <p><b>Sample Usages.</b>
78   *
# Line 83 | Line 86 | package jsr166e;
86   * subdivided) to each element of an array or collection; especially
87   * when the operation takes a significantly different amount of time
88   * to complete for some elements than others, either because of
89 < * intrinsic variation (for example IO) or auxiliary effects such as
89 > * intrinsic variation (for example I/O) or auxiliary effects such as
90   * garbage collection.  Because CountedCompleters provide their own
91   * continuations, other threads need not block waiting to perform
92   * them.
# Line 96 | Line 99 | package jsr166e;
99   * improve load balancing. In the recursive case, the second of each
100   * pair of subtasks to finish triggers completion of its parent
101   * (because no result combination is performed, the default no-op
102 < * implementation of method {@code onCompletion} is not overridden). A
103 < * static utility method sets up the base task and invokes it
102 > * implementation of method {@code onCompletion} is not overridden).
103 > * A static utility method sets up the base task and invokes it
104   * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
105   *
106   * <pre> {@code
# Line 126 | Line 129 | package jsr166e;
129   *       op.apply(array[lo]);
130   *     tryComplete();
131   *   }
132 < * } }</pre>
132 > * }}</pre>
133   *
134   * This design can be improved by noticing that in the recursive case,
135   * the task has nothing to do after forking its right task, so can
136   * directly invoke its left task before returning. (This is an analog
137   * of tail recursion removal.)  Also, because the task returns upon
138   * executing its left task (rather than falling through to invoke
139 < * tryComplete) the pending count is set to one:
139 > * {@code tryComplete}) the pending count is set to one:
140   *
141   * <pre> {@code
142   * class ForEach<E> ...
# Line 152 | Line 155 | package jsr166e;
155   *   }
156   * }</pre>
157   *
158 < * As a further improvement, notice that the left task need not even
159 < * exist.  Instead of creating a new one, we can iterate using the
160 < * original task, and add a pending count for each fork. Additionally,
161 < * because no task in this tree implements an {@link #onCompletion}
162 < * method, {@code tryComplete()} can be replaced with {@link
160 < * #propagateCompletion}.
158 > * As a further improvement, notice that the left task need not even exist.
159 > * Instead of creating a new one, we can iterate using the original task,
160 > * and add a pending count for each fork.  Additionally, because no task
161 > * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
162 > * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
163   *
164   * <pre> {@code
165   * class ForEach<E> ...
# Line 183 | Line 185 | package jsr166e;
185   *
186   * <p><b>Searching.</b> A tree of CountedCompleters can search for a
187   * value or property in different parts of a data structure, and
188 < * report a result in an {@link java.util.concurrent.AtomicReference}
189 < * as soon as one is found. The others can poll the result to avoid
190 < * unnecessary work. (You could additionally {@link #cancel} other
191 < * tasks, but it is usually simpler and more efficient to just let
192 < * them notice that the result is set and if so skip further
193 < * processing.)  Illustrating again with an array using full
188 > * report a result in an {@link
189 > * java.util.concurrent.atomic.AtomicReference AtomicReference} as
190 > * soon as one is found. The others can poll the result to avoid
191 > * unnecessary work. (You could additionally {@linkplain #cancel
192 > * cancel} other tasks, but it is usually simpler and more efficient
193 > * to just let them notice that the result is set and if so skip
194 > * further processing.)  Illustrating again with an array using full
195   * partitioning (again, in practice, leaf tasks will almost always
196   * process more than one element):
197   *
# Line 223 | Line 226 | package jsr166e;
226   *   public static <E> E search(E[] array) {
227   *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
228   *   }
229 < *}}</pre>
229 > * }}</pre>
230   *
231   * In this example, as well as others in which tasks have no other
232   * effects except to compareAndSet a common result, the trailing
# Line 234 | Line 237 | package jsr166e;
237   *
238   * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
239   * results of multiple subtasks usually need to access these results
240 < * in method {@link #onCompletion}. As illustrated in the following
240 > * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
241   * class (that performs a simplified form of map-reduce where mappings
242   * and reductions are all of type {@code E}), one way to do this in
243   * divide and conquer designs is to have each subtask record its
# Line 277 | Line 280 | package jsr166e;
280   *   }
281   *   public void onCompletion(CountedCompleter<?> caller) {
282   *     if (caller != this) {
283 < *      MapReducer<E> child = (MapReducer<E>)caller;
284 < *      MapReducer<E> sib = child.sibling;
285 < *      if (sib == null || sib.result == null)
286 < *        result = child.result;
287 < *      else
288 < *        result = reducer.apply(child.result, sib.result);
283 > *       MapReducer<E> child = (MapReducer<E>)caller;
284 > *       MapReducer<E> sib = child.sibling;
285 > *       if (sib == null || sib.result == null)
286 > *         result = child.result;
287 > *       else
288 > *         result = reducer.apply(child.result, sib.result);
289   *     }
290   *   }
291   *   public E getRawResult() { return result; }
# Line 291 | Line 294 | package jsr166e;
294   *     return new MapReducer<E>(null, array, mapper, reducer,
295   *                              0, array.length).invoke();
296   *   }
297 < * } }</pre>
297 > * }}</pre>
298   *
299   * Here, method {@code onCompletion} takes a form common to many
300   * completion designs that combine results. This callback-style method
# Line 335 | Line 338 | package jsr166e;
338   *     while (h - l >= 2) {
339   *       int mid = (l + h) >>> 1;
340   *       addToPendingCount(1);
341 < *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork;
341 > *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
342   *       h = mid;
343   *     }
344   *     if (h > l)
# Line 356 | Line 359 | package jsr166e;
359   *
360   * <p><b>Triggers.</b> Some CountedCompleters are themselves never
361   * forked, but instead serve as bits of plumbing in other designs;
362 < * including those in which the completion of one of more async tasks
362 > * including those in which the completion of one or more async tasks
363   * triggers another async task. For example:
364   *
365   * <pre> {@code
# Line 388 | Line 391 | public abstract class CountedCompleter<T
391       * Creates a new CountedCompleter with the given completer
392       * and initial pending count.
393       *
394 <     * @param completer this tasks completer, or {@code null} if none
394 >     * @param completer this task's completer, or {@code null} if none
395       * @param initialPendingCount the initial pending count
396       */
397      protected CountedCompleter(CountedCompleter<?> completer,
# Line 401 | Line 404 | public abstract class CountedCompleter<T
404       * Creates a new CountedCompleter with the given completer
405       * and an initial pending count of zero.
406       *
407 <     * @param completer this tasks completer, or {@code null} if none
407 >     * @param completer this task's completer, or {@code null} if none
408       */
409      protected CountedCompleter(CountedCompleter<?> completer) {
410          this.completer = completer;
# Line 422 | Line 425 | public abstract class CountedCompleter<T
425  
426      /**
427       * Performs an action when method {@link #tryComplete} is invoked
428 <     * and there are no pending counts, or when the unconditional
428 >     * and the pending count is zero, or when the unconditional
429       * method {@link #complete} is invoked.  By default, this method
430       * does nothing. You can distinguish cases by checking the
431       * identity of the given caller argument. If not equal to {@code
# Line 430 | Line 433 | public abstract class CountedCompleter<T
433       * (and/or links to other results) to combine.
434       *
435       * @param caller the task invoking this method (which may
436 <     * be this task itself).
436 >     * be this task itself)
437       */
438      public void onCompletion(CountedCompleter<?> caller) {
439      }
440  
441      /**
442 <     * Performs an action when method {@link #completeExceptionally}
443 <     * is invoked or method {@link #compute} throws an exception, and
444 <     * this task has not otherwise already completed normally. On
445 <     * entry to this method, this task {@link
446 <     * ForkJoinTask#isCompletedAbnormally}.  The return value of this
447 <     * method controls further propagation: If {@code true} and this
448 <     * task has a completer, then this completer is also completed
449 <     * exceptionally.  The default implementation of this method does
450 <     * nothing except return {@code true}.
442 >     * Performs an action when method {@link
443 >     * #completeExceptionally(Throwable)} is invoked or method {@link
444 >     * #compute} throws an exception, and this task has not already
445 >     * otherwise completed normally. On entry to this method, this task
446 >     * {@link ForkJoinTask#isCompletedAbnormally}.  The return value
447 >     * of this method controls further propagation: If {@code true}
448 >     * and this task has a completer that has not completed, then that
449 >     * completer is also completed exceptionally, with the same
450 >     * exception as this completer.  The default implementation of
451 >     * this method does nothing except return {@code true}.
452       *
453       * @param ex the exception
454       * @param caller the task invoking this method (which may
455 <     * be this task itself).
456 <     * @return true if this exception should be propagated to this
457 <     * tasks completer, if one exists.
455 >     * be this task itself)
456 >     * @return {@code true} if this exception should be propagated to this
457 >     * task's completer, if one exists
458       */
459      public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
460          return true;
# Line 490 | Line 494 | public abstract class CountedCompleter<T
494       * @param delta the value to add
495       */
496      public final void addToPendingCount(int delta) {
497 <        int c; // note: can replace with intrinsic in jdk8
497 >        int c;
498          do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
499      }
500  
# Line 500 | Line 504 | public abstract class CountedCompleter<T
504       *
505       * @param expected the expected value
506       * @param count the new value
507 <     * @return true if successful
507 >     * @return {@code true} if successful
508       */
509      public final boolean compareAndSetPendingCount(int expected, int count) {
510          return U.compareAndSwapInt(this, PENDING, expected, count);
# Line 534 | Line 538 | public abstract class CountedCompleter<T
538  
539      /**
540       * If the pending count is nonzero, decrements the count;
541 <     * otherwise invokes {@link #onCompletion} and then similarly
542 <     * tries to complete this task's completer, if one exists,
543 <     * else marks this task as complete.
541 >     * otherwise invokes {@link #onCompletion(CountedCompleter)}
542 >     * and then similarly tries to complete this task's completer,
543 >     * if one exists, else marks this task as complete.
544       */
545      public final void tryComplete() {
546          CountedCompleter<?> a = this, s = a;
# Line 555 | Line 559 | public abstract class CountedCompleter<T
559  
560      /**
561       * Equivalent to {@link #tryComplete} but does not invoke {@link
562 <     * #onCompletion} along the completion path: If the pending count
563 <     * is nonzero, decrements the count; otherwise, similarly tries to
564 <     * complete this task's completer, if one exists, else marks this
565 <     * task as complete. This method may be useful in cases where
566 <     * {@code onCompletion} should not, or need not, be invoked for
567 <     * each completer in a computation.
562 >     * #onCompletion(CountedCompleter)} along the completion path:
563 >     * If the pending count is nonzero, decrements the count;
564 >     * otherwise, similarly tries to complete this task's completer, if
565 >     * one exists, else marks this task as complete. This method may be
566 >     * useful in cases where {@code onCompletion} should not, or need
567 >     * not, be invoked for each completer in a computation.
568       */
569      public final void propagateCompletion() {
570          CountedCompleter<?> a = this, s = a;
# Line 577 | Line 581 | public abstract class CountedCompleter<T
581      }
582  
583      /**
584 <     * Regardless of pending count, invokes {@link #onCompletion},
585 <     * marks this task as complete and further triggers {@link
586 <     * #tryComplete} on this task's completer, if one exists.  The
587 <     * given rawResult is used as an argument to {@link #setRawResult}
588 <     * before invoking {@link #onCompletion} or marking this task as
589 <     * complete; its value is meaningful only for classes overriding
590 <     * {@code setRawResult}.
584 >     * Regardless of pending count, invokes
585 >     * {@link #onCompletion(CountedCompleter)}, marks this task as
586 >     * complete and further triggers {@link #tryComplete} on this
587 >     * task's completer, if one exists.  The given rawResult is
588 >     * used as an argument to {@link #setRawResult} before invoking
589 >     * {@link #onCompletion(CountedCompleter)} or marking this task
590 >     * as complete; its value is meaningful only for classes
591 >     * overriding {@code setRawResult}.  This method does not modify
592 >     * the pending count.
593       *
594       * <p>This method may be useful when forcing completion as soon as
595       * any one (versus all) of several subtask results are obtained.
# Line 623 | Line 629 | public abstract class CountedCompleter<T
629      /**
630       * If this task does not have a completer, invokes {@link
631       * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
632 <     * this task's pending count is non-zero, decrements its pending
633 <     * count and returns {@code null}.  Otherwise, returns the
632 >     * the completer's pending count is non-zero, decrements that
633 >     * pending count and returns {@code null}.  Otherwise, returns the
634       * completer.  This method can be used as part of a completion
635       * traversal loop for homogeneous task hierarchies:
636       *
# Line 661 | Line 667 | public abstract class CountedCompleter<T
667      }
668  
669      /**
670 <     * Support for FJT exception propagation
670 >     * Supports ForkJoinTask exception propagation.
671       */
672      void internalPropagateException(Throwable ex) {
673          CountedCompleter<?> a = this, s = a;
674          while (a.onExceptionalCompletion(ex, s) &&
675 <               (a = (s = a).completer) != null && a.status >= 0)
676 <            a.recordExceptionalCompletion(ex);
675 >               (a = (s = a).completer) != null && a.status >= 0 &&
676 >               a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
677 >            ;
678      }
679  
680      /**
681 <     * Implements execution conventions for CountedCompleters
681 >     * Implements execution conventions for CountedCompleters.
682       */
683      protected final boolean exec() {
684          compute();
# Line 679 | Line 686 | public abstract class CountedCompleter<T
686      }
687  
688      /**
689 <     * Returns the result of the computation. By default
689 >     * Returns the result of the computation.  By default,
690       * returns {@code null}, which is appropriate for {@code Void}
691       * actions, but in other cases should be overridden, almost
692       * always to return a field or function of a field that
# Line 703 | Line 710 | public abstract class CountedCompleter<T
710      private static final long PENDING;
711      static {
712          try {
713 <            U = sun.misc.Unsafe.getUnsafe();
713 >            U = getUnsafe();
714              PENDING = U.objectFieldOffset
715                  (CountedCompleter.class.getDeclaredField("pending"));
716          } catch (Exception e) {
# Line 721 | Line 728 | public abstract class CountedCompleter<T
728      private static sun.misc.Unsafe getUnsafe() {
729          try {
730              return sun.misc.Unsafe.getUnsafe();
731 <        } catch (SecurityException se) {
732 <            try {
733 <                return java.security.AccessController.doPrivileged
734 <                    (new java.security
735 <                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
736 <                        public sun.misc.Unsafe run() throws Exception {
737 <                            java.lang.reflect.Field f = sun.misc
738 <                                .Unsafe.class.getDeclaredField("theUnsafe");
739 <                            f.setAccessible(true);
740 <                            return (sun.misc.Unsafe) f.get(null);
741 <                        }});
742 <            } catch (java.security.PrivilegedActionException e) {
743 <                throw new RuntimeException("Could not initialize intrinsics",
744 <                                           e.getCause());
745 <            }
731 >        } catch (SecurityException tryReflectionInstead) {}
732 >        try {
733 >            return java.security.AccessController.doPrivileged
734 >            (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
735 >                public sun.misc.Unsafe run() throws Exception {
736 >                    Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
737 >                    for (java.lang.reflect.Field f : k.getDeclaredFields()) {
738 >                        f.setAccessible(true);
739 >                        Object x = f.get(null);
740 >                        if (k.isInstance(x))
741 >                            return k.cast(x);
742 >                    }
743 >                    throw new NoSuchFieldError("the Unsafe");
744 >                }});
745 >        } catch (java.security.PrivilegedActionException e) {
746 >            throw new RuntimeException("Could not initialize intrinsics",
747 >                                       e.getCause());
748          }
749      }
750   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines