ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinTask.java (file contents):
Revision 1.52 by dl, Sat Jul 24 20:28:18 2010 UTC vs.
Revision 1.53 by dl, Wed Aug 11 18:45:12 2010 UTC

# Line 144 | Line 144 | public abstract class ForkJoinTask<V> im
144       * status maintenance (2) execution and awaiting completion (3)
145       * user-level methods that additionally report results. This is
146       * sometimes hard to see because this file orders exported methods
147 <     * in a way that flows well in javadocs.
147 >     * in a way that flows well in javadocs. In particular, most
148 >     * join mechanics are in method quietlyJoin, below.
149       */
150  
151      /*
# Line 164 | Line 165 | public abstract class ForkJoinTask<V> im
165       * them.
166       */
167  
168 <    /** Run status of this task */
168 >    /** The run status of this task */
169      volatile int status; // accessed directly by pool and workers
170  
171      private static final int NORMAL      = -1;
# Line 191 | Line 192 | public abstract class ForkJoinTask<V> im
192       * also clearing signal request bits.
193       *
194       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
194     * @return status on exit
195       */
196 <    private int setCompletion(int completion) {
196 >    private void setCompletion(int completion) {
197          int s;
198          while ((s = status) >= 0) {
199              if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
200                  if (s != 0)
201                      synchronized (this) { notifyAll(); }
202 <                return completion;
202 >                break;
203              }
204          }
205        return s;
205      }
206  
207      /**
208       * Record exception and set exceptional completion
209       * @return status on exit
210       */
211 <    private int setExceptionalCompletion(Throwable rex) {
211 >    private void setExceptionalCompletion(Throwable rex) {
212          exceptionMap.put(this, rex);
213 <        return setCompletion(EXCEPTIONAL);
213 >        setCompletion(EXCEPTIONAL);
214      }
215  
216      /**
# Line 233 | Line 232 | public abstract class ForkJoinTask<V> im
232  
233      /**
234       * Blocks a non-worker-thread until completion.
236     * @return status on exit
235       */
236 <    private int externalAwaitDone() {
236 >    private void externalAwaitDone() {
237          int s;
238          while ((s = status) >= 0) {
239              synchronized(this) {
240                  if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
241                      boolean interrupted = false;
242 <                    while ((s = status) >= 0) {
242 >                    while (status >= 0) {
243                          try {
244                              wait();
245                          } catch (InterruptedException ie) {
# Line 254 | Line 252 | public abstract class ForkJoinTask<V> im
252                  }
253              }
254          }
257        return s;
255      }
256  
257      /**
# Line 262 | Line 259 | public abstract class ForkJoinTask<V> im
259       * doesn't wait for completion otherwise. Primary execution method
260       * for ForkJoinWorkerThread.
261       */
262 <    final void tryExec() {
262 >    final void quietlyExec() {
263          try {
264              if (status < 0 || !exec())
265                  return;
# Line 273 | Line 270 | public abstract class ForkJoinTask<V> im
270          setCompletion(NORMAL); // must be outside try block
271      }
272  
276    /**
277     * If not done and this task is next in worker queue, runs it,
278     * else waits for it.
279     * @return status on exit
280     */
281    private int doJoin() {
282        int stat;
283        Thread t;
284        ForkJoinWorkerThread w;
285        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
286            if ((stat = status) < 0)
287                return stat;
288            if ((w = (ForkJoinWorkerThread) t).unpushTask(this)) {
289                boolean completed;
290                try {
291                    completed = exec();
292                } catch (Throwable rex) {
293                    return setExceptionalCompletion(rex);
294                }
295                if (completed)
296                    return setCompletion(NORMAL);
297            }
298            return w.joinTask(this);
299        }
300        return externalAwaitDone();
301    }
302
303    /**
304     * Unless done, calls exec and records status if completed, or
305     * waits for completion otherwise.
306     * @return status on exit
307     */
308    private int doInvoke() {
309        int stat;
310        if ((stat = status) >= 0) {
311            boolean completed;
312            try {
313                completed = exec();
314            } catch (Throwable rex) {
315                return setExceptionalCompletion(rex);
316            }
317            stat = completed ? setCompletion(NORMAL) : doJoin();
318        }
319        return stat;
320    }
321
322    /**
323     * Returns result or throws exception associated with given status.
324     * @param s the status
325     */
326    private V reportResult(int s) {
327        Throwable ex;
328        if (s < NORMAL && (ex = getException()) != null)
329            UNSAFE.throwException(ex);
330        return getRawResult();
331    }
332
273      // public methods
274  
275      /**
# Line 365 | Line 305 | public abstract class ForkJoinTask<V> im
305       * @return the computed result
306       */
307      public final V join() {
308 <        return reportResult(doJoin());
308 >        quietlyJoin();
309 >        Throwable ex;
310 >        if (status < NORMAL && (ex = getException()) != null)
311 >            UNSAFE.throwException(ex);
312 >        return getRawResult();
313      }
314  
315      /**
# Line 376 | Line 320 | public abstract class ForkJoinTask<V> im
320       * @return the computed result
321       */
322      public final V invoke() {
323 <        return reportResult(doInvoke());
323 >        quietlyInvoke();
324 >        Throwable ex;
325 >        if (status < NORMAL && (ex = getException()) != null)
326 >            UNSAFE.throwException(ex);
327 >        return getRawResult();
328      }
329  
330      /**
# Line 434 | Line 382 | public abstract class ForkJoinTask<V> im
382              }
383              else if (i != 0)
384                  t.fork();
385 <            else if (t.doInvoke() < NORMAL && ex == null)
386 <                ex = t.getException();
385 >            else {
386 >                t.quietlyInvoke();
387 >                if (ex == null && t.status < NORMAL)
388 >                    ex = t.getException();
389 >            }
390          }
391          for (int i = 1; i <= last; ++i) {
392              ForkJoinTask<?> t = tasks[i];
393              if (t != null) {
394                  if (ex != null)
395                      t.cancel(false);
396 <                else if (t.doJoin() < NORMAL && ex == null)
397 <                    ex = t.getException();
396 >                else {
397 >                    t.quietlyJoin();
398 >                    if (ex == null && t.status < NORMAL)
399 >                        ex = t.getException();
400 >                }
401              }
402          }
403          if (ex != null)
# Line 490 | Line 444 | public abstract class ForkJoinTask<V> im
444              }
445              else if (i != 0)
446                  t.fork();
447 <            else if (t.doInvoke() < NORMAL && ex == null)
448 <                ex = t.getException();
447 >            else {
448 >                t.quietlyInvoke();
449 >                if (ex == null && t.status < NORMAL)
450 >                    ex = t.getException();
451 >            }
452          }
453          for (int i = 1; i <= last; ++i) {
454              ForkJoinTask<?> t = ts.get(i);
455              if (t != null) {
456                  if (ex != null)
457                      t.cancel(false);
458 <                else if (t.doJoin() < NORMAL && ex == null)
459 <                    ex = t.getException();
458 >                else {
459 >                    t.quietlyJoin();
460 >                    if (ex == null && t.status < NORMAL)
461 >                        ex = t.getException();
462 >                }
463              }
464          }
465          if (ex != null)
# Line 532 | Line 492 | public abstract class ForkJoinTask<V> im
492       * @return {@code true} if this task is now cancelled
493       */
494      public boolean cancel(boolean mayInterruptIfRunning) {
495 <        return setCompletion(CANCELLED) == CANCELLED;
495 >        setCompletion(CANCELLED);
496 >        return status == CANCELLED;
497      }
498  
499      /**
# Line 647 | Line 608 | public abstract class ForkJoinTask<V> im
608      }
609  
610      public final V get() throws InterruptedException, ExecutionException {
611 <        int s = doJoin();
611 >        quietlyJoin();
612          if (Thread.interrupted())
613              throw new InterruptedException();
614 +        int s = status;
615          if (s < NORMAL) {
616              Throwable ex;
617              if (s == CANCELLED)
# Line 667 | Line 629 | public abstract class ForkJoinTask<V> im
629          if (t instanceof ForkJoinWorkerThread) {
630              ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
631              if (status >= 0 && w.unpushTask(this))
632 <                tryExec();
632 >                quietlyExec();
633              pool = w.pool;
634          }
635          else
# Line 735 | Line 697 | public abstract class ForkJoinTask<V> im
697      }
698  
699      /**
700 <     * Joins this task, without returning its result or throwing an
700 >     * Joins this task, without returning its result or throwing its
701       * exception. This method may be useful when processing
702       * collections of tasks when some have been cancelled or otherwise
703       * known to have aborted.
704       */
705      public final void quietlyJoin() {
706 <        doJoin();
706 >        Thread t;
707 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
708 >            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
709 >            if (status >= 0) {
710 >                if (w.unpushTask(this)) {
711 >                    boolean completed;
712 >                    try {
713 >                        completed = exec();
714 >                    } catch (Throwable rex) {
715 >                        setExceptionalCompletion(rex);
716 >                        return;
717 >                    }
718 >                    if (completed) {
719 >                        setCompletion(NORMAL);
720 >                        return;
721 >                    }
722 >                }
723 >                w.joinTask(this);
724 >            }
725 >        }
726 >        else
727 >            externalAwaitDone();
728      }
729  
730      /**
731       * Commences performing this task and awaits its completion if
732 <     * necessary, without returning its result or throwing an
732 >     * necessary, without returning its result or throwing its
733       * exception. This method may be useful when processing
734       * collections of tasks when some have been cancelled or otherwise
735       * known to have aborted.
736       */
737      public final void quietlyInvoke() {
738 <        doInvoke();
738 >        if (status >= 0) {
739 >            boolean completed;
740 >            try {
741 >                completed = exec();
742 >            } catch (Throwable rex) {
743 >                setExceptionalCompletion(rex);
744 >                return;
745 >            }
746 >            if (completed)
747 >                setCompletion(NORMAL);
748 >            else
749 >                quietlyJoin();
750 >        }
751      }
752  
753      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines