ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.406 by dl, Fri Mar 25 12:29:55 2022 UTC vs.
Revision 1.407 by dl, Mon Apr 4 12:02:41 2022 UTC

# Line 210 | Line 210 | public class ForkJoinPool extends Abstra
210       * moded accesses and/or fences for other control, with modes
211       * reflecting the presence or absence of other contextual sync
212       * provided by atomic and/or volatile accesses. Some methods (or
213 <     * their primary loops) begin with an acquire fence that amounts
214 <     * to an acquiring read of "this" to cover all fields (which is
213 >     * their primary loops) begin with an acquire fence or
214 >     * otherwise-unnecessary valatile read that amounts to an
215 >     * acquiring read of "this" to cover all fields (which is
216       * sometimes stronger than necessary, but less brittle). Some
217       * constructions are intentionally racy because they use read
218       * values as hints, not for correctness.
# Line 370 | Line 371 | public class ForkJoinPool extends Abstra
371       * an effect in creating threads or letting them time out and
372       * terminate when idle.
373       *
374 <     * Field "mode" mainly holds lifetime status, atomically and
374 >     * Field "runState" holds lifetime status, atomically and
375       * monotonically setting SHUTDOWN, STOP, and finally TERMINATED
376       * bits. It is updated only via bitwise atomics (getAndBitwiseOr).
377       *
# Line 563 | Line 564 | public class ForkJoinPool extends Abstra
564       * status, but they will also immediately terminate. To conform to
565       * ExecutorService invoke, invokeAll, and invokeAny specs, we must
566       * track pool status while waiting in ForkJoinTask.awaitDone, and
567 <     * interrupt interruptible callers on termination.
567 >     * interrupt interruptible callers on termination, while also
568 >     * avoiding cancelling other tasks that are normally completing
569 >     * during quiescent termination. This is tracked by recording
570 >     * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
571 >     * argument to joining methods.
572       *
573       * Trimming workers. To release resources after periods of lack of
574       * use, a worker starting to wait when the pool is quiescent will
# Line 1049 | Line 1054 | public class ForkJoinPool extends Abstra
1054          final int getAndSetAccess(int v) {
1055              return U.getAndSetInt(this, ACCESS, v);
1056          }
1057 +        final void releaseAccess() {
1058 +            U.putIntRelease(this, ACCESS, 0);
1059 +        }
1060  
1061          /**
1062           * Constructor. For owned queues, most fields are initialized
# Line 1057 | Line 1065 | public class ForkJoinPool extends Abstra
1065          WorkQueue(ForkJoinWorkerThread owner, int config) {
1066              this.owner = owner;
1067              this.config = config;
1068 +            base = top = 1;
1069          }
1070  
1071          /**
# Line 1070 | Line 1079 | public class ForkJoinPool extends Abstra
1079           * Returns the approximate number of tasks in the queue.
1080           */
1081          final int queueSize() {
1082 <            int unused = access, n = top - base; // for ordering effect
1083 <            return (n < 0) ? 0 : n;   // ignore transient negative
1082 >            int unused = access;            // for ordering effect
1083 >            return Math.max(top - base, 0); // ignore transient negative
1084          }
1085  
1086          /**
# Line 1128 | Line 1137 | public class ForkJoinPool extends Abstra
1137              if (p - b > 0 && a != null && (cap = a.length) > 0) {
1138                  do {
1139                      if (fifo == 0 || (nb = b + 1) == p) {
1140 <                        if ((t = getAndClearSlot(a, (cap - 1) & s)) != null) {
1140 >                        if ((t = getAndClearSlot(a, (cap - 1) & s)) != null)
1141                              top = s;
1133                            U.storeFence();
1134                        }
1142                          break;                   // lost race for only task
1143                      }
1144                      else if ((t = getAndClearSlot(a, (cap - 1) & b)) != null) {
1145                          base = nb;
1139                        U.storeFence();
1146                          break;
1147                      }
1148                      else {
# Line 1146 | Line 1152 | public class ForkJoinPool extends Abstra
1152                          }
1153                      }
1154                  } while (p - b > 0);
1155 +                U.storeStoreFence(); // for timely index updates
1156              }
1157              return t;
1158          }
# Line 1162 | Line 1169 | public class ForkJoinPool extends Abstra
1169           * Pops the given task only if it is at the current top.
1170           */
1171          final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
1165            boolean taken = false;
1172              ForkJoinTask<?>[] a = array;
1173              int p = top, s, cap, k;
1174              if (task != null && base != p && a != null && (cap = a.length) > 0 &&
1175                  a[k = (cap - 1) & (s = p - 1)] == task) {
1176                  if (owned || getAndSetAccess(1) == 0) {
1177 <                    if ((owned || (top == p && a[k] == task)) &&
1178 <                        getAndClearSlot(a, k) != null) {
1179 <                        taken = true;
1177 >                    if (top != p || a[k] != task ||
1178 >                        getAndClearSlot(a, k) == null)
1179 >                        access = 0;
1180 >                    else {
1181                          top = s;
1182 <                        U.storeFence();
1182 >                        releaseAccess();
1183 >                        return true;
1184                      }
1177                    if (!owned)
1178                        access = 0;
1185                  }
1186              }
1187 <            return taken;
1187 >            return false;
1188          }
1189  
1190          /**
# Line 1249 | Line 1255 | public class ForkJoinPool extends Abstra
1255                      else if (t != null) {
1256                          if (casSlotToNull(a, k, t)) {
1257                              base = nb;
1258 <                            U.storeFence();
1258 >                            U.storeStoreFence();
1259                              return t;
1260                          }
1261                          break;                   // contended
# Line 1288 | Line 1294 | public class ForkJoinPool extends Abstra
1294           * @return task status if removed, else 0
1295           */
1296          final int tryRemoveAndExec(ForkJoinTask<?> task, boolean owned) {
1291            boolean taken = false;
1297              ForkJoinTask<?>[] a = array;
1298              int p = top, s = p - 1, d = p - base, cap;
1299              if (task != null && d > 0 && a != null && (cap = a.length) > 0) {
# Line 1297 | Line 1302 | public class ForkJoinPool extends Abstra
1302                      if ((t = a[k = i & m]) == task) {
1303                          if (!owned && getAndSetAccess(1) != 0)
1304                              break;                 // fail if locked
1305 <                        if ((owned || (top == p && a[k] == task)) &&
1306 <                            getAndClearSlot(a, k) != null) {
1307 <                            taken = true;
1305 >                        else if (top != p || a[k] != task ||
1306 >                                 getAndClearSlot(a, k) == null) {
1307 >                            access = 0;
1308 >                            break;                 // missed
1309 >                        }
1310 >                        else {
1311                              if (i != s && i == base)
1312                                  base = i + 1;      // avoid shift
1313                              else {
# Line 1307 | Line 1315 | public class ForkJoinPool extends Abstra
1315                                      a[j & m] = getAndClearSlot(a, ++j & m);
1316                                  top = s;
1317                              }
1318 <                            U.storeFence();
1318 >                            releaseAccess();
1319 >                            return task.doExec();
1320                          }
1312                        if (!owned)
1313                            access = 0;
1314                        break;
1321                      }
1322                      else if (t == null || --d == 0)
1323                          break;
1324                  }
1325              }
1326 <            if (!taken)
1321 <                return 0;
1322 <            return task.doExec();
1326 >            return 0;
1327          }
1328  
1329          /**
# Line 1334 | Line 1338 | public class ForkJoinPool extends Abstra
1338              int status = 0;
1339              if (task != null) {
1340                  outer: for (;;) {
1337                    boolean taken = false;
1341                      ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1342                      int p, s, cap, k;
1343                      if ((status = task.status) < 0)
# Line 1351 | Line 1354 | public class ForkJoinPool extends Abstra
1354                      }
1355                      if (!owned && getAndSetAccess(1) != 0)
1356                          break;                 // fail if locked
1357 <                    if ((owned || (top == p && a[k] == t)) &&
1355 <                        getAndClearSlot(a, k) != null) {
1356 <                        taken = true;
1357 <                        top = s;
1358 <                        U.storeFence();
1359 <                    }
1360 <                    if (!owned)
1357 >                    if (top != p || a[k] != t || getAndClearSlot(a, k) == null) {
1358                          access = 0;
1359 <                    if (taken) {
1363 <                        t.doExec();
1364 <                        if (limit != 0 && --limit == 0)
1365 <                            break;
1359 >                        break;                 // missed
1360                      }
1361 +                    top = s;
1362 +                    releaseAccess();
1363 +                    t.doExec();
1364 +                    if (limit != 0 && --limit == 0)
1365 +                        break;
1366                  }
1367                  status = task.status;
1368              }
# Line 1397 | Line 1396 | public class ForkJoinPool extends Abstra
1396                              break;
1397                          else if (casSlotToNull(a, k, t)) {
1398                              base = nb;
1399 <                            U.storeFence();
1399 >                            U.storeStoreFence();
1400                              t.doExec();
1401                          }
1402                      }
# Line 1739 | Line 1738 | public class ForkJoinPool extends Abstra
1738      }
1739  
1740      /**
1741 <     * Returns true if any submission queue is detectably nonempty.
1742 <     * Accurate only when workers are quiescent; else conservatively
1741 >     * Returns true if any queue is detectably nonempty.  Accurate
1742 >     * only when workers are quiescent; else conservatively
1743       * approximate.
1744 +     * @param submissionsOnly if true, only check submission queues
1745       */
1746 <    private boolean hasSubmissions() {
1747 <        WorkQueue[] qs; WorkQueue q;
1748 <        int n = ((qs = queues) == null) ? 0 : qs.length;
1749 <        for (int i = 0; i < n; i += 2) {
1750 <            if ((q = qs[i]) != null && (q.access > 0 || q.top - q.base > 0))
1751 <                return true;
1746 >    private boolean hasTasks(boolean submissionsOnly) {
1747 >        int step = submissionsOnly ? 2 : 1;
1748 >        for (int checkSum = 0;;) { // repeat until stable (normally twice)
1749 >            U.loadFence();
1750 >            WorkQueue[] qs = queues;
1751 >            int n = (qs == null) ? 0 : qs.length, sum = 0;
1752 >            for (int i = 0; i < n; i += step) {
1753 >                WorkQueue q; int s;
1754 >                if ((q = qs[i]) != null) {
1755 >                    if (q.access > 0 || (s = q.top) != q.base)
1756 >                        return true;
1757 >                    sum += (s << 16) + i + 1;
1758 >                }
1759 >            }
1760 >            if (checkSum == (checkSum = sum))
1761 >                return false;
1762          }
1753        return false;
1763      }
1764  
1765      /**
# Line 1826 | Line 1835 | public class ForkJoinPool extends Abstra
1835              w.stackPred = (int)pc;               // set ctl stack link
1836          } while (pc != (pc = compareAndExchangeCtl(
1837                              pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
1838 <        if ((qc & RC_MASK) <= 0L)
1838 >        if ((qc & RC_MASK) <= 0L) {
1839 >            if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
1840 >                return 0;                        // check for stragglers
1841 >            if (runState != 0 && tryTerminate(false, false))
1842 >                return -1;                       // quiescent termination
1843              idle = true;
1844 <        WorkQueue[] qs = queues; // to spin for expected #accesses in scan+signal
1845 <        int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) + 4, rs;
1846 <        if (idle && hasSubmissions() && w.phase < 0)
1834 <            reactivate();                        // check for stragglers
1835 <        if ((rs = runState) < 0 ||
1836 <            (rs != 0 && idle && tryTerminate(false, false)))
1837 <            return -1;                           // quiescent termination
1844 >        }
1845 >        WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
1846 >        int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
1847          while ((p = w.phase) < 0 && --spins > 0)
1848 <            Thread.onSpinWait();                 // spin before block
1849 <        if (p < 0) {                             // await signal
1850 <            long deadline = (idle) ? keepAlive + System.currentTimeMillis() : 0L;
1848 >            Thread.onSpinWait();
1849 >        if (p < 0) {
1850 >            long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
1851              LockSupport.setCurrentBlocker(this);
1852 <            for (;;) {
1852 >            for (;;) {                           // await signal or termination
1853 >                if (runState < 0)
1854 >                    return -1;
1855                  w.access = PARKED;               // enable unpark
1856                  if (w.phase < 0) {
1857                      if (idle)
# Line 1864 | Line 1875 | public class ForkJoinPool extends Abstra
1875                  }
1876              }
1877          }
1878 <        return (runState < 0) ? -1 : 0;
1878 >        return 0;
1879      }
1880  
1881      /**
# Line 1876 | Line 1887 | public class ForkJoinPool extends Abstra
1887          do {
1888              if (runState < 0)
1889                  break;
1890 <            if ((c & RC_MASK) > 0L || hasSubmissions())
1890 >            if ((c & RC_MASK) > 0L || hasTasks(false))
1891                  return false;
1892          } while (c != (c = ctl));  // validate
1893          return true;
# Line 1986 | Line 1997 | public class ForkJoinPool extends Abstra
1997              int s; WorkQueue[] qs;
1998              if ((s = task.status) < 0)
1999                  return s;
2000 <            if (!rescan && sctl == (sctl = ctl) &&
2001 <                (s = tryCompensate(sctl, timed)) >= 0)
2002 <                return s;                              // block
2000 >            if (!rescan && sctl == (sctl = ctl)) {
2001 >                if ((s = tryCompensate(sctl, timed)) >= 0)
2002 >                    return s;                              // block
2003 >                if (runState < 0)
2004 >                    return 0;
2005 >            }
2006              rescan = false;
1993            if (runState < 0)
1994                return 0;
2007              int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2008              scan: for (int i = n >>> 1; i > 0; --i, r += 2) {
2009                  int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
# Line 2039 | Line 2051 | public class ForkJoinPool extends Abstra
2051                  }
2052              }
2053          }
2054 <     }
2054 >    }
2055  
2056      /**
2057       * Version of helpJoin for CountedCompleters.
# Line 2065 | Line 2077 | public class ForkJoinPool extends Abstra
2077                      return 0;
2078                  if ((s = tryCompensate(sctl, timed)) >= 0)
2079                      return s;
2080 +                if (runState < 0)
2081 +                    return 0;
2082              }
2083              rescan = false;
2070            if (runState < 0)
2071                return 0;
2084              int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2085              scan: for (int i = n; i > 0; --i, ++r) {
2086                  int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
# Line 2317 | Line 2329 | public class ForkJoinPool extends Abstra
2329      private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty,
2330                                             ForkJoinTask<T> task) {
2331          WorkQueue q; Thread t; ForkJoinWorkerThread wt;
2332 <        U.storeFence();  // ensure safely publishable
2332 >        U.storeStoreFence();  // ensure safely publishable
2333          if (task == null) throw new NullPointerException();
2334          if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2335              (wt = (ForkJoinWorkerThread)t).pool == this)
# Line 2472 | Line 2484 | public class ForkJoinPool extends Abstra
2484                  WorkQueue q; Thread thread;
2485                  if ((q = qs[(r + i) & (n - 1)]) != null &&
2486                      (thread = q.owner) != current && q.access != STOP) {
2487 +                    for (ForkJoinTask<?> t; (t = q.poll(null)) != null; )
2488 +                        ForkJoinTask.cancelIgnoringExceptions(t);
2489                      if (thread != null && !thread.isInterrupted()) {
2490                          q.forcePhaseActive();      // for awaitWork
2491                          try {
# Line 2479 | Line 2493 | public class ForkJoinPool extends Abstra
2493                          } catch (Throwable ignore) {
2494                          }
2495                      }
2482                    for (ForkJoinTask<?> t; (t = q.poll(null)) != null; )
2483                        ForkJoinTask.cancelIgnoringExceptions(t);
2496                  }
2497              }
2498          }
# Line 3262 | Line 3274 | public class ForkJoinPool extends Abstra
3274       * @return {@code true} if there are any queued submissions
3275       */
3276      public boolean hasQueuedSubmissions() {
3277 <        return (runState & TERMINATED) == 0 && hasSubmissions();
3277 >        return hasTasks(true);
3278      }
3279  
3280      /**
# Line 3455 | Line 3467 | public class ForkJoinPool extends Abstra
3467              terminated = false;
3468          }
3469          else if (!(terminated = ((runState & TERMINATED) != 0))) {
3470 <            tryTerminate(false, false); // reduce transient blocking
3470 >            tryTerminate(false, false);  // reduce transient blocking
3471              if ((lock = registrationLock) != null &&
3472                  !(terminated = (((runState & TERMINATED) != 0)))) {
3473                  lock.lock();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines