ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.49 by dl, Wed Jul 7 19:52:31 2010 UTC vs.
Revision 1.53 by dl, Wed Aug 11 18:45:12 2010 UTC

# Line 144 | Line 144 | public abstract class ForkJoinTask<V> im
144       * status maintenance (2) execution and awaiting completion (3)
145       * user-level methods that additionally report results. This is
146       * sometimes hard to see because this file orders exported methods
147 <     * in a way that flows well in javadocs.
147 >     * in a way that flows well in javadocs. In particular, most
148 >     * join mechanics are in method quietlyJoin, below.
149       */
150  
151 <    /**
152 <     * Run control status bits packed into a single int to minimize
153 <     * footprint and to ensure atomicity (via CAS).  Status is
154 <     * initially zero, and takes on nonnegative values until
155 <     * completed, upon which status holds value COMPLETED. CANCELLED,
156 <     * or EXCEPTIONAL. Tasks undergoing blocking waits by other
157 <     * threads have the SIGNAL bit set.  Completion of a stolen task
158 <     * with SIGNAL set awakens any waiters via notifyAll. Even though
159 <     * suboptimal for some purposes, we use basic builtin wait/notify
160 <     * to take advantage of "monitor inflation" in JVMs that we would
161 <     * otherwise need to emulate to avoid adding further per-task
162 <     * bookkeeping overhead.  We want these monitors to be "fat",
163 <     * i.e., not use biasing or thin-lock techniques, so use some odd
164 <     * coding idioms that tend to avoid them.
151 >    /*
152 >     * The status field holds run control status bits packed into a
153 >     * single int to minimize footprint and to ensure atomicity (via
154 >     * CAS).  Status is initially zero, and takes on nonnegative
155 >     * values until completed, upon which status holds value
156 >     * COMPLETED. CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
157 >     * waits by other threads have the SIGNAL bit set.  Completion of
158 >     * a stolen task with SIGNAL set awakens any waiters via
159 >     * notifyAll. Even though suboptimal for some purposes, we use
160 >     * basic builtin wait/notify to take advantage of "monitor
161 >     * inflation" in JVMs that we would otherwise need to emulate to
162 >     * avoid adding further per-task bookkeeping overhead.  We want
163 >     * these monitors to be "fat", i.e., not use biasing or thin-lock
164 >     * techniques, so use some odd coding idioms that tend to avoid
165 >     * them.
166       */
167 +
168 +    /** The run status of this task */
169      volatile int status; // accessed directly by pool and workers
170  
171      private static final int NORMAL      = -1;
# Line 188 | Line 192 | public abstract class ForkJoinTask<V> im
192       * also clearing signal request bits.
193       *
194       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
191     * @return status on exit
195       */
196 <    private int setCompletion(int completion) {
196 >    private void setCompletion(int completion) {
197          int s;
198          while ((s = status) >= 0) {
199              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
200 <                if (s == SIGNAL)
200 >                if (s != 0)
201                      synchronized (this) { notifyAll(); }
202 <                return completion;
202 >                break;
203              }
204          }
202        return s;
205      }
206  
207      /**
208       * Record exception and set exceptional completion
209       * @return status on exit
210       */
211 <    private int setExceptionalCompletion(Throwable rex) {
211 >    private void setExceptionalCompletion(Throwable rex) {
212          exceptionMap.put(this, rex);
213 <        return setCompletion(EXCEPTIONAL);
213 >        setCompletion(EXCEPTIONAL);
214      }
215  
216      /**
217       * Blocks a worker thread until completion. Called only by pool.
218       */
219 <    final int internalAwaitDone() {
220 <        int s;
219 >    final void internalAwaitDone() {
220 >        int s;         // the odd construction reduces lock bias effects
221          while ((s = status) >= 0) {
222 <            synchronized(this) {
223 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
224 <                    do {
225 <                        try {
224 <                            wait();
225 <                        } catch (InterruptedException ie) {
226 <                            cancelIfTerminating();
227 <                        }
228 <                    } while ((s = status) >= 0);
229 <                    break;
222 >            try {
223 >                synchronized(this) {
224 >                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
225 >                        wait();
226                  }
227 +            } catch (InterruptedException ie) {
228 +                cancelIfTerminating();
229              }
230          }
233        return s;
231      }
232  
233      /**
234       * Blocks a non-worker-thread until completion.
238     * @return status on exit
235       */
236 <    private int externalAwaitDone() {
236 >    private void externalAwaitDone() {
237          int s;
238          while ((s = status) >= 0) {
239              synchronized(this) {
240                  if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
241                      boolean interrupted = false;
242 <                    do {
242 >                    while (status >= 0) {
243                          try {
244                              wait();
245                          } catch (InterruptedException ie) {
246                              interrupted = true;
247                          }
248 <                    } while ((s = status) >= 0);
248 >                    }
249                      if (interrupted)
250                          Thread.currentThread().interrupt();
251                      break;
252                  }
253              }
254          }
259        return s;
255      }
256  
257      /**
# Line 264 | Line 259 | public abstract class ForkJoinTask<V> im
259       * doesn't wait for completion otherwise. Primary execution method
260       * for ForkJoinWorkerThread.
261       */
262 <    final void tryExec() {
262 >    final void quietlyExec() {
263          try {
264              if (status < 0 || !exec())
265                  return;
# Line 275 | Line 270 | public abstract class ForkJoinTask<V> im
270          setCompletion(NORMAL); // must be outside try block
271      }
272  
278    /**
279     * If not done and this task is next in worker queue, runs it,
280     * else waits for it.
281     * @return status on exit
282     */
283    private int doJoin() {
284        int stat;
285        if ((stat = status) < 0)
286            return stat;
287        Thread t = Thread.currentThread();
288        ForkJoinWorkerThread w;
289        if (t instanceof ForkJoinWorkerThread) {
290            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
291                boolean completed;
292                try {
293                    completed = exec();
294                } catch (Throwable rex) {
295                    return setExceptionalCompletion(rex);
296                }
297                if (completed)
298                    return setCompletion(NORMAL);
299            }
300            w.joinTask(this);
301            return status;
302        }
303        return externalAwaitDone();
304    }
305
306    /**
307     * Unless done, calls exec and records status if completed, or
308     * waits for completion otherwise.
309     * @return status on exit
310     */
311    private int doInvoke() {
312        int stat;
313        if ((stat = status) >= 0) {
314            boolean completed;
315            try {
316                completed = exec();
317            } catch (Throwable rex) {
318                return setExceptionalCompletion(rex);
319            }
320            if (completed)
321                stat = setCompletion(NORMAL);
322            else
323                stat = doJoin();
324        }
325        return stat;
326    }
327
328    /**
329     * Returns result or throws exception associated with given status.
330     * @param s the status
331     */
332    private V reportResult(int s) {
333        Throwable ex;
334        if (s < NORMAL && (ex = getException()) != null)
335            UNSAFE.throwException(ex);
336        return getRawResult();
337    }
338
273      // public methods
274  
275      /**
# Line 371 | Line 305 | public abstract class ForkJoinTask<V> im
305       * @return the computed result
306       */
307      public final V join() {
308 <        return reportResult(doJoin());
308 >        quietlyJoin();
309 >        Throwable ex;
310 >        if (status < NORMAL && (ex = getException()) != null)
311 >            UNSAFE.throwException(ex);
312 >        return getRawResult();
313      }
314  
315      /**
# Line 382 | Line 320 | public abstract class ForkJoinTask<V> im
320       * @return the computed result
321       */
322      public final V invoke() {
323 <        return reportResult(doInvoke());
323 >        quietlyInvoke();
324 >        Throwable ex;
325 >        if (status < NORMAL && (ex = getException()) != null)
326 >            UNSAFE.throwException(ex);
327 >        return getRawResult();
328      }
329  
330      /**
# Line 440 | Line 382 | public abstract class ForkJoinTask<V> im
382              }
383              else if (i != 0)
384                  t.fork();
385 <            else if (t.doInvoke() < NORMAL && ex == null)
386 <                ex = t.getException();
385 >            else {
386 >                t.quietlyInvoke();
387 >                if (ex == null && t.status < NORMAL)
388 >                    ex = t.getException();
389 >            }
390          }
391          for (int i = 1; i <= last; ++i) {
392              ForkJoinTask<?> t = tasks[i];
393              if (t != null) {
394                  if (ex != null)
395                      t.cancel(false);
396 <                else if (t.doJoin() < NORMAL && ex == null)
397 <                    ex = t.getException();
396 >                else {
397 >                    t.quietlyJoin();
398 >                    if (ex == null && t.status < NORMAL)
399 >                        ex = t.getException();
400 >                }
401              }
402          }
403          if (ex != null)
# Line 496 | Line 444 | public abstract class ForkJoinTask<V> im
444              }
445              else if (i != 0)
446                  t.fork();
447 <            else if (t.doInvoke() < NORMAL && ex == null)
448 <                ex = t.getException();
447 >            else {
448 >                t.quietlyInvoke();
449 >                if (ex == null && t.status < NORMAL)
450 >                    ex = t.getException();
451 >            }
452          }
453          for (int i = 1; i <= last; ++i) {
454              ForkJoinTask<?> t = ts.get(i);
455              if (t != null) {
456                  if (ex != null)
457                      t.cancel(false);
458 <                else if (t.doJoin() < NORMAL && ex == null)
459 <                    ex = t.getException();
458 >                else {
459 >                    t.quietlyJoin();
460 >                    if (ex == null && t.status < NORMAL)
461 >                        ex = t.getException();
462 >                }
463              }
464          }
465          if (ex != null)
# Line 543 | Line 497 | public abstract class ForkJoinTask<V> im
497      }
498  
499      /**
500 <     * Cancels, ignoring any exceptions it throws. Used during worker
501 <     * and pool shutdown.
500 >     * Cancels, ignoring any exceptions thrown by cancel. Used during
501 >     * worker and pool shutdown. Cancel is spec'ed not to throw any
502 >     * exceptions, but if it does anyway, we have no recourse during
503 >     * shutdown, so guard against this case.
504       */
505      final void cancelIgnoringExceptions() {
506          try {
# Line 556 | Line 512 | public abstract class ForkJoinTask<V> im
512      /**
513       * Cancels ignoring exceptions if worker is terminating
514       */
515 <    private void cancelIfTerminating() {
515 >    final void cancelIfTerminating() {
516          Thread t = Thread.currentThread();
517          if ((t instanceof ForkJoinWorkerThread) &&
518              ((ForkJoinWorkerThread) t).isTerminating()) {
# Line 652 | Line 608 | public abstract class ForkJoinTask<V> im
608      }
609  
610      public final V get() throws InterruptedException, ExecutionException {
611 <        int s = doJoin();
611 >        quietlyJoin();
612          if (Thread.interrupted())
613              throw new InterruptedException();
614 +        int s = status;
615          if (s < NORMAL) {
616              Throwable ex;
617              if (s == CANCELLED)
# Line 672 | Line 629 | public abstract class ForkJoinTask<V> im
629          if (t instanceof ForkJoinWorkerThread) {
630              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
631              if (status >= 0 && w.unpushTask(this))
632 <                tryExec();
632 >                quietlyExec();
633              pool = w.pool;
634          }
635          else
636              pool = null;
637          /*
638 <         * Timed wait loop intermixes cases for fj (pool != null) and
638 >         * Timed wait loop intermixes cases for FJ (pool != null) and
639           * non FJ threads. For FJ, decrement pool count but don't try
640           * for replacement; increment count on completion. For non-FJ,
641           * deal with interrupts. This is messy, but a little less so
# Line 694 | Line 651 | public abstract class ForkJoinTask<V> im
651              int s = status;
652              if (s < 0)
653                  break;
654 <            if (UNSAFE.compareAndSwapInt(this, statusOffset,
698 <                                         s, s | SIGNAL)) {
654 >            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
655                  long startTime = System.nanoTime();
656                  long nanos = unit.toNanos(timeout);
657                  long nt; // wait time
# Line 741 | Line 697 | public abstract class ForkJoinTask<V> im
697      }
698  
699      /**
700 <     * Joins this task, without returning its result or throwing an
700 >     * Joins this task, without returning its result or throwing its
701       * exception. This method may be useful when processing
702       * collections of tasks when some have been cancelled or otherwise
703       * known to have aborted.
704       */
705      public final void quietlyJoin() {
706 <        doJoin();
706 >        Thread t;
707 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
708 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
709 >            if (status >= 0) {
710 >                if (w.unpushTask(this)) {
711 >                    boolean completed;
712 >                    try {
713 >                        completed = exec();
714 >                    } catch (Throwable rex) {
715 >                        setExceptionalCompletion(rex);
716 >                        return;
717 >                    }
718 >                    if (completed) {
719 >                        setCompletion(NORMAL);
720 >                        return;
721 >                    }
722 >                }
723 >                w.joinTask(this);
724 >            }
725 >        }
726 >        else
727 >            externalAwaitDone();
728      }
729  
730      /**
731       * Commences performing this task and awaits its completion if
732 <     * necessary, without returning its result or throwing an
732 >     * necessary, without returning its result or throwing its
733       * exception. This method may be useful when processing
734       * collections of tasks when some have been cancelled or otherwise
735       * known to have aborted.
736       */
737      public final void quietlyInvoke() {
738 <        doInvoke();
738 >        if (status >= 0) {
739 >            boolean completed;
740 >            try {
741 >                completed = exec();
742 >            } catch (Throwable rex) {
743 >                setExceptionalCompletion(rex);
744 >                return;
745 >            }
746 >            if (completed)
747 >                setCompletion(NORMAL);
748 >            else
749 >                quietlyJoin();
750 >        }
751      }
752  
753      /**
# Line 1103 | Line 1092 | public abstract class ForkJoinTask<V> im
1092          Object ex = s.readObject();
1093          if (ex != null)
1094              setExceptionalCompletion((Throwable) ex);
1106        if (status < 0)
1107            synchronized (this) { notifyAll(); }
1095      }
1096  
1097      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines