ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.3 by dl, Wed Jan 7 19:12:36 2009 UTC vs.
Revision 1.4 by dl, Mon Jan 12 17:16:18 2009 UTC

# Line 181 | Line 181 | public class ForkJoinPool extends Abstra
181      /**
182       * Head of Treiber stack for barrier sync. See below for explanation
183       */
184 <    private volatile WaitQueueNode barrierStack;
184 >    private volatile WaitQueueNode syncStack;
185  
186      /**
187       * The count for event barrier
# Line 264 | Line 264 | public class ForkJoinPool extends Abstra
264      private static int runControlFor(int r, int a)   { return (r << 16) + a; }
265  
266      /**
267 <     * Increment active count. Called by workers before/during
268 <     * executing tasks.
267 >     * Try incrementing active count; fail on contention. Called by
268 >     * workers before/during executing tasks.
269 >     * @return true on success;
270       */
271 <    final void incrementActiveCount() {
272 <        int c;
273 <        do;while (!casRunControl(c = runControl, c+1));
271 >    final boolean tryIncrementActiveCount() {
272 >        int c = runControl;
273 >        return casRunControl(c, c+1);
274      }
275  
276      /**
277 <     * Decrement active count; possibly trigger termination.
277 >     * Try decrementing active count; fail on contention.
278 >     * Possibly trigger termination on success
279       * Called by workers when they can't find tasks.
280 +     * @return true on success
281       */
282 <    final void decrementActiveCount() {
283 <        int c, nextc;
284 <        do;while (!casRunControl(c = runControl, nextc = c-1));
282 >    final boolean tryDecrementActiveCount() {
283 >        int c = runControl;
284 >        int nextc = c - 1;
285 >        if (!casRunControl(c, nextc))
286 >            return false;
287          if (canTerminateOnShutdown(nextc))
288              terminateOnShutdown();
289 +        return true;
290      }
291  
292      /**
# Line 508 | Line 514 | public class ForkJoinPool extends Abstra
514          if (isShutdown())
515              throw new RejectedExecutionException();
516          submissionQueue.offer(task);
517 <        signalIdleWorkers(true);
517 >        signalIdleWorkers();
518      }
519  
520      /**
# Line 717 | Line 723 | public class ForkJoinPool extends Abstra
723          } finally {
724              lock.unlock();
725          }
726 <        signalIdleWorkers(false);
726 >        signalIdleWorkers();
727      }
728  
729      /**
# Line 1067 | Line 1073 | public class ForkJoinPool extends Abstra
1073          } finally {
1074              lock.unlock();
1075          }
1076 <        signalIdleWorkers(false);
1076 >        signalIdleWorkers();
1077      }
1078  
1079      /**
# Line 1077 | Line 1083 | public class ForkJoinPool extends Abstra
1083          if (transitionRunStateTo(TERMINATING)) {
1084              stopAllWorkers();
1085              resumeAllSpares();
1086 <            signalIdleWorkers(true);
1086 >            signalIdleWorkers();
1087              cancelQueuedSubmissions();
1088              cancelQueuedWorkerTasks();
1089              interruptUnterminatedWorkers();
1090 <            signalIdleWorkers(true); // resignal after interrupt
1090 >            signalIdleWorkers(); // resignal after interrupt
1091          }
1092      }
1093  
# Line 1165 | Line 1171 | public class ForkJoinPool extends Abstra
1171  
1172  
1173      /*
1174 <     * Nodes for event barrier to manage idle threads.
1174 >     * Nodes for event barrier to manage idle threads.  Queue nodes
1175 >     * are basic Treiber stack nodes, also used for spare stack.
1176       *
1177       * The event barrier has an event count and a wait queue (actually
1178       * a Treiber stack).  Workers are enabled to look for work when
1179 <     * the eventCount is incremented. If they fail to find some,
1180 <     * they may wait for next count. Synchronization events occur only
1181 <     * in enough contexts to maintain overall liveness:
1179 >     * the eventCount is incremented. If they fail to find work, they
1180 >     * may wait for next count. Upon release, threads help others wake
1181 >     * up.
1182 >     *
1183 >     * Synchronization events occur only in enough contexts to
1184 >     * maintain overall liveness:
1185       *
1186       *   - Submission of a new task to the pool
1187 <     *   - Creation or termination of a worker
1187 >     *   - Resizes or other changes to the workers array
1188       *   - pool termination
1189       *   - A worker pushing a task on an empty queue
1190       *
1191 <     * The last case (pushing a task) occurs often enough, and is
1192 <     * heavy enough compared to simple stack pushes to require some
1193 <     * special handling: Method signalNonEmptyWorkerQueue returns
1194 <     * without advancing count if the queue appears to be empty.  This
1195 <     * would ordinarily result in races causing some queued waiters
1196 <     * not to be woken up. To avoid this, a worker in sync
1197 <     * rescans for tasks after being enqueued if it was the first to
1198 <     * enqueue, and aborts the wait if finding one, also helping to
1199 <     * signal others. This works well because the worker has nothing
1200 <     * better to do anyway, and so might as well help alleviate the
1201 <     * overhead and contention on the threads actually doing work.
1202 <     *
1203 <     * Queue nodes are basic Treiber stack nodes, also used for spare
1204 <     * stack.
1191 >     * The case of pushing a task occurs often enough, and is heavy
1192 >     * enough compared to simple stack pushes, to require special
1193 >     * handling: Method signalWork returns without advancing count if
1194 >     * the queue appears to be empty.  This would ordinarily result in
1195 >     * races causing some queued waiters not to be woken up. To avoid
1196 >     * this, the first worker enqueued in method sync (see
1197 >     * syncIsReleasable) rescans for tasks after being enqueued, and
1198 >     * helps signal if any are found. This works well because the
1199 >     * worker has nothing better to do, and so might as well help
1200 >     * alleviate the overhead and contention on the threads actually
1201 >     * doing work.  Also, since event counts increments on task
1202 >     * availability exist to maintain liveness (rather than to force
1203 >     * refreshes etc), it is OK for callers to exit early if
1204 >     * contending with another signaller.
1205       */
1206      static final class WaitQueueNode {
1207          WaitQueueNode next; // only written before enqueued
1208          volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1209          final long count; // unused for spare stack
1210 <        WaitQueueNode(ForkJoinWorkerThread w, long c) {
1210 >
1211 >        WaitQueueNode(long c, ForkJoinWorkerThread w) {
1212              count = c;
1213              thread = w;
1214          }
1215 <        final boolean signal() {
1215 >
1216 >        /**
1217 >         * Wake up waiter, returning false if known to already
1218 >         */
1219 >        boolean signal() {
1220              ForkJoinWorkerThread t = thread;
1221 +            if (t == null)
1222 +                return false;
1223              thread = null;
1224 <            if (t != null) {
1225 <                LockSupport.unpark(t);
1226 <                return true;
1224 >            LockSupport.unpark(t);
1225 >            return true;
1226 >        }
1227 >
1228 >        /**
1229 >         * Await release on sync
1230 >         */
1231 >        void awaitSyncRelease(ForkJoinPool p) {
1232 >            while (thread != null && !p.syncIsReleasable(this))
1233 >                LockSupport.park(this);
1234 >        }
1235 >
1236 >        /**
1237 >         * Await resumption as spare
1238 >         */
1239 >        void awaitSpareRelease() {
1240 >            while (thread != null) {
1241 >                if (!Thread.interrupted())
1242 >                    LockSupport.park(this);
1243              }
1211            return false;
1244          }
1245      }
1246  
1247      /**
1248 <     * Release at least one thread waiting for event count to advance,
1249 <     * if one exists. If initial attempt fails, release all threads.
1250 <     * @param all if false, at first try to only release one thread
1251 <     * @return current event
1248 >     * Ensures that no thread is waiting for count to advance from the
1249 >     * current value of eventCount read on entry to this method, by
1250 >     * releasing waiting threads if necessary.
1251 >     * @return the count
1252       */
1253 <    private long releaseIdleWorkers(boolean all) {
1254 <        long c;
1255 <        for (;;) {
1256 <            WaitQueueNode q = barrierStack;
1257 <            c = eventCount;
1226 <            long qc;
1227 <            if (q == null || (qc = q.count) >= c)
1228 <                break;
1229 <            if (!all) {
1230 <                if (casBarrierStack(q, q.next) && q.signal())
1231 <                    break;
1232 <                all = true;
1233 <            }
1234 <            else if (casBarrierStack(q, null)) {
1253 >    final long ensureSync() {
1254 >        long c = eventCount;
1255 >        WaitQueueNode q;
1256 >        while ((q = syncStack) != null && q.count < c) {
1257 >            if (casBarrierStack(q, null)) {
1258                  do {
1259 <                 q.signal();
1259 >                    q.signal();
1260                  } while ((q = q.next) != null);
1261                  break;
1262              }
# Line 1242 | Line 1265 | public class ForkJoinPool extends Abstra
1265      }
1266  
1267      /**
1268 <     * Returns current barrier event count
1246 <     * @return current barrier event count
1247 <     */
1248 <    final long getEventCount() {
1249 <        long ec = eventCount;
1250 <        releaseIdleWorkers(true); // release to ensure accurate result
1251 <        return ec;
1252 <    }
1253 <
1254 <    /**
1255 <     * Increment event count and release at least one waiting thread,
1256 <     * if one exists (released threads will in turn wake up others).
1257 <     * @param all if true, try to wake up all
1268 >     * Increments event count and releases waiting threads.
1269       */
1270 <    final void signalIdleWorkers(boolean all) {
1270 >    private void signalIdleWorkers() {
1271          long c;
1272          do;while (!casEventCount(c = eventCount, c+1));
1273 <        releaseIdleWorkers(all);
1273 >        ensureSync();
1274      }
1275  
1276      /**
1277 <     * Wake up threads waiting to steal a task. Because method
1278 <     * sync rechecks availability, it is OK to only proceed if
1279 <     * queue appears to be non-empty.
1277 >     * Signal threads waiting to poll a task. Because method sync
1278 >     * rechecks availability, it is OK to only proceed if queue
1279 >     * appears to be non-empty, and OK to skip under contention to
1280 >     * increment count (since some other thread succeeded).
1281       */
1282 <    final void signalNonEmptyWorkerQueue() {
1271 <        // If CAS fails another signaller must have succeeded
1282 >    final void signalWork() {
1283          long c;
1284 <        if (barrierStack != null && casEventCount(c = eventCount, c+1))
1285 <            releaseIdleWorkers(false);
1284 >        WaitQueueNode q;
1285 >        if (syncStack != null &&
1286 >            casEventCount(c = eventCount, c+1) &&
1287 >            (((q = syncStack) != null && q.count <= c) &&
1288 >             (!casBarrierStack(q, q.next) || !q.signal())))
1289 >            ensureSync();
1290      }
1291  
1292      /**
1293 <     * Waits until event count advances from count, or some thread is
1294 <     * waiting on a previous count, or there is stealable work
1295 <     * available. Help wake up others on release.
1293 >     * Waits until event count advances from last value held by
1294 >     * caller, or if excess threads, caller is resumed as spare, or
1295 >     * caller or pool is terminating. Updates caller's event on exit.
1296       * @param w the calling worker thread
1282     * @param prev previous value returned by sync (or 0)
1283     * @return current event count
1297       */
1298 <    final long sync(ForkJoinWorkerThread w, long prev) {
1299 <        updateStealCount(w);
1298 >    final void sync(ForkJoinWorkerThread w) {
1299 >        updateStealCount(w); // Transfer w's count while it is idle
1300  
1301 <        while (!w.isShutdown() && !isTerminating() &&
1302 <               (parallelism >= runningCountOf(workerCounts) ||
1290 <                !suspendIfSpare(w))) { // prefer suspend to waiting here
1301 >        while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1302 >            long prev = w.lastEventCount;
1303              WaitQueueNode node = null;
1304 <            boolean queued = false;
1305 <            for (;;) {
1306 <                if (!queued) {
1307 <                    if (eventCount != prev)
1308 <                        break;
1309 <                    WaitQueueNode h = barrierStack;
1310 <                    if (h != null && h.count != prev)
1299 <                        break; // release below and maybe retry
1300 <                    if (node == null)
1301 <                        node = new WaitQueueNode(w, prev);
1302 <                    queued = casBarrierStack(node.next = h, node);
1303 <                }
1304 <                else if (Thread.interrupted() ||
1305 <                         node.thread == null ||
1306 <                         (node.next == null && w.prescan()) ||
1307 <                         eventCount != prev) {
1308 <                    node.thread = null;
1309 <                    if (eventCount == prev) // help trigger
1310 <                        casEventCount(prev, prev+1);
1304 >            WaitQueueNode h;
1305 >            while (eventCount == prev &&
1306 >                   ((h = syncStack) == null || h.count == prev)) {
1307 >                if (node == null)
1308 >                    node = new WaitQueueNode(prev, w);
1309 >                if (casBarrierStack(node.next = h, node)) {
1310 >                    node.awaitSyncRelease(this);
1311                      break;
1312                  }
1313                else
1314                    LockSupport.park(this);
1313              }
1314 +            long ec = ensureSync();
1315 +            if (ec != prev) {
1316 +                w.lastEventCount = ec;
1317 +                break;
1318 +            }
1319 +        }
1320 +    }
1321 +
1322 +    /**
1323 +     * Returns true if worker waiting on sync can proceed:
1324 +     *  - on signal (thread == null)
1325 +     *  - on event count advance (winning race to notify vs signaller)
1326 +     *  - on Interrupt
1327 +     *  - if the first queued node, we find work available
1328 +     * If node was not signalled and event count not advanced on exit,
1329 +     * then we also help advance event count.
1330 +     * @return true if node can be released
1331 +     */
1332 +    final boolean syncIsReleasable(WaitQueueNode node) {
1333 +        long prev = node.count;
1334 +        if (!Thread.interrupted() && node.thread != null &&
1335 +            (node.next != null ||
1336 +             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1337 +            eventCount == prev)
1338 +            return false;
1339 +        if (node.thread != null) {
1340 +            node.thread = null;
1341              long ec = eventCount;
1342 <            if (releaseIdleWorkers(false) != prev)
1343 <                return ec;
1342 >            if (prev <= ec) // help signal
1343 >                casEventCount(ec, ec+1);
1344          }
1345 <        return prev; // return old count if aborted
1345 >        return true;
1346 >    }
1347 >
1348 >    /**
1349 >     * Returns true if a new sync event occurred since last call to
1350 >     * sync or this method, if so, updating caller's count.
1351 >     */
1352 >    final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1353 >        long lc = w.lastEventCount;
1354 >        long ec = ensureSync();
1355 >        if (ec == lc)
1356 >            return false;
1357 >        w.lastEventCount = ec;
1358 >        return true;
1359      }
1360  
1361      //  Parallelism maintenance
# Line 1408 | Line 1446 | public class ForkJoinPool extends Abstra
1446          return (tc < maxPoolSize &&
1447                  (rc == 0 || totalSurplus < 0 ||
1448                   (maintainParallelism &&
1449 <                  runningDeficit > totalSurplus && mayHaveQueuedWork())));
1449 >                  runningDeficit > totalSurplus &&
1450 >                  ForkJoinWorkerThread.hasQueuedTasks(workers))));
1451      }
1452 <
1414 <    /**
1415 <     * Returns true if at least one worker queue appears to be
1416 <     * nonempty. This is expensive but not often called. It is not
1417 <     * critical that this be accurate, but if not, more or fewer
1418 <     * running threads than desired might be maintained.
1419 <     */
1420 <    private boolean mayHaveQueuedWork() {
1421 <        ForkJoinWorkerThread[] ws = workers;
1422 <        int len = ws.length;
1423 <        ForkJoinWorkerThread v;
1424 <        for (int i = 0; i < len; ++i) {
1425 <            if ((v = ws[i]) != null && v.getRawQueueSize() > 0) {
1426 <                releaseIdleWorkers(false); // help wake up stragglers
1427 <                return true;
1428 <            }
1429 <        }
1430 <        return false;
1431 <    }
1432 <
1452 >    
1453      /**
1454       * Add a spare worker if lock available and no more than the
1455       * expected numbers of threads exist
# Line 1483 | Line 1503 | public class ForkJoinPool extends Abstra
1503          }
1504          else
1505              updateWorkerCount(-1); // adjust on failure
1506 <        signalIdleWorkers(false);
1506 >        signalIdleWorkers();
1507      }
1508  
1509      /**
# Line 1499 | Line 1519 | public class ForkJoinPool extends Abstra
1519          int s;
1520          while (parallelism < runningCountOf(s = workerCounts)) {
1521              if (node == null)
1522 <                node = new WaitQueueNode(w, 0);
1522 >                node = new WaitQueueNode(0, w);
1523              if (casWorkerCounts(s, s-1)) { // representation-dependent
1524                  // push onto stack
1525                  do;while (!casSpareStack(node.next = spareStack, node));
1506
1526                  // block until released by resumeSpare
1527 <                while (node.thread != null) {
1509 <                    if (!Thread.interrupted())
1510 <                        LockSupport.park(this);
1511 <                }
1512 <                w.activate(); // help warm up
1527 >                node.awaitSpareRelease();
1528                  return true;
1529              }
1530          }
# Line 1535 | Line 1550 | public class ForkJoinPool extends Abstra
1550      }
1551  
1552      /**
1553 <     * Pop and resume all spare threads. Same idea as
1539 <     * releaseIdleWorkers.
1553 >     * Pop and resume all spare threads. Same idea as ensureSync.
1554       * @return true if any spares released
1555       */
1556      private boolean resumeAllSpares() {
# Line 1577 | Line 1591 | public class ForkJoinPool extends Abstra
1591      }
1592  
1593      /**
1580     * Returns approximate number of spares, just for diagnostics.
1581     */
1582    private int countSpares() {
1583        int sum = 0;
1584        for (WaitQueueNode q = spareStack; q != null; q = q.next)
1585            ++sum;
1586        return sum;
1587    }
1588
1589    /**
1594       * Interface for extending managed parallelism for tasks running
1595       * in ForkJoinPools. A ManagedBlocker provides two methods.
1596       * Method <code>isReleasable</code> must return true if blocking is not
# Line 1697 | Line 1701 | public class ForkJoinPool extends Abstra
1701      static final long eventCountOffset;
1702      static final long workerCountsOffset;
1703      static final long runControlOffset;
1704 <    static final long barrierStackOffset;
1704 >    static final long syncStackOffset;
1705      static final long spareStackOffset;
1706  
1707      static {
# Line 1715 | Line 1719 | public class ForkJoinPool extends Abstra
1719                  (ForkJoinPool.class.getDeclaredField("workerCounts"));
1720              runControlOffset = _unsafe.objectFieldOffset
1721                  (ForkJoinPool.class.getDeclaredField("runControl"));
1722 <            barrierStackOffset = _unsafe.objectFieldOffset
1723 <                (ForkJoinPool.class.getDeclaredField("barrierStack"));
1722 >            syncStackOffset = _unsafe.objectFieldOffset
1723 >                (ForkJoinPool.class.getDeclaredField("syncStack"));
1724              spareStackOffset = _unsafe.objectFieldOffset
1725                  (ForkJoinPool.class.getDeclaredField("spareStack"));
1726          } catch (Exception e) {
# Line 1737 | Line 1741 | public class ForkJoinPool extends Abstra
1741          return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val);
1742      }
1743      private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1744 <        return _unsafe.compareAndSwapObject(this, barrierStackOffset, cmp, val);
1744 >        return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val);
1745      }
1746   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines