ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.16 by dl, Wed Jul 7 20:41:24 2010 UTC vs.
Revision 1.17 by dl, Wed Aug 11 18:45:45 2010 UTC

# Line 142 | Line 142 | public abstract class ForkJoinTask<V> im
142       * status maintenance (2) execution and awaiting completion (3)
143       * user-level methods that additionally report results. This is
144       * sometimes hard to see because this file orders exported methods
145 <     * in a way that flows well in javadocs.
145 >     * in a way that flows well in javadocs. In particular, most
146 >     * join mechanics are in method quietlyJoin, below.
147       */
148  
149 <    /**
150 <     * Run control status bits packed into a single int to minimize
151 <     * footprint and to ensure atomicity (via CAS).  Status is
152 <     * initially zero, and takes on nonnegative values until
153 <     * completed, upon which status holds value COMPLETED. CANCELLED,
154 <     * or EXCEPTIONAL. Tasks undergoing blocking waits by other
155 <     * threads have the SIGNAL bit set.  Completion of a stolen task
156 <     * with SIGNAL set awakens any waiters via notifyAll. Even though
157 <     * suboptimal for some purposes, we use basic builtin wait/notify
158 <     * to take advantage of "monitor inflation" in JVMs that we would
159 <     * otherwise need to emulate to avoid adding further per-task
160 <     * bookkeeping overhead.  We want these monitors to be "fat",
161 <     * i.e., not use biasing or thin-lock techniques, so use some odd
162 <     * coding idioms that tend to avoid them.
149 >    /*
150 >     * The status field holds run control status bits packed into a
151 >     * single int to minimize footprint and to ensure atomicity (via
152 >     * CAS).  Status is initially zero, and takes on nonnegative
153 >     * values until completed, upon which status holds value
154 >     * COMPLETED. CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
155 >     * waits by other threads have the SIGNAL bit set.  Completion of
156 >     * a stolen task with SIGNAL set awakens any waiters via
157 >     * notifyAll. Even though suboptimal for some purposes, we use
158 >     * basic builtin wait/notify to take advantage of "monitor
159 >     * inflation" in JVMs that we would otherwise need to emulate to
160 >     * avoid adding further per-task bookkeeping overhead.  We want
161 >     * these monitors to be "fat", i.e., not use biasing or thin-lock
162 >     * techniques, so use some odd coding idioms that tend to avoid
163 >     * them.
164       */
165 +
166 +    /** The run status of this task */
167      volatile int status; // accessed directly by pool and workers
168  
169      private static final int NORMAL      = -1;
# Line 186 | Line 190 | public abstract class ForkJoinTask<V> im
190       * also clearing signal request bits.
191       *
192       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
189     * @return status on exit
193       */
194 <    private int setCompletion(int completion) {
194 >    private void setCompletion(int completion) {
195          int s;
196          while ((s = status) >= 0) {
197              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
198 <                if (s == SIGNAL)
198 >                if (s != 0)
199                      synchronized (this) { notifyAll(); }
200 <                return completion;
200 >                break;
201              }
202          }
200        return s;
203      }
204  
205      /**
206       * Record exception and set exceptional completion
207       * @return status on exit
208       */
209 <    private int setExceptionalCompletion(Throwable rex) {
209 >    private void setExceptionalCompletion(Throwable rex) {
210          exceptionMap.put(this, rex);
211 <        return setCompletion(EXCEPTIONAL);
211 >        setCompletion(EXCEPTIONAL);
212      }
213  
214      /**
215       * Blocks a worker thread until completion. Called only by pool.
216       */
217 <    final int internalAwaitDone() {
218 <        int s;
217 >    final void internalAwaitDone() {
218 >        int s;         // the odd construction reduces lock bias effects
219          while ((s = status) >= 0) {
220 <            synchronized(this) {
221 <                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
222 <                    do {
223 <                        try {
222 <                            wait();
223 <                        } catch (InterruptedException ie) {
224 <                            cancelIfTerminating();
225 <                        }
226 <                    } while ((s = status) >= 0);
227 <                    break;
220 >            try {
221 >                synchronized(this) {
222 >                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
223 >                        wait();
224                  }
225 +            } catch (InterruptedException ie) {
226 +                cancelIfTerminating();
227              }
228          }
231        return s;
229      }
230  
231      /**
232       * Blocks a non-worker-thread until completion.
236     * @return status on exit
233       */
234 <    private int externalAwaitDone() {
234 >    private void externalAwaitDone() {
235          int s;
236          while ((s = status) >= 0) {
237              synchronized(this) {
238                  if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
239                      boolean interrupted = false;
240 <                    do {
240 >                    while (status >= 0) {
241                          try {
242                              wait();
243                          } catch (InterruptedException ie) {
244                              interrupted = true;
245                          }
246 <                    } while ((s = status) >= 0);
246 >                    }
247                      if (interrupted)
248                          Thread.currentThread().interrupt();
249                      break;
250                  }
251              }
252          }
257        return s;
253      }
254  
255      /**
# Line 262 | Line 257 | public abstract class ForkJoinTask<V> im
257       * doesn't wait for completion otherwise. Primary execution method
258       * for ForkJoinWorkerThread.
259       */
260 <    final void tryExec() {
260 >    final void quietlyExec() {
261          try {
262              if (status < 0 || !exec())
263                  return;
# Line 273 | Line 268 | public abstract class ForkJoinTask<V> im
268          setCompletion(NORMAL); // must be outside try block
269      }
270  
276    /**
277     * If not done and this task is next in worker queue, runs it,
278     * else waits for it.
279     * @return status on exit
280     */
281    private int doJoin() {
282        int stat;
283        if ((stat = status) < 0)
284            return stat;
285        Thread t = Thread.currentThread();
286        ForkJoinWorkerThread w;
287        if (t instanceof ForkJoinWorkerThread) {
288            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
289                boolean completed;
290                try {
291                    completed = exec();
292                } catch (Throwable rex) {
293                    return setExceptionalCompletion(rex);
294                }
295                if (completed)
296                    return setCompletion(NORMAL);
297            }
298            w.joinTask(this);
299            return status;
300        }
301        return externalAwaitDone();
302    }
303
304    /**
305     * Unless done, calls exec and records status if completed, or
306     * waits for completion otherwise.
307     * @return status on exit
308     */
309    private int doInvoke() {
310        int stat;
311        if ((stat = status) >= 0) {
312            boolean completed;
313            try {
314                completed = exec();
315            } catch (Throwable rex) {
316                return setExceptionalCompletion(rex);
317            }
318            if (completed)
319                stat = setCompletion(NORMAL);
320            else
321                stat = doJoin();
322        }
323        return stat;
324    }
325
326    /**
327     * Returns result or throws exception associated with given status.
328     * @param s the status
329     */
330    private V reportResult(int s) {
331        Throwable ex;
332        if (s < NORMAL && (ex = getException()) != null)
333            UNSAFE.throwException(ex);
334        return getRawResult();
335    }
336
271      // public methods
272  
273      /**
# Line 369 | Line 303 | public abstract class ForkJoinTask<V> im
303       * @return the computed result
304       */
305      public final V join() {
306 <        return reportResult(doJoin());
306 >        quietlyJoin();
307 >        Throwable ex;
308 >        if (status < NORMAL && (ex = getException()) != null)
309 >            UNSAFE.throwException(ex);
310 >        return getRawResult();
311      }
312  
313      /**
# Line 380 | Line 318 | public abstract class ForkJoinTask<V> im
318       * @return the computed result
319       */
320      public final V invoke() {
321 <        return reportResult(doInvoke());
321 >        quietlyInvoke();
322 >        Throwable ex;
323 >        if (status < NORMAL && (ex = getException()) != null)
324 >            UNSAFE.throwException(ex);
325 >        return getRawResult();
326      }
327  
328      /**
# Line 438 | Line 380 | public abstract class ForkJoinTask<V> im
380              }
381              else if (i != 0)
382                  t.fork();
383 <            else if (t.doInvoke() < NORMAL && ex == null)
384 <                ex = t.getException();
383 >            else {
384 >                t.quietlyInvoke();
385 >                if (ex == null && t.status < NORMAL)
386 >                    ex = t.getException();
387 >            }
388          }
389          for (int i = 1; i <= last; ++i) {
390              ForkJoinTask<?> t = tasks[i];
391              if (t != null) {
392                  if (ex != null)
393                      t.cancel(false);
394 <                else if (t.doJoin() < NORMAL && ex == null)
395 <                    ex = t.getException();
394 >                else {
395 >                    t.quietlyJoin();
396 >                    if (ex == null && t.status < NORMAL)
397 >                        ex = t.getException();
398 >                }
399              }
400          }
401          if (ex != null)
# Line 494 | Line 442 | public abstract class ForkJoinTask<V> im
442              }
443              else if (i != 0)
444                  t.fork();
445 <            else if (t.doInvoke() < NORMAL && ex == null)
446 <                ex = t.getException();
445 >            else {
446 >                t.quietlyInvoke();
447 >                if (ex == null && t.status < NORMAL)
448 >                    ex = t.getException();
449 >            }
450          }
451          for (int i = 1; i <= last; ++i) {
452              ForkJoinTask<?> t = ts.get(i);
453              if (t != null) {
454                  if (ex != null)
455                      t.cancel(false);
456 <                else if (t.doJoin() < NORMAL && ex == null)
457 <                    ex = t.getException();
456 >                else {
457 >                    t.quietlyJoin();
458 >                    if (ex == null && t.status < NORMAL)
459 >                        ex = t.getException();
460 >                }
461              }
462          }
463          if (ex != null)
# Line 541 | Line 495 | public abstract class ForkJoinTask<V> im
495      }
496  
497      /**
498 <     * Cancels, ignoring any exceptions it throws. Used during worker
499 <     * and pool shutdown.
498 >     * Cancels, ignoring any exceptions thrown by cancel. Used during
499 >     * worker and pool shutdown. Cancel is spec'ed not to throw any
500 >     * exceptions, but if it does anyway, we have no recourse during
501 >     * shutdown, so guard against this case.
502       */
503      final void cancelIgnoringExceptions() {
504          try {
# Line 554 | Line 510 | public abstract class ForkJoinTask<V> im
510      /**
511       * Cancels ignoring exceptions if worker is terminating
512       */
513 <    private void cancelIfTerminating() {
513 >    final void cancelIfTerminating() {
514          Thread t = Thread.currentThread();
515          if ((t instanceof ForkJoinWorkerThread) &&
516              ((ForkJoinWorkerThread) t).isTerminating()) {
# Line 650 | Line 606 | public abstract class ForkJoinTask<V> im
606      }
607  
608      public final V get() throws InterruptedException, ExecutionException {
609 <        int s = doJoin();
609 >        quietlyJoin();
610          if (Thread.interrupted())
611              throw new InterruptedException();
612 +        int s = status;
613          if (s < NORMAL) {
614              Throwable ex;
615              if (s == CANCELLED)
# Line 670 | Line 627 | public abstract class ForkJoinTask<V> im
627          if (t instanceof ForkJoinWorkerThread) {
628              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
629              if (status >= 0 && w.unpushTask(this))
630 <                tryExec();
630 >                quietlyExec();
631              pool = w.pool;
632          }
633          else
634              pool = null;
635          /*
636 <         * Timed wait loop intermixes cases for fj (pool != null) and
636 >         * Timed wait loop intermixes cases for FJ (pool != null) and
637           * non FJ threads. For FJ, decrement pool count but don't try
638           * for replacement; increment count on completion. For non-FJ,
639           * deal with interrupts. This is messy, but a little less so
# Line 692 | Line 649 | public abstract class ForkJoinTask<V> im
649              int s = status;
650              if (s < 0)
651                  break;
652 <            if (UNSAFE.compareAndSwapInt(this, statusOffset,
696 <                                         s, s | SIGNAL)) {
652 >            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
653                  long startTime = System.nanoTime();
654                  long nanos = unit.toNanos(timeout);
655                  long nt; // wait time
# Line 739 | Line 695 | public abstract class ForkJoinTask<V> im
695      }
696  
697      /**
698 <     * Joins this task, without returning its result or throwing an
698 >     * Joins this task, without returning its result or throwing its
699       * exception. This method may be useful when processing
700       * collections of tasks when some have been cancelled or otherwise
701       * known to have aborted.
702       */
703      public final void quietlyJoin() {
704 <        doJoin();
704 >        Thread t;
705 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
706 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
707 >            if (status >= 0) {
708 >                if (w.unpushTask(this)) {
709 >                    boolean completed;
710 >                    try {
711 >                        completed = exec();
712 >                    } catch (Throwable rex) {
713 >                        setExceptionalCompletion(rex);
714 >                        return;
715 >                    }
716 >                    if (completed) {
717 >                        setCompletion(NORMAL);
718 >                        return;
719 >                    }
720 >                }
721 >                w.joinTask(this);
722 >            }
723 >        }
724 >        else
725 >            externalAwaitDone();
726      }
727  
728      /**
729       * Commences performing this task and awaits its completion if
730 <     * necessary, without returning its result or throwing an
730 >     * necessary, without returning its result or throwing its
731       * exception. This method may be useful when processing
732       * collections of tasks when some have been cancelled or otherwise
733       * known to have aborted.
734       */
735      public final void quietlyInvoke() {
736 <        doInvoke();
736 >        if (status >= 0) {
737 >            boolean completed;
738 >            try {
739 >                completed = exec();
740 >            } catch (Throwable rex) {
741 >                setExceptionalCompletion(rex);
742 >                return;
743 >            }
744 >            if (completed)
745 >                setCompletion(NORMAL);
746 >            else
747 >                quietlyJoin();
748 >        }
749      }
750  
751      /**
# Line 1101 | Line 1090 | public abstract class ForkJoinTask<V> im
1090          Object ex = s.readObject();
1091          if (ex != null)
1092              setExceptionalCompletion((Throwable) ex);
1104        if (status < 0)
1105            synchronized (this) { notifyAll(); }
1093      }
1094  
1095      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines