ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.51 by dl, Fri Dec 4 15:46:38 2009 UTC vs.
Revision 1.52 by dl, Sat Dec 5 11:39:03 2009 UTC

# Line 1423 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already be awake
1426 >         * Wakes up waiter, also clearing thread field
1427           */
1428 <        boolean signal() {
1428 >        void signal() {
1429              ForkJoinWorkerThread t = thread;
1430 <            if (t == null)
1431 <                return false;
1432 <            thread = null;
1433 <            LockSupport.unpark(t);
1434 <            return true;
1430 >            if (t != null) {
1431 >                thread = null;
1432 >                LockSupport.unpark(t);
1433 >            }
1434          }
1435      }
1436  
# Line 1470 | Line 1469 | public class ForkJoinPool extends Abstra
1469       */
1470      final void signalWork() {
1471          if (syncStack != null) {
1472 <            long c;
1473 <            casEventCount(c = eventCount, c+1);
1472 >            long c = eventCount;
1473 >            casEventCount(c, c+1);
1474              WaitQueueNode q = syncStack;
1475 <            if (q != null && q.count <= c &&
1476 <                (!casBarrierStack(q, q.next) || !q.signal()))
1477 <                ensureSync();
1475 >            if (q != null && q.count <= c) {
1476 >                if (casBarrierStack(q, q.next))
1477 >                    q.signal();
1478 >                else
1479 >                    ensureSync(); // awaken all on contention
1480 >            }
1481          }
1482      }
1483  
1484      /**
1485 <     * Waits until event count advances from last value held by
1486 <     * caller, or if excess threads, caller is resumed as spare, or
1485 >     * Possibly blocks until event count advances from last value held
1486 >     * by caller, or if excess threads, caller is resumed as spare, or
1487       * caller or pool is terminating. Updates caller's event on exit.
1488       *
1489       * @param w the calling worker thread
# Line 1493 | Line 1495 | public class ForkJoinPool extends Abstra
1495              long prev = w.lastEventCount;
1496              WaitQueueNode node = null;
1497              WaitQueueNode h;
1498 <            long ec;
1499 <            while ((ec = eventCount) == prev &&
1498 >            long c;
1499 >            while ((c = eventCount) == prev &&
1500                     ((h = syncStack) == null || h.count == prev)) {
1501                  if (node == null)
1502                      node = new WaitQueueNode(prev, w);
1503                  if (casBarrierStack(node.next = h, node)) {
1504 <                    if (!Thread.interrupted() &&
1504 >                    if (!Thread.interrupted() &&
1505                          node.thread != null &&
1506                          eventCount == prev &&
1507                          (h != null || // cover signalWork race
1508                           (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1509                            eventCount == prev)))
1510                          LockSupport.park(this);
1511 <                    ec = eventCount;
1512 <                    if (node.thread != null) {
1511 >                    c = eventCount;
1512 >                    if (node.thread != null) { // help signal if not unparked
1513                          node.thread = null;
1514 <                        if (ec == prev)
1515 <                            casEventCount(prev, prev + 1); // help signal
1514 >                        if (c == prev)
1515 >                            casEventCount(prev, prev + 1);
1516                      }
1517                      break;
1518                  }
1519              }
1520 <            w.lastEventCount = ec;
1520 >            w.lastEventCount = c;
1521              ensureSync();
1522          }
1523      }
# Line 1525 | Line 1527 | public class ForkJoinPool extends Abstra
1527       * call to sync or this method, if so, updating caller's count.
1528       */
1529      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530 <        long lc = w.lastEventCount;
1531 <        long ec = eventCount;
1532 <        if (lc != ec)
1533 <            w.lastEventCount = ec;
1530 >        long wc = w.lastEventCount;
1531 >        long c = eventCount;
1532 >        if (wc != c)
1533 >            w.lastEventCount = c;
1534          ensureSync();
1535 <        return lc != ec || lc != eventCount;
1535 >        return wc != c || wc != eventCount;
1536      }
1537  
1538      //  Parallelism maintenance
# Line 1697 | Line 1699 | public class ForkJoinPool extends Abstra
1699      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700          WaitQueueNode node = null;
1701          for (;;) {
1700            int p = parallelism;
1702              int s = workerCounts;
1703 <            int r = runningCountOf(s);
1704 <            int t = totalCountOf(s);
1705 <            // use t as bound if r transiently out of sync
1706 <            if (t <= p || r <= p)
1703 >            int rc = runningCountOf(s);
1704 >            int tc = totalCountOf(s);
1705 >            int ps = parallelism;
1706 >            // use tc as bound if rc transiently out of sync
1707 >            if (tc <= ps || rc <= ps)
1708                  return false; // not a spare
1709              if (node == null)
1710                  node = new WaitQueueNode(0, w);
1711 <            if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1711 >            if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
1712                  break;
1713          }
1714          // push onto stack

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines