ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.171 by jsr166, Fri Mar 22 22:03:11 2013 UTC vs.
Revision 1.172 by dl, Mon Apr 8 15:26:54 2013 UTC

# Line 325 | Line 325 | public class ForkJoinPool extends Abstra
325       *
326       * Signalling.  We create or wake up workers only when there
327       * appears to be at least one task they might be able to find and
328 <     * execute. However, many other threads may notice the same task
329 <     * and each signal to wake up a thread that might take it. So in
330 <     * general, pools will be over-signalled.  When a submission is
331 <     * added or another worker adds a task to a queue that has fewer
332 <     * than two tasks, they signal waiting workers (or trigger
333 <     * creation of new ones if fewer than the given parallelism level
334 <     * -- signalWork), and may leave a hint to the unparked worker to
335 <     * help signal others upon wakeup).  These primary signals are
336 <     * buttressed by others (see method helpSignal) whenever other
337 <     * threads scan for work or do not have a task to process.  On
338 <     * most platforms, signalling (unpark) overhead time is noticeably
339 <     * long, and the time between signalling a thread and it actually
340 <     * making progress can be very noticeably long, so it is worth
341 <     * offloading these delays from critical paths as much as
342 <     * possible.
328 >     * execute.
329       *
330       * Trimming workers. To release resources after periods of lack of
331       * use, a worker starting to wait when the pool is quiescent will
# Line 694 | Line 680 | public class ForkJoinPool extends Abstra
680           */
681          final void push(ForkJoinTask<?> task) {
682              ForkJoinTask<?>[] a; ForkJoinPool p;
683 <            int s = top, m, n;
683 >            int s = top, m;
684              if ((a = array) != null) {    // ignore if queue removed
685                  int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
686                  U.putOrderedObject(a, j, task);
687 <                if ((n = (top = s + 1) - base) <= 2) {
702 <                    if ((p = pool) != null)
703 <                        p.signalWork(this);
704 <                }
705 <                else if (n >= m)
687 >                if ((top = s + 1) - base >= m)
688                      growArray();
689 +                else if ((p = pool) != null)
690 +                    p.signalWork(this);
691              }
692          }
693  
# Line 1195 | Line 1179 | public class ForkJoinPool extends Abstra
1179      static final int FIFO_QUEUE          =  1;
1180      static final int SHARED_QUEUE        = -1;
1181  
1182 <    // bounds for #steps in scan loop -- must be power 2 minus 1
1183 <    private static final int MIN_SCAN    = 0x7ff;   // cover estimation slop
1200 <    private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1182 >    // Mininum number of scans before blocking in scan() and related methods
1183 >    private static final int MIN_RESCANS = 0x0fff;
1184  
1185      // Instance fields
1186      volatile long stealCount;                  // collects worker counts
# Line 1423 | Line 1406 | public class ForkJoinPool extends Abstra
1406              (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1407              (q = ws[m & z & SQMASK]) != null &&
1408              U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1409 <            int b = q.base, s = q.top, n, an;
1410 <            if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1411 <                int j = (((an - 1) & s) << ASHIFT) + ABASE;
1409 >            int s = q.top, am;
1410 >            if ((a = q.array) != null && (am = a.length - 1) > s - q.base) {
1411 >                int j = ((am & s) << ASHIFT) + ABASE;
1412                  U.putOrderedObject(a, j, task);
1413                  q.top = s + 1;                     // push on to deque
1414                  q.qlock = 0;
1415 <                if (n <= 2)
1433 <                    signalWork(q);
1415 >                signalWork(q);
1416                  return;
1417              }
1418              q.qlock = 0;
# Line 1537 | Line 1519 | public class ForkJoinPool extends Abstra
1519      /**
1520       * Tries to create or activate a worker if too few are active.
1521       *
1522 <     * @param q the (non-null) queue holding tasks to be signalled
1522 >     * @param q if non-null, the queue holding tasks to be processed
1523       */
1524      final void signalWork(WorkQueue q) {
1525 <        int hint = q.poolIndex;
1525 >        long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w;  Thread p;
1526 >        if ((u = (int)((c = ctl) >>> 32)) < 0) {
1527 >            if ((e = (int)c) > 0) {
1528 >                if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1529 >                    (w = ws[i]) != null) {
1530 >                    long nc = (((long)(w.nextWait & E_MASK)) |
1531 >                               ((long)(u + UAC_UNIT) << 32));
1532 >                    if (w.eventCount == (e | INT_SIGN) &&
1533 >                        U.compareAndSwapLong(this, CTL, c, nc)) {
1534 >                        w.eventCount = (e + E_SEQ) & E_MASK;
1535 >                        if ((p = w.parker) != null)
1536 >                            U.unpark(p);
1537 >                    }
1538 >                    else
1539 >                        retrySignalWork(q);
1540 >                }
1541 >            }
1542 >            else if ((short)u < 0)
1543 >                tryAddWorker();
1544 >        }
1545 >    }
1546 >
1547 >    /**
1548 >     * Fallback version of signalWork, triggered if release fails
1549 >     * and the calling queue is non-empty;
1550 >     */
1551 >    final void retrySignalWork(WorkQueue q) {
1552          long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
1553 <        while ((u = (int)((c = ctl) >>> 32)) < 0) {
1553 >        while ((q != null && !q.isEmpty()) &&
1554 >               (u = (int)((c = ctl) >>> 32)) < 0) {
1555              if ((e = (int)c) > 0) {
1556                  if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1557 <                    (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1557 >                    (w = ws[i]) != null) {
1558                      long nc = (((long)(w.nextWait & E_MASK)) |
1559                                 ((long)(u + UAC_UNIT) << 32));
1560 <                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1561 <                        w.hint = hint;
1560 >                    if (w.eventCount == (e | INT_SIGN) &&
1561 >                        U.compareAndSwapLong(this, CTL, c, nc)) {
1562                          w.eventCount = (e + E_SEQ) & E_MASK;
1563                          if ((p = w.parker) != null)
1564                              U.unpark(p);
1565                          break;
1566                      }
1558                    if (q.top - q.base <= 0)
1559                        break;
1567                  }
1568                  else
1569                      break;
# Line 1622 | Line 1629 | public class ForkJoinPool extends Abstra
1629          if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1630              int ec = w.eventCount;               // ec is negative if inactive
1631              int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1632 <            w.hint = -1;                         // update seed and clear hint
1626 <            int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1632 >            int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1);
1633              do {
1634 <                WorkQueue q; ForkJoinTask<?>[] a; int b;
1635 <                if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1636 <                    (a = q.array) != null) {     // probably nonempty
1637 <                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1638 <                    if (ec >= 0) {
1639 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1640 <                            U.getObjectVolatile(a, i);
1634 >                WorkQueue q; int b;
1635 >                if ((q = ws[(r - j) & m]) != null &&
1636 >                    (b = q.base) - q.top < 0) {  // probably nonempty
1637 >                    ForkJoinTask<?>[] a = q.array;
1638 >                    if ((ec >= 0 || (ec = w.eventCount) >= 0) && a != null) {
1639 >                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1640 >                        ForkJoinTask<?> t =
1641 >                            (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1642                          if (q.base == b && t != null &&
1643                              U.compareAndSwapObject(a, i, t, null)) {
1644 <                            U.putOrderedInt(q, QBASE, ++b);
1638 <                            if (b - q.top < 0)
1639 <                                signalWork(q);
1644 >                            U.putOrderedInt(q, QBASE, b + 1);
1645                              return t;            // taken
1646                          }
1647                      }
1648 <                    if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
1649 <                        w.hint = (r + j) & m;    // help signal below
1645 <                        break;                   // cannot take
1646 <                    }
1648 >                    if (j < m)                   // must restart to revisit
1649 >                        break;
1650                  }
1651              } while (--j >= 0);
1652  
1653 <            int h, e, ns; long c, sc; WorkQueue q;
1654 <            if ((ns = w.nsteals) != 0) {
1653 >            int e, ns; long c, sc;
1654 >            if (j >= 0 || plock != ps) {         // incomplete scan
1655 >                if (w.eventCount < 0)            // help activate for next time
1656 >                    signalWork(null);
1657 >            }
1658 >            else if ((e = (int)(c = ctl)) < 0)
1659 >                w.qlock = -1;                    // pool is terminating
1660 >            else if (ec >= 0) {                  // try to enqueue/inactivate
1661 >                long nc = (((long)ec |
1662 >                            ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1663 >                w.nextWait = e;                  // link and mark inactive
1664 >                w.eventCount = ec | INT_SIGN;
1665 >                if (!U.compareAndSwapLong(this, CTL, c, nc))
1666 >                    w.eventCount = ec;           // unmark on CAS failure
1667 >                else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1668 >                    idleAwaitWork(w, nc, c);
1669 >            }
1670 >            else if ((ns = w.nsteals) != 0) {
1671                  if (U.compareAndSwapLong(this, STEALCOUNT,
1672                                           sc = stealCount, sc + ns))
1673                      w.nsteals = 0;               // collect steals and rescan
1674              }
1675 <            else if (plock != ps)                // consistency check
1676 <                ;                                // skip
1677 <            else if ((e = (int)(c = ctl)) < 0)
1678 <                w.qlock = -1;                    // pool is terminating
1679 <            else {
1680 <                if ((h = w.hint) < 0) {
1681 <                    if (ec >= 0) {               // try to enqueue/inactivate
1682 <                        long nc = (((long)ec |
1683 <                                    ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1665 <                        w.nextWait = e;          // link and mark inactive
1666 <                        w.eventCount = ec | INT_SIGN;
1667 <                        if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1668 <                            w.eventCount = ec;   // unmark on CAS failure
1669 <                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1670 <                            idleAwaitWork(w, nc, c);
1671 <                    }
1672 <                    else if (w.eventCount < 0 && ctl == c) {
1673 <                        Thread wt = Thread.currentThread();
1674 <                        Thread.interrupted();    // clear status
1675 <                        U.putObject(wt, PARKBLOCKER, this);
1676 <                        w.parker = wt;           // emulate LockSupport.park
1677 <                        if (w.eventCount < 0)    // recheck
1678 <                            U.park(false, 0L);   // block
1679 <                        w.parker = null;
1680 <                        U.putObject(wt, PARKBLOCKER, null);
1681 <                    }
1682 <                }
1683 <                if ((h >= 0 || (h = w.hint) >= 0) &&
1684 <                    (ws = workQueues) != null && h < ws.length &&
1685 <                    (q = ws[h]) != null) {      // signal others before retry
1686 <                    WorkQueue v; Thread p; int u, i, s;
1687 <                    for (int n = (config & SMASK) - 1;;) {
1688 <                        int idleCount = (w.eventCount < 0) ? 0 : -1;
1689 <                        if (((s = idleCount - q.base + q.top) <= n &&
1690 <                             (n = s) <= 0) ||
1691 <                            (u = (int)((c = ctl) >>> 32)) >= 0 ||
1692 <                            (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1693 <                            (v = ws[i]) == null)
1694 <                            break;
1695 <                        long nc = (((long)(v.nextWait & E_MASK)) |
1696 <                                   ((long)(u + UAC_UNIT) << 32));
1697 <                        if (v.eventCount != (e | INT_SIGN) ||
1698 <                            !U.compareAndSwapLong(this, CTL, c, nc))
1699 <                            break;
1700 <                        v.hint = h;
1701 <                        v.eventCount = (e + E_SEQ) & E_MASK;
1702 <                        if ((p = v.parker) != null)
1703 <                            U.unpark(p);
1704 <                        if (--n <= 0)
1705 <                            break;
1706 <                    }
1707 <                }
1675 >            else if (w.eventCount < 0 && ctl == c) {
1676 >                Thread wt = Thread.currentThread();
1677 >                Thread.interrupted();            // clear status
1678 >                U.putObject(wt, PARKBLOCKER, this);
1679 >                w.parker = wt;                   // emulate LockSupport.park
1680 >                if (w.eventCount < 0 && ctl == c)// recheck
1681 >                    U.park(false, 0L);           // block
1682 >                w.parker = null;
1683 >                U.putObject(wt, PARKBLOCKER, null);
1684              }
1685          }
1686          return null;
# Line 1723 | Line 1699 | public class ForkJoinPool extends Abstra
1699       * @param prevCtl the ctl value to restore if thread is terminated
1700       */
1701      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1702 <        if (w != null && w.eventCount < 0 &&
1703 <            !tryTerminate(false, false) && (int)prevCtl != 0 &&
1704 <            ctl == currentCtl) {
1702 >        if (w != null && w.eventCount < 0 && !tryTerminate(false, false) &&
1703 >            (int)prevCtl != 0 && ctl == currentCtl) {
1704 >            int ns = w.nsteals;
1705 >            if (ns != 0) {
1706 >                w.nsteals = 0;
1707 >                long sc;
1708 >                do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1709 >                                                   sc = stealCount, sc + ns));
1710 >            }
1711              int dc = -(short)(currentCtl >>> TC_SHIFT);
1712              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1713              long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1714              Thread wt = Thread.currentThread();
1715 +            int spins = MIN_RESCANS; // poll before blocking
1716              while (ctl == currentCtl) {
1717 <                Thread.interrupted();  // timed variant of version in scan()
1718 <                U.putObject(wt, PARKBLOCKER, this);
1719 <                w.parker = wt;
1737 <                if (ctl == currentCtl)
1738 <                    U.park(false, parkTime);
1739 <                w.parker = null;
1740 <                U.putObject(wt, PARKBLOCKER, null);
1741 <                if (ctl != currentCtl)
1742 <                    break;
1743 <                if (deadline - System.nanoTime() <= 0L &&
1744 <                    U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1745 <                    w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1746 <                    w.hint = -1;
1747 <                    w.qlock = -1;   // shrink
1748 <                    break;
1717 >                if (spins >= 0) {
1718 >                    if (w.nextSeed() < 0)
1719 >                        --spins;
1720                  }
1721 <            }
1722 <        }
1723 <    }
1724 <
1725 <    /**
1726 <     * Scans through queues looking for work while joining a task; if
1727 <     * any present, signals. May return early if more signalling is
1728 <     * detectably unneeded.
1729 <     *
1730 <     * @param task return early if done
1731 <     * @param origin an index to start scan
1732 <     */
1733 <    private void helpSignal(ForkJoinTask<?> task, int origin) {
1734 <        WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1764 <        if (task != null && task.status >= 0 &&
1765 <            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1766 <            (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1767 <            outer: for (int k = origin, j = m; j >= 0; --j) {
1768 <                WorkQueue q = ws[k++ & m];
1769 <                for (int n = m;;) { // limit to at most m signals
1770 <                    if (task.status < 0)
1771 <                        break outer;
1772 <                    if (q == null ||
1773 <                        ((s = -q.base + q.top) <= n && (n = s) <= 0))
1721 >                else {
1722 >                    Thread.interrupted();  // timed variant of version in scan()
1723 >                    U.putObject(wt, PARKBLOCKER, this);
1724 >                    w.parker = wt;
1725 >                    if (ctl == currentCtl)
1726 >                        U.park(false, parkTime);
1727 >                    w.parker = null;
1728 >                    U.putObject(wt, PARKBLOCKER, null);
1729 >                    if (ctl != currentCtl)
1730 >                        break;
1731 >                    if (deadline - System.nanoTime() <= 0L &&
1732 >                        U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1733 >                        w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1734 >                        w.qlock = -1;   // shrink
1735                          break;
1775                    if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1776                        (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1777                        (w = ws[i]) == null)
1778                        break outer;
1779                    long nc = (((long)(w.nextWait & E_MASK)) |
1780                               ((long)(u + UAC_UNIT) << 32));
1781                    if (w.eventCount != (e | INT_SIGN))
1782                        break outer;
1783                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1784                        w.eventCount = (e + E_SEQ) & E_MASK;
1785                        if ((p = w.parker) != null)
1786                            U.unpark(p);
1787                        if (--n <= 0)
1788                            break;
1736                      }
1737                  }
1738              }
# Line 1888 | Line 1835 | public class ForkJoinPool extends Abstra
1835          WorkQueue[] ws; int m;
1836          if (task != null && (ws = workQueues) != null &&
1837              (m = ws.length - 1) >= 0) {
1838 <            for (int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;;) {
1838 >            int scans = m | MIN_RESCANS;
1839 >            for (int j = 1, k = scans;;) {
1840                  WorkQueue q; int s;
1841                  if ((s = task.status) < 0)
1842                      return s;
1843 <                if (((q = ws[j & m]) == null || !q.pollAndExecCC(task)) &&
1844 <                    (j -= 2) <= 0)
1843 >                if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
1844 >                    k = scans;
1845 >                else if (--k >= 0)
1846 >                    j += 2;
1847 >                else
1848                      break;
1849              }
1850          }
# Line 1964 | Line 1915 | public class ForkJoinPool extends Abstra
1915              joiner.currentJoin = task;
1916              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
1917                           joiner.tryRemoveAndExec(task)); // process local tasks
1918 <            if (s >= 0 && (s = task.status) >= 0) {
1919 <                helpSignal(task, joiner.poolIndex);
1920 <                if ((s = task.status) >= 0 &&
1970 <                    (task instanceof CountedCompleter))
1971 <                    s = helpComplete(task);
1972 <            }
1918 >            if (s >= 0 && (s = task.status) >= 0 &&
1919 >                (task instanceof CountedCompleter))
1920 >                s = helpComplete(task);
1921              while (s >= 0 && (s = task.status) >= 0) {
1922                  if ((!joiner.isEmpty() ||           // try helping
1923                       (s = tryHelpStealer(joiner, task)) == 0) &&
1924                      (s = task.status) >= 0) {
1925 <                    helpSignal(task, joiner.poolIndex);
1978 <                    if ((s = task.status) >= 0 && tryCompensate()) {
1925 >                    if (tryCompensate()) {
1926                          if (task.trySetSignal() && (s = task.status) >= 0) {
1927                              synchronized (task) {
1928                                  if (task.status >= 0) {
# Line 1988 | Line 1935 | public class ForkJoinPool extends Abstra
1935                                      task.notifyAll();
1936                              }
1937                          }
1938 <                        long c;                          // re-activate
1939 <                        do {} while (!U.compareAndSwapLong
1940 <                                     (this, CTL, c = ctl, c + AC_UNIT));
1938 >                        // reactivate
1939 >                        if (false) { // possible hotspot bug?
1940 >                            U.getAndAddLong(this, CTL, AC_UNIT);
1941 >                        }
1942 >                        else {
1943 >                            long c;
1944 >                            do {} while (!U.compareAndSwapLong
1945 >                                         (this, CTL, c = ctl, c + AC_UNIT));
1946 >                        }
1947                      }
1948                  }
1949              }
# Line 2014 | Line 1967 | public class ForkJoinPool extends Abstra
1967              joiner.currentJoin = task;
1968              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
1969                           joiner.tryRemoveAndExec(task));
1970 <            if (s >= 0 && (s = task.status) >= 0) {
1971 <                helpSignal(task, joiner.poolIndex);
1972 <                if ((s = task.status) >= 0 &&
2020 <                    (task instanceof CountedCompleter))
2021 <                    s = helpComplete(task);
2022 <            }
1970 >            if (s >= 0 && (s = task.status) >= 0 &&
1971 >                (task instanceof CountedCompleter))
1972 >                s = helpComplete(task);
1973              if (s >= 0 && joiner.isEmpty()) {
1974                  do {} while (task.status >= 0 &&
1975                               tryHelpStealer(joiner, task) > 0);
# Line 2058 | Line 2008 | public class ForkJoinPool extends Abstra
2008      final void helpQuiescePool(WorkQueue w) {
2009          for (boolean active = true;;) {
2010              long c; WorkQueue q; ForkJoinTask<?> t; int b;
2011 <            while ((t = w.nextLocalTask()) != null) {
2062 <                if (w.base - w.top < 0)
2063 <                    signalWork(w);
2011 >            while ((t = w.nextLocalTask()) != null)
2012                  t.doExec();
2065            }
2013              if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
2014                  if (!active) {      // re-establish active count
2015                      active = true;
2016                      do {} while (!U.compareAndSwapLong
2017                                   (this, CTL, c = ctl, c + AC_UNIT));
2018                  }
2019 <                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2073 <                    if (q.base - q.top < 0)
2074 <                        signalWork(q);
2019 >                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2020                      w.runSubtask(t);
2076                }
2021              }
2022              else if (active) {       // decrement active count without queuing
2023                  long nc = (c = ctl) - AC_UNIT;
# Line 2100 | Line 2044 | public class ForkJoinPool extends Abstra
2044                  return t;
2045              if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2046                  return null;
2047 <            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2104 <                if (q.base - q.top < 0)
2105 <                    signalWork(q);
2047 >            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2048                  return t;
2107            }
2049          }
2050      }
2051  
# Line 2216 | Line 2157 | public class ForkJoinPool extends Abstra
2157                      for (int i = 0; i < ws.length; ++i) {
2158                          if ((w = ws[i]) != null) {
2159                              if (!w.isEmpty()) {    // signal unprocessed tasks
2160 <                                signalWork(w);
2160 >                                signalWork(null);
2161                                  return false;
2162                              }
2163                              if ((i & 1) != 0 && w.eventCount >= 0)
# Line 2372 | Line 2313 | public class ForkJoinPool extends Abstra
2313                  else
2314                      break;
2315              }
2316 <            helpComplete(root);
2316 >            if (root.status >= 0)
2317 >                helpComplete(root);
2318          }
2319      }
2320  
# Line 2406 | Line 2348 | public class ForkJoinPool extends Abstra
2348                          q.qlock = 0;
2349                  }
2350              }
2351 <            if (t.status >= 0) {
2352 <                if (t instanceof CountedCompleter)
2411 <                    p.externalHelpComplete(q, t);
2412 <                else
2413 <                    p.helpSignal(t, q.poolIndex);
2414 <            }
2351 >            if (t.status >= 0 && (t instanceof CountedCompleter))
2352 >                p.externalHelpComplete(q, t);
2353          }
2354      }
2355  
# Line 3126 | Line 3064 | public class ForkJoinPool extends Abstra
3064                  ForkJoinTask<?> t; WorkQueue q; int b;
3065                  if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3066                      found = true;
3067 <                    if ((t = q.pollAt(b)) != null) {
3130 <                        if (q.base - q.top < 0)
3131 <                            signalWork(q);
3067 >                    if ((t = q.pollAt(b)) != null)
3068                          t.doExec();
3133                    }
3069                      break;
3070                  }
3071              }
# Line 3245 | Line 3180 | public class ForkJoinPool extends Abstra
3180          Thread t = Thread.currentThread();
3181          if (t instanceof ForkJoinWorkerThread) {
3182              ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3183 <            while (!blocker.isReleasable()) { // variant of helpSignal
3249 <                WorkQueue[] ws; WorkQueue q; int m, u;
3250 <                if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
3251 <                    for (int i = 0; i <= m; ++i) {
3252 <                        if (blocker.isReleasable())
3253 <                            return;
3254 <                        if ((q = ws[i]) != null && q.base - q.top < 0) {
3255 <                            p.signalWork(q);
3256 <                            if ((u = (int)(p.ctl >>> 32)) >= 0 ||
3257 <                                (u >> UAC_SHIFT) >= 0)
3258 <                                break;
3259 <                        }
3260 <                    }
3261 <                }
3183 >            while (!blocker.isReleasable()) {
3184                  if (p.tryCompensate()) {
3185                      try {
3186                          do {} while (!blocker.isReleasable() &&

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines