ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.49 by jsr166, Mon Nov 16 04:57:09 2009 UTC vs.
Revision 1.50 by dl, Fri Dec 4 12:09:46 2009 UTC

# Line 526 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
535                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 799 | Line 801 | public class ForkJoinPool extends Abstra
801              if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 1376 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1379
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1400 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1411 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1421 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, returning false if known to already be awake
1427           */
1428          boolean signal() {
1429              ForkJoinWorkerThread t = thread;
# Line 1431 | Line 1433 | public class ForkJoinPool extends Abstra
1433              LockSupport.unpark(t);
1434              return true;
1435          }
1434
1435        /**
1436         * Awaits release on sync.
1437         */
1438        void awaitSyncRelease(ForkJoinPool p) {
1439            while (thread != null && !p.syncIsReleasable(this))
1440                LockSupport.park(this);
1441        }
1442
1443        /**
1444         * Awaits resumption as spare.
1445         */
1446        void awaitSpareRelease() {
1447            while (thread != null) {
1448                if (!Thread.interrupted())
1449                    LockSupport.park(this);
1450            }
1451        }
1436      }
1437  
1438      /**
1439       * Ensures that no thread is waiting for count to advance from the
1440       * current value of eventCount read on entry to this method, by
1441       * releasing waiting threads if necessary.
1458     *
1459     * @return the count
1442       */
1443 <    final long ensureSync() {
1443 >    final void ensureSync() {
1444          long c = eventCount;
1445          WaitQueueNode q;
1446          while ((q = syncStack) != null && q.count < c) {
# Line 1469 | Line 1451 | public class ForkJoinPool extends Abstra
1451                  break;
1452              }
1453          }
1472        return c;
1454      }
1455  
1456      /**
# Line 1484 | Line 1465 | public class ForkJoinPool extends Abstra
1465      /**
1466       * Signals threads waiting to poll a task. Because method sync
1467       * rechecks availability, it is OK to only proceed if queue
1468 <     * appears to be non-empty, and OK to skip under contention to
1469 <     * increment count (since some other thread succeeded).
1468 >     * appears to be non-empty, and OK if CAS to increment count
1469 >     * fails (since some other thread succeeded).
1470       */
1471      final void signalWork() {
1472 <        long c;
1473 <        WaitQueueNode q;
1474 <        if (syncStack != null &&
1475 <            casEventCount(c = eventCount, c+1) &&
1476 <            (((q = syncStack) != null && q.count <= c) &&
1477 <             (!casBarrierStack(q, q.next) || !q.signal())))
1478 <            ensureSync();
1472 >        if (syncStack != null) {
1473 >            long c;
1474 >            casEventCount(c = eventCount, c+1);
1475 >            WaitQueueNode q = syncStack;
1476 >            if (q != null && q.count <= c &&
1477 >                (!casBarrierStack(q, q.next) || !q.signal()))
1478 >                ensureSync();
1479 >        }
1480      }
1481  
1482      /**
# Line 1507 | Line 1489 | public class ForkJoinPool extends Abstra
1489      final void sync(ForkJoinWorkerThread w) {
1490          updateStealCount(w); // Transfer w's count while it is idle
1491  
1492 <        while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1492 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493              long prev = w.lastEventCount;
1494              WaitQueueNode node = null;
1495              WaitQueueNode h;
1496 +            boolean helpSignal = false;
1497              while (eventCount == prev &&
1498                     ((h = syncStack) == null || h.count == prev)) {
1499                  if (node == null)
1500                      node = new WaitQueueNode(prev, w);
1501                  if (casBarrierStack(node.next = h, node)) {
1502 <                    node.awaitSyncRelease(this);
1502 >                    if (!Thread.interrupted() && node.thread != null &&
1503 >                        eventCount == prev) {
1504 >                        if (h == null && // cover signalWork race
1505 >                            ForkJoinWorkerThread.hasQueuedTasks(workers))
1506 >                            helpSignal = true;
1507 >                        else
1508 >                            LockSupport.park(this);
1509 >                    }
1510 >                    if (node.thread != null)
1511 >                        node.thread = null;
1512                      break;
1513                  }
1514              }
1515 <            long ec = ensureSync();
1516 <            if (ec != prev) {
1515 >            long ec = eventCount;
1516 >            if (ec != prev)
1517                  w.lastEventCount = ec;
1518 <                break;
1519 <            }
1518 >            else if (helpSignal)
1519 >                casEventCount(ec, ec + 1);
1520 >            ensureSync();
1521          }
1522      }
1523  
1531    /**
1532     * Returns {@code true} if worker waiting on sync can proceed:
1533     *  - on signal (thread == null)
1534     *  - on event count advance (winning race to notify vs signaller)
1535     *  - on interrupt
1536     *  - if the first queued node, we find work available
1537     * If node was not signalled and event count not advanced on exit,
1538     * then we also help advance event count.
1539     *
1540     * @return {@code true} if node can be released
1541     */
1542    final boolean syncIsReleasable(WaitQueueNode node) {
1543        long prev = node.count;
1544        if (!Thread.interrupted() && node.thread != null &&
1545            (node.next != null ||
1546             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1547            eventCount == prev)
1548            return false;
1549        if (node.thread != null) {
1550            node.thread = null;
1551            long ec = eventCount;
1552            if (prev <= ec) // help signal
1553                casEventCount(ec, ec+1);
1554        }
1555        return true;
1556    }
1524  
1525      /**
1526       * Returns {@code true} if a new sync event occurred since last
# Line 1561 | Line 1528 | public class ForkJoinPool extends Abstra
1528       */
1529      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530          long lc = w.lastEventCount;
1531 <        long ec = ensureSync();
1532 <        if (ec == lc)
1533 <            return false;
1534 <        w.lastEventCount = ec;
1535 <        return true;
1531 >        long ec = eventCount;
1532 >        if (lc != ec)
1533 >            w.lastEventCount = ec;
1534 >        ensureSync();
1535 >        return lc != ec || lc != eventCount;
1536      }
1537  
1538      //  Parallelism maintenance
# Line 1731 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700          WaitQueueNode node = null;
1701 <        int s;
1702 <        while (parallelism < runningCountOf(s = workerCounts)) {
1701 >        for (;;) {
1702 >            int p = parallelism;
1703 >            int s = workerCounts;
1704 >            int r = runningCountOf(s);
1705 >            int t = totalCountOf(s);
1706 >            // use t as bound if r transiently out of sync
1707 >            if (t <= p || r <= p)
1708 >                return false; // not a spare
1709              if (node == null)
1710                  node = new WaitQueueNode(0, w);
1711 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1712 <                // push onto stack
1740 <                do {} while (!casSpareStack(node.next = spareStack, node));
1741 <                // block until released by resumeSpare
1742 <                node.awaitSpareRelease();
1743 <                return true;
1744 <            }
1711 >            if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1712 >                break;
1713          }
1714 <        return false;
1714 >        // push onto stack
1715 >        do {} while (!casSpareStack(node.next = spareStack, node));
1716 >        // block until released by resumeSpare
1717 >        while (!Thread.interrupted() && node.thread != null)
1718 >            LockSupport.park(this);
1719 >        return true;
1720      }
1721  
1722      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines