ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.50 by dl, Fri Jul 23 13:07:43 2010 UTC vs.
Revision 1.55 by dl, Sun Aug 29 23:34:46 2010 UTC

# Line 144 | Line 144 | public abstract class ForkJoinTask<V> im
144       * status maintenance (2) execution and awaiting completion (3)
145       * user-level methods that additionally report results. This is
146       * sometimes hard to see because this file orders exported methods
147 <     * in a way that flows well in javadocs.
147 >     * in a way that flows well in javadocs. In particular, most
148 >     * join mechanics are in method quietlyJoin, below.
149       */
150  
151      /*
# Line 164 | Line 165 | public abstract class ForkJoinTask<V> im
165       * them.
166       */
167  
168 <    /** Run status of this task */
168 >    /** The run status of this task */
169      volatile int status; // accessed directly by pool and workers
170  
171      private static final int NORMAL      = -1;
# Line 191 | Line 192 | public abstract class ForkJoinTask<V> im
192       * also clearing signal request bits.
193       *
194       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
194     * @return status on exit
195       */
196 <    private int setCompletion(int completion) {
196 >    private void setCompletion(int completion) {
197          int s;
198          while ((s = status) >= 0) {
199              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
200 <                if (s == SIGNAL)
200 >                if (s != 0)
201                      synchronized (this) { notifyAll(); }
202 <                return completion;
202 >                break;
203              }
204          }
205        return s;
205      }
206  
207      /**
208       * Record exception and set exceptional completion
209       * @return status on exit
210       */
211 <    private int setExceptionalCompletion(Throwable rex) {
211 >    private void setExceptionalCompletion(Throwable rex) {
212          exceptionMap.put(this, rex);
213 <        return setCompletion(EXCEPTIONAL);
213 >        setCompletion(EXCEPTIONAL);
214      }
215  
216      /**
217 <     * Blocks a worker thread until completion. Called only by pool.
217 >     * Blocks a worker thread until completion. Called only by
218 >     * pool. Currently unused -- pool-based waits use timeout
219 >     * version below.
220       */
221      final void internalAwaitDone() {
222          int s;         // the odd construction reduces lock bias effects
# Line 232 | Line 233 | public abstract class ForkJoinTask<V> im
233      }
234  
235      /**
236 <     * Blocks a non-worker-thread until completion.
236 >     * Blocks a worker thread until completed or timed out.  Called
237 >     * only by pool.
238 >     *
239       * @return status on exit
240       */
241 <    private int externalAwaitDone() {
241 >    final int internalAwaitDone(long millis) {
242 >        int s;
243 >        if ((s = status) >= 0) {
244 >            try {
245 >                synchronized(this) {
246 >                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
247 >                        wait(millis, 0);
248 >                }
249 >            } catch (InterruptedException ie) {
250 >                cancelIfTerminating();
251 >            }
252 >            s = status;
253 >        }
254 >        return s;
255 >    }
256 >
257 >    /**
258 >     * Blocks a non-worker-thread until completion.
259 >     */
260 >    private void externalAwaitDone() {
261          int s;
262          while ((s = status) >= 0) {
263              synchronized(this) {
264                  if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
265                      boolean interrupted = false;
266 <                    while ((s = status) >= 0) {
266 >                    while (status >= 0) {
267                          try {
268                              wait();
269                          } catch (InterruptedException ie) {
# Line 254 | Line 276 | public abstract class ForkJoinTask<V> im
276                  }
277              }
278          }
257        return s;
279      }
280  
281      /**
# Line 262 | Line 283 | public abstract class ForkJoinTask<V> im
283       * doesn't wait for completion otherwise. Primary execution method
284       * for ForkJoinWorkerThread.
285       */
286 <    final void tryExec() {
286 >    final void quietlyExec() {
287          try {
288              if (status < 0 || !exec())
289                  return;
# Line 273 | Line 294 | public abstract class ForkJoinTask<V> im
294          setCompletion(NORMAL); // must be outside try block
295      }
296  
276    /**
277     * If not done and this task is next in worker queue, runs it,
278     * else waits for it.
279     * @return status on exit
280     */
281    private int doJoin() {
282        int stat;
283        if ((stat = status) < 0)
284            return stat;
285        Thread t = Thread.currentThread();
286        ForkJoinWorkerThread w;
287        if (t instanceof ForkJoinWorkerThread) {
288            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
289                boolean completed;
290                try {
291                    completed = exec();
292                } catch (Throwable rex) {
293                    return setExceptionalCompletion(rex);
294                }
295                if (completed)
296                    return setCompletion(NORMAL);
297            }
298            return w.joinTask(this);
299        }
300        return externalAwaitDone();
301    }
302
303    /**
304     * Unless done, calls exec and records status if completed, or
305     * waits for completion otherwise.
306     * @return status on exit
307     */
308    private int doInvoke() {
309        int stat;
310        if ((stat = status) >= 0) {
311            boolean completed;
312            try {
313                completed = exec();
314            } catch (Throwable rex) {
315                return setExceptionalCompletion(rex);
316            }
317            stat = completed ? setCompletion(NORMAL) : doJoin();
318        }
319        return stat;
320    }
321
322    /**
323     * Returns result or throws exception associated with given status.
324     * @param s the status
325     */
326    private V reportResult(int s) {
327        Throwable ex;
328        if (s < NORMAL && (ex = getException()) != null)
329            UNSAFE.throwException(ex);
330        return getRawResult();
331    }
332
297      // public methods
298  
299      /**
# Line 365 | Line 329 | public abstract class ForkJoinTask<V> im
329       * @return the computed result
330       */
331      public final V join() {
332 <        return reportResult(doJoin());
332 >        quietlyJoin();
333 >        Throwable ex;
334 >        if (status < NORMAL && (ex = getException()) != null)
335 >            UNSAFE.throwException(ex);
336 >        return getRawResult();
337      }
338  
339      /**
# Line 376 | Line 344 | public abstract class ForkJoinTask<V> im
344       * @return the computed result
345       */
346      public final V invoke() {
347 <        return reportResult(doInvoke());
347 >        quietlyInvoke();
348 >        Throwable ex;
349 >        if (status < NORMAL && (ex = getException()) != null)
350 >            UNSAFE.throwException(ex);
351 >        return getRawResult();
352      }
353  
354      /**
# Line 434 | Line 406 | public abstract class ForkJoinTask<V> im
406              }
407              else if (i != 0)
408                  t.fork();
409 <            else if (t.doInvoke() < NORMAL && ex == null)
410 <                ex = t.getException();
409 >            else {
410 >                t.quietlyInvoke();
411 >                if (ex == null && t.status < NORMAL)
412 >                    ex = t.getException();
413 >            }
414          }
415          for (int i = 1; i <= last; ++i) {
416              ForkJoinTask<?> t = tasks[i];
417              if (t != null) {
418                  if (ex != null)
419                      t.cancel(false);
420 <                else if (t.doJoin() < NORMAL && ex == null)
421 <                    ex = t.getException();
420 >                else {
421 >                    t.quietlyJoin();
422 >                    if (ex == null && t.status < NORMAL)
423 >                        ex = t.getException();
424 >                }
425              }
426          }
427          if (ex != null)
# Line 490 | Line 468 | public abstract class ForkJoinTask<V> im
468              }
469              else if (i != 0)
470                  t.fork();
471 <            else if (t.doInvoke() < NORMAL && ex == null)
472 <                ex = t.getException();
471 >            else {
472 >                t.quietlyInvoke();
473 >                if (ex == null && t.status < NORMAL)
474 >                    ex = t.getException();
475 >            }
476          }
477          for (int i = 1; i <= last; ++i) {
478              ForkJoinTask<?> t = ts.get(i);
479              if (t != null) {
480                  if (ex != null)
481                      t.cancel(false);
482 <                else if (t.doJoin() < NORMAL && ex == null)
483 <                    ex = t.getException();
482 >                else {
483 >                    t.quietlyJoin();
484 >                    if (ex == null && t.status < NORMAL)
485 >                        ex = t.getException();
486 >                }
487              }
488          }
489          if (ex != null)
# Line 532 | Line 516 | public abstract class ForkJoinTask<V> im
516       * @return {@code true} if this task is now cancelled
517       */
518      public boolean cancel(boolean mayInterruptIfRunning) {
519 <        return setCompletion(CANCELLED) == CANCELLED;
519 >        setCompletion(CANCELLED);
520 >        return status == CANCELLED;
521      }
522  
523      /**
524 <     * Cancels, ignoring any exceptions it throws. Used during worker
525 <     * and pool shutdown.
524 >     * Cancels, ignoring any exceptions thrown by cancel. Used during
525 >     * worker and pool shutdown. Cancel is spec'ed not to throw any
526 >     * exceptions, but if it does anyway, we have no recourse during
527 >     * shutdown, so guard against this case.
528       */
529      final void cancelIgnoringExceptions() {
530          try {
# Line 645 | Line 632 | public abstract class ForkJoinTask<V> im
632      }
633  
634      public final V get() throws InterruptedException, ExecutionException {
635 <        int s = doJoin();
635 >        quietlyJoin();
636          if (Thread.interrupted())
637              throw new InterruptedException();
638 +        int s = status;
639          if (s < NORMAL) {
640              Throwable ex;
641              if (s == CANCELLED)
# Line 665 | Line 653 | public abstract class ForkJoinTask<V> im
653          if (t instanceof ForkJoinWorkerThread) {
654              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
655              if (status >= 0 && w.unpushTask(this))
656 <                tryExec();
656 >                quietlyExec();
657              pool = w.pool;
658          }
659          else
660              pool = null;
661          /*
662 <         * Timed wait loop intermixes cases for fj (pool != null) and
662 >         * Timed wait loop intermixes cases for FJ (pool != null) and
663           * non FJ threads. For FJ, decrement pool count but don't try
664           * for replacement; increment count on completion. For non-FJ,
665           * deal with interrupts. This is messy, but a little less so
# Line 679 | Line 667 | public abstract class ForkJoinTask<V> im
667           */
668          boolean interrupted = false;
669          boolean dec = false; // true if pool count decremented
670 +        long nanos = unit.toNanos(timeout);
671          for (;;) {
672              if (Thread.interrupted() && pool == null) {
673                  interrupted = true;
# Line 687 | Line 676 | public abstract class ForkJoinTask<V> im
676              int s = status;
677              if (s < 0)
678                  break;
679 <            if (UNSAFE.compareAndSwapInt(this, statusOffset,
691 <                                         s, s | SIGNAL)) {
679 >            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
680                  long startTime = System.nanoTime();
693                long nanos = unit.toNanos(timeout);
681                  long nt; // wait time
682                  while (status >= 0 &&
683                         (nt = nanos - (System.nanoTime() - startTime)) > 0) {
# Line 734 | Line 721 | public abstract class ForkJoinTask<V> im
721      }
722  
723      /**
724 <     * Joins this task, without returning its result or throwing an
724 >     * Joins this task, without returning its result or throwing its
725       * exception. This method may be useful when processing
726       * collections of tasks when some have been cancelled or otherwise
727       * known to have aborted.
728       */
729      public final void quietlyJoin() {
730 <        doJoin();
730 >        Thread t;
731 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
732 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
733 >            if (status >= 0) {
734 >                if (w.unpushTask(this)) {
735 >                    boolean completed;
736 >                    try {
737 >                        completed = exec();
738 >                    } catch (Throwable rex) {
739 >                        setExceptionalCompletion(rex);
740 >                        return;
741 >                    }
742 >                    if (completed) {
743 >                        setCompletion(NORMAL);
744 >                        return;
745 >                    }
746 >                }
747 >                w.joinTask(this);
748 >            }
749 >        }
750 >        else
751 >            externalAwaitDone();
752      }
753  
754      /**
755       * Commences performing this task and awaits its completion if
756 <     * necessary, without returning its result or throwing an
756 >     * necessary, without returning its result or throwing its
757       * exception. This method may be useful when processing
758       * collections of tasks when some have been cancelled or otherwise
759       * known to have aborted.
760       */
761      public final void quietlyInvoke() {
762 <        doInvoke();
762 >        if (status >= 0) {
763 >            boolean completed;
764 >            try {
765 >                completed = exec();
766 >            } catch (Throwable rex) {
767 >                setExceptionalCompletion(rex);
768 >                return;
769 >            }
770 >            if (completed)
771 >                setCompletion(NORMAL);
772 >            else
773 >                quietlyJoin();
774 >        }
775      }
776  
777      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines