ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.173 by jsr166, Tue Apr 16 05:53:51 2013 UTC vs.
Revision 1.174 by dl, Thu Apr 18 00:19:35 2013 UTC

# Line 309 | Line 309 | public class ForkJoinPool extends Abstra
309       * has not yet entered the wait queue. We solve this by requiring
310       * a full sweep of all workers (via repeated calls to method
311       * scan()) both before and after a newly waiting worker is added
312 <     * to the wait queue. During a rescan, the worker might release
313 <     * some other queued worker rather than itself, which has the same
314 <     * net effect. Because enqueued workers may actually be rescanning
315 <     * rather than waiting, we set and clear the "parker" field of
316 <     * WorkQueues to reduce unnecessary calls to unpark.  (This
317 <     * requires a secondary recheck to avoid missed signals.)  Note
318 <     * the unusual conventions about Thread.interrupts surrounding
319 <     * parking and other blocking: Because interrupts are used solely
320 <     * to alert threads to check termination, which is checked anyway
321 <     * upon blocking, we clear status (using Thread.interrupted)
322 <     * before any call to park, so that park does not immediately
323 <     * return due to status being set via some other unrelated call to
324 <     * interrupt in user code.
312 >     * to the wait queue.  Because enqueued workers may actually be
313 >     * rescanning rather than waiting, we set and clear the "parker"
314 >     * field of WorkQueues to reduce unnecessary calls to unpark.
315 >     * (This requires a secondary recheck to avoid missed signals.)
316 >     * Note the unusual conventions about Thread.interrupts
317 >     * surrounding parking and other blocking: Because interrupts are
318 >     * used solely to alert threads to check termination, which is
319 >     * checked anyway upon blocking, we clear status (using
320 >     * Thread.interrupted) before any call to park, so that park does
321 >     * not immediately return due to status being set via some other
322 >     * unrelated call to interrupt in user code.
323       *
324       * Signalling.  We create or wake up workers only when there
325       * appears to be at least one task they might be able to find and
326 <     * execute.
326 >     * execute.  When a submission is added or another worker adds a
327 >     * task to a queue that has fewer than two tasks, they signal
328 >     * waiting workers (or trigger creation of new ones if fewer than
329 >     * the given parallelism level -- signalWork).  These primary
330 >     * signals are buttressed by others whenever other threads remove
331 >     * a task from a queue a notice that there are other tasks there
332 >     * as well.  So in general, pools will be over-signalled. On most
333 >     * platforms, signalling (unpark) overhead time is noticeably
334 >     * long, and the time between signalling a thread and it actually
335 >     * making progress can be very noticeably long, so it is worth
336 >     * offloading these delays from critical paths as much as
337 >     * possible.
338       *
339       * Trimming workers. To release resources after periods of lack of
340       * use, a worker starting to wait when the pool is quiescent will
# Line 680 | Line 689 | public class ForkJoinPool extends Abstra
689           */
690          final void push(ForkJoinTask<?> task) {
691              ForkJoinTask<?>[] a; ForkJoinPool p;
692 <            int s = top, m;
692 >            int s = top, m, n;
693              if ((a = array) != null) {    // ignore if queue removed
694 <                int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
694 >                long j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
695                  U.putOrderedObject(a, j, task);
696 <                if ((top = s + 1) - base >= m)
696 >                if ((n = (top = s + 1) - base) <= 2) {
697 >                    if ((p = pool) != null)
698 >                        p.signalWork(this);
699 >                }
700 >                else if (n >= m)
701                      growArray();
689                else if ((p = pool) != null)
690                    p.signalWork(this);
702              }
703          }
704  
# Line 745 | Line 756 | public class ForkJoinPool extends Abstra
756           * appear in ForkJoinPool methods scan and tryHelpStealer.
757           */
758          final ForkJoinTask<?> pollAt(int b) {
759 <            ForkJoinTask<?> t; ForkJoinTask<?>[] a;
759 >            ForkJoinTask<?> t; ForkJoinTask<?>[] a; ForkJoinPool p;
760              if ((a = array) != null) {
761                  int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
762                  if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
763                      base == b &&
764                      U.compareAndSwapObject(a, j, t, null)) {
765                      U.putOrderedInt(this, QBASE, b + 1);
766 +                    if (top - b > 1 && (p = pool) != null)
767 +                        p.signalWork(this);
768                      return t;
769                  }
770              }
# Line 762 | Line 775 | public class ForkJoinPool extends Abstra
775           * Takes next task, if one exists, in FIFO order.
776           */
777          final ForkJoinTask<?> poll() {
778 <            ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
778 >            ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; ForkJoinPool p;
779              while ((b = base) - top < 0 && (a = array) != null) {
780                  int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
781                  t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
# Line 770 | Line 783 | public class ForkJoinPool extends Abstra
783                      if (base == b &&
784                          U.compareAndSwapObject(a, j, t, null)) {
785                          U.putOrderedInt(this, QBASE, b + 1);
786 +                        if (top - b > 1 && (p = pool) != null)
787 +                            p.signalWork(this);
788                          return t;
789                      }
790                  }
# Line 920 | Line 935 | public class ForkJoinPool extends Abstra
935           * Polls for and executes the given task or any other task in
936           * its CountedCompleter computation.
937           */
938 <        final boolean pollAndExecCC(ForkJoinTask<?> root) {
938 >        final boolean pollAndExecCC(ForkJoinTask<?> root, ForkJoinPool spool) {
939              ForkJoinTask<?>[] a; int b; Object o;
940              outer: while (root.status >= 0 && (b = base) - top < 0 &&
941                            (a = array) != null) {
# Line 933 | Line 948 | public class ForkJoinPool extends Abstra
948                          if (base == b &&
949                              U.compareAndSwapObject(a, j, t, null)) {
950                              U.putOrderedInt(this, QBASE, b + 1);
951 +                            if (spool != null && top - b > 1)
952 +                                spool.signalWork(this);
953                              t.doExec();
954                              return true;
955                          }
# Line 1406 | Line 1423 | public class ForkJoinPool extends Abstra
1423              (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1424              (q = ws[m & z & SQMASK]) != null &&
1425              U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1426 <            int s = q.top, am;
1427 <            if ((a = q.array) != null && (am = a.length - 1) > s - q.base) {
1426 >            int s, am, n;
1427 >            if ((a = q.array) != null &&
1428 >                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
1429                  int j = ((am & s) << ASHIFT) + ABASE;
1430                  U.putOrderedObject(a, j, task);
1431                  q.top = s + 1;                     // push on to deque
1432                  q.qlock = 0;
1433 <                signalWork(q);
1433 >                if (n <= 1)
1434 >                    signalWork(q);
1435                  return;
1436              }
1437              q.qlock = 0;
# Line 1473 | Line 1492 | public class ForkJoinPool extends Abstra
1492                      try {                      // locked version of push
1493                          if ((a != null && a.length > s + 1 - q.base) ||
1494                              (a = q.growArray()) != null) {   // must presize
1495 <                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1495 >                            long j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1496                              U.putOrderedObject(a, j, task);
1497                              q.top = s + 1;
1498                              submitted = true;
# Line 1522 | Line 1541 | public class ForkJoinPool extends Abstra
1541       * @param q if non-null, the queue holding tasks to be processed
1542       */
1543      final void signalWork(WorkQueue q) {
1544 <        long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
1545 <        if ((u = (int)((c = ctl) >>> 32)) < 0) {
1546 <            if ((e = (int)c) > 0) {
1547 <                if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1548 <                    (w = ws[i]) != null) {
1530 <                    long nc = (((long)(w.nextWait & E_MASK)) |
1531 <                               ((long)(u + UAC_UNIT) << 32));
1532 <                    if (w.eventCount == (e | INT_SIGN) &&
1533 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1534 <                        w.eventCount = (e + E_SEQ) & E_MASK;
1535 <                        if ((p = w.parker) != null)
1536 <                            U.unpark(p);
1537 <                    }
1538 <                    else
1539 <                        retrySignalWork(q);
1540 <                }
1541 <            }
1542 <            else if ((short)u < 0)
1543 <                tryAddWorker();
1544 <        }
1545 <    }
1546 <
1547 <    /**
1548 <     * Fallback version of signalWork, triggered if release fails
1549 <     * and the calling queue is non-empty;
1550 <     */
1551 <    final void retrySignalWork(WorkQueue q) {
1552 <        long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
1553 <        while ((q != null && !q.isEmpty()) &&
1554 <               (u = (int)((c = ctl) >>> 32)) < 0) {
1555 <            if ((e = (int)c) > 0) {
1556 <                if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1557 <                    (w = ws[i]) != null) {
1558 <                    long nc = (((long)(w.nextWait & E_MASK)) |
1559 <                               ((long)(u + UAC_UNIT) << 32));
1560 <                    if (w.eventCount == (e | INT_SIGN) &&
1561 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1562 <                        w.eventCount = (e + E_SEQ) & E_MASK;
1563 <                        if ((p = w.parker) != null)
1564 <                            U.unpark(p);
1565 <                        break;
1566 <                    }
1567 <                }
1568 <                else
1569 <                    break;
1570 <            }
1571 <            else {
1544 >        for (;;) {
1545 >            long c; int e, u, i; WorkQueue[] ws; WorkQueue w; Thread p;
1546 >            if ((u = (int)((c = ctl) >>> 32)) >= 0)
1547 >                break;
1548 >            if ((e = (int)c) <= 0) {
1549                  if ((short)u < 0)
1550                      tryAddWorker();
1551                  break;
1552              }
1553 +            if ((ws = workQueues) == null || ws.length <= (i = e & SMASK) ||
1554 +                (w = ws[i]) == null)
1555 +                break;
1556 +            int wec = w.eventCount;
1557 +            long nc = (((long)(w.nextWait & E_MASK)) |
1558 +                       ((long)(u + UAC_UNIT) << 32));
1559 +            if (wec == (e | INT_SIGN) &&
1560 +                U.compareAndSwapLong(this, CTL, c, nc)) {
1561 +                w.eventCount = (e + E_SEQ) & E_MASK;
1562 +                if ((p = w.parker) != null)
1563 +                    U.unpark(p);
1564 +                break;
1565 +            }
1566 +            if (q == null || q.base - q.top >= 0) // quit if empty
1567 +                break;
1568          }
1569      }
1570  
# Line 1600 | Line 1592 | public class ForkJoinPool extends Abstra
1592       * The scan terminates upon either finding a non-empty queue, or
1593       * completing the sweep. If the worker is not inactivated, it
1594       * takes and returns a task from this queue. Otherwise, if not
1595 <     * activated, it signals workers (that may include itself) and
1596 <     * returns so caller can retry. Also returns for true if the
1597 <     * worker array may have changed during an empty scan.  On failure
1598 <     * to find a task, we take one of the following actions, after
1599 <     * which the caller will retry calling this method unless
1600 <     * terminated.
1609 <     *
1610 <     * * If pool is terminating, terminate the worker.
1611 <     *
1612 <     * * If not already enqueued, try to inactivate and enqueue the
1613 <     * worker on wait queue. Or, if inactivating has caused the pool
1614 <     * to be quiescent, relay to idleAwaitWork to possibly shrink
1615 <     * pool.
1616 <     *
1617 <     * * If already enqueued and none of the above apply, possibly
1618 <     * park awaiting signal, else lingering to help scan and signal.
1619 <     *
1620 <     * * If a non-empty queue discovered or left as a hint,
1621 <     * help wake up other workers before return.
1595 >     * activated, it tries to activate itself or some other worker by
1596 >     * signalling. Also indicates retry if the worker array may have
1597 >     * changed during an empty scan.  On failure to find a task, if
1598 >     * not already enqueued, tries to inactivate and enqueue the
1599 >     * worker on wait queue and then rescan. Otherwise relays to one
1600 >     * of the actions in onEmptyScan.
1601       *
1602       * @param w the worker (via its WorkQueue)
1603       * @return a task or null if none found
1604       */
1605      private final ForkJoinTask<?> scan(WorkQueue w) {
1606          WorkQueue[] ws; int m;
1607 <        int ps = plock;                          // read plock before ws
1607 >        int ps = plock;                              // read plock before ws
1608          if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1609 <            int ec = w.eventCount;               // ec is negative if inactive
1609 >            int ec = w.eventCount;                   // ec negative if inactive
1610              int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1611 <            int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1);
1612 <            do {
1613 <                WorkQueue q; int b;
1614 <                if ((q = ws[(r - j) & m]) != null &&
1615 <                    (b = q.base) - q.top < 0) {  // probably nonempty
1616 <                    ForkJoinTask<?>[] a = q.array;
1617 <                    if ((ec >= 0 || (ec = w.eventCount) >= 0) && a != null) {
1618 <                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1619 <                        ForkJoinTask<?> t =
1620 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1621 <                        if (q.base == b && t != null &&
1622 <                            U.compareAndSwapObject(a, i, t, null)) {
1623 <                            U.putOrderedInt(q, QBASE, b + 1);
1624 <                            return t;            // taken
1646 <                        }
1611 >            for (int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1);;) {
1612 >                WorkQueue q; int b, s; ForkJoinTask<?>[] a;
1613 >                if ((q = ws[(r - j) & m]) != null && // probably nonempty
1614 >                    (b = q.base) - (s = q.top) < 0 && (a = q.array) != null) {
1615 >                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1616 >                    ForkJoinTask<?> t =
1617 >                        (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1618 >                    if ((ec >= 0 || (ec = w.eventCount) >= 0) &&
1619 >                        q.base == b && t != null &&
1620 >                        U.compareAndSwapObject(a, i, t, null)) {
1621 >                        U.putOrderedInt(q, QBASE, b + 1);
1622 >                        if (q.top - b > 1)
1623 >                            signalWork(q);
1624 >                        return t;
1625                      }
1626 <                    if (j < m)                   // must restart to revisit
1626 >                    if (--j < m) {               // restart to revisit
1627 >                        if (ec < 0)              // help activate
1628 >                            signalWork(q);
1629                          break;
1630 +                    }
1631 +                }
1632 +                else if (--j < 0) {
1633 +                    long c = ctl;
1634 +                    long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
1635 +                    int e = (int)c;
1636 +                    if (plock == ps) {
1637 +                        if (e >= 0 && ec >= 0) {
1638 +                            w.nextWait = e;          // try to enqueue/inactivate
1639 +                            w.eventCount = ec | INT_SIGN;
1640 +                            if (!U.compareAndSwapLong(this, CTL, c, nc))
1641 +                                w.eventCount = ec;   // unmark on CAS failure
1642 +                        }
1643 +                        else
1644 +                            onEmptyScan(w, c);
1645 +                    }
1646 +                    break;
1647                  }
1651            } while (--j >= 0);
1652
1653            int e, ns; long c, sc;
1654            if (j >= 0 || plock != ps) {         // incomplete scan
1655                if (w.eventCount < 0)            // help activate for next time
1656                    signalWork(null);
1657            }
1658            else if ((e = (int)(c = ctl)) < 0)
1659                w.qlock = -1;                    // pool is terminating
1660            else if (ec >= 0) {                  // try to enqueue/inactivate
1661                long nc = (((long)ec |
1662                            ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1663                w.nextWait = e;                  // link and mark inactive
1664                w.eventCount = ec | INT_SIGN;
1665                if (!U.compareAndSwapLong(this, CTL, c, nc))
1666                    w.eventCount = ec;           // unmark on CAS failure
1667                else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1668                    idleAwaitWork(w, nc, c);
1669            }
1670            else if ((ns = w.nsteals) != 0) {
1671                if (U.compareAndSwapLong(this, STEALCOUNT,
1672                                         sc = stealCount, sc + ns))
1673                    w.nsteals = 0;               // collect steals and rescan
1674            }
1675            else if (w.eventCount < 0 && ctl == c) {
1676                Thread wt = Thread.currentThread();
1677                Thread.interrupted();            // clear status
1678                U.putObject(wt, PARKBLOCKER, this);
1679                w.parker = wt;                   // emulate LockSupport.park
1680                if (w.eventCount < 0 && ctl == c)// recheck
1681                    U.park(false, 0L);           // block
1682                w.parker = null;
1683                U.putObject(wt, PARKBLOCKER, null);
1648              }
1649          }
1650          return null;
1651      }
1652  
1653      /**
1654 <     * If inactivating worker w has caused the pool to become
1654 >     * A continuation of scan(), possibly blocking or terminating
1655 >     * worker w. ALso, if inactivating w has caused the pool to become
1656       * quiescent, checks for pool termination, and, so long as this is
1657       * not the only worker, waits for event for up to a given
1658       * duration.  On timeout, if ctl has not changed, terminates the
# Line 1695 | Line 1660 | public class ForkJoinPool extends Abstra
1660       * repeat this process.
1661       *
1662       * @param w the calling worker
1663 <     * @param currentCtl the ctl value triggering possible quiescence
1699 <     * @param prevCtl the ctl value to restore if thread is terminated
1663 >     * @param c the ctl value triggering possible quiescence
1664       */
1665 <    private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1666 <        if (w != null && w.eventCount < 0 && !tryTerminate(false, false) &&
1667 <            (int)prevCtl != 0 && ctl == currentCtl) {
1668 <            int ns = w.nsteals;
1669 <            if (ns != 0) {
1670 <                w.nsteals = 0;
1671 <                long sc;
1672 <                do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1673 <                                                   sc = stealCount, sc + ns));
1674 <            }
1675 <            int dc = -(short)(currentCtl >>> TC_SHIFT);
1676 <            long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1677 <            long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1678 <            Thread wt = Thread.currentThread();
1679 <            int spins = MIN_RESCANS; // poll before blocking
1680 <            while (ctl == currentCtl) {
1681 <                if (spins >= 0) {
1682 <                    if (w.nextSeed() < 0)
1683 <                        --spins;
1684 <                }
1685 <                else {
1686 <                    Thread.interrupted();  // timed variant of version in scan()
1665 >    private final void onEmptyScan(WorkQueue w, long c) {
1666 >        int ec, ns, e, d;
1667 >        if (w != null && ctl == c) {
1668 >            if ((e = (int)c) < 0)
1669 >                w.qlock = -1;                         // pool is terminating
1670 >            else if ((ec = w.eventCount) < 0 &&
1671 >                     ((d = (int)(c >> AC_SHIFT) + (config & SMASK)) != 0 ||
1672 >                      !tryTerminate(false, false))) {
1673 >                long pc = 0L, parkTime = 0L, deadline = 0L;
1674 >                if (d == 0 && ec == (e | INT_SIGN) &&
1675 >                    (pc = (((long)(w.nextWait & E_MASK)) |
1676 >                           ((long)(((int)(c >>> 32)) + UAC_UNIT) << 32))) != 0) {
1677 >                    int dc = -(short)(c >>> TC_SHIFT);
1678 >                    parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
1679 >                                (dc + 1) * IDLE_TIMEOUT);
1680 >                    deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1681 >                }
1682 >                if ((ns = w.nsteals) != 0) {
1683 >                    long sc = stealCount;
1684 >                    if (U.compareAndSwapLong(this, STEALCOUNT, sc, sc + ns))
1685 >                        w.nsteals = 0;                    // collect and rescan
1686 >                }
1687 >                else if (w.eventCount < 0 && ctl == c) {
1688 >                    Thread wt = Thread.currentThread();
1689 >                    Thread.interrupted();             // clear status
1690                      U.putObject(wt, PARKBLOCKER, this);
1691 <                    w.parker = wt;
1692 <                    if (ctl == currentCtl)
1693 <                        U.park(false, parkTime);
1691 >                    w.parker = wt;                    // emulate LockSupport.park
1692 >                    if (w.eventCount < 0 && ctl == c) // recheck
1693 >                        U.park(false, parkTime);      // block
1694                      w.parker = null;
1695                      U.putObject(wt, PARKBLOCKER, null);
1696 <                    if (ctl != currentCtl)
1697 <                        break;
1698 <                    if (deadline - System.nanoTime() <= 0L &&
1732 <                        U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1696 >                    if (parkTime != 0L && w.eventCount < 0 && ctl == c &&
1697 >                        deadline - System.nanoTime() <= 0L &&
1698 >                        U.compareAndSwapLong(this, CTL, c, pc)) {
1699                          w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1700                          w.qlock = -1;   // shrink
1735                        break;
1701                      }
1702                  }
1703              }
# Line 1800 | Line 1765 | public class ForkJoinPool extends Abstra
1765                              if (t != null && v.base == b &&
1766                                  U.compareAndSwapObject(a, i, t, null)) {
1767                                  U.putOrderedInt(v, QBASE, b + 1);
1768 +                                if (v.top - b > 1)
1769 +                                    signalWork(v);
1770                                  joiner.runSubtask(t);
1771                              }
1772                              else if (v.base == b && ++steps == MAX_HELP)
# Line 1831 | Line 1798 | public class ForkJoinPool extends Abstra
1798       *
1799       * @param task the task to join
1800       */
1801 <    private int helpComplete(ForkJoinTask<?> task) {
1801 >    private int helpComplete(ForkJoinTask<?> task, ForkJoinPool spool) {
1802          WorkQueue[] ws; int m;
1803          if (task != null && (ws = workQueues) != null &&
1804              (m = ws.length - 1) >= 0) {
1805 <            int scans = m | MIN_RESCANS;
1839 <            for (int j = 1, k = scans;;) {
1805 >            for (int j = 1, k = m;;) {
1806                  WorkQueue q; int s;
1807                  if ((s = task.status) < 0)
1808                      return s;
1809 <                if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
1810 <                    k = scans;
1809 >                if ((q = ws[j & m]) != null && q.pollAndExecCC(task, spool))
1810 >                    k = m;
1811                  else if (--k >= 0)
1812                      j += 2;
1813                  else
# Line 1917 | Line 1883 | public class ForkJoinPool extends Abstra
1883                           joiner.tryRemoveAndExec(task)); // process local tasks
1884              if (s >= 0 && (s = task.status) >= 0 &&
1885                  (task instanceof CountedCompleter))
1886 <                s = helpComplete(task);
1886 >                s = helpComplete(task, this);
1887              while (s >= 0 && (s = task.status) >= 0) {
1888                  if ((!joiner.isEmpty() ||           // try helping
1889                       (s = tryHelpStealer(joiner, task)) == 0) &&
# Line 1936 | Line 1902 | public class ForkJoinPool extends Abstra
1902                              }
1903                          }
1904                          // reactivate
1905 <                        if (false) { // possible hotspot bug?
1906 <                            U.getAndAddLong(this, CTL, AC_UNIT);
1907 <                        }
1942 <                        else {
1943 <                            long c;
1944 <                            do {} while (!U.compareAndSwapLong
1945 <                                         (this, CTL, c = ctl, c + AC_UNIT));
1946 <                        }
1905 >                        long c;
1906 >                        do {} while (!U.compareAndSwapLong
1907 >                                     (this, CTL, c = ctl, c + AC_UNIT));
1908                      }
1909                  }
1910              }
# Line 1969 | Line 1930 | public class ForkJoinPool extends Abstra
1930                           joiner.tryRemoveAndExec(task));
1931              if (s >= 0 && (s = task.status) >= 0 &&
1932                  (task instanceof CountedCompleter))
1933 <                s = helpComplete(task);
1933 >                s = helpComplete(task, this);
1934              if (s >= 0 && joiner.isEmpty()) {
1935                  do {} while (task.status >= 0 &&
1936                               tryHelpStealer(joiner, task) > 0);
# Line 2157 | Line 2118 | public class ForkJoinPool extends Abstra
2118                      for (int i = 0; i < ws.length; ++i) {
2119                          if ((w = ws[i]) != null) {
2120                              if (!w.isEmpty()) {    // signal unprocessed tasks
2121 <                                signalWork(null);
2121 >                                signalWork(w);
2122                                  return false;
2123                              }
2124                              if ((i & 1) != 0 && w.eventCount >= 0)
# Line 2314 | Line 2275 | public class ForkJoinPool extends Abstra
2275                      break;
2276              }
2277              if (root.status >= 0)
2278 <                helpComplete(root);
2278 >                helpComplete(root, null);
2279          }
2280      }
2281  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines