ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.48 by dl, Thu May 27 16:46:48 2010 UTC vs.
Revision 1.49 by dl, Wed Jul 7 19:52:31 2010 UTC

# Line 64 | Line 64 | import java.util.WeakHashMap;
64   * results of a task is {@link #join}, but there are several variants:
65   * The {@link Future#get} methods support interruptible and/or timed
66   * waits for completion and report results using {@code Future}
67 < * conventions. Method {@link #helpJoin} enables callers to actively
68 < * execute other tasks while awaiting joins, which is sometimes more
69 < * efficient but only applies when all subtasks are known to be
70 < * strictly tree-structured. Method {@link #invoke} is semantically
67 > * conventions. Method {@link #invoke} is semantically
68   * equivalent to {@code fork(); join()} but always attempts to begin
69   * execution in the current thread. The "<em>quiet</em>" forms of
70   * these methods do not extract results or report exceptions. These
# Line 125 | Line 122 | import java.util.WeakHashMap;
122   *
123   * <p>This class provides {@code adapt} methods for {@link Runnable}
124   * and {@link Callable}, that may be of use when mixing execution of
125 < * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
126 < * are of this form, consider using a pool in
130 < * {@linkplain ForkJoinPool#setAsyncMode async mode}.
125 > * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
126 > * of this form, consider using a pool constructed in <em>asyncMode</em>.
127   *
128   * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
129   * used in extensions such as remote execution frameworks. It is
# Line 155 | Line 151 | public abstract class ForkJoinTask<V> im
151       * Run control status bits packed into a single int to minimize
152       * footprint and to ensure atomicity (via CAS).  Status is
153       * initially zero, and takes on nonnegative values until
154 <     * completed, upon which status holds COMPLETED. CANCELLED, or
155 <     * EXCEPTIONAL, which use the top 3 bits.  Tasks undergoing
156 <     * blocking waits by other threads have the SIGNAL bit set.
157 <     *
158 <     * Completion of a stolen task with SIGNAL set awakens any waiters
159 <     * via notifyAll. Even though suboptimal for some purposes, we use
160 <     * basic builtin wait/notify to take advantage of "monitor
161 <     * inflation" in JVMs that we would otherwise need to emulate to
162 <     * avoid adding further per-task bookkeeping overhead.  We want
163 <     * these monitors to be "fat", i.e., not use biasing or thin-lock
168 <     * techniques, so use some odd coding idioms that tend to avoid
169 <     * them.
170 <     *
171 <     * Note that bits 1-28 are currently unused. Also value
172 <     * 0x80000000 is available as spare completion value.
154 >     * completed, upon which status holds value COMPLETED. CANCELLED,
155 >     * or EXCEPTIONAL. Tasks undergoing blocking waits by other
156 >     * threads have the SIGNAL bit set.  Completion of a stolen task
157 >     * with SIGNAL set awakens any waiters via notifyAll. Even though
158 >     * suboptimal for some purposes, we use basic builtin wait/notify
159 >     * to take advantage of "monitor inflation" in JVMs that we would
160 >     * otherwise need to emulate to avoid adding further per-task
161 >     * bookkeeping overhead.  We want these monitors to be "fat",
162 >     * i.e., not use biasing or thin-lock techniques, so use some odd
163 >     * coding idioms that tend to avoid them.
164       */
165      volatile int status; // accessed directly by pool and workers
166  
167 <    private static final int COMPLETION_MASK      = 0xe0000000;
168 <    private static final int NORMAL               = 0xe0000000; // == mask
169 <    private static final int CANCELLED            = 0xc0000000;
170 <    private static final int EXCEPTIONAL          = 0xa0000000;
180 <    private static final int SIGNAL               = 0x00000001;
167 >    private static final int NORMAL      = -1;
168 >    private static final int CANCELLED   = -2;
169 >    private static final int EXCEPTIONAL = -3;
170 >    private static final int SIGNAL      =  1;
171  
172      /**
173       * Table of exceptions thrown by tasks, to enable reporting by
# Line 204 | Line 194 | public abstract class ForkJoinTask<V> im
194          int s;
195          while ((s = status) >= 0) {
196              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
197 <                if ((s & SIGNAL) != 0)
197 >                if (s == SIGNAL)
198                      synchronized (this) { notifyAll(); }
199                  return completion;
200              }
# Line 224 | Line 214 | public abstract class ForkJoinTask<V> im
214      /**
215       * Blocks a worker thread until completion. Called only by pool.
216       */
217 <    final void internalAwaitDone() {
217 >    final int internalAwaitDone() {
218          int s;
219          while ((s = status) >= 0) {
220              synchronized(this) {
221 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
221 >                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
222                      do {
223                          try {
224                              wait();
225                          } catch (InterruptedException ie) {
226                              cancelIfTerminating();
227                          }
228 <                    } while (status >= 0);
228 >                    } while ((s = status) >= 0);
229                      break;
230                  }
231              }
232          }
233 +        return s;
234      }
235  
236      /**
# Line 250 | Line 241 | public abstract class ForkJoinTask<V> im
241          int s;
242          while ((s = status) >= 0) {
243              synchronized(this) {
244 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
244 >                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
245                      boolean interrupted = false;
246                      do {
247                          try {
# Line 270 | Line 261 | public abstract class ForkJoinTask<V> im
261  
262      /**
263       * Unless done, calls exec and records status if completed, but
264 <     * doesn't wait for completion otherwise.
264 >     * doesn't wait for completion otherwise. Primary execution method
265 >     * for ForkJoinWorkerThread.
266       */
267      final void tryExec() {
268          try {
# Line 288 | Line 280 | public abstract class ForkJoinTask<V> im
280       * else waits for it.
281       * @return status on exit
282       */
283 <    private int waitingJoin() {
284 <        int s = status;
285 <        if (s < 0)
286 <            return s;
283 >    private int doJoin() {
284 >        int stat;
285 >        if ((stat = status) < 0)
286 >            return stat;
287          Thread t = Thread.currentThread();
288 +        ForkJoinWorkerThread w;
289          if (t instanceof ForkJoinWorkerThread) {
290 <            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
298 <            if (w.unpushTask(this)) {
290 >            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
291                  boolean completed;
292                  try {
293                      completed = exec();
# Line 305 | Line 297 | public abstract class ForkJoinTask<V> im
297                  if (completed)
298                      return setCompletion(NORMAL);
299              }
300 <            return w.pool.awaitJoin(this);
300 >            w.joinTask(this);
301 >            return status;
302          }
303 <        else
311 <            return externalAwaitDone();
303 >        return externalAwaitDone();
304      }
305  
306      /**
# Line 316 | Line 308 | public abstract class ForkJoinTask<V> im
308       * waits for completion otherwise.
309       * @return status on exit
310       */
311 <    private int waitingInvoke() {
312 <        int s = status;
313 <        if (s < 0)
322 <            return s;
323 <        boolean completed;
324 <        try {
325 <            completed = exec();
326 <        } catch (Throwable rex) {
327 <            return setExceptionalCompletion(rex);
328 <        }
329 <        if (completed)
330 <            return setCompletion(NORMAL);
331 <        return waitingJoin();
332 <    }
333 <
334 <    /**
335 <     * If this task is next in worker queue, runs it, else processes other
336 <     * tasks until complete.
337 <     * @return status on exit
338 <     */
339 <    private int busyJoin() {
340 <        int s = status;
341 <        if (s < 0)
342 <            return s;
343 <        ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
344 <        if (w.unpushTask(this)) {
311 >    private int doInvoke() {
312 >        int stat;
313 >        if ((stat = status) >= 0) {
314              boolean completed;
315              try {
316                  completed = exec();
# Line 349 | Line 318 | public abstract class ForkJoinTask<V> im
318                  return setExceptionalCompletion(rex);
319              }
320              if (completed)
321 <                return setCompletion(NORMAL);
321 >                stat = setCompletion(NORMAL);
322 >            else
323 >                stat = doJoin();
324          }
325 <        return w.execWhileJoining(this);
325 >        return stat;
326      }
327  
328      /**
# Line 400 | Line 371 | public abstract class ForkJoinTask<V> im
371       * @return the computed result
372       */
373      public final V join() {
374 <        return reportResult(waitingJoin());
374 >        return reportResult(doJoin());
375      }
376  
377      /**
# Line 411 | Line 382 | public abstract class ForkJoinTask<V> im
382       * @return the computed result
383       */
384      public final V invoke() {
385 <        return reportResult(waitingInvoke());
385 >        return reportResult(doInvoke());
386      }
387  
388      /**
# Line 469 | Line 440 | public abstract class ForkJoinTask<V> im
440              }
441              else if (i != 0)
442                  t.fork();
443 <            else if (t.waitingInvoke() < NORMAL && ex == null)
443 >            else if (t.doInvoke() < NORMAL && ex == null)
444                  ex = t.getException();
445          }
446          for (int i = 1; i <= last; ++i) {
# Line 477 | Line 448 | public abstract class ForkJoinTask<V> im
448              if (t != null) {
449                  if (ex != null)
450                      t.cancel(false);
451 <                else if (t.waitingJoin() < NORMAL && ex == null)
451 >                else if (t.doJoin() < NORMAL && ex == null)
452                      ex = t.getException();
453              }
454          }
# Line 525 | Line 496 | public abstract class ForkJoinTask<V> im
496              }
497              else if (i != 0)
498                  t.fork();
499 <            else if (t.waitingInvoke() < NORMAL && ex == null)
499 >            else if (t.doInvoke() < NORMAL && ex == null)
500                  ex = t.getException();
501          }
502          for (int i = 1; i <= last; ++i) {
# Line 533 | Line 504 | public abstract class ForkJoinTask<V> im
504              if (t != null) {
505                  if (ex != null)
506                      t.cancel(false);
507 <                else if (t.waitingJoin() < NORMAL && ex == null)
507 >                else if (t.doJoin() < NORMAL && ex == null)
508                      ex = t.getException();
509              }
510          }
# Line 568 | Line 539 | public abstract class ForkJoinTask<V> im
539       */
540      public boolean cancel(boolean mayInterruptIfRunning) {
541          setCompletion(CANCELLED);
542 <        return (status & COMPLETION_MASK) == CANCELLED;
542 >        return status == CANCELLED;
543      }
544  
545      /**
# Line 601 | Line 572 | public abstract class ForkJoinTask<V> im
572      }
573  
574      public final boolean isCancelled() {
575 <        return (status & COMPLETION_MASK) == CANCELLED;
575 >        return status == CANCELLED;
576      }
577  
578      /**
# Line 610 | Line 581 | public abstract class ForkJoinTask<V> im
581       * @return {@code true} if this task threw an exception or was cancelled
582       */
583      public final boolean isCompletedAbnormally() {
584 <        return (status & COMPLETION_MASK) < NORMAL;
584 >        return status < NORMAL;
585      }
586  
587      /**
# Line 621 | Line 592 | public abstract class ForkJoinTask<V> im
592       * exception and was not cancelled
593       */
594      public final boolean isCompletedNormally() {
595 <        return (status & COMPLETION_MASK) == NORMAL;
595 >        return status == NORMAL;
596      }
597  
598      /**
# Line 632 | Line 603 | public abstract class ForkJoinTask<V> im
603       * @return the exception, or {@code null} if none
604       */
605      public final Throwable getException() {
606 <        int s = status & COMPLETION_MASK;
606 >        int s = status;
607          return ((s >= NORMAL)    ? null :
608                  (s == CANCELLED) ? new CancellationException() :
609                  exceptionMap.get(this));
# Line 681 | Line 652 | public abstract class ForkJoinTask<V> im
652      }
653  
654      public final V get() throws InterruptedException, ExecutionException {
655 <        int s = waitingJoin() & COMPLETION_MASK;
655 >        int s = doJoin();
656          if (Thread.interrupted())
657              throw new InterruptedException();
658          if (s < NORMAL) {
# Line 754 | Line 725 | public abstract class ForkJoinTask<V> im
725              }
726          }
727          if (pool != null && dec)
728 <            pool.updateRunningCount(1);
728 >            pool.incrementRunningCount();
729          if (interrupted)
730              throw new InterruptedException();
731 <        int es = status & COMPLETION_MASK;
731 >        int es = status;
732          if (es != NORMAL) {
733              Throwable ex;
734              if (es == CANCELLED)
# Line 770 | Line 741 | public abstract class ForkJoinTask<V> im
741      }
742  
743      /**
773     * Possibly executes other tasks until this task {@link #isDone is
774     * done}, then returns the result of the computation.  This method
775     * may be more efficient than {@code join}, but is only applicable
776     * when there are no potential dependencies between continuation
777     * of the current task and that of any other task that might be
778     * executed while helping. (This usually holds for pure
779     * divide-and-conquer tasks).
780     *
781     * <p>This method may be invoked only from within {@code
782     * ForkJoinTask} computations (as may be determined using method
783     * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
784     * result in exceptions or errors, possibly including {@code
785     * ClassCastException}.
786     *
787     * @return the computed result
788     */
789    public final V helpJoin() {
790        return reportResult(busyJoin());
791    }
792
793    /**
794     * Possibly executes other tasks until this task {@link #isDone is
795     * done}.  This method may be useful when processing collections
796     * of tasks when some have been cancelled or otherwise known to
797     * have aborted.
798     *
799     * <p>This method may be invoked only from within {@code
800     * ForkJoinTask} computations (as may be determined using method
801     * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
802     * result in exceptions or errors, possibly including {@code
803     * ClassCastException}.
804     */
805    public final void quietlyHelpJoin() {
806        busyJoin();
807    }
808
809    /**
744       * Joins this task, without returning its result or throwing an
745       * exception. This method may be useful when processing
746       * collections of tasks when some have been cancelled or otherwise
747       * known to have aborted.
748       */
749      public final void quietlyJoin() {
750 <        waitingJoin();
750 >        doJoin();
751      }
752  
753      /**
# Line 824 | Line 758 | public abstract class ForkJoinTask<V> im
758       * known to have aborted.
759       */
760      public final void quietlyInvoke() {
761 <        waitingInvoke();
761 >        doInvoke();
762      }
763  
764      /**
# Line 856 | Line 790 | public abstract class ForkJoinTask<V> im
790       * pre-constructed trees of subtasks in loops.
791       */
792      public void reinitialize() {
793 <        if ((status & COMPLETION_MASK) == EXCEPTIONAL)
793 >        if (status == EXCEPTIONAL)
794              exceptionMap.remove(this);
795          status = 0;
796      }
# Line 1166 | Line 1100 | public abstract class ForkJoinTask<V> im
1100      private void readObject(java.io.ObjectInputStream s)
1101          throws java.io.IOException, ClassNotFoundException {
1102          s.defaultReadObject();
1169        status |= SIGNAL; // conservatively set external signal
1103          Object ex = s.readObject();
1104          if (ex != null)
1105              setExceptionalCompletion((Throwable) ex);
1106 +        if (status < 0)
1107 +            synchronized (this) { notifyAll(); }
1108      }
1109  
1110      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines