ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.150 by dl, Wed Feb 3 12:41:58 2021 UTC vs.
Revision 1.151 by dl, Sat Feb 6 17:33:47 2021 UTC

# Line 271 | Line 271 | public abstract class ForkJoinTask<V> im
271       *
272       * @param interruptible true if wait can be cancelled by interrupt
273       * @param deadline if non-zero use timed waits and possibly timeout
274 <     * @param pool if nonnull pool to uncompensate after unblocking
274 >     * @param pool current pool if known
275 >     * @param uncompensate if true uncompensate after unblocking
276       * @return status on exit, or ABNORMAL if interrupted while waiting
277       */
278      private int awaitDone(boolean interruptible, long deadline,
279 <                          ForkJoinPool pool) {
279 >                          ForkJoinPool pool, boolean uncompensate) {
280          int s;
281 <        boolean interrupted = false, queued = false, parked = false;
281 >        boolean interrupted = false, queued = false, parked = false, fail = false;
282          Aux node = null;
283          while ((s = status) >= 0) {
284              Aux a; long ns;
285 <            if (parked && Thread.interrupted()) {
285 >            if (fail || (fail = (pool != null && pool.mode < 0)))
286 >                casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
287 >            else if (parked && Thread.interrupted()) {
288                  if (interruptible) {
289                      s = ABNORMAL;
290                      break;
# Line 307 | Line 310 | public abstract class ForkJoinTask<V> im
310              else {
311                  try {
312                      node = new Aux(Thread.currentThread(), null);
313 <                } catch (Throwable ex) {     // try to cancel if cannot create
314 <                    casStatus(s, s | (DONE | ABNORMAL));
313 >                } catch (Throwable ex) {     // cannot create
314 >                    fail = true;
315                  }
316              }
317          }
318 <        if (pool != null)
318 >        if (pool != null && uncompensate)
319              pool.uncompensate();
320  
321          if (queued) {
# Line 434 | Line 437 | public abstract class ForkJoinTask<V> im
437       * Helps and/or waits for completion from join, get, or invoke;
438       * called from either internal or external threads.
439       *
440 +     * @param submittedPool if nonnull, known externally submitted pool
441       * @param ran true if task known to have been exec'd
442       * @param interruptible true if park interruptibly when external
443       * @param timed true if use timed wait
444       * @param nanos if timed, timeout value
445       * @return ABNORMAL if interrupted, else status on exit
446       */
447 <    private int awaitJoin(boolean ran, boolean interruptible, boolean timed,
447 >    private int awaitJoin(ForkJoinPool submittedPool, boolean ran,
448 >                          boolean interruptible, boolean timed,
449                            long nanos) {
450 <        boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
450 >        boolean internal; ForkJoinPool p, hostPool;
451 >        ForkJoinPool.WorkQueue q; int s;
452          Thread t; ForkJoinWorkerThread wt;
453          if (internal = ((t = Thread.currentThread())
454                          instanceof ForkJoinWorkerThread)) {
455              p = (wt = (ForkJoinWorkerThread)t).pool;
456              q = wt.workQueue;
457 +            if (submittedPool == null)
458 +                submittedPool = p;
459 +            else if (submittedPool != p)
460 +                internal = false;
461          }
462          else {
463              p = ForkJoinPool.common;
# Line 464 | Line 474 | public abstract class ForkJoinTask<V> im
474              else if ((deadline = nanos + System.nanoTime()) == 0L)
475                  deadline = 1L;
476          }
477 <        ForkJoinPool uncompensate = null;
478 <        if (q != null && p != null) {            // try helping
479 <            if ((!timed || p.isSaturated()) &&
480 <                ((this instanceof CountedCompleter) ?
481 <                 (s = p.helpComplete(this, q, internal)) < 0 :
482 <                 (!ran &&
483 <                  (!internal && q.externalTryUnpush(this)) ||
484 <                  q.tryRemove(this, internal)) &&
485 <                 (s = doExec()) < 0))
477 >        boolean uncompensate = false;
478 >        // try helping unless timed, external, and pool has workers
479 >        if (q != null && p != null &&
480 >            (internal || !timed || (p.mode & SMASK) == 0)) {
481 >            if (this instanceof CountedCompleter)
482 >                s = p.helpComplete(this, q, internal);
483 >            else if (!ran &&
484 >                     (!internal && q.externalTryUnpush(this)) ||
485 >                     q.tryRemove(this, internal))
486 >                s = doExec();
487 >            else
488 >                s = status;
489 >            if (s < 0)
490                  return s;
491 <            if (internal) {
491 >            else if (internal) {
492                  if ((s = p.helpJoin(this, q)) < 0)
493                      return s;
494                  if (s == UNCOMPENSATE)
495 <                    uncompensate = p;
495 >                    uncompensate = true;
496                  interruptible = false;
497              }
498          }
499 <        return awaitDone(interruptible, deadline, uncompensate);
499 >        return awaitDone(interruptible, deadline, submittedPool, uncompensate);
500      }
501  
502      /**
# Line 638 | Line 652 | public abstract class ForkJoinTask<V> im
652      public final V join() {
653          int s;
654          if ((s = status) >= 0)
655 <            s = awaitJoin(false, false, false, 0L);
655 >            s = awaitJoin(null, false, false, false, 0L);
656          if ((s & ABNORMAL) != 0)
657              reportException(s);
658          return getRawResult();
# Line 655 | Line 669 | public abstract class ForkJoinTask<V> im
669      public final V invoke() {
670          int s;
671          if ((s = doExec()) >= 0)
672 <            s = awaitJoin(true, false, false, 0L);
672 >            s = awaitJoin(null, true, false, false, 0L);
673          if ((s & ABNORMAL) != 0)
674              reportException(s);
675          return getRawResult();
# Line 684 | Line 698 | public abstract class ForkJoinTask<V> im
698              throw new NullPointerException();
699          t2.fork();
700          if ((s1 = t1.doExec()) >= 0)
701 <            s1 = t1.awaitJoin(true, false, false, 0L);
701 >            s1 = t1.awaitJoin(null, true, false, false, 0L);
702          if ((s1 & ABNORMAL) != 0) {
703              cancelIgnoringExceptions(t2);
704              t1.reportException(s1);
705          }
706 <        else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
706 >        else if (((s2 = t2.awaitJoin(null, false, false, false, 0L)) & ABNORMAL) != 0)
707              t2.reportException(s2);
708      }
709  
# Line 720 | Line 734 | public abstract class ForkJoinTask<V> im
734              if (i == 0) {
735                  int s;
736                  if ((s = t.doExec()) >= 0)
737 <                    s = t.awaitJoin(true, false, false, 0L);
737 >                    s = t.awaitJoin(null, true, false, false, 0L);
738                  if ((s & ABNORMAL) != 0)
739                      ex = t.getException(s);
740                  break;
# Line 733 | Line 747 | public abstract class ForkJoinTask<V> im
747                  if ((t = tasks[i]) != null) {
748                      int s;
749                      if ((s = t.status) >= 0)
750 <                        s = t.awaitJoin(false, false, false, 0L);
750 >                        s = t.awaitJoin(null, false, false, false, 0L);
751                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
752                          break;
753                  }
# Line 783 | Line 797 | public abstract class ForkJoinTask<V> im
797              if (i == 0) {
798                  int s;
799                  if ((s = t.doExec()) >= 0)
800 <                    s = t.awaitJoin(true, false, false, 0L);
800 >                    s = t.awaitJoin(null, true, false, false, 0L);
801                  if ((s & ABNORMAL) != 0)
802                      ex = t.getException(s);
803                  break;
# Line 796 | Line 810 | public abstract class ForkJoinTask<V> im
810                  if ((t = ts.get(i)) != null) {
811                      int s;
812                      if ((s = t.status) >= 0)
813 <                        s = t.awaitJoin(false, false, false, 0L);
813 >                        s = t.awaitJoin(null, false, false, false, 0L);
814                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
815                          break;
816                  }
# Line 947 | Line 961 | public abstract class ForkJoinTask<V> im
961       * member of a ForkJoinPool and was interrupted while waiting
962       */
963      public final V get() throws InterruptedException, ExecutionException {
964 <        int s;
965 <        if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
964 >        int s = awaitJoin(null, false, true, false, 0L);
965 >        if ((s & ABNORMAL) != 0)
966              reportExecutionException(s);
967          return getRawResult();
968      }
# Line 969 | Line 983 | public abstract class ForkJoinTask<V> im
983       */
984      public final V get(long timeout, TimeUnit unit)
985          throws InterruptedException, ExecutionException, TimeoutException {
986 <        int s;
987 <        if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
988 <            (s & ABNORMAL) != 0)
986 >        long nanos = unit.toNanos(timeout);
987 >        int s = awaitJoin(null, false, true, true, nanos);
988 >        if (s >= 0 || (s & ABNORMAL) != 0)
989              reportExecutionException(s);
990          return getRawResult();
991      }
# Line 984 | Line 998 | public abstract class ForkJoinTask<V> im
998       */
999      public final void quietlyJoin() {
1000          if (status >= 0)
1001 <            awaitJoin(false, false, false, 0L);
1001 >            awaitJoin(null, false, false, false, 0L);
1002      }
1003  
1004 +
1005      /**
1006       * Commences performing this task and awaits its completion if
1007       * necessary, without returning its result or throwing its
# Line 994 | Line 1009 | public abstract class ForkJoinTask<V> im
1009       */
1010      public final void quietlyInvoke() {
1011          if (doExec() >= 0)
1012 <            awaitJoin(true, false, false, 0L);
1012 >            awaitJoin(null, true, false, false, 0L);
1013 >    }
1014 >
1015 >    // Versions of join/get for pool.invoke* methods that use external,
1016 >    // possibly-non-commonPool submits
1017 >
1018 >    final V joinForPoolInvoke(ForkJoinPool pool) {
1019 >        int s;
1020 >        if ((s = status) >= 0)
1021 >            s = awaitJoin(pool, false, false, false, 0L);
1022 >        if ((s & ABNORMAL) != 0)
1023 >            reportException(s);
1024 >        return getRawResult();
1025 >    }
1026 >    final void tryJoinForPoolInvoke(ForkJoinPool pool) {
1027 >        if (status >= 0)
1028 >            awaitJoin(pool, false, false, false, 0L);
1029 >    }
1030 >    final V getForPoolInvoke(ForkJoinPool pool)
1031 >        throws InterruptedException, ExecutionException {
1032 >        int s = awaitJoin(pool, false, true, false, 0L);
1033 >        if ((s & ABNORMAL) != 0)
1034 >            reportExecutionException(s);
1035 >        return getRawResult();
1036 >    }
1037 >
1038 >    final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1039 >        throws InterruptedException, ExecutionException, TimeoutException {
1040 >        int s = awaitJoin(pool, false, true, true, nanos);
1041 >        if (s >= 0 || (s & ABNORMAL) != 0)
1042 >            reportExecutionException(s);
1043 >        return getRawResult();
1044      }
1045  
1046      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines