ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.49 by dl, Wed Jul 7 19:52:31 2010 UTC vs.
Revision 1.55 by dl, Sun Aug 29 23:34:46 2010 UTC

# Line 144 | Line 144 | public abstract class ForkJoinTask<V> im
144       * status maintenance (2) execution and awaiting completion (3)
145       * user-level methods that additionally report results. This is
146       * sometimes hard to see because this file orders exported methods
147 <     * in a way that flows well in javadocs.
147 >     * in a way that flows well in javadocs. In particular, most
148 >     * join mechanics are in method quietlyJoin, below.
149       */
150  
151 <    /**
152 <     * Run control status bits packed into a single int to minimize
153 <     * footprint and to ensure atomicity (via CAS).  Status is
154 <     * initially zero, and takes on nonnegative values until
155 <     * completed, upon which status holds value COMPLETED. CANCELLED,
156 <     * or EXCEPTIONAL. Tasks undergoing blocking waits by other
157 <     * threads have the SIGNAL bit set.  Completion of a stolen task
158 <     * with SIGNAL set awakens any waiters via notifyAll. Even though
159 <     * suboptimal for some purposes, we use basic builtin wait/notify
160 <     * to take advantage of "monitor inflation" in JVMs that we would
161 <     * otherwise need to emulate to avoid adding further per-task
162 <     * bookkeeping overhead.  We want these monitors to be "fat",
163 <     * i.e., not use biasing or thin-lock techniques, so use some odd
164 <     * coding idioms that tend to avoid them.
151 >    /*
152 >     * The status field holds run control status bits packed into a
153 >     * single int to minimize footprint and to ensure atomicity (via
154 >     * CAS).  Status is initially zero, and takes on nonnegative
155 >     * values until completed, upon which status holds value
156 >     * COMPLETED. CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
157 >     * waits by other threads have the SIGNAL bit set.  Completion of
158 >     * a stolen task with SIGNAL set awakens any waiters via
159 >     * notifyAll. Even though suboptimal for some purposes, we use
160 >     * basic builtin wait/notify to take advantage of "monitor
161 >     * inflation" in JVMs that we would otherwise need to emulate to
162 >     * avoid adding further per-task bookkeeping overhead.  We want
163 >     * these monitors to be "fat", i.e., not use biasing or thin-lock
164 >     * techniques, so use some odd coding idioms that tend to avoid
165 >     * them.
166       */
167 +
168 +    /** The run status of this task */
169      volatile int status; // accessed directly by pool and workers
170  
171      private static final int NORMAL      = -1;
# Line 188 | Line 192 | public abstract class ForkJoinTask<V> im
192       * also clearing signal request bits.
193       *
194       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
191     * @return status on exit
195       */
196 <    private int setCompletion(int completion) {
196 >    private void setCompletion(int completion) {
197          int s;
198          while ((s = status) >= 0) {
199              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
200 <                if (s == SIGNAL)
200 >                if (s != 0)
201                      synchronized (this) { notifyAll(); }
202 <                return completion;
202 >                break;
203              }
204          }
202        return s;
205      }
206  
207      /**
208       * Record exception and set exceptional completion
209       * @return status on exit
210       */
211 <    private int setExceptionalCompletion(Throwable rex) {
211 >    private void setExceptionalCompletion(Throwable rex) {
212          exceptionMap.put(this, rex);
213 <        return setCompletion(EXCEPTIONAL);
213 >        setCompletion(EXCEPTIONAL);
214      }
215  
216      /**
217 <     * Blocks a worker thread until completion. Called only by pool.
217 >     * Blocks a worker thread until completion. Called only by
218 >     * pool. Currently unused -- pool-based waits use timeout
219 >     * version below.
220       */
221 <    final int internalAwaitDone() {
222 <        int s;
221 >    final void internalAwaitDone() {
222 >        int s;         // the odd construction reduces lock bias effects
223          while ((s = status) >= 0) {
224 <            synchronized(this) {
225 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
226 <                    do {
227 <                        try {
228 <                            wait();
229 <                        } catch (InterruptedException ie) {
230 <                            cancelIfTerminating();
231 <                        }
232 <                    } while ((s = status) >= 0);
233 <                    break;
224 >            try {
225 >                synchronized(this) {
226 >                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
227 >                        wait();
228 >                }
229 >            } catch (InterruptedException ie) {
230 >                cancelIfTerminating();
231 >            }
232 >        }
233 >    }
234 >
235 >    /**
236 >     * Blocks a worker thread until completed or timed out.  Called
237 >     * only by pool.
238 >     *
239 >     * @return status on exit
240 >     */
241 >    final int internalAwaitDone(long millis) {
242 >        int s;
243 >        if ((s = status) >= 0) {
244 >            try {
245 >                synchronized(this) {
246 >                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
247 >                        wait(millis, 0);
248                  }
249 +            } catch (InterruptedException ie) {
250 +                cancelIfTerminating();
251              }
252 +            s = status;
253          }
254          return s;
255      }
256  
257      /**
258       * Blocks a non-worker-thread until completion.
238     * @return status on exit
259       */
260 <    private int externalAwaitDone() {
260 >    private void externalAwaitDone() {
261          int s;
262          while ((s = status) >= 0) {
263              synchronized(this) {
264                  if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
265                      boolean interrupted = false;
266 <                    do {
266 >                    while (status >= 0) {
267                          try {
268                              wait();
269                          } catch (InterruptedException ie) {
270                              interrupted = true;
271                          }
272 <                    } while ((s = status) >= 0);
272 >                    }
273                      if (interrupted)
274                          Thread.currentThread().interrupt();
275                      break;
276                  }
277              }
278          }
259        return s;
279      }
280  
281      /**
# Line 264 | Line 283 | public abstract class ForkJoinTask<V> im
283       * doesn't wait for completion otherwise. Primary execution method
284       * for ForkJoinWorkerThread.
285       */
286 <    final void tryExec() {
286 >    final void quietlyExec() {
287          try {
288              if (status < 0 || !exec())
289                  return;
# Line 275 | Line 294 | public abstract class ForkJoinTask<V> im
294          setCompletion(NORMAL); // must be outside try block
295      }
296  
278    /**
279     * If not done and this task is next in worker queue, runs it,
280     * else waits for it.
281     * @return status on exit
282     */
283    private int doJoin() {
284        int stat;
285        if ((stat = status) < 0)
286            return stat;
287        Thread t = Thread.currentThread();
288        ForkJoinWorkerThread w;
289        if (t instanceof ForkJoinWorkerThread) {
290            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
291                boolean completed;
292                try {
293                    completed = exec();
294                } catch (Throwable rex) {
295                    return setExceptionalCompletion(rex);
296                }
297                if (completed)
298                    return setCompletion(NORMAL);
299            }
300            w.joinTask(this);
301            return status;
302        }
303        return externalAwaitDone();
304    }
305
306    /**
307     * Unless done, calls exec and records status if completed, or
308     * waits for completion otherwise.
309     * @return status on exit
310     */
311    private int doInvoke() {
312        int stat;
313        if ((stat = status) >= 0) {
314            boolean completed;
315            try {
316                completed = exec();
317            } catch (Throwable rex) {
318                return setExceptionalCompletion(rex);
319            }
320            if (completed)
321                stat = setCompletion(NORMAL);
322            else
323                stat = doJoin();
324        }
325        return stat;
326    }
327
328    /**
329     * Returns result or throws exception associated with given status.
330     * @param s the status
331     */
332    private V reportResult(int s) {
333        Throwable ex;
334        if (s < NORMAL && (ex = getException()) != null)
335            UNSAFE.throwException(ex);
336        return getRawResult();
337    }
338
297      // public methods
298  
299      /**
# Line 371 | Line 329 | public abstract class ForkJoinTask<V> im
329       * @return the computed result
330       */
331      public final V join() {
332 <        return reportResult(doJoin());
332 >        quietlyJoin();
333 >        Throwable ex;
334 >        if (status < NORMAL && (ex = getException()) != null)
335 >            UNSAFE.throwException(ex);
336 >        return getRawResult();
337      }
338  
339      /**
# Line 382 | Line 344 | public abstract class ForkJoinTask<V> im
344       * @return the computed result
345       */
346      public final V invoke() {
347 <        return reportResult(doInvoke());
347 >        quietlyInvoke();
348 >        Throwable ex;
349 >        if (status < NORMAL && (ex = getException()) != null)
350 >            UNSAFE.throwException(ex);
351 >        return getRawResult();
352      }
353  
354      /**
# Line 440 | Line 406 | public abstract class ForkJoinTask<V> im
406              }
407              else if (i != 0)
408                  t.fork();
409 <            else if (t.doInvoke() < NORMAL && ex == null)
410 <                ex = t.getException();
409 >            else {
410 >                t.quietlyInvoke();
411 >                if (ex == null && t.status < NORMAL)
412 >                    ex = t.getException();
413 >            }
414          }
415          for (int i = 1; i <= last; ++i) {
416              ForkJoinTask<?> t = tasks[i];
417              if (t != null) {
418                  if (ex != null)
419                      t.cancel(false);
420 <                else if (t.doJoin() < NORMAL && ex == null)
421 <                    ex = t.getException();
420 >                else {
421 >                    t.quietlyJoin();
422 >                    if (ex == null && t.status < NORMAL)
423 >                        ex = t.getException();
424 >                }
425              }
426          }
427          if (ex != null)
# Line 496 | Line 468 | public abstract class ForkJoinTask<V> im
468              }
469              else if (i != 0)
470                  t.fork();
471 <            else if (t.doInvoke() < NORMAL && ex == null)
472 <                ex = t.getException();
471 >            else {
472 >                t.quietlyInvoke();
473 >                if (ex == null && t.status < NORMAL)
474 >                    ex = t.getException();
475 >            }
476          }
477          for (int i = 1; i <= last; ++i) {
478              ForkJoinTask<?> t = ts.get(i);
479              if (t != null) {
480                  if (ex != null)
481                      t.cancel(false);
482 <                else if (t.doJoin() < NORMAL && ex == null)
483 <                    ex = t.getException();
482 >                else {
483 >                    t.quietlyJoin();
484 >                    if (ex == null && t.status < NORMAL)
485 >                        ex = t.getException();
486 >                }
487              }
488          }
489          if (ex != null)
# Line 543 | Line 521 | public abstract class ForkJoinTask<V> im
521      }
522  
523      /**
524 <     * Cancels, ignoring any exceptions it throws. Used during worker
525 <     * and pool shutdown.
524 >     * Cancels, ignoring any exceptions thrown by cancel. Used during
525 >     * worker and pool shutdown. Cancel is spec'ed not to throw any
526 >     * exceptions, but if it does anyway, we have no recourse during
527 >     * shutdown, so guard against this case.
528       */
529      final void cancelIgnoringExceptions() {
530          try {
# Line 556 | Line 536 | public abstract class ForkJoinTask<V> im
536      /**
537       * Cancels ignoring exceptions if worker is terminating
538       */
539 <    private void cancelIfTerminating() {
539 >    final void cancelIfTerminating() {
540          Thread t = Thread.currentThread();
541          if ((t instanceof ForkJoinWorkerThread) &&
542              ((ForkJoinWorkerThread) t).isTerminating()) {
# Line 652 | Line 632 | public abstract class ForkJoinTask<V> im
632      }
633  
634      public final V get() throws InterruptedException, ExecutionException {
635 <        int s = doJoin();
635 >        quietlyJoin();
636          if (Thread.interrupted())
637              throw new InterruptedException();
638 +        int s = status;
639          if (s < NORMAL) {
640              Throwable ex;
641              if (s == CANCELLED)
# Line 672 | Line 653 | public abstract class ForkJoinTask<V> im
653          if (t instanceof ForkJoinWorkerThread) {
654              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
655              if (status >= 0 && w.unpushTask(this))
656 <                tryExec();
656 >                quietlyExec();
657              pool = w.pool;
658          }
659          else
660              pool = null;
661          /*
662 <         * Timed wait loop intermixes cases for fj (pool != null) and
662 >         * Timed wait loop intermixes cases for FJ (pool != null) and
663           * non FJ threads. For FJ, decrement pool count but don't try
664           * for replacement; increment count on completion. For non-FJ,
665           * deal with interrupts. This is messy, but a little less so
# Line 686 | Line 667 | public abstract class ForkJoinTask<V> im
667           */
668          boolean interrupted = false;
669          boolean dec = false; // true if pool count decremented
670 +        long nanos = unit.toNanos(timeout);
671          for (;;) {
672              if (Thread.interrupted() && pool == null) {
673                  interrupted = true;
# Line 694 | Line 676 | public abstract class ForkJoinTask<V> im
676              int s = status;
677              if (s < 0)
678                  break;
679 <            if (UNSAFE.compareAndSwapInt(this, statusOffset,
698 <                                         s, s | SIGNAL)) {
679 >            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
680                  long startTime = System.nanoTime();
700                long nanos = unit.toNanos(timeout);
681                  long nt; // wait time
682                  while (status >= 0 &&
683                         (nt = nanos - (System.nanoTime() - startTime)) > 0) {
# Line 741 | Line 721 | public abstract class ForkJoinTask<V> im
721      }
722  
723      /**
724 <     * Joins this task, without returning its result or throwing an
724 >     * Joins this task, without returning its result or throwing its
725       * exception. This method may be useful when processing
726       * collections of tasks when some have been cancelled or otherwise
727       * known to have aborted.
728       */
729      public final void quietlyJoin() {
730 <        doJoin();
730 >        Thread t;
731 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
732 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
733 >            if (status >= 0) {
734 >                if (w.unpushTask(this)) {
735 >                    boolean completed;
736 >                    try {
737 >                        completed = exec();
738 >                    } catch (Throwable rex) {
739 >                        setExceptionalCompletion(rex);
740 >                        return;
741 >                    }
742 >                    if (completed) {
743 >                        setCompletion(NORMAL);
744 >                        return;
745 >                    }
746 >                }
747 >                w.joinTask(this);
748 >            }
749 >        }
750 >        else
751 >            externalAwaitDone();
752      }
753  
754      /**
755       * Commences performing this task and awaits its completion if
756 <     * necessary, without returning its result or throwing an
756 >     * necessary, without returning its result or throwing its
757       * exception. This method may be useful when processing
758       * collections of tasks when some have been cancelled or otherwise
759       * known to have aborted.
760       */
761      public final void quietlyInvoke() {
762 <        doInvoke();
762 >        if (status >= 0) {
763 >            boolean completed;
764 >            try {
765 >                completed = exec();
766 >            } catch (Throwable rex) {
767 >                setExceptionalCompletion(rex);
768 >                return;
769 >            }
770 >            if (completed)
771 >                setCompletion(NORMAL);
772 >            else
773 >                quietlyJoin();
774 >        }
775      }
776  
777      /**
# Line 1103 | Line 1116 | public abstract class ForkJoinTask<V> im
1116          Object ex = s.readObject();
1117          if (ex != null)
1118              setExceptionalCompletion((Throwable) ex);
1106        if (status < 0)
1107            synchronized (this) { notifyAll(); }
1119      }
1120  
1121      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines