ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.156 by dl, Fri Mar 25 12:29:55 2022 UTC vs.
Revision 1.157 by dl, Mon Apr 4 12:02:42 2022 UTC

# Line 235 | Line 235 | public abstract class ForkJoinTask<V> im
235      static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
236      static final int POOLSUBMIT   = 1 << 18; // for pool.submit vs fork
237  
238 +    // flags for awaitDone (in addition to above)
239 +    static final int RAN           = 1;
240 +    static final int INTERRUPTIBLE = 2;
241 +    static final int TIMED         = 4;
242 +
243      // Fields
244      volatile int status;                // accessed directly by pool and workers
245      private transient volatile Aux aux; // either waiters or thrown Exception
# Line 363 | Line 368 | public abstract class ForkJoinTask<V> im
368       * Helps and/or waits for completion from join, get, or invoke;
369       * called from either internal or external threads.
370       *
371 <     * @param s last known status
372 <     * @param ran true if task known to have been exec'd
368 <     * @param interruptible true if park interruptibly when external
369 <     * @param deadline if timed, timeout deadline, else 0
371 >     * @param how flags for POOLSUBMIT, RAN, INTERRUPTIBLE, TIMED
372 >     * @param deadline if timed, timeout deadline
373       * @return ABNORMAL if interrupted, else status on exit
374       */
375 <    private int awaitDone(int s, boolean ran, boolean interruptible,
376 <                          long deadline) {
374 <        Thread t; ForkJoinWorkerThread wt;
375 <        ForkJoinPool p = null, cp;
375 >    private int awaitDone(int how, long deadline) {
376 >        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool p;
377          ForkJoinPool.WorkQueue q = null;
378 +        boolean timed = (how & TIMED) != 0;
379          boolean owned = false, uncompensate = false;
380          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
381              owned = true;
382              q = (wt = (ForkJoinWorkerThread)t).workQueue;
383              p = wt.pool;
384          }
385 <        else if ((s & POOLSUBMIT) == 0 && (cp = ForkJoinPool.common) != null &&
386 <                 (q = cp.externalQueue()) != null)
385 <            p = cp;
385 >        else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0)
386 >            q = p.externalQueue();
387          if (q != null && p != null) { // try helping
387            boolean timed = (deadline != 0L);
388              if (this instanceof CountedCompleter)
389                  s = p.helpComplete(this, q, owned, timed);
390 <            else if (ran || (s = q.tryRemoveAndExec(this, owned)) >= 0)
390 >            else if ((how & RAN) != 0 ||
391 >                     (s = q.tryRemoveAndExec(this, owned)) >= 0)
392                  s = (owned) ? p.helpJoin(this, q, timed) : 0;
393              if (s < 0)
394                  return s;
395 <            else if (s == UNCOMPENSATE)
395 >            if (s == UNCOMPENSATE)
396                  uncompensate = true;
397          }
398          Aux node = null;
399 +        long ns = 0L;
400          boolean interrupted = false, queued = false;
401 <        for (boolean parked = false, fail = false;;) {
402 <            Aux a; long ns;
401 >        for (;;) {                    // install node and await signal
402 >            Aux a;
403              if ((s = status) < 0)
404                  break;
405 <            else if (fail || (fail = (p != null && p.runState < 0)))
406 <                casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
407 <            else if (queued) {
406 <                if (parked && Thread.interrupted()) {
407 <                    interrupted = true;
408 <                    if (interruptible) {
409 <                        s = ABNORMAL;
410 <                        break;
411 <                    }
412 <                }
413 <                parked = true;
414 <                if (deadline == 0L)
415 <                    LockSupport.park();
416 <                else if ((ns = deadline - System.nanoTime()) > 0L)
417 <                    LockSupport.parkNanos(ns);
418 <                else
419 <                    break;
420 <            }
421 <            else if (node != null) {
405 >            else if (node == null)
406 >                node = new Aux(Thread.currentThread(), null);
407 >            else if (!queued) {
408                  if (((a = aux) == null || a.ex == null) &&
409                      (queued = casAux(node.next = a, node)))
410                      LockSupport.setCurrentBlocker(this);
411              }
412 <            else {
413 <                try {
414 <                    node = new Aux(Thread.currentThread(), null);
415 <                } catch (Throwable ex) {     // cannot create
416 <                    fail = true;
412 >            else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
413 >                s = 0;
414 >                break;
415 >            }
416 >            else if (Thread.interrupted()) {
417 >                interrupted = true;
418 >                if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
419 >                    cancelIgnoringExceptions(this); // cancel on shutdown
420 >                else if ((how & INTERRUPTIBLE) != 0) {
421 >                    s = ABNORMAL;
422 >                    break;
423                  }
424              }
425 +            else if ((s = status) < 0) // recheck
426 +                break;
427 +            else if (timed)
428 +                LockSupport.parkNanos(ns);
429 +            else
430 +                LockSupport.park();
431          }
432 <        if (p != null && uncompensate)
432 >        if (uncompensate)
433              p.uncompensate();
434  
435          if (queued) {
# Line 525 | Line 523 | public abstract class ForkJoinTask<V> im
523       */
524      private Throwable getException(int s) {
525          Throwable ex = null;
526 <        if ((s & ABNORMAL) != 0 &&
529 <            ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
526 >        if ((s & ABNORMAL) != 0 && (ex = getThrowableException()) == null)
527              ex = new CancellationException();
528          return ex;
529      }
# Line 536 | Line 533 | public abstract class ForkJoinTask<V> im
533       * CancellationException if none recorded.
534       */
535      private void reportException(int s) {
536 <        ForkJoinTask.<RuntimeException>uncheckedThrow(
540 <            (s & THROWN) != 0 ? getThrowableException() : null);
536 >        ForkJoinTask.<RuntimeException>uncheckedThrow(getThrowableException());
537      }
538  
539      /**
# Line 545 | Line 541 | public abstract class ForkJoinTask<V> im
541       * necessary in an ExecutionException.
542       */
543      private void reportExecutionException(int s) {
544 <        Throwable ex = null;
544 >        Throwable ex = null, rx;
545          if (s == ABNORMAL)
546              ex = new InterruptedException();
547          else if (s >= 0)
548              ex = new TimeoutException();
549 <        else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
550 <            ex = new ExecutionException(ex);
549 >        else if ((rx = getThrowableException()) != null)
550 >            ex = new ExecutionException(rx);
551          ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
552      }
553  
# Line 596 | Line 592 | public abstract class ForkJoinTask<V> im
592      public final ForkJoinTask<V> fork() {
593          Thread t; ForkJoinWorkerThread wt;
594          ForkJoinPool p; ForkJoinPool.WorkQueue q;
595 <        U.storeFence();  // ensure safely publishable
595 >        U.storeStoreFence();  // ensure safely publishable
596          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
597              p = (wt = (ForkJoinWorkerThread)t).pool;
598              q = wt.workQueue;
# Line 621 | Line 617 | public abstract class ForkJoinTask<V> im
617      public final V join() {
618          int s;
619          if ((s = status) >= 0)
620 <            s = awaitDone(s, false, false, 0L);
620 >            s = awaitDone(s & POOLSUBMIT, 0L);
621          if ((s & ABNORMAL) != 0)
622              reportException(s);
623          return getRawResult();
# Line 638 | Line 634 | public abstract class ForkJoinTask<V> im
634      public final V invoke() {
635          int s;
636          if ((s = doExec()) >= 0)
637 <            s = awaitDone(s, true, false, 0L);
637 >            s = awaitDone(RAN, 0L);
638          if ((s & ABNORMAL) != 0)
639              reportException(s);
640          return getRawResult();
# Line 667 | Line 663 | public abstract class ForkJoinTask<V> im
663              throw new NullPointerException();
664          t2.fork();
665          if ((s1 = t1.doExec()) >= 0)
666 <            s1 = t1.awaitDone(s1, true, false, 0L);
666 >            s1 = t1.awaitDone(RAN, 0L);
667          if ((s1 & ABNORMAL) != 0) {
668              cancelIgnoringExceptions(t2);
669              t1.reportException(s1);
670          }
671          else {
672              if ((s2 = t2.status) >= 0)
673 <                s2 = t2.awaitDone(s2, false, false, 0L);
673 >                s2 = t2.awaitDone(0, 0L);
674              if ((s2 & ABNORMAL) != 0)
675                  t2.reportException(s2);
676          }
# Line 707 | Line 703 | public abstract class ForkJoinTask<V> im
703              if (i == 0) {
704                  int s;
705                  if ((s = t.doExec()) >= 0)
706 <                    s = t.awaitDone(s, true, false, 0L);
706 >                    s = t.awaitDone(RAN, 0L);
707                  if ((s & ABNORMAL) != 0)
708                      ex = t.getException(s);
709                  break;
# Line 720 | Line 716 | public abstract class ForkJoinTask<V> im
716                  if ((t = tasks[i]) != null) {
717                      int s;
718                      if ((s = t.status) >= 0)
719 <                        s = t.awaitDone(s, false, false, 0L);
719 >                        s = t.awaitDone(0, 0L);
720                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
721                          break;
722                  }
# Line 770 | Line 766 | public abstract class ForkJoinTask<V> im
766              if (i == 0) {
767                  int s;
768                  if ((s = t.doExec()) >= 0)
769 <                    s = t.awaitDone(s, true, false, 0L);
769 >                    s = t.awaitDone(RAN, 0L);
770                  if ((s & ABNORMAL) != 0)
771                      ex = t.getException(s);
772                  break;
# Line 783 | Line 779 | public abstract class ForkJoinTask<V> im
779                  if ((t = ts.get(i)) != null) {
780                      int s;
781                      if ((s = t.status) >= 0)
782 <                        s = t.awaitDone(s, false, false, 0L);
782 >                        s = t.awaitDone(0, 0L);
783                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
784                          break;
785                  }
# Line 961 | Line 957 | public abstract class ForkJoinTask<V> im
957          if (Thread.interrupted())
958              s = ABNORMAL;
959          else if ((s = status) >= 0)
960 <            s = awaitDone(s, false, true, 0L);
960 >            s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE, 0L);
961          if ((s & ABNORMAL) != 0)
962              reportExecutionException(s);
963          return getRawResult();
# Line 983 | Line 979 | public abstract class ForkJoinTask<V> im
979       */
980      public final V get(long timeout, TimeUnit unit)
981          throws InterruptedException, ExecutionException, TimeoutException {
982 <        long nanos = unit.toNanos(timeout), deadline;
982 >        long nanos = unit.toNanos(timeout);
983          int s;
984          if (Thread.interrupted())
985              s = ABNORMAL;
986 <        else if ((s = status) >= 0 && nanos > 0L) {
987 <            if ((deadline = nanos + System.nanoTime()) == 0L)
988 <                deadline = 1L;
993 <            s = awaitDone(s, false, true, deadline);
994 <        }
986 >        else if ((s = status) >= 0 && nanos > 0L)
987 >            s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
988 >                          nanos + System.nanoTime());
989          if (s >= 0 || (s & ABNORMAL) != 0)
990              reportExecutionException(s);
991          return getRawResult();
# Line 1006 | Line 1000 | public abstract class ForkJoinTask<V> im
1000      public final void quietlyJoin() {
1001          int s;
1002          if ((s = status) >= 0)
1003 <            awaitDone(s, false, false, 0L);
1003 >            awaitDone(s & POOLSUBMIT, 0L);
1004      }
1005  
1006      /**
# Line 1017 | Line 1011 | public abstract class ForkJoinTask<V> im
1011      public final void quietlyInvoke() {
1012          int s;
1013          if ((s = doExec()) >= 0)
1014 <            awaitDone(s, true, false, 0L);
1014 >            awaitDone(RAN, 0L);
1015      }
1016  
1017      /**
# Line 1035 | Line 1029 | public abstract class ForkJoinTask<V> im
1029      public final boolean quietlyJoin(long timeout, TimeUnit unit)
1030          throws InterruptedException {
1031          int s;
1032 <        long nanos = unit.toNanos(timeout), deadline;
1032 >        long nanos = unit.toNanos(timeout);
1033          if (Thread.interrupted())
1034              s = ABNORMAL;
1035 <        else if ((s = status) >= 0 && nanos > 0L) {
1036 <            if ((deadline = nanos + System.nanoTime()) == 0L)
1037 <                deadline = 1L;
1044 <            s = awaitDone(s, false, true, deadline);
1045 <        }
1035 >        else if ((s = status) >= 0 && nanos > 0L)
1036 >            s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
1037 >                          nanos + System.nanoTime());
1038          if (s == ABNORMAL)
1039              throw new InterruptedException();
1040          else
# Line 1061 | Line 1053 | public abstract class ForkJoinTask<V> im
1053      public final boolean quietlyJoinUninterruptibly(long timeout,
1054                                                      TimeUnit unit) {
1055          int s;
1056 <        long nanos = unit.toNanos(timeout), deadline;
1057 <        boolean interrupted = Thread.interrupted();
1058 <        if ((s = status) >= 0 && nanos > 0L) {
1067 <            if ((deadline = nanos + System.nanoTime()) == 0L)
1068 <                deadline = 1L;
1069 <            s = awaitDone(s, false, false, deadline);
1070 <        }
1071 <        if (interrupted || s == ABNORMAL)
1072 <            Thread.currentThread().interrupt();
1056 >        long nanos = unit.toNanos(timeout);
1057 >        if ((s = status) >= 0 && nanos > 0L)
1058 >            s = awaitDone((s & POOLSUBMIT) | TIMED, nanos + System.nanoTime());
1059          return (s < 0);
1060      }
1061  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines