ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.12 by jsr166, Mon Nov 16 04:57:09 2009 UTC vs.
Revision 1.13 by dl, Sat Dec 5 11:43:01 2009 UTC

# Line 524 | Line 524 | public class ForkJoinPool extends Abstra
524                  ws = workers;
525                  if (ws == null) {
526                      int ps = parallelism;
527 +                    updateWorkerCount(ps);
528                      ws = ensureWorkerArrayCapacity(ps);
529                      for (int i = 0; i < ps; ++i) {
530                          ForkJoinWorkerThread w = createWorker(i);
531                          if (w != null) {
532                              ws[i] = w;
533                              w.start();
533                            updateWorkerCount(1);
534                          }
535 +                        else
536 +                            updateWorkerCount(-1);
537                      }
538                  }
539              } finally {
# Line 797 | Line 799 | public class ForkJoinPool extends Abstra
799              if (isProcessingTasks()) {
800                  int p = this.parallelism;
801                  this.parallelism = parallelism;
802 <                if (parallelism > p)
803 <                    createAndStartAddedWorkers();
804 <                else
805 <                    trimSpares();
802 >                if (workers != null) {
803 >                    if (parallelism > p)
804 >                        createAndStartAddedWorkers();
805 >                    else
806 >                        trimSpares();
807 >                }
808              }
809          } finally {
810              lock.unlock();
# Line 1374 | Line 1378 | public class ForkJoinPool extends Abstra
1378          }
1379      }
1380  
1377
1381      /*
1382       * Nodes for event barrier to manage idle threads.  Queue nodes
1383       * are basic Treiber stack nodes, also used for spare stack.
# Line 1398 | Line 1401 | public class ForkJoinPool extends Abstra
1401       * handling: Method signalWork returns without advancing count if
1402       * the queue appears to be empty.  This would ordinarily result in
1403       * races causing some queued waiters not to be woken up. To avoid
1404 <     * this, the first worker enqueued in method sync (see
1405 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1406 <     * helps signal if any are found. This works well because the
1407 <     * worker has nothing better to do, and so might as well help
1408 <     * alleviate the overhead and contention on the threads actually
1409 <     * doing work.  Also, since event counts increments on task
1410 <     * availability exist to maintain liveness (rather than to force
1411 <     * refreshes etc), it is OK for callers to exit early if
1409 <     * contending with another signaller.
1404 >     * this, the first worker enqueued in method sync rescans for
1405 >     * tasks after being enqueued, and helps signal if any are
1406 >     * found. This works well because the worker has nothing better to
1407 >     * do, and so might as well help alleviate the overhead and
1408 >     * contention on the threads actually doing work.  Also, since
1409 >     * event counts increments on task availability exist to maintain
1410 >     * liveness (rather than to force refreshes etc), it is OK for
1411 >     * callers to exit early if contending with another signaller.
1412       */
1413      static final class WaitQueueNode {
1414          WaitQueueNode next; // only written before enqueued
# Line 1419 | Line 1421 | public class ForkJoinPool extends Abstra
1421          }
1422  
1423          /**
1424 <         * Wakes up waiter, returning false if known to already
1424 >         * Wakes up waiter, also clearing thread field
1425           */
1426 <        boolean signal() {
1426 >        void signal() {
1427              ForkJoinWorkerThread t = thread;
1428 <            if (t == null)
1429 <                return false;
1430 <            thread = null;
1429 <            LockSupport.unpark(t);
1430 <            return true;
1431 <        }
1432 <
1433 <        /**
1434 <         * Awaits release on sync.
1435 <         */
1436 <        void awaitSyncRelease(ForkJoinPool p) {
1437 <            while (thread != null && !p.syncIsReleasable(this))
1438 <                LockSupport.park(this);
1439 <        }
1440 <
1441 <        /**
1442 <         * Awaits resumption as spare.
1443 <         */
1444 <        void awaitSpareRelease() {
1445 <            while (thread != null) {
1446 <                if (!Thread.interrupted())
1447 <                    LockSupport.park(this);
1428 >            if (t != null) {
1429 >                thread = null;
1430 >                LockSupport.unpark(t);
1431              }
1432          }
1433      }
# Line 1453 | Line 1436 | public class ForkJoinPool extends Abstra
1436       * Ensures that no thread is waiting for count to advance from the
1437       * current value of eventCount read on entry to this method, by
1438       * releasing waiting threads if necessary.
1456     *
1457     * @return the count
1439       */
1440 <    final long ensureSync() {
1440 >    final void ensureSync() {
1441          long c = eventCount;
1442          WaitQueueNode q;
1443          while ((q = syncStack) != null && q.count < c) {
# Line 1467 | Line 1448 | public class ForkJoinPool extends Abstra
1448                  break;
1449              }
1450          }
1470        return c;
1451      }
1452  
1453      /**
# Line 1482 | Line 1462 | public class ForkJoinPool extends Abstra
1462      /**
1463       * Signals threads waiting to poll a task. Because method sync
1464       * rechecks availability, it is OK to only proceed if queue
1465 <     * appears to be non-empty, and OK to skip under contention to
1466 <     * increment count (since some other thread succeeded).
1465 >     * appears to be non-empty, and OK if CAS to increment count
1466 >     * fails (since some other thread succeeded).
1467       */
1468      final void signalWork() {
1469 <        long c;
1470 <        WaitQueueNode q;
1471 <        if (syncStack != null &&
1472 <            casEventCount(c = eventCount, c+1) &&
1473 <            (((q = syncStack) != null && q.count <= c) &&
1474 <             (!casBarrierStack(q, q.next) || !q.signal())))
1475 <            ensureSync();
1469 >        if (syncStack != null) {
1470 >            long c = eventCount;
1471 >            casEventCount(c, c+1);
1472 >            WaitQueueNode q = syncStack;
1473 >            if (q != null && q.count <= c) {
1474 >                if (casBarrierStack(q, q.next))
1475 >                    q.signal();
1476 >                else
1477 >                    ensureSync(); // awaken all on contention
1478 >            }
1479 >        }
1480      }
1481  
1482      /**
1483 <     * Waits until event count advances from last value held by
1484 <     * caller, or if excess threads, caller is resumed as spare, or
1483 >     * Possibly blocks until event count advances from last value held
1484 >     * by caller, or if excess threads, caller is resumed as spare, or
1485       * caller or pool is terminating. Updates caller's event on exit.
1486       *
1487       * @param w the calling worker thread
# Line 1505 | Line 1489 | public class ForkJoinPool extends Abstra
1489      final void sync(ForkJoinWorkerThread w) {
1490          updateStealCount(w); // Transfer w's count while it is idle
1491  
1492 <        while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1492 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493              long prev = w.lastEventCount;
1494              WaitQueueNode node = null;
1495              WaitQueueNode h;
1496 <            while (eventCount == prev &&
1496 >            long c;
1497 >            while ((c = eventCount) == prev &&
1498                     ((h = syncStack) == null || h.count == prev)) {
1499                  if (node == null)
1500                      node = new WaitQueueNode(prev, w);
1501                  if (casBarrierStack(node.next = h, node)) {
1502 <                    node.awaitSyncRelease(this);
1502 >                    if (!Thread.interrupted() &&
1503 >                        node.thread != null &&
1504 >                        eventCount == prev &&
1505 >                        (h != null || // cover signalWork race
1506 >                         (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1507 >                          eventCount == prev)))
1508 >                        LockSupport.park(this);
1509 >                    c = eventCount;
1510 >                    if (node.thread != null) { // help signal if not unparked
1511 >                        node.thread = null;
1512 >                        if (c == prev)
1513 >                            casEventCount(prev, prev + 1);
1514 >                    }
1515                      break;
1516                  }
1517              }
1518 <            long ec = ensureSync();
1519 <            if (ec != prev) {
1523 <                w.lastEventCount = ec;
1524 <                break;
1525 <            }
1526 <        }
1527 <    }
1528 <
1529 <    /**
1530 <     * Returns {@code true} if worker waiting on sync can proceed:
1531 <     *  - on signal (thread == null)
1532 <     *  - on event count advance (winning race to notify vs signaller)
1533 <     *  - on interrupt
1534 <     *  - if the first queued node, we find work available
1535 <     * If node was not signalled and event count not advanced on exit,
1536 <     * then we also help advance event count.
1537 <     *
1538 <     * @return {@code true} if node can be released
1539 <     */
1540 <    final boolean syncIsReleasable(WaitQueueNode node) {
1541 <        long prev = node.count;
1542 <        if (!Thread.interrupted() && node.thread != null &&
1543 <            (node.next != null ||
1544 <             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1545 <            eventCount == prev)
1546 <            return false;
1547 <        if (node.thread != null) {
1548 <            node.thread = null;
1549 <            long ec = eventCount;
1550 <            if (prev <= ec) // help signal
1551 <                casEventCount(ec, ec+1);
1518 >            w.lastEventCount = c;
1519 >            ensureSync();
1520          }
1553        return true;
1521      }
1522  
1523      /**
# Line 1558 | Line 1525 | public class ForkJoinPool extends Abstra
1525       * call to sync or this method, if so, updating caller's count.
1526       */
1527      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1528 <        long lc = w.lastEventCount;
1529 <        long ec = ensureSync();
1530 <        if (ec == lc)
1531 <            return false;
1532 <        w.lastEventCount = ec;
1533 <        return true;
1528 >        long wc = w.lastEventCount;
1529 >        long c = eventCount;
1530 >        if (wc != c)
1531 >            w.lastEventCount = c;
1532 >        ensureSync();
1533 >        return wc != c || wc != eventCount;
1534      }
1535  
1536      //  Parallelism maintenance
# Line 1729 | Line 1696 | public class ForkJoinPool extends Abstra
1696       */
1697      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1698          WaitQueueNode node = null;
1699 <        int s;
1700 <        while (parallelism < runningCountOf(s = workerCounts)) {
1699 >        for (;;) {
1700 >            int s = workerCounts;
1701 >            int rc = runningCountOf(s);
1702 >            int tc = totalCountOf(s);
1703 >            int ps = parallelism;
1704 >            // use tc as bound if rc transiently out of sync
1705 >            if (tc <= ps || rc <= ps)
1706 >                return false; // not a spare
1707              if (node == null)
1708                  node = new WaitQueueNode(0, w);
1709 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1710 <                // push onto stack
1738 <                do {} while (!casSpareStack(node.next = spareStack, node));
1739 <                // block until released by resumeSpare
1740 <                node.awaitSpareRelease();
1741 <                return true;
1742 <            }
1709 >            if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
1710 >                break;
1711          }
1712 <        return false;
1712 >        // push onto stack
1713 >        do {} while (!casSpareStack(node.next = spareStack, node));
1714 >        // block until released by resumeSpare
1715 >        while (!Thread.interrupted() && node.thread != null)
1716 >            LockSupport.park(this);
1717 >        return true;
1718      }
1719  
1720      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines