ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.372 by dl, Fri Feb 7 13:53:19 2020 UTC vs.
Revision 1.373 by dl, Tue Feb 11 23:59:56 2020 UTC

# Line 819 | Line 819 | public class ForkJoinPool extends Abstra
819      static final int SHUTDOWN     = 1 << 24;
820      static final int TERMINATED   = 1 << 25;
821      static final int STOP         = 1 << 31;       // must be negative
822 <    static final int ADJUST       = 1 << 16;       // tryCompensate return
822 >    static final int UNCOMPENSATE = 1 << 16;       // tryCompensate return
823  
824      /**
825       * Initial capacity of work-stealing queue array.  Must be a power
# Line 1002 | Line 1002 | public class ForkJoinPool extends Abstra
1002          /**
1003           * Pops the given task for owner only if it is at the current top.
1004           */
1005 <        final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
1005 >        final boolean tryUnpush(ForkJoinTask<?> task) {
1006 >            int s = top, cap, k; ForkJoinTask<?>[] a;
1007 >            if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
1008 >                casSlotToNull(a, (cap - 1) & s, task)) {
1009 >                top = s;
1010 >                return true;
1011 >            }
1012 >            return false;
1013 >        }
1014 >
1015 >        /**
1016 >         * Locking version of tryUnpush.
1017 >         */
1018 >        final boolean externalTryUnpush(ForkJoinTask<?> task) {
1019              boolean taken = false;
1020              int s = top, cap, k; ForkJoinTask<?>[] a;
1021              if ((a = array) != null && (cap = a.length) > 0 &&
1022 <                a[k = (cap - 1) & (s - 1)] == task) {
1023 <                if (owned || tryLock()) {
1024 <                    if ((owned || (top == s && array == a)) &&
1025 <                        (taken = casSlotToNull(a, k, task)))
1026 <                        top = s - 1;
1014 <                    if (!owned)
1015 <                        source = 0; // release lock
1016 <                }
1022 >                a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
1023 >                if (top == s && array == a &&
1024 >                    (taken = casSlotToNull(a, k, task)))
1025 >                    top = s - 1;
1026 >                source = 0; // release lock
1027              }
1028              return taken;
1029          }
# Line 1209 | Line 1219 | public class ForkJoinPool extends Abstra
1219          }
1220  
1221          /**
1222 <         * Returns true if owned and not known to be blocked.
1222 >         * Returns true if owned by a worker thread and not known to be blocked.
1223           */
1224          final boolean isApparentlyUnblocked() {
1225              Thread wt; Thread.State s;
# Line 1612 | Line 1622 | public class ForkJoinPool extends Abstra
1622              c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK);
1623          } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c)));
1624  
1625 +        Thread.interrupted();                // clear status
1626          LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
1627          long deadline = 0L;                  // nonzero if possibly quiescent
1628          int ac = (int)(c >> RC_SHIFT), md;
# Line 1643 | Line 1654 | public class ForkJoinPool extends Abstra
1654                  return -1;                   // trigger quiescent termination
1655          }
1656  
1657 <        for (;;) {                           // await activation or termination
1658 <            if (w.phase >= 0) {
1659 <                LockSupport.setCurrentBlocker(null);
1649 <                return 0;
1650 <            }
1657 >        for (boolean alt = false;;) {        // await activation or termination
1658 >            if (w.phase >= 0)
1659 >                break;
1660              else if (mode < 0)
1661                  return -1;
1662 <            else if (deadline != 0L &&
1663 <                     deadline - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1664 <                     compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1665 <                                          (w.stackPred & SP_MASK)))) {
1666 <                w.phase = QUIET;
1667 <                return -1;                   // drop on timeout
1668 <            }
1669 <            else if (!Thread.interrupted() && (int)(ctl >> RC_SHIFT) <= ac) {
1670 <                if (deadline != 0L)
1671 <                    LockSupport.parkUntil(deadline);
1672 <                else
1673 <                    LockSupport.park();
1674 <            }
1662 >            else if ((int)(ctl >> RC_SHIFT) > ac)
1663 >                Thread.onSpinWait();         // signal in progess
1664 >            else if (!(alt = !alt)) {        // check between park calls
1665 >                if (!Thread.interrupted() && deadline != 0L &&
1666 >                    deadline - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1667 >                    compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1668 >                                         (w.stackPred & SP_MASK)))) {
1669 >                    w.phase = QUIET;
1670 >                    return -1;               // drop on timeout
1671 >                }
1672 >            }
1673 >            else if (deadline != 0L)
1674 >                LockSupport.parkUntil(deadline);
1675 >            else
1676 >                LockSupport.park();
1677          }
1678 +        LockSupport.setCurrentBlocker(null);
1679 +        return 0;
1680      }
1681  
1682      // Utilities used by ForkJoinTask
# Line 1688 | Line 1701 | public class ForkJoinPool extends Abstra
1701       */
1702      final boolean canStop() {
1703          outer: for (long oldSum = 0L;;) { // repeat until stable
1704 <            int md;
1705 <            WorkQueue[] qs = queues;
1693 <            long c = ctl, checkSum = c;
1694 <            if (((md = mode) & STOP) != 0 || qs == null)
1704 >            int md; WorkQueue[] qs;  long c;
1705 >            if ((qs = queues) == null || ((md = mode) & STOP) != 0)
1706                  return true;
1707 <            if ((md & SMASK) + (int)(c >> RC_SHIFT) > 0)
1707 >            if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0)
1708                  break;
1709 +            long checkSum = c;
1710              for (int i = 1; i < qs.length; i += 2) { // scan submitters
1711 <                WorkQueue q; ForkJoinTask<?>[] a; int s, cap;
1712 <                long u = ((long)i) << 32;
1713 <                if ((q = qs[i]) == null || (a = q.array) == null ||
1714 <                    (cap = a.length) <= 0)
1715 <                    checkSum += u;
1704 <                else if ((s = q.top) == q.base && a[(cap - 1) & s] == null &&
1705 <                         q.source == 0)
1706 <                    checkSum += u + s;
1707 <                else
1711 >                WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap;
1712 >                if ((q = qs[i]) != null && (a = q.array) != null &&
1713 >                    (cap = a.length) > 0 &&
1714 >                    ((s = q.top) != q.base || a[(cap - 1) & s] != null ||
1715 >                     q.source != 0))
1716                      break outer;
1717 +                checkSum += (((long)i) << 32) ^ s;
1718              }
1719              if (oldSum == (oldSum = checkSum) && queues == qs)
1720                  return true;
# Line 1722 | Line 1731 | public class ForkJoinPool extends Abstra
1731       * unblocked.
1732       *
1733       * @param c incoming ctl value
1734 <     * @return ADJUST: block then adjust, 0: block without adjust, -1 : retry
1734 >     * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1735       */
1736      private int tryCompensate(long c) {
1737          Predicate<? super ForkJoinPool> sat;
# Line 1742 | Line 1751 | public class ForkJoinPool extends Abstra
1751                      if (compareAndSetCtl(c, nc)) {
1752                          v.phase = sp;
1753                          LockSupport.unpark(vt);
1754 <                        return ADJUST;
1754 >                        return UNCOMPENSATE;
1755                      }
1756                  }
1757                  return -1;                        // retry
1758              }
1759              else if (active > minActive) {        // reduce parallelism
1760                  long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1761 <                return compareAndSetCtl(c, nc) ? ADJUST : -1;
1761 >                return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1762              }
1763          }
1764          if (total < maxTotal) {                   // expand pool
1765              long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1766 <            return !compareAndSetCtl(c, nc) ? -1 : !createWorker() ? 0 : ADJUST;
1766 >            return (!compareAndSetCtl(c, nc) ? -1 :
1767 >                    !createWorker() ? 0 : UNCOMPENSATE);
1768          }
1769          else if (!compareAndSetCtl(c, c))         // validate
1770              return -1;
# Line 1779 | Line 1789 | public class ForkJoinPool extends Abstra
1789       *
1790       * @param task the task
1791       * @param w caller's WorkQueue
1792 <     * @return task status on exit, or ADJUST for compensated blocking
1792 >     * @return task status on exit, or UNCOMPENSATE for compensated blocking
1793       */
1794      final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
1795          int s = 0;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines