ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.127 by jsr166, Thu Jan 23 19:36:34 2020 UTC vs.
Revision 1.128 by dl, Sat Feb 1 20:20:17 2020 UTC

# Line 227 | Line 227 | public abstract class ForkJoinTask<V> im
227       * atomicity.  Status is initially zero, and takes on nonnegative
228       * values until completed, upon which it holds (sign bit) DONE,
229       * possibly with ABNORMAL (cancelled or exceptional) and THROWN
230 <     * (in which case an exception has been stored).  These control
231 <     * bits occupy only (some of) the upper half (16 bits) of status
232 <     * field. The lower bits are used for user-defined tags.
230 >     * (in which case an exception has been stored). A value of
231 >     * ABNORMAL without DONE signifies an interrupted wait.  These
232 >     * control bits occupy only (some of) the upper half (16 bits) of
233 >     * status field. The lower bits are used for user-defined tags.
234       */
235      private static final int DONE     = 1 << 31; // must be negative
236 <    private static final int ABNORMAL = 1 << 16; // set atomically with DONE
237 <    private static final int THROWN   = 1 << 17; // set atomically with ABNORMAL
236 >    private static final int ABNORMAL = 1 << 16;
237 >    private static final int THROWN   = 1 << 17;
238      private static final int SMASK    = 0xffff;  // short bits for tags
239      // sentinels can be any positive upper half value:
239    private static final int INTRPT   = 1 << 16; // awaitDone interrupt return
240      static final         int ADJUST   = 1 << 16; // uncompensate after block
241  
242      // Fields
243      volatile int status;                // accessed directly by pool and workers
244      private transient volatile Aux aux; // either waiters or thrown Exception
245 +
246      // Support for atomic operations
247      private static final VarHandle STATUS;
248      private static final VarHandle AUX;
# Line 273 | Line 274 | public abstract class ForkJoinTask<V> im
274       *
275       * @param interruptible true if wait can be cancelled by interrupt
276       * @param deadline if non-zero use timed waits and possibly timeout
277 <     * @param pool if nonull, pool to uncompensate when unblocking
278 <     * @return status on exit, or INTRPT if interrupted while waiting
277 >     * @param adjust if true, uncompensate pool after unblocking
278 >     * @param pool if nonull, current pool (possibly comonPool if unknown)
279 >     * @return status on exit, or ABNORMAL if interrupted while waiting
280       */
281 <    private int awaitDone(boolean interruptible, long deadline,
281 >    private int awaitDone(boolean interruptible, long deadline, boolean adjust,
282                            ForkJoinPool pool) {
283 <        int s;
283 >        int s; Aux node = null; boolean interrupted = false, queued = false;
284 >        long nanos = 0L;
285          try {
286 <            Aux node = null; boolean interrupted = false, queued = false;
284 <            for (;;) {
285 <                Aux a; long nanos;
286 >            for (Aux a;;) {
287                  if ((s = status) < 0)
288                      break;
289                  else if (node == null)
# Line 293 | Line 294 | public abstract class ForkJoinTask<V> im
294                      else if (queued = casAux(node.next = a, node))
295                          LockSupport.setCurrentBlocker(this);
296                  }
297 <                else {
298 <                    if (deadline == 0L)
299 <                        LockSupport.park();
299 <                    else if ((nanos = deadline - System.nanoTime()) > 0L)
300 <                        LockSupport.parkNanos(nanos);
301 <                    else {
302 <                        s = 0;               // timeout
303 <                        break;
304 <                    }
305 <                    if ((interrupted |= Thread.interrupted()) && interruptible) {
306 <                        s = INTRPT;
297 >                else if (Thread.interrupted()) {
298 >                    if (interruptible) {
299 >                        s = ABNORMAL;
300                          break;
301                      }
302 +                    interrupted = true;
303                  }
304 +                else if (pool != null && pool.isStopping())
305 +                    casStatus(s, s | (DONE | ABNORMAL)); // help cancel
306 +                else if (deadline != 0L &&
307 +                         (nanos = deadline - System.nanoTime()) <= 0L)
308 +                    break;               // timeout
309 +                else if ((s = status) < 0)
310 +                    break;               // recheck
311 +                else if (nanos > 0L)
312 +                    LockSupport.parkNanos(nanos);
313 +                else
314 +                    LockSupport.park();
315              }
316 <            if (queued) {
317 <                LockSupport.setCurrentBlocker(null);
318 <                if (s >= 0) {            // try to unsplice after cancellation
319 <                    outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
320 <                        for (Aux trail = null;;) {
321 <                            Aux next = a.next;
322 <                            if (a == node) {
323 <                                if (trail != null)
324 <                                    trail.casNext(trail, next);
325 <                                else if (casAux(a, next))
326 <                                    break outer; // cannot be re-encountered
327 <                                break;           // restart
328 <                            } else {
329 <                                trail = a;
330 <                                if ((a = next) == null)
331 <                                    break outer;
332 <                            }
316 >        } finally {
317 >            if (adjust && pool != null)
318 >                pool.uncompensate();
319 >        }
320 >        if (queued) {
321 >            LockSupport.setCurrentBlocker(null);
322 >            if (s >= 0) { // try to unsplice after cancellation
323 >                outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
324 >                    for (Aux trail = null;;) {
325 >                        Aux next = a.next;
326 >                        if (a == node) {
327 >                            if (trail != null)
328 >                                trail.casNext(trail, next);
329 >                            else if (casAux(a, next))
330 >                                break outer; // cannot be re-encountered
331 >                            break;           // restart
332 >                        } else {
333 >                            trail = a;
334 >                            if ((a = next) == null)
335 >                                break outer;
336                          }
337                      }
338                  }
331                else {
332                    signalWaiters();             // help clean or signal
333                    if (interrupted)
334                        Thread.currentThread().interrupt();
335                }
339              }
340 <        } finally { // for sake of OOME on node construction
341 <            if (pool != null)
342 <                pool.uncompensate();
340 >            else {
341 >                signalWaiters();             // help clean or signal
342 >                if (interrupted)
343 >                    Thread.currentThread().interrupt();
344 >            }
345          }
346          return s;
347      }
# Line 423 | Line 428 | public abstract class ForkJoinTask<V> im
428      }
429  
430      /**
431 <     * Helps and/or waits for completion.
431 >     * Helps and/or waits for completion from join, or async invoke if ran true.
432       *
433 <     * @param ran true if task known to be invoked
434 <     * @param interruptible true if wait can be cancelled by interrupt
430 <     * @param deadline if non-zero use timed waits and possibly timeout
431 <     * @return status on exit, or INTRPT if interruptible and interrupted
433 >     * @param ran true if task known to have been exec'd
434 >     * @return status on exit
435       */
436 <    private int helpOrWait(boolean ran, boolean interruptible, long deadline) {
437 <        boolean cc = (this instanceof CountedCompleter);
436 >    private int awaitJoin(boolean ran) {
437 >        boolean adjust = false, owned;
438          Thread t; ForkJoinWorkerThread wt;
439 <        ForkJoinPool.WorkQueue q; ForkJoinPool p; boolean owned; int s;
439 >        ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
440          if (owned = ((t = Thread.currentThread())
441                       instanceof ForkJoinWorkerThread)) {
442              p = (wt = (ForkJoinWorkerThread)t).pool;
# Line 443 | Line 446 | public abstract class ForkJoinTask<V> im
446              p = ForkJoinPool.common;
447              q = ForkJoinPool.commonQueue();
448          }
449 <        if (p == null || q == null)
450 <            s = 0;
451 <        else if (cc)
452 <            s = p.helpComplete(this, q, owned);
453 <        else if (ran || !q.tryRemove(this, owned) || (s = doExec()) >= 0)
454 <            s = owned ? p.helpJoin(this, q) : 0;
455 <        return (s < 0) ? s : awaitDone(interruptible, deadline,
456 <                                       (s == ADJUST) ? p : null);
449 >        if (q != null && p != null) {
450 >            if ((this instanceof CountedCompleter) ?
451 >                (s = p.helpComplete(this, q, owned)) < 0 :
452 >                (!ran && q.tryRemove(this, owned) && (s = doExec()) < 0))
453 >                return s;
454 >            else if (owned) {
455 >                if ((s = p.helpJoin(this, q)) < 0)
456 >                    return s;
457 >                else if (s == ADJUST)
458 >                    adjust = true;
459 >            }
460 >        }
461 >        return awaitDone(false, 0L, adjust, p);
462 >    }
463 >
464 >    /**
465 >     * Helps and/or waits for completion from get.
466 >     *
467 >     * @param timed if true use timed wait
468 >     * @param nanos wait time
469 >     * @return status on exit, or ABNORMAL if interruptible and interrupted
470 >     */
471 >    private int awaitGet(boolean timed, long nanos) {
472 >        boolean adjust = false, owned;
473 >        Thread t; ForkJoinWorkerThread wt;
474 >        ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; long deadline;
475 >        if (owned = ((t = Thread.currentThread())
476 >                     instanceof ForkJoinWorkerThread)) {
477 >            p = (wt = (ForkJoinWorkerThread)t).pool;
478 >            q = wt.workQueue;
479 >        }
480 >        else if (!Thread.interrupted()) {
481 >            p = ForkJoinPool.common;
482 >            q = ForkJoinPool.commonQueue();
483 >        }
484 >        else
485 >            return ABNORMAL;
486 >        if (!timed)
487 >            deadline = 0L;
488 >        else if (nanos <= 0L)
489 >            return 0;
490 >        else if ((deadline = nanos + System.nanoTime()) == 0L)
491 >            deadline = 1L;
492 >        if (q != null && p != null) {
493 >            if ((!timed || p.isSaturated()) &&
494 >                ((this instanceof CountedCompleter) ?
495 >                 (s = p.helpComplete(this, q, owned)) < 0 :
496 >                 (q.tryRemove(this, owned) && (s = doExec()) < 0)))
497 >                return s;
498 >            else if (owned) {
499 >                if ((s = p.helpJoin(this, q)) < 0)
500 >                    return s;
501 >                else if (s == ADJUST)
502 >                    adjust = true;
503 >            }
504 >        }
505 >        return awaitDone(!owned, deadline, adjust, p);
506      }
507  
508      /**
509 <     * Cancels, ignoring any exceptions thrown by cancel. Used during
510 <     * worker and pool shutdown. Cancel is spec'ed not to throw any
511 <     * exceptions, but if it does anyway, we have no recourse during
460 <     * shutdown, so guard against this case.
509 >     * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
510 >     * spec'ed not to throw any exceptions, but if it does anyway, we
511 >     * have no recourse, so guard against this case.
512       */
513 <    static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
513 >    static final void cancelIgnoringExceptions(Future<?> t) {
514          if (t != null) {
515              try {
516                  t.cancel(false);
# Line 512 | Line 563 | public abstract class ForkJoinTask<V> im
563      }
564  
565      /**
566 +     * Returns exception associated with the given status, or null if none.
567 +     */
568 +    private Throwable getException(int s) {
569 +        Throwable ex = null;
570 +        if ((s & ABNORMAL) != 0 &&
571 +            ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
572 +            ex = new CancellationException();
573 +        return ex;
574 +    }
575 +
576 +    /**
577       * Throws exception associated with the given status, or
578       * CancellationException if none recorded.
579       */
# Line 581 | Line 643 | public abstract class ForkJoinTask<V> im
643      public final V join() {
644          int s;
645          if ((s = status) >= 0)
646 <            s = helpOrWait(false, false, 0L);
646 >            s = awaitJoin(false);
647          if ((s & ABNORMAL) != 0)
648              reportException(s);
649          return getRawResult();
# Line 598 | Line 660 | public abstract class ForkJoinTask<V> im
660      public final V invoke() {
661          int s;
662          if ((s = doExec()) >= 0)
663 <            s = helpOrWait(false, true, 0L);
663 >            s = awaitJoin(true);
664          if ((s & ABNORMAL) != 0)
665              reportException(s);
666          return getRawResult();
# Line 627 | Line 689 | public abstract class ForkJoinTask<V> im
689              throw new NullPointerException();
690          t2.fork();
691          if ((s1 = t1.doExec()) >= 0)
692 <            s1 = t1.helpOrWait(true, false, 0L);
693 <        if ((s1 & ABNORMAL) != 0)
692 >            s1 = t1.awaitJoin(true);
693 >        if ((s1 & ABNORMAL) != 0) {
694 >            cancelIgnoringExceptions(t2);
695              t1.reportException(s1);
696 <        if ((s2 = t2.status) >= 0)
697 <            s2 = t2.helpOrWait(false, false, 0L);
698 <        if ((s2 & ABNORMAL) != 0)
699 <            t2.reportException(s2);
696 >        }
697 >        else {
698 >            if ((s2 = t2.status) >= 0)
699 >                s2 = t2.awaitJoin(false);
700 >            if ((s2 & ABNORMAL) != 0)
701 >                t2.reportException(s2);
702 >        }
703      }
704  
705      /**
# Line 662 | Line 728 | public abstract class ForkJoinTask<V> im
728              }
729              if (i == 0) {
730                  if ((s = t.doExec()) >= 0)
731 <                    s = t.helpOrWait(true, false, 0L);
731 >                    s = t.awaitJoin(true);
732                  if ((s & ABNORMAL) != 0)
733 <                    ex = t.getException();
733 >                    ex = t.getException(s);
734                  break;
735              }
736              t.fork();
# Line 674 | Line 740 | public abstract class ForkJoinTask<V> im
740                  ForkJoinTask<?> t;
741                  if ((t = tasks[i]) != null) {
742                      if ((s = t.status) >= 0)
743 <                        s = t.helpOrWait(false, false, 0L);
744 <                    if ((s & ABNORMAL) != 0) {
679 <                        ex = t.getException();
743 >                        s = t.awaitJoin(false);
744 >                    if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
745                          break;
681                    }
746                  }
747              }
748          }
749 <        if (ex != null)
749 >        if (ex != null) {
750 >            for (int i = 1, s; i <= last; ++i)
751 >                cancelIgnoringExceptions(tasks[i]);
752              rethrow(ex);
753 +        }
754      }
755  
756      /**
# Line 722 | Line 789 | public abstract class ForkJoinTask<V> im
789              }
790              if (i == 0) {
791                  if ((s = t.doExec()) >= 0)
792 <                    s = t.helpOrWait(true, false, 0L);
792 >                    s = t.awaitJoin(true);
793                  if ((s & ABNORMAL) != 0)
794 <                    ex = t.getException();
794 >                    ex = t.getException(s);
795                  break;
796              }
797              t.fork();
# Line 734 | Line 801 | public abstract class ForkJoinTask<V> im
801                  ForkJoinTask<?> t;
802                  if ((t = ts.get(i)) != null) {
803                      if ((s = t.status) >= 0)
804 <                        s = t.helpOrWait(false, false, 0L);
805 <                    if ((s & ABNORMAL) != 0) {
739 <                        ex = t.getException();
804 >                        s = t.awaitJoin(false);
805 >                    if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
806                          break;
741                    }
807                  }
808              }
809          }
810 <        if (ex != null)
810 >        if (ex != null) {
811 >            for (int i = 1, s; i <= last; ++i)
812 >                cancelIgnoringExceptions(ts.get(i));
813              rethrow(ex);
814 +        }
815          return tasks;
816      }
817  
# Line 814 | Line 882 | public abstract class ForkJoinTask<V> im
882       * @return the exception, or {@code null} if none
883       */
884      public final Throwable getException() {
885 <        int s = status;
818 <        return ((s & ABNORMAL) == 0 ? null :
819 <                (s & THROWN)   == 0 ? new CancellationException() :
820 <                getThrowableException());
885 >        return getException(status);
886      }
887  
888      /**
# Line 887 | Line 952 | public abstract class ForkJoinTask<V> im
952       * member of a ForkJoinPool and was interrupted while waiting
953       */
954      public final V get() throws InterruptedException, ExecutionException {
955 <        int s;
956 <        if (Thread.interrupted())
892 <            s = INTRPT;
893 <        else if ((s = status) >= 0)
894 <            s = helpOrWait(false, true, 0L);
895 <        if (s == INTRPT)
955 >        int s; Throwable ex;
956 >        if ((s = status) >= 0 && (s = awaitGet(false, 0L)) >= 0)
957              throw new InterruptedException();
958 <        else if ((s & THROWN) != 0)
959 <            throw new ExecutionException(getThrowableException());
960 <        else if ((s & ABNORMAL) != 0)
958 >        else if ((s & ABNORMAL) == 0)
959 >            return getRawResult();
960 >        else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null)
961              throw new CancellationException();
962          else
963 <            return getRawResult();
963 >            throw new ExecutionException(ex);
964      }
965  
966      /**
# Line 918 | Line 979 | public abstract class ForkJoinTask<V> im
979       */
980      public final V get(long timeout, TimeUnit unit)
981          throws InterruptedException, ExecutionException, TimeoutException {
982 +        int s; Throwable ex;
983          long nanos = unit.toNanos(timeout);
984 <        int s;
985 <        if (Thread.interrupted())
986 <            s = INTRPT;
987 <        else if ((s = status) >= 0 && nanos > 0L) {
988 <            long d = nanos + System.nanoTime();
927 <            s = helpOrWait(false, true, (d == 0L) ? 1L : d); // avoid 0
984 >        if ((s = status) >= 0 && (s = awaitGet(true, nanos)) >= 0) {
985 >            if (s == ABNORMAL)
986 >                throw new InterruptedException();
987 >            else
988 >                throw new TimeoutException();
989          }
990 <
991 <        if (s == INTRPT)
992 <            throw new InterruptedException();
932 <        else if (s >= 0)
933 <            throw new TimeoutException();
934 <        else if ((s & THROWN) != 0)
935 <            throw new ExecutionException(getThrowableException());
936 <        else if ((s & ABNORMAL) != 0)
990 >        else if ((s & ABNORMAL) == 0)
991 >            return getRawResult();
992 >        else if ((s & THROWN) == 0 || (ex = getThrowableException()) == null)
993              throw new CancellationException();
994          else
995 <            return getRawResult();
995 >            throw new ExecutionException(ex);
996      }
997  
998      /**
# Line 947 | Line 1003 | public abstract class ForkJoinTask<V> im
1003       */
1004      public final void quietlyJoin() {
1005          if (status >= 0)
1006 <            helpOrWait(false, false, 0L);
1006 >            awaitJoin(false);
1007      }
1008  
1009      /**
# Line 957 | Line 1013 | public abstract class ForkJoinTask<V> im
1013       */
1014      public final void quietlyInvoke() {
1015          if (doExec() >= 0)
1016 <            helpOrWait(true, false, 0L);
1016 >            awaitJoin(true);
1017      }
1018  
1019      /**
# Line 971 | Line 1027 | public abstract class ForkJoinTask<V> im
1027          Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1028          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1029              (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1030 <            p.helpQuiescePool(w.workQueue);
1030 >            p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1031          else
1032 <            ForkJoinPool.quiesceCommonPool();
1032 >            ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
1033      }
1034  
1035      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines