ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.49 by jsr166, Mon Nov 16 04:57:09 2009 UTC vs.
Revision 1.52 by dl, Sat Dec 5 11:39:03 2009 UTC

# Line 526 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
535                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 799 | Line 801 | public class ForkJoinPool extends Abstra
801              if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 1376 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1379
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1400 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1411 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1421 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, also clearing thread field
1427           */
1428 <        boolean signal() {
1428 >        void signal() {
1429              ForkJoinWorkerThread t = thread;
1430 <            if (t == null)
1431 <                return false;
1432 <            thread = null;
1431 <            LockSupport.unpark(t);
1432 <            return true;
1433 <        }
1434 <
1435 <        /**
1436 <         * Awaits release on sync.
1437 <         */
1438 <        void awaitSyncRelease(ForkJoinPool p) {
1439 <            while (thread != null && !p.syncIsReleasable(this))
1440 <                LockSupport.park(this);
1441 <        }
1442 <
1443 <        /**
1444 <         * Awaits resumption as spare.
1445 <         */
1446 <        void awaitSpareRelease() {
1447 <            while (thread != null) {
1448 <                if (!Thread.interrupted())
1449 <                    LockSupport.park(this);
1430 >            if (t != null) {
1431 >                thread = null;
1432 >                LockSupport.unpark(t);
1433              }
1434          }
1435      }
# Line 1455 | Line 1438 | public class ForkJoinPool extends Abstra
1438       * Ensures that no thread is waiting for count to advance from the
1439       * current value of eventCount read on entry to this method, by
1440       * releasing waiting threads if necessary.
1458     *
1459     * @return the count
1441       */
1442 <    final long ensureSync() {
1442 >    final void ensureSync() {
1443          long c = eventCount;
1444          WaitQueueNode q;
1445          while ((q = syncStack) != null && q.count < c) {
# Line 1469 | Line 1450 | public class ForkJoinPool extends Abstra
1450                  break;
1451              }
1452          }
1472        return c;
1453      }
1454  
1455      /**
# Line 1484 | Line 1464 | public class ForkJoinPool extends Abstra
1464      /**
1465       * Signals threads waiting to poll a task. Because method sync
1466       * rechecks availability, it is OK to only proceed if queue
1467 <     * appears to be non-empty, and OK to skip under contention to
1468 <     * increment count (since some other thread succeeded).
1467 >     * appears to be non-empty, and OK if CAS to increment count
1468 >     * fails (since some other thread succeeded).
1469       */
1470      final void signalWork() {
1471 <        long c;
1472 <        WaitQueueNode q;
1473 <        if (syncStack != null &&
1474 <            casEventCount(c = eventCount, c+1) &&
1475 <            (((q = syncStack) != null && q.count <= c) &&
1476 <             (!casBarrierStack(q, q.next) || !q.signal())))
1477 <            ensureSync();
1471 >        if (syncStack != null) {
1472 >            long c = eventCount;
1473 >            casEventCount(c, c+1);
1474 >            WaitQueueNode q = syncStack;
1475 >            if (q != null && q.count <= c) {
1476 >                if (casBarrierStack(q, q.next))
1477 >                    q.signal();
1478 >                else
1479 >                    ensureSync(); // awaken all on contention
1480 >            }
1481 >        }
1482      }
1483  
1484      /**
1485 <     * Waits until event count advances from last value held by
1486 <     * caller, or if excess threads, caller is resumed as spare, or
1485 >     * Possibly blocks until event count advances from last value held
1486 >     * by caller, or if excess threads, caller is resumed as spare, or
1487       * caller or pool is terminating. Updates caller's event on exit.
1488       *
1489       * @param w the calling worker thread
# Line 1507 | Line 1491 | public class ForkJoinPool extends Abstra
1491      final void sync(ForkJoinWorkerThread w) {
1492          updateStealCount(w); // Transfer w's count while it is idle
1493  
1494 <        while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1494 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1495              long prev = w.lastEventCount;
1496              WaitQueueNode node = null;
1497              WaitQueueNode h;
1498 <            while (eventCount == prev &&
1498 >            long c;
1499 >            while ((c = eventCount) == prev &&
1500                     ((h = syncStack) == null || h.count == prev)) {
1501                  if (node == null)
1502                      node = new WaitQueueNode(prev, w);
1503                  if (casBarrierStack(node.next = h, node)) {
1504 <                    node.awaitSyncRelease(this);
1504 >                    if (!Thread.interrupted() &&
1505 >                        node.thread != null &&
1506 >                        eventCount == prev &&
1507 >                        (h != null || // cover signalWork race
1508 >                         (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1509 >                          eventCount == prev)))
1510 >                        LockSupport.park(this);
1511 >                    c = eventCount;
1512 >                    if (node.thread != null) { // help signal if not unparked
1513 >                        node.thread = null;
1514 >                        if (c == prev)
1515 >                            casEventCount(prev, prev + 1);
1516 >                    }
1517                      break;
1518                  }
1519              }
1520 <            long ec = ensureSync();
1521 <            if (ec != prev) {
1525 <                w.lastEventCount = ec;
1526 <                break;
1527 <            }
1528 <        }
1529 <    }
1530 <
1531 <    /**
1532 <     * Returns {@code true} if worker waiting on sync can proceed:
1533 <     *  - on signal (thread == null)
1534 <     *  - on event count advance (winning race to notify vs signaller)
1535 <     *  - on interrupt
1536 <     *  - if the first queued node, we find work available
1537 <     * If node was not signalled and event count not advanced on exit,
1538 <     * then we also help advance event count.
1539 <     *
1540 <     * @return {@code true} if node can be released
1541 <     */
1542 <    final boolean syncIsReleasable(WaitQueueNode node) {
1543 <        long prev = node.count;
1544 <        if (!Thread.interrupted() && node.thread != null &&
1545 <            (node.next != null ||
1546 <             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1547 <            eventCount == prev)
1548 <            return false;
1549 <        if (node.thread != null) {
1550 <            node.thread = null;
1551 <            long ec = eventCount;
1552 <            if (prev <= ec) // help signal
1553 <                casEventCount(ec, ec+1);
1520 >            w.lastEventCount = c;
1521 >            ensureSync();
1522          }
1555        return true;
1523      }
1524  
1525      /**
# Line 1560 | Line 1527 | public class ForkJoinPool extends Abstra
1527       * call to sync or this method, if so, updating caller's count.
1528       */
1529      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530 <        long lc = w.lastEventCount;
1531 <        long ec = ensureSync();
1532 <        if (ec == lc)
1533 <            return false;
1534 <        w.lastEventCount = ec;
1535 <        return true;
1530 >        long wc = w.lastEventCount;
1531 >        long c = eventCount;
1532 >        if (wc != c)
1533 >            w.lastEventCount = c;
1534 >        ensureSync();
1535 >        return wc != c || wc != eventCount;
1536      }
1537  
1538      //  Parallelism maintenance
# Line 1731 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700          WaitQueueNode node = null;
1701 <        int s;
1702 <        while (parallelism < runningCountOf(s = workerCounts)) {
1701 >        for (;;) {
1702 >            int s = workerCounts;
1703 >            int rc = runningCountOf(s);
1704 >            int tc = totalCountOf(s);
1705 >            int ps = parallelism;
1706 >            // use tc as bound if rc transiently out of sync
1707 >            if (tc <= ps || rc <= ps)
1708 >                return false; // not a spare
1709              if (node == null)
1710                  node = new WaitQueueNode(0, w);
1711 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1712 <                // push onto stack
1740 <                do {} while (!casSpareStack(node.next = spareStack, node));
1741 <                // block until released by resumeSpare
1742 <                node.awaitSpareRelease();
1743 <                return true;
1744 <            }
1711 >            if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
1712 >                break;
1713          }
1714 <        return false;
1714 >        // push onto stack
1715 >        do {} while (!casSpareStack(node.next = spareStack, node));
1716 >        // block until released by resumeSpare
1717 >        while (!Thread.interrupted() && node.thread != null)
1718 >            LockSupport.park(this);
1719 >        return true;
1720      }
1721  
1722      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines