ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.154 by dl, Sun Nov 14 16:19:13 2021 UTC vs.
Revision 1.155 by dl, Fri Mar 18 16:01:42 2022 UTC

# Line 186 | Line 186 | public abstract class ForkJoinTask<V> im
186       * This is sometimes hard to see because this file orders exported
187       * methods in a way that flows well in javadocs.
188       *
189 <     * Revision notes: The use of "Aux" field replaces previous
190 <     * reliance on a table to hold exceptions and synchronized blocks
191 <     * and monitors to wait for completion. This class uses
192 <     * jdk-internal Unsafe for atomics and special memory modes,
193 <     * rather than VarHandles, to avoid initialization dependencies in
194 <     * other jdk components that require early parallelism.
189 >     * Revision notes: This class uses jdk-internal Unsafe for atomics
190 >     * and special memory modes, rather than VarHandles, to avoid
191 >     * initialization dependencies in other jdk components that
192 >     * require early parallelism. It also simplifies handling of
193 >     * pool-submitted tasks, among other minor improvements.
194       */
195  
196      /**
# Line 208 | Line 207 | public abstract class ForkJoinTask<V> im
207              this.thread = thread;
208              this.ex = ex;
209          }
211        @SuppressWarnings("removal")
210          final boolean casNext(Aux c, Aux v) { // used only in cancellation
211 <            return U.compareAndSetObject(this, NEXT, c, v);
211 >            return U.compareAndSetReference(this, NEXT, c, v);
212          }
213          private static final Unsafe U;
214          private static final long NEXT;
# Line 230 | Line 228 | public abstract class ForkJoinTask<V> im
228       * control bits occupy only (some of) the upper half (16 bits) of
229       * status field. The lower bits are used for user-defined tags.
230       */
231 <    private static final int DONE         = 1 << 31; // must be negative
232 <    private static final int ABNORMAL     = 1 << 16;
233 <    private static final int THROWN       = 1 << 17;
234 <    private static final int SMASK        = 0xffff;  // short bits for tags
235 <    private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
231 >    static final int DONE         = 1 << 31; // must be negative
232 >    static final int ABNORMAL     = 1 << 16;
233 >    static final int THROWN       = 1 << 17;
234 >    static final int SMASK        = 0xffff;  // short bits for tags
235 >    static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
236 >    static final int POOLSUBMIT   = 1 << 18; // for pool.submit vs fork
237  
238      // Fields
239      volatile int status;                // accessed directly by pool and workers
# Line 250 | Line 249 | public abstract class ForkJoinTask<V> im
249      private boolean casStatus(int c, int v) {
250          return U.compareAndSetInt(this, STATUS, c, v);
251      }
253    @SuppressWarnings("removal")
252      private boolean casAux(Aux c, Aux v) {
253 <        return U.compareAndSetObject(this, AUX, c, v);
253 >        return U.compareAndSetReference(this, AUX, c, v);
254 >    }
255 >
256 >    /**
257 >     * Marks this task as an external pool submission.
258 >     */
259 >    final void markPoolSubmission() {
260 >        getAndBitwiseOrStatus(POOLSUBMIT);
261      }
262  
263      /** Removes and unparks waiters */
# Line 358 | Line 363 | public abstract class ForkJoinTask<V> im
363       * Helps and/or waits for completion from join, get, or invoke;
364       * called from either internal or external threads.
365       *
366 <     * @param pool if nonnull, known submitted pool, else assumes current pool
366 >     * @param s last known status
367       * @param ran true if task known to have been exec'd
368       * @param interruptible true if park interruptibly when external
369 <     * @param timed true if use timed wait
365 <     * @param nanos if timed, timeout value
369 >     * @param deadline if timed, timeout deadline, else 0
370       * @return ABNORMAL if interrupted, else status on exit
371       */
372 <    private int awaitDone(ForkJoinPool pool, boolean ran,
373 <                          boolean interruptible, boolean timed,
374 <                          long nanos) {
375 <        ForkJoinPool p; boolean internal; int s; Thread t;
372 >    private int awaitDone(int s, boolean ran, boolean interruptible,
373 >                          long deadline) {
374 >        Thread t; ForkJoinWorkerThread wt;
375 >        ForkJoinPool p = null, cp;
376          ForkJoinPool.WorkQueue q = null;
377 +        boolean owned = false, uncompensate = false;
378          if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
379 <            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
379 >            owned = true;
380 >            q = (wt = (ForkJoinWorkerThread)t).workQueue;
381              p = wt.pool;
376            if (pool == null)
377                pool = p;
378            if (internal = (pool == p))
379                q = wt.workQueue;
382          }
383 <        else {
384 <            internal = false;
385 <            p = ForkJoinPool.commonPool();
386 <            if (pool == null)
387 <                pool = p;
388 <            if (pool == p && p != null)
389 <                q = p.externalQueue();
390 <        }
391 <        if (interruptible && Thread.interrupted())
392 <            return ABNORMAL;
393 <        if ((s = status) < 0)
394 <            return s;
395 <        long deadline = 0L;
394 <        if (timed) {
395 <            if (nanos <= 0L)
396 <                return 0;
397 <            else if ((deadline = nanos + System.nanoTime()) == 0L)
398 <                deadline = 1L;
383 >        else if ((s & POOLSUBMIT) == 0 && (cp = ForkJoinPool.common) != null &&
384 >                 (q = cp.externalQueue()) != null)
385 >            p = cp;
386 >        if (q != null && p != null) { // try helping
387 >            boolean timed = (deadline != 0L);
388 >            if (this instanceof CountedCompleter)
389 >                s = p.helpComplete(this, q, owned, timed);
390 >            else if (ran || (s = q.tryRemoveAndExec(this, owned)) >= 0)
391 >                s = (owned) ? p.helpJoin(this, q, timed) : 0;
392 >            if (s < 0)
393 >                return s;
394 >            else if (s == UNCOMPENSATE)
395 >                uncompensate = true;
396          }
400        boolean uncompensate = false;
401        if (q != null && p != null) {  // try helping
402            // help even in timed mode if pool has no parallelism
403            boolean canHelp = !timed || (p.mode & SMASK) == 0;
404            if (canHelp) {
405                if ((this instanceof CountedCompleter) &&
406                    (s = p.helpComplete(this, q, internal)) < 0)
407                    return s;
408                if (!ran && ((!internal && q.externalTryUnpush(this)) ||
409                             q.tryRemove(this, internal)) && (s = doExec()) < 0)
410                    return s;
411            }
412            if (internal) {
413                if ((s = p.helpJoin(this, q, canHelp)) < 0)
414                    return s;
415                if (s == UNCOMPENSATE)
416                    uncompensate = true;
417            }
418        }
419        // block until done or cancelled wait
420        boolean interrupted = false, queued = false;
421        boolean parked = false, fail = false;
397          Aux node = null;
398 <        while ((s = status) >= 0) {
398 >        boolean interrupted = false, queued = false;
399 >        for (boolean parked = false, fail = false;;) {
400              Aux a; long ns;
401 <            if (fail || (fail = (pool != null && pool.mode < 0)))
401 >            if ((s = status) < 0)
402 >                break;
403 >            else if (fail || (fail = (p != null && p.runState < 0)))
404                  casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
427            else if (parked && Thread.interrupted()) {
428                if (interruptible) {
429                    s = ABNORMAL;
430                    break;
431                }
432                interrupted = true;
433            }
405              else if (queued) {
406 <                if (deadline != 0L) {
407 <                    if ((ns = deadline - System.nanoTime()) <= 0L)
406 >                if (parked && Thread.interrupted()) {
407 >                    interrupted = true;
408 >                    if (interruptible) {
409 >                        s = ABNORMAL;
410                          break;
411 <                    LockSupport.parkNanos(ns);
411 >                    }
412                  }
440                else
441                    LockSupport.park();
413                  parked = true;
414 +                if (deadline == 0L)
415 +                    LockSupport.park();
416 +                else if ((ns = deadline - System.nanoTime()) > 0L)
417 +                    LockSupport.parkNanos(ns);
418 +                else
419 +                    break;
420              }
421              else if (node != null) {
422 <                if ((a = aux) != null && a.ex != null)
423 <                    Thread.onSpinWait();     // exception in progress
447 <                else if (queued = casAux(node.next = a, node))
422 >                if (((a = aux) == null || a.ex == null) &&
423 >                    (queued = casAux(node.next = a, node)))
424                      LockSupport.setCurrentBlocker(this);
425              }
426              else {
# Line 455 | Line 431 | public abstract class ForkJoinTask<V> im
431                  }
432              }
433          }
434 <        if (pool != null && uncompensate)
435 <            pool.uncompensate();
434 >        if (p != null && uncompensate)
435 >            p.uncompensate();
436  
437          if (queued) {
438              LockSupport.setCurrentBlocker(null);
# Line 618 | Line 594 | public abstract class ForkJoinTask<V> im
594       * @return {@code this}, to simplify usage
595       */
596      public final ForkJoinTask<V> fork() {
597 <        Thread t; ForkJoinWorkerThread w;
598 <        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
599 <            (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
597 >        Thread t; ForkJoinWorkerThread wt;
598 >        ForkJoinPool p; ForkJoinPool.WorkQueue q;
599 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
600 >            p = (wt = (ForkJoinWorkerThread)t).pool;
601 >            q = wt.workQueue;
602 >        }
603          else
604 <            ForkJoinPool.externalPushCommon(this);
604 >            q = (p = ForkJoinPool.common).submissionQueue(false);
605 >        q.push(this, p, true);
606          return this;
607      }
608  
# Line 640 | Line 620 | public abstract class ForkJoinTask<V> im
620      public final V join() {
621          int s;
622          if ((s = status) >= 0)
623 <            s = awaitDone(null, false, false, false, 0L);
623 >            s = awaitDone(s, false, false, 0L);
624          if ((s & ABNORMAL) != 0)
625              reportException(s);
626          return getRawResult();
# Line 657 | Line 637 | public abstract class ForkJoinTask<V> im
637      public final V invoke() {
638          int s;
639          if ((s = doExec()) >= 0)
640 <            s = awaitDone(null, true, false, false, 0L);
640 >            s = awaitDone(s, true, false, 0L);
641          if ((s & ABNORMAL) != 0)
642              reportException(s);
643          return getRawResult();
# Line 686 | Line 666 | public abstract class ForkJoinTask<V> im
666              throw new NullPointerException();
667          t2.fork();
668          if ((s1 = t1.doExec()) >= 0)
669 <            s1 = t1.awaitDone(null, true, false, false, 0L);
669 >            s1 = t1.awaitDone(s1, true, false, 0L);
670          if ((s1 & ABNORMAL) != 0) {
671              cancelIgnoringExceptions(t2);
672              t1.reportException(s1);
673          }
674 <        else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
675 <            t2.reportException(s2);
674 >        else {
675 >            if ((s2 = t2.status) >= 0)
676 >                s2 = t2.awaitDone(s2, false, false, 0L);
677 >            if ((s2 & ABNORMAL) != 0)
678 >                t2.reportException(s2);
679 >        }
680      }
681  
682      /**
# Line 722 | Line 706 | public abstract class ForkJoinTask<V> im
706              if (i == 0) {
707                  int s;
708                  if ((s = t.doExec()) >= 0)
709 <                    s = t.awaitDone(null, true, false, false, 0L);
709 >                    s = t.awaitDone(s, true, false, 0L);
710                  if ((s & ABNORMAL) != 0)
711                      ex = t.getException(s);
712                  break;
# Line 735 | Line 719 | public abstract class ForkJoinTask<V> im
719                  if ((t = tasks[i]) != null) {
720                      int s;
721                      if ((s = t.status) >= 0)
722 <                        s = t.awaitDone(null, false, false, false, 0L);
722 >                        s = t.awaitDone(s, false, false, 0L);
723                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
724                          break;
725                  }
# Line 785 | Line 769 | public abstract class ForkJoinTask<V> im
769              if (i == 0) {
770                  int s;
771                  if ((s = t.doExec()) >= 0)
772 <                    s = t.awaitDone(null, true, false, false, 0L);
772 >                    s = t.awaitDone(s, true, false, 0L);
773                  if ((s & ABNORMAL) != 0)
774                      ex = t.getException(s);
775                  break;
# Line 798 | Line 782 | public abstract class ForkJoinTask<V> im
782                  if ((t = ts.get(i)) != null) {
783                      int s;
784                      if ((s = t.status) >= 0)
785 <                        s = t.awaitDone(null, false, false, false, 0L);
785 >                        s = t.awaitDone(s, false, false, 0L);
786                      if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
787                          break;
788                  }
# Line 871 | Line 855 | public abstract class ForkJoinTask<V> im
855          return (status & (DONE | ABNORMAL)) == DONE;
856      }
857  
858 +    @Override
859 +    public State state() {
860 +        int s = status;
861 +        return (s >= 0) ? State.RUNNING :
862 +            ((s & (DONE | ABNORMAL)) == DONE) ? State.SUCCESS:
863 +            ((s & (ABNORMAL | THROWN)) == (ABNORMAL | THROWN)) ? State.FAILED :
864 +            State.CANCELLED;
865 +    }
866 +
867 +    @Override
868 +    public V resultNow() {
869 +        if (!isCompletedNormally())
870 +            throw new IllegalStateException();
871 +        return getRawResult();
872 +    }
873 +
874 +    @Override
875 +    public Throwable exceptionNow() {
876 +        if ((status & (ABNORMAL | THROWN)) != (ABNORMAL | THROWN))
877 +            throw new IllegalStateException();
878 +        return getThrowableException();
879 +    }
880 +
881      /**
882       * Returns the exception thrown by the base computation, or a
883       * {@code CancellationException} if cancelled, or {@code null} if
# Line 949 | Line 956 | public abstract class ForkJoinTask<V> im
956       * member of a ForkJoinPool and was interrupted while waiting
957       */
958      public final V get() throws InterruptedException, ExecutionException {
959 <        int s = awaitDone(null, false, true, false, 0L);
959 >        int s;
960 >        if (Thread.interrupted())
961 >            s = ABNORMAL;
962 >        else if ((s = status) >= 0)
963 >            s = awaitDone(s, false, true, 0L);
964          if ((s & ABNORMAL) != 0)
965              reportExecutionException(s);
966          return getRawResult();
# Line 971 | Line 982 | public abstract class ForkJoinTask<V> im
982       */
983      public final V get(long timeout, TimeUnit unit)
984          throws InterruptedException, ExecutionException, TimeoutException {
985 <        long nanos = unit.toNanos(timeout);
986 <        int s = awaitDone(null, false, true, true, nanos);
985 >        long nanos = unit.toNanos(timeout), deadline;
986 >        int s;
987 >        if (Thread.interrupted())
988 >            s = ABNORMAL;
989 >        else if ((s = status) >= 0 && nanos > 0L) {
990 >            if ((deadline = nanos + System.nanoTime()) == 0L)
991 >                deadline = 1L;
992 >            s = awaitDone(s, false, true, deadline);
993 >        }
994          if (s >= 0 || (s & ABNORMAL) != 0)
995              reportExecutionException(s);
996          return getRawResult();
# Line 985 | Line 1003 | public abstract class ForkJoinTask<V> im
1003       * known to have aborted.
1004       */
1005      public final void quietlyJoin() {
1006 <        if (status >= 0)
1007 <            awaitDone(null, false, false, false, 0L);
1006 >        int s;
1007 >        if ((s = status) >= 0)
1008 >            awaitDone(s, false, false, 0L);
1009      }
1010  
992
1011      /**
1012       * Commences performing this task and awaits its completion if
1013       * necessary, without returning its result or throwing its
1014       * exception.
1015       */
1016      public final void quietlyInvoke() {
1017 <        if (doExec() >= 0)
1018 <            awaitDone(null, true, false, false, 0L);
1017 >        int s;
1018 >        if ((s = doExec()) >= 0)
1019 >            awaitDone(s, true, false, 0L);
1020      }
1021  
1022 <    // Versions of join/get for pool.invoke* methods that use external,
1023 <    // possibly-non-commonPool submits
1024 <
1025 <    final void awaitPoolInvoke(ForkJoinPool pool) {
1026 <        awaitDone(pool, false, false, false, 0L);
1027 <    }
1028 <    final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
1029 <        awaitDone(pool, false, true, true, nanos);
1030 <    }
1031 <    final V joinForPoolInvoke(ForkJoinPool pool) {
1032 <        int s = awaitDone(pool, false, false, false, 0L);
1033 <        if ((s & ABNORMAL) != 0)
1034 <            reportException(s);
1035 <        return getRawResult();
1036 <    }
1037 <    final V getForPoolInvoke(ForkJoinPool pool)
1038 <        throws InterruptedException, ExecutionException {
1039 <        int s = awaitDone(pool, false, true, false, 0L);
1040 <        if ((s & ABNORMAL) != 0)
1041 <            reportExecutionException(s);
1042 <        return getRawResult();
1022 >    /**
1023 >     * Tries to join this task, returning true if it completed
1024 >     * (possibly exceptionally) before the given timeout and/or the
1025 >     * the current thread has been interrupted, else false.
1026 >     *
1027 >     * @param timeout the maximum time to wait
1028 >     * @param unit the time unit of the timeout argument
1029 >     * @return true if this task completed
1030 >     * @throws InterruptedException if the current thread was
1031 >     * interrupted while waiting
1032 >     */
1033 >    public final boolean quietlyJoin(long timeout, TimeUnit unit)
1034 >        throws InterruptedException {
1035 >        int s;
1036 >        long nanos = unit.toNanos(timeout), deadline;
1037 >        if (Thread.interrupted())
1038 >            s = ABNORMAL;
1039 >        else if ((s = status) >= 0 && nanos > 0L) {
1040 >            if ((deadline = nanos + System.nanoTime()) == 0L)
1041 >                deadline = 1L;
1042 >            s = awaitDone(s, false, true, deadline);
1043 >        }
1044 >        if (s == ABNORMAL)
1045 >            throw new InterruptedException();
1046 >        else
1047 >            return (s < 0);
1048      }
1049 <    final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1050 <        throws InterruptedException, ExecutionException, TimeoutException {
1051 <        int s = awaitDone(pool, false, true, true, nanos);
1052 <        if (s >= 0 || (s & ABNORMAL) != 0)
1053 <            reportExecutionException(s);
1054 <        return getRawResult();
1049 >
1050 >    /**
1051 >     * Tries to join this task, returning true if it completed
1052 >     * (possibly exceptionally) before the given timeout.
1053 >     *
1054 >     * @param timeout the maximum time to wait
1055 >     * @param unit the time unit of the timeout argument
1056 >     * @return true if this task completed
1057 >     */
1058 >    public final boolean quietlyJoinUninterruptibly(long timeout,
1059 >                                                    TimeUnit unit) {
1060 >        int s;
1061 >        long nanos = unit.toNanos(timeout), deadline;
1062 >        boolean interrupted = Thread.interrupted();
1063 >        if ((s = status) >= 0 && nanos > 0L) {
1064 >            if ((deadline = nanos + System.nanoTime()) == 0L)
1065 >                deadline = 1L;
1066 >            s = awaitDone(s, false, false, deadline);
1067 >        }
1068 >        if (interrupted || s == ABNORMAL)
1069 >            Thread.currentThread().interrupt();
1070 >        return (s < 0);
1071      }
1072  
1073      /**
# Line 1038 | Line 1078 | public abstract class ForkJoinTask<V> im
1078       * all are processed.
1079       */
1080      public static void helpQuiesce() {
1081 <        Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1042 <        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1043 <            (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1044 <            p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1045 <        else
1046 <            ForkJoinPool.commonPool().externalHelpQuiescePool(Long.MAX_VALUE, false);
1081 >        ForkJoinPool.helpQuiescePool(null, Long.MAX_VALUE, false);
1082      }
1083  
1084      /**
# Line 1105 | Line 1140 | public abstract class ForkJoinTask<V> im
1140       * @return {@code true} if unforked
1141       */
1142      public boolean tryUnfork() {
1143 <        Thread t; ForkJoinPool.WorkQueue q;
1144 <        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1145 <            ? (q = ((ForkJoinWorkerThread)t).workQueue) != null
1146 <               && q.tryUnpush(this)
1147 <            : (q = ForkJoinPool.commonQueue()) != null
1148 <               && q.externalTryUnpush(this);
1143 >        Thread t; ForkJoinPool.WorkQueue q; boolean owned;
1144 >        if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1145 >            q = ((ForkJoinWorkerThread)t).workQueue;
1146 >        else
1147 >            q = ForkJoinPool.commonQueue();
1148 >        return (q != null && q.tryUnpush(this, owned));
1149      }
1150  
1151      /**
# Line 1514 | Line 1549 | public abstract class ForkJoinTask<V> im
1549       *
1550       * @since 17
1551       */
1552 <    // adaptInterruptible deferred to its own independent change
1553 <    // https://bugs.openjdk.java.net/browse/JDK-8246587
1519 <    /* TODO: public */ private static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1552 >    public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1553 >        // https://bugs.openjdk.java.net/browse/JDK-8246587
1554          return new AdaptedInterruptibleCallable<T>(callable);
1555      }
1556  
# Line 1558 | Line 1592 | public abstract class ForkJoinTask<V> im
1592          U = Unsafe.getUnsafe();
1593          STATUS = U.objectFieldOffset(ForkJoinTask.class, "status");
1594          AUX = U.objectFieldOffset(ForkJoinTask.class, "aux");
1595 +        Class<?> dep1 = LockSupport.class; // ensure loaded
1596 +        Class<?> dep2 = Aux.class;
1597      }
1598  
1599   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines