--- jsr166/src/jsr166y/ForkJoinPool.java 2009/01/07 19:12:36 1.3 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/01/12 17:16:18 1.4 @@ -181,7 +181,7 @@ public class ForkJoinPool extends Abstra /** * Head of Treiber stack for barrier sync. See below for explanation */ - private volatile WaitQueueNode barrierStack; + private volatile WaitQueueNode syncStack; /** * The count for event barrier @@ -264,23 +264,29 @@ public class ForkJoinPool extends Abstra private static int runControlFor(int r, int a) { return (r << 16) + a; } /** - * Increment active count. Called by workers before/during - * executing tasks. + * Try incrementing active count; fail on contention. Called by + * workers before/during executing tasks. + * @return true on success; */ - final void incrementActiveCount() { - int c; - do;while (!casRunControl(c = runControl, c+1)); + final boolean tryIncrementActiveCount() { + int c = runControl; + return casRunControl(c, c+1); } /** - * Decrement active count; possibly trigger termination. + * Try decrementing active count; fail on contention. + * Possibly trigger termination on success * Called by workers when they can't find tasks. + * @return true on success */ - final void decrementActiveCount() { - int c, nextc; - do;while (!casRunControl(c = runControl, nextc = c-1)); + final boolean tryDecrementActiveCount() { + int c = runControl; + int nextc = c - 1; + if (!casRunControl(c, nextc)) + return false; if (canTerminateOnShutdown(nextc)) terminateOnShutdown(); + return true; } /** @@ -508,7 +514,7 @@ public class ForkJoinPool extends Abstra if (isShutdown()) throw new RejectedExecutionException(); submissionQueue.offer(task); - signalIdleWorkers(true); + signalIdleWorkers(); } /** @@ -717,7 +723,7 @@ public class ForkJoinPool extends Abstra } finally { lock.unlock(); } - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -1067,7 +1073,7 @@ public class ForkJoinPool extends Abstra } finally { lock.unlock(); } - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -1077,11 +1083,11 @@ public class ForkJoinPool extends Abstra if (transitionRunStateTo(TERMINATING)) { stopAllWorkers(); resumeAllSpares(); - signalIdleWorkers(true); + signalIdleWorkers(); cancelQueuedSubmissions(); cancelQueuedWorkerTasks(); interruptUnterminatedWorkers(); - signalIdleWorkers(true); // resignal after interrupt + signalIdleWorkers(); // resignal after interrupt } } @@ -1165,75 +1171,92 @@ public class ForkJoinPool extends Abstra /* - * Nodes for event barrier to manage idle threads. + * Nodes for event barrier to manage idle threads. Queue nodes + * are basic Treiber stack nodes, also used for spare stack. * * The event barrier has an event count and a wait queue (actually * a Treiber stack). Workers are enabled to look for work when - * the eventCount is incremented. If they fail to find some, - * they may wait for next count. Synchronization events occur only - * in enough contexts to maintain overall liveness: + * the eventCount is incremented. If they fail to find work, they + * may wait for next count. Upon release, threads help others wake + * up. + * + * Synchronization events occur only in enough contexts to + * maintain overall liveness: * * - Submission of a new task to the pool - * - Creation or termination of a worker + * - Resizes or other changes to the workers array * - pool termination * - A worker pushing a task on an empty queue * - * The last case (pushing a task) occurs often enough, and is - * heavy enough compared to simple stack pushes to require some - * special handling: Method signalNonEmptyWorkerQueue returns - * without advancing count if the queue appears to be empty. This - * would ordinarily result in races causing some queued waiters - * not to be woken up. To avoid this, a worker in sync - * rescans for tasks after being enqueued if it was the first to - * enqueue, and aborts the wait if finding one, also helping to - * signal others. This works well because the worker has nothing - * better to do anyway, and so might as well help alleviate the - * overhead and contention on the threads actually doing work. - * - * Queue nodes are basic Treiber stack nodes, also used for spare - * stack. + * The case of pushing a task occurs often enough, and is heavy + * enough compared to simple stack pushes, to require special + * handling: Method signalWork returns without advancing count if + * the queue appears to be empty. This would ordinarily result in + * races causing some queued waiters not to be woken up. To avoid + * this, the first worker enqueued in method sync (see + * syncIsReleasable) rescans for tasks after being enqueued, and + * helps signal if any are found. This works well because the + * worker has nothing better to do, and so might as well help + * alleviate the overhead and contention on the threads actually + * doing work. Also, since event counts increments on task + * availability exist to maintain liveness (rather than to force + * refreshes etc), it is OK for callers to exit early if + * contending with another signaller. */ static final class WaitQueueNode { WaitQueueNode next; // only written before enqueued volatile ForkJoinWorkerThread thread; // nulled to cancel wait final long count; // unused for spare stack - WaitQueueNode(ForkJoinWorkerThread w, long c) { + + WaitQueueNode(long c, ForkJoinWorkerThread w) { count = c; thread = w; } - final boolean signal() { + + /** + * Wake up waiter, returning false if known to already + */ + boolean signal() { ForkJoinWorkerThread t = thread; + if (t == null) + return false; thread = null; - if (t != null) { - LockSupport.unpark(t); - return true; + LockSupport.unpark(t); + return true; + } + + /** + * Await release on sync + */ + void awaitSyncRelease(ForkJoinPool p) { + while (thread != null && !p.syncIsReleasable(this)) + LockSupport.park(this); + } + + /** + * Await resumption as spare + */ + void awaitSpareRelease() { + while (thread != null) { + if (!Thread.interrupted()) + LockSupport.park(this); } - return false; } } /** - * Release at least one thread waiting for event count to advance, - * if one exists. If initial attempt fails, release all threads. - * @param all if false, at first try to only release one thread - * @return current event + * Ensures that no thread is waiting for count to advance from the + * current value of eventCount read on entry to this method, by + * releasing waiting threads if necessary. + * @return the count */ - private long releaseIdleWorkers(boolean all) { - long c; - for (;;) { - WaitQueueNode q = barrierStack; - c = eventCount; - long qc; - if (q == null || (qc = q.count) >= c) - break; - if (!all) { - if (casBarrierStack(q, q.next) && q.signal()) - break; - all = true; - } - else if (casBarrierStack(q, null)) { + final long ensureSync() { + long c = eventCount; + WaitQueueNode q; + while ((q = syncStack) != null && q.count < c) { + if (casBarrierStack(q, null)) { do { - q.signal(); + q.signal(); } while ((q = q.next) != null); break; } @@ -1242,82 +1265,97 @@ public class ForkJoinPool extends Abstra } /** - * Returns current barrier event count - * @return current barrier event count - */ - final long getEventCount() { - long ec = eventCount; - releaseIdleWorkers(true); // release to ensure accurate result - return ec; - } - - /** - * Increment event count and release at least one waiting thread, - * if one exists (released threads will in turn wake up others). - * @param all if true, try to wake up all + * Increments event count and releases waiting threads. */ - final void signalIdleWorkers(boolean all) { + private void signalIdleWorkers() { long c; do;while (!casEventCount(c = eventCount, c+1)); - releaseIdleWorkers(all); + ensureSync(); } /** - * Wake up threads waiting to steal a task. Because method - * sync rechecks availability, it is OK to only proceed if - * queue appears to be non-empty. + * Signal threads waiting to poll a task. Because method sync + * rechecks availability, it is OK to only proceed if queue + * appears to be non-empty, and OK to skip under contention to + * increment count (since some other thread succeeded). */ - final void signalNonEmptyWorkerQueue() { - // If CAS fails another signaller must have succeeded + final void signalWork() { long c; - if (barrierStack != null && casEventCount(c = eventCount, c+1)) - releaseIdleWorkers(false); + WaitQueueNode q; + if (syncStack != null && + casEventCount(c = eventCount, c+1) && + (((q = syncStack) != null && q.count <= c) && + (!casBarrierStack(q, q.next) || !q.signal()))) + ensureSync(); } /** - * Waits until event count advances from count, or some thread is - * waiting on a previous count, or there is stealable work - * available. Help wake up others on release. + * Waits until event count advances from last value held by + * caller, or if excess threads, caller is resumed as spare, or + * caller or pool is terminating. Updates caller's event on exit. * @param w the calling worker thread - * @param prev previous value returned by sync (or 0) - * @return current event count */ - final long sync(ForkJoinWorkerThread w, long prev) { - updateStealCount(w); + final void sync(ForkJoinWorkerThread w) { + updateStealCount(w); // Transfer w's count while it is idle - while (!w.isShutdown() && !isTerminating() && - (parallelism >= runningCountOf(workerCounts) || - !suspendIfSpare(w))) { // prefer suspend to waiting here + while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) { + long prev = w.lastEventCount; WaitQueueNode node = null; - boolean queued = false; - for (;;) { - if (!queued) { - if (eventCount != prev) - break; - WaitQueueNode h = barrierStack; - if (h != null && h.count != prev) - break; // release below and maybe retry - if (node == null) - node = new WaitQueueNode(w, prev); - queued = casBarrierStack(node.next = h, node); - } - else if (Thread.interrupted() || - node.thread == null || - (node.next == null && w.prescan()) || - eventCount != prev) { - node.thread = null; - if (eventCount == prev) // help trigger - casEventCount(prev, prev+1); + WaitQueueNode h; + while (eventCount == prev && + ((h = syncStack) == null || h.count == prev)) { + if (node == null) + node = new WaitQueueNode(prev, w); + if (casBarrierStack(node.next = h, node)) { + node.awaitSyncRelease(this); break; } - else - LockSupport.park(this); } + long ec = ensureSync(); + if (ec != prev) { + w.lastEventCount = ec; + break; + } + } + } + + /** + * Returns true if worker waiting on sync can proceed: + * - on signal (thread == null) + * - on event count advance (winning race to notify vs signaller) + * - on Interrupt + * - if the first queued node, we find work available + * If node was not signalled and event count not advanced on exit, + * then we also help advance event count. + * @return true if node can be released + */ + final boolean syncIsReleasable(WaitQueueNode node) { + long prev = node.count; + if (!Thread.interrupted() && node.thread != null && + (node.next != null || + !ForkJoinWorkerThread.hasQueuedTasks(workers)) && + eventCount == prev) + return false; + if (node.thread != null) { + node.thread = null; long ec = eventCount; - if (releaseIdleWorkers(false) != prev) - return ec; + if (prev <= ec) // help signal + casEventCount(ec, ec+1); } - return prev; // return old count if aborted + return true; + } + + /** + * Returns true if a new sync event occurred since last call to + * sync or this method, if so, updating caller's count. + */ + final boolean hasNewSyncEvent(ForkJoinWorkerThread w) { + long lc = w.lastEventCount; + long ec = ensureSync(); + if (ec == lc) + return false; + w.lastEventCount = ec; + return true; } // Parallelism maintenance @@ -1408,28 +1446,10 @@ public class ForkJoinPool extends Abstra return (tc < maxPoolSize && (rc == 0 || totalSurplus < 0 || (maintainParallelism && - runningDeficit > totalSurplus && mayHaveQueuedWork()))); + runningDeficit > totalSurplus && + ForkJoinWorkerThread.hasQueuedTasks(workers)))); } - - /** - * Returns true if at least one worker queue appears to be - * nonempty. This is expensive but not often called. It is not - * critical that this be accurate, but if not, more or fewer - * running threads than desired might be maintained. - */ - private boolean mayHaveQueuedWork() { - ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - ForkJoinWorkerThread v; - for (int i = 0; i < len; ++i) { - if ((v = ws[i]) != null && v.getRawQueueSize() > 0) { - releaseIdleWorkers(false); // help wake up stragglers - return true; - } - } - return false; - } - + /** * Add a spare worker if lock available and no more than the * expected numbers of threads exist @@ -1483,7 +1503,7 @@ public class ForkJoinPool extends Abstra } else updateWorkerCount(-1); // adjust on failure - signalIdleWorkers(false); + signalIdleWorkers(); } /** @@ -1499,17 +1519,12 @@ public class ForkJoinPool extends Abstra int s; while (parallelism < runningCountOf(s = workerCounts)) { if (node == null) - node = new WaitQueueNode(w, 0); + node = new WaitQueueNode(0, w); if (casWorkerCounts(s, s-1)) { // representation-dependent // push onto stack do;while (!casSpareStack(node.next = spareStack, node)); - // block until released by resumeSpare - while (node.thread != null) { - if (!Thread.interrupted()) - LockSupport.park(this); - } - w.activate(); // help warm up + node.awaitSpareRelease(); return true; } } @@ -1535,8 +1550,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and resume all spare threads. Same idea as - * releaseIdleWorkers. + * Pop and resume all spare threads. Same idea as ensureSync. * @return true if any spares released */ private boolean resumeAllSpares() { @@ -1577,16 +1591,6 @@ public class ForkJoinPool extends Abstra } /** - * Returns approximate number of spares, just for diagnostics. - */ - private int countSpares() { - int sum = 0; - for (WaitQueueNode q = spareStack; q != null; q = q.next) - ++sum; - return sum; - } - - /** * Interface for extending managed parallelism for tasks running * in ForkJoinPools. A ManagedBlocker provides two methods. * Method isReleasable must return true if blocking is not @@ -1697,7 +1701,7 @@ public class ForkJoinPool extends Abstra static final long eventCountOffset; static final long workerCountsOffset; static final long runControlOffset; - static final long barrierStackOffset; + static final long syncStackOffset; static final long spareStackOffset; static { @@ -1715,8 +1719,8 @@ public class ForkJoinPool extends Abstra (ForkJoinPool.class.getDeclaredField("workerCounts")); runControlOffset = _unsafe.objectFieldOffset (ForkJoinPool.class.getDeclaredField("runControl")); - barrierStackOffset = _unsafe.objectFieldOffset - (ForkJoinPool.class.getDeclaredField("barrierStack")); + syncStackOffset = _unsafe.objectFieldOffset + (ForkJoinPool.class.getDeclaredField("syncStack")); spareStackOffset = _unsafe.objectFieldOffset (ForkJoinPool.class.getDeclaredField("spareStack")); } catch (Exception e) { @@ -1737,6 +1741,6 @@ public class ForkJoinPool extends Abstra return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); } private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, barrierStackOffset, cmp, val); + return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); } }