ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.58 by jsr166, Tue Jan 31 01:51:13 2012 UTC vs.
Revision 1.59 by dl, Mon Feb 20 18:20:02 2012 UTC

# Line 198 | Line 198 | public abstract class ForkJoinTask<V> im
198       * methods in a way that flows well in javadocs.
199       */
200  
201    /**
202     * The number of times to try to help join a task without any
203     * apparent progress before giving up and blocking. The value is
204     * arbitrary but should be large enough to cope with transient
205     * stalls (due to GC etc) that can cause helping methods not to be
206     * able to proceed because other workers have not progressed to
207     * the point where subtasks can be found or taken.
208     */
209    private static final int HELP_RETRIES = 32;
210
201      /*
202       * The status field holds run control status bits packed into a
203       * single int to minimize footprint and to ensure atomicity (via
204       * CAS).  Status is initially zero, and takes on nonnegative
205 <     * values until completed, upon which status holds value
206 <     * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
207 <     * waits by other threads have the SIGNAL bit set.  Completion of
208 <     * a stolen task with SIGNAL set awakens any waiters via
209 <     * notifyAll. Even though suboptimal for some purposes, we use
210 <     * basic builtin wait/notify to take advantage of "monitor
211 <     * inflation" in JVMs that we would otherwise need to emulate to
212 <     * avoid adding further per-task bookkeeping overhead.  We want
213 <     * these monitors to be "fat", i.e., not use biasing or thin-lock
214 <     * techniques, so use some odd coding idioms that tend to avoid
215 <     * them.
205 >     * values until completed, upon which status (anded with
206 >     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
207 >     * undergoing blocking waits by other threads have the SIGNAL bit
208 >     * set.  Completion of a stolen task with SIGNAL set awakens any
209 >     * waiters via notifyAll. Even though suboptimal for some
210 >     * purposes, we use basic builtin wait/notify to take advantage of
211 >     * "monitor inflation" in JVMs that we would otherwise need to
212 >     * emulate to avoid adding further per-task bookkeeping overhead.
213 >     * We want these monitors to be "fat", i.e., not use biasing or
214 >     * thin-lock techniques, so use some odd coding idioms that tend
215 >     * to avoid them, mainly by arranging that every synchronized
216 >     * block performs a wait, notifyAll or both.
217       */
218  
219      /** The run status of this task */
220      volatile int status; // accessed directly by pool and workers
221 <    static final int NORMAL      = 0xfffffffc;  // negative with low 2 bits 0
222 <    static final int CANCELLED   = 0xfffffff8;  // must be < NORMAL
223 <    static final int EXCEPTIONAL = 0xfffffff4;  // must be < CANCELLED
221 >    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
222 >    static final int NORMAL      = 0xf0000000;  // must be negative
223 >    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
224 >    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
225      static final int SIGNAL      = 0x00000001;
226      static final int MARKED      = 0x00000002;
227  
228      /**
229       * Marks completion and wakes up threads waiting to join this
230 <     * task, also clearing signal request bits. A specialization for
231 <     * NORMAL completion is in method doExec.
230 >     * task. A specialization for NORMAL completion is in method
231 >     * doExec.
232       *
233       * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
234       * @return completion status on exit
# Line 245 | Line 237 | public abstract class ForkJoinTask<V> im
237          for (int s;;) {
238              if ((s = status) < 0)
239                  return s;
240 <            if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|completion)) {
240 >            if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
241                  if ((s & SIGNAL) != 0)
242                      synchronized (this) { notifyAll(); }
243                  return completion;
# Line 269 | Line 261 | public abstract class ForkJoinTask<V> im
261                  return setExceptionalCompletion(rex);
262              }
263              while ((s = status) >= 0 && completed) {
264 <                if (U.compareAndSwapInt(this, STATUS, s, (s & ~SIGNAL)|NORMAL)) {
264 >                if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) {
265                      if ((s & SIGNAL) != 0)
266                          synchronized (this) { notifyAll(); }
267                      return NORMAL;
# Line 280 | Line 272 | public abstract class ForkJoinTask<V> im
272      }
273  
274      /**
275 +     * Tries to set SIGNAL status. Used by ForkJoinPool. Other
276 +     * variants are directly incorporated into externalAwaitDone etc.
277 +     *
278 +     * @return true if successful
279 +     */
280 +    final boolean trySetSignal() {
281 +        int s;
282 +        return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL);
283 +    }
284 +
285 +    /**
286       * Blocks a non-worker-thread until completion.
287       * @return status upon completion
288       */
289      private int externalAwaitDone() {
290 +        boolean interrupted = false;
291          int s;
292 <        if ((s = status) >= 0) {
293 <            boolean interrupted = false;
294 <            synchronized (this) {
295 <                while ((s = status) >= 0) {
292 <                    if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
292 >        while ((s = status) >= 0) {
293 >            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
294 >                synchronized (this) {
295 >                    if (status >= 0) {
296                          try {
297                              wait();
298                          } catch (InterruptedException ie) {
299                              interrupted = true;
300                          }
301                      }
302 +                    else
303 +                        notifyAll();
304                  }
305              }
301            if (interrupted)
302                Thread.currentThread().interrupt();
306          }
307 +        if (interrupted)
308 +            Thread.currentThread().interrupt();
309          return s;
310      }
311  
312      /**
313 <     * Blocks a non-worker-thread until completion or interruption or timeout.
313 >     * Blocks a non-worker-thread until completion or interruption.
314       */
315 <    private int externalInterruptibleAwaitDone(long millis)
311 <        throws InterruptedException {
315 >    private int externalInterruptibleAwaitDone() throws InterruptedException {
316          int s;
317          if (Thread.interrupted())
318              throw new InterruptedException();
319 <        if ((s = status) >= 0) {
320 <            synchronized (this) {
321 <                while ((s = status) >= 0) {
322 <                    if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
323 <                        wait(millis);
324 <                        if (millis > 0L)
325 <                            break;
322 <                    }
319 >        while ((s = status) >= 0) {
320 >            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
321 >                synchronized (this) {
322 >                    if (status >= 0)
323 >                        wait();
324 >                    else
325 >                        notifyAll();
326                  }
327              }
328          }
# Line 330 | Line 333 | public abstract class ForkJoinTask<V> im
333      /**
334       * Implementation for join, get, quietlyJoin. Directly handles
335       * only cases of already-completed, external wait, and
336 <     * unfork+exec.  Others are relayed to awaitJoin.
336 >     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
337       *
338       * @return status upon completion
339       */
340      private int doJoin() {
341          int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
342          if ((s = status) >= 0) {
343 <            if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
344 <                s = externalAwaitDone();
345 <            else if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
346 <                     tryUnpush(this) || (s = doExec()) >= 0)
344 <                s = awaitJoin(w, wt.pool);
345 <        }
346 <        return s;
347 <    }
348 <
349 <    /**
350 <     * Helps and/or blocks until joined.
351 <     *
352 <     * @param w the joiner
353 <     * @param p the pool
354 <     * @return status upon completion
355 <     */
356 <    private int awaitJoin(ForkJoinPool.WorkQueue w, ForkJoinPool p) {
357 <        int s;
358 <        ForkJoinTask<?> prevJoin = w.currentJoin;
359 <        w.currentJoin = this;
360 <        for (int k = HELP_RETRIES; (s = status) >= 0;) {
361 <            if ((w.queueSize() > 0) ?
362 <                w.tryRemoveAndExec(this) :        // self-help
363 <                p.tryHelpStealer(w, this))        // help process tasks
364 <                k = HELP_RETRIES;                 // reset if made progress
365 <            else if ((s = status) < 0)            // recheck
366 <                break;
367 <            else if (--k > 0) {
368 <                if ((k & 3) == 1)
369 <                    Thread.yield();               // occasionally yield
370 <            }
371 <            else if (k == 0)
372 <                p.tryPollForAndExec(w, this);     // uncommon self-help case
373 <            else if (p.tryCompensate()) {         // true if can block
374 <                try {
375 <                    int ss = status;
376 <                    if (ss >= 0 &&                // assert need signal
377 <                        U.compareAndSwapInt(this, STATUS, ss, ss | SIGNAL)) {
378 <                        synchronized (this) {
379 <                            if (status >= 0)      // block
380 <                                wait();
381 <                        }
382 <                    }
383 <                } catch (InterruptedException ignore) {
384 <                } finally {
385 <                    p.incrementActiveCount();     // re-activate
386 <                }
343 >            if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
344 >                if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
345 >                    tryUnpush(this) || (s = doExec()) >= 0)
346 >                    s = wt.pool.awaitJoin(w, this);
347              }
348 +            else
349 +                s = externalAwaitDone();
350          }
389        w.currentJoin = prevJoin;
351          return s;
352      }
353  
# Line 396 | Line 357 | public abstract class ForkJoinTask<V> im
357       * @return status upon completion
358       */
359      private int doInvoke() {
360 <        int s; Thread t;
360 >        int s; Thread t; ForkJoinWorkerThread wt;
361          if ((s = doExec()) >= 0) {
362 <            if (!((t = Thread.currentThread()) instanceof ForkJoinWorkerThread))
362 >            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
363 >                s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,
364 >                                                                  this);
365 >            else
366                  s = externalAwaitDone();
403            else {
404                ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
405                s = awaitJoin(wt.workQueue, wt.pool);
406            }
367          }
368          return s;
369      }
# Line 540 | Line 500 | public abstract class ForkJoinTask<V> im
500       * @return the exception, or null if none
501       */
502      private Throwable getThrowableException() {
503 <        if (status != EXCEPTIONAL)
503 >        if ((status & DONE_MASK) != EXCEPTIONAL)
504              return null;
505          int h = System.identityHashCode(this);
506          ExceptionNode e;
# Line 625 | Line 585 | public abstract class ForkJoinTask<V> im
585      }
586  
587      /**
588 <     * Report the result of invoke or join; called only upon
629 <     * non-normal return of internal versions.
588 >     * Throws exception, if any, associated with the given status.
589       */
590 <    private V reportResult() {
591 <        int s; Throwable ex;
592 <        if ((s = status) == CANCELLED)
593 <            throw new CancellationException();
594 <        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
590 >    private void reportException(int s) {
591 >        Throwable ex = ((s == CANCELLED) ?  new CancellationException() :
592 >                        (s == EXCEPTIONAL) ? getThrowableException() :
593 >                        null);
594 >        if (ex != null)
595              U.throwException(ex);
637        return getRawResult();
596      }
597  
598      // public methods
# Line 658 | Line 616 | public abstract class ForkJoinTask<V> im
616       * @return {@code this}, to simplify usage
617       */
618      public final ForkJoinTask<V> fork() {
619 <        ForkJoinWorkerThread wt;
662 <        (wt = (ForkJoinWorkerThread)Thread.currentThread()).
663 <            workQueue.push(this, wt.pool);
619 >        ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this);
620          return this;
621      }
622  
# Line 676 | Line 632 | public abstract class ForkJoinTask<V> im
632       * @return the computed result
633       */
634      public final V join() {
635 <        if (doJoin() != NORMAL)
636 <            return reportResult();
637 <        else
638 <            return getRawResult();
635 >        int s;
636 >        if ((s = doJoin() & DONE_MASK) != NORMAL)
637 >            reportException(s);
638 >        return getRawResult();
639      }
640  
641      /**
# Line 691 | Line 647 | public abstract class ForkJoinTask<V> im
647       * @return the computed result
648       */
649      public final V invoke() {
650 <        if (doInvoke() != NORMAL)
651 <            return reportResult();
652 <        else
653 <            return getRawResult();
650 >        int s;
651 >        if ((s = doInvoke() & DONE_MASK) != NORMAL)
652 >            reportException(s);
653 >        return getRawResult();
654      }
655  
656      /**
# Line 721 | Line 677 | public abstract class ForkJoinTask<V> im
677       * @throws NullPointerException if any task is null
678       */
679      public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
680 +        int s1, s2;
681          t2.fork();
682 <        t1.invoke();
683 <        t2.join();
682 >        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
683 >            t1.reportException(s1);
684 >        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
685 >            t2.reportException(s2);
686      }
687  
688      /**
# Line 860 | Line 819 | public abstract class ForkJoinTask<V> im
819       * @return {@code true} if this task is now cancelled
820       */
821      public boolean cancel(boolean mayInterruptIfRunning) {
822 <        return setCompletion(CANCELLED) == CANCELLED;
822 >        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
823      }
824  
825      public final boolean isDone() {
# Line 868 | Line 827 | public abstract class ForkJoinTask<V> im
827      }
828  
829      public final boolean isCancelled() {
830 <        return status == CANCELLED;
830 >        return (status & DONE_MASK) == CANCELLED;
831      }
832  
833      /**
# Line 888 | Line 847 | public abstract class ForkJoinTask<V> im
847       * exception and was not cancelled
848       */
849      public final boolean isCompletedNormally() {
850 <        return status == NORMAL;
850 >        return (status & DONE_MASK) == NORMAL;
851      }
852  
853      /**
# Line 899 | Line 858 | public abstract class ForkJoinTask<V> im
858       * @return the exception, or {@code null} if none
859       */
860      public final Throwable getException() {
861 <        int s = status;
861 >        int s = status & DONE_MASK;
862          return ((s >= NORMAL)    ? null :
863                  (s == CANCELLED) ? new CancellationException() :
864                  getThrowableException());
# Line 961 | Line 920 | public abstract class ForkJoinTask<V> im
920       */
921      public final V get() throws InterruptedException, ExecutionException {
922          int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
923 <            doJoin() : externalInterruptibleAwaitDone(0L);
923 >            doJoin() : externalInterruptibleAwaitDone();
924          Throwable ex;
925 <        if (s == CANCELLED)
925 >        if ((s &= DONE_MASK) == CANCELLED)
926              throw new CancellationException();
927          if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
928              throw new ExecutionException(ex);
# Line 986 | Line 945 | public abstract class ForkJoinTask<V> im
945       */
946      public final V get(long timeout, TimeUnit unit)
947          throws InterruptedException, ExecutionException, TimeoutException {
948 <        // Messy in part because we measure in nanos, but wait in millis
949 <        int s; long millis, nanos;
950 <        Thread t = Thread.currentThread();
951 <        if (!(t instanceof ForkJoinWorkerThread)) {
952 <            if ((millis = unit.toMillis(timeout)) > 0L)
953 <                s = externalInterruptibleAwaitDone(millis);
954 <            else
955 <                s = status;
956 <        }
957 <        else if ((s = status) >= 0 && (nanos = unit.toNanos(timeout)) > 0L) {
958 <            long deadline = System.nanoTime() + nanos;
959 <            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
960 <            ForkJoinPool.WorkQueue w = wt.workQueue;
961 <            ForkJoinPool p = wt.pool;
962 <            if (w.tryUnpush(this))
963 <                doExec();
964 <            boolean blocking = false;
948 >        if (Thread.interrupted())
949 >            throw new InterruptedException();
950 >        // Messy in part because we measure in nanosecs, but wait in millisecs
951 >        int s; long ns, ms;
952 >        if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
953 >            long deadline = System.nanoTime() + ns;
954 >            ForkJoinPool p = null;
955 >            ForkJoinPool.WorkQueue w = null;
956 >            Thread t = Thread.currentThread();
957 >            if (t instanceof ForkJoinWorkerThread) {
958 >                ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
959 >                p = wt.pool;
960 >                w = wt.workQueue;
961 >                s = p.helpJoinOnce(w, this); // no retries on failure
962 >            }
963 >            boolean canBlock = false;
964 >            boolean interrupted = false;
965              try {
966                  while ((s = status) >= 0) {
967 <                    if (w.runState < 0)
967 >                    if (w != null && w.runState < 0)
968                          cancelIgnoringExceptions(this);
969 <                    else if (!blocking)
970 <                        blocking = p.tryCompensate();
969 >                    else if (!canBlock) {
970 >                        if (p == null || p.tryCompensate(this, null))
971 >                            canBlock = true;
972 >                    }
973                      else {
974 <                        millis = TimeUnit.NANOSECONDS.toMillis(nanos);
1014 <                        if (millis > 0L &&
974 >                        if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
975                              U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
976 <                            try {
977 <                                synchronized (this) {
978 <                                    if (status >= 0)
979 <                                        wait(millis);
976 >                            synchronized (this) {
977 >                                if (status >= 0) {
978 >                                    try {
979 >                                        wait(ms);
980 >                                    } catch (InterruptedException ie) {
981 >                                        if (p == null)
982 >                                            interrupted = true;
983 >                                    }
984                                  }
985 <                            } catch (InterruptedException ie) {
985 >                                else
986 >                                    notifyAll();
987                              }
988                          }
989 <                        if ((s = status) < 0 ||
990 <                            (nanos = deadline - System.nanoTime()) <= 0L)
989 >                        if ((s = status) < 0 || interrupted ||
990 >                            (ns = deadline - System.nanoTime()) <= 0L)
991                              break;
992                      }
993                  }
994              } finally {
995 <                if (blocking)
995 >                if (p != null && canBlock)
996                      p.incrementActiveCount();
997              }
998 +            if (interrupted)
999 +                throw new InterruptedException();
1000          }
1001 <        if (s != NORMAL) {
1001 >        if ((s &= DONE_MASK) != NORMAL) {
1002              Throwable ex;
1003              if (s == CANCELLED)
1004                  throw new CancellationException();
# Line 1098 | Line 1065 | public abstract class ForkJoinTask<V> im
1065       * setRawResult(null)}.
1066       */
1067      public void reinitialize() {
1068 <        if (status == EXCEPTIONAL)
1068 >        if ((status & DONE_MASK) == EXCEPTIONAL)
1069              clearExceptionalCompletion();
1070          else
1071              status = 0;
# Line 1386 | Line 1353 | public abstract class ForkJoinTask<V> im
1353      static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1354          implements RunnableFuture<T> {
1355          final Runnable runnable;
1389        final T resultOnCompletion;
1356          T result;
1357          AdaptedRunnable(Runnable runnable, T result) {
1358              if (runnable == null) throw new NullPointerException();
1359              this.runnable = runnable;
1360 <            this.resultOnCompletion = result;
1360 >            this.result = result; // OK to set this even before completion
1361          }
1362 <        public T getRawResult() { return result; }
1363 <        public void setRawResult(T v) { result = v; }
1364 <        public boolean exec() {
1365 <            runnable.run();
1366 <            result = resultOnCompletion;
1367 <            return true;
1362 >        public final T getRawResult() { return result; }
1363 >        public final void setRawResult(T v) { result = v; }
1364 >        public final boolean exec() { runnable.run(); return true; }
1365 >        public final void run() { invoke(); }
1366 >        private static final long serialVersionUID = 5232453952276885070L;
1367 >    }
1368 >
1369 >    /**
1370 >     * Adaptor for Runnables without results
1371 >     */
1372 >    static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1373 >        implements RunnableFuture<Void> {
1374 >        final Runnable runnable;
1375 >        AdaptedRunnableAction(Runnable runnable) {
1376 >            if (runnable == null) throw new NullPointerException();
1377 >            this.runnable = runnable;
1378          }
1379 <        public void run() { invoke(); }
1379 >        public final Void getRawResult() { return null; }
1380 >        public final void setRawResult(Void v) { }
1381 >        public final boolean exec() { runnable.run(); return true; }
1382 >        public final void run() { invoke(); }
1383          private static final long serialVersionUID = 5232453952276885070L;
1384      }
1385  
# Line 1415 | Line 1394 | public abstract class ForkJoinTask<V> im
1394              if (callable == null) throw new NullPointerException();
1395              this.callable = callable;
1396          }
1397 <        public T getRawResult() { return result; }
1398 <        public void setRawResult(T v) { result = v; }
1399 <        public boolean exec() {
1397 >        public final T getRawResult() { return result; }
1398 >        public final void setRawResult(T v) { result = v; }
1399 >        public final boolean exec() {
1400              try {
1401                  result = callable.call();
1402                  return true;
# Line 1429 | Line 1408 | public abstract class ForkJoinTask<V> im
1408                  throw new RuntimeException(ex);
1409              }
1410          }
1411 <        public void run() { invoke(); }
1411 >        public final void run() { invoke(); }
1412          private static final long serialVersionUID = 2838392045355241008L;
1413      }
1414  
# Line 1442 | Line 1421 | public abstract class ForkJoinTask<V> im
1421       * @return the task
1422       */
1423      public static ForkJoinTask<?> adapt(Runnable runnable) {
1424 <        return new AdaptedRunnable<Void>(runnable, null);
1424 >        return new AdaptedRunnableAction(runnable);
1425      }
1426  
1427      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines