--- jsr166/src/jsr166y/ForkJoinPool.java 2011/02/22 10:50:51 1.92 +++ jsr166/src/jsr166y/ForkJoinPool.java 2011/02/23 12:48:43 1.93 @@ -754,10 +754,13 @@ public class ForkJoinPool extends Abstra } /** - * Tries to enqueue worker in wait queue and await change in - * worker's eventCount. Before blocking, rescans queues to avoid - * missed signals. If the pool is quiescent, possibly terminates - * worker upon exit. + * Tries to enqueue worker w in wait queue and await change in + * worker's eventCount. If the pool is quiescent, possibly + * terminates worker upon exit. Otherwise, before blocking, + * rescans queues to avoid missed signals. Upon finding work, + * releases at least one worker (which may be the current + * worker). Rescans restart upon detected staleness or failure to + * release due to contention. * * @param w the calling worker * @param c the ctl value on entry @@ -765,26 +768,26 @@ public class ForkJoinPool extends Abstra */ private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { int v = w.eventCount; - w.nextWait = (int)c; // w's successor record + w.nextWait = (int)c; // w's successor record long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - long d = ctl; // return true if lost to a deq, to force rescan + long d = ctl; // return true if lost to a deq, to force scan return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; } - if (parallelism + (int)(c >> AC_SHIFT) == 1 && + for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount + long s = stealCount; + if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) + sc = w.stealCount = 0; + else if (w.eventCount != v) + return true; // update next time + } + if (parallelism + (int)(nc >> AC_SHIFT) == 0 && blockedCount == 0 && quiescerCount == 0) - idleAwaitWork(w, v); // quiescent -- maybe shrink - - boolean rescanned = false; - for (int sc;;) { + idleAwaitWork(w, nc, c, v); // quiescent + for (boolean rescanned = false;;) { if (w.eventCount != v) return true; - if ((sc = w.stealCount) != 0) { - long s = stealCount; // accumulate stealCount - if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc)) - w.stealCount = 0; - } - else if (!rescanned) { + if (!rescanned) { int g = scanGuard, m = g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws != null && m < ws.length) { @@ -821,37 +824,40 @@ public class ForkJoinPool extends Abstra } /** - * If pool is quiescent, checks for termination, and waits for - * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl - * has not changed, terminates the worker. Upon its termination - * (see deregisterWorker), it may wake up another worker to - * possibly repeat this process. + * If inactivating worker w has caused pool to become + * quiescent, check for pool termination, and wait for event + * for up to SHRINK_RATE nanosecs (rescans are unnecessary in + * this case because quiescence reflects consensus about lack + * of work). On timeout, if ctl has not changed, terminate the + * worker. Upon its termination (see deregisterWorker), it may + * wake up another worker to possibly repeat this process. * * @param w the calling worker - * @param v the eventCount w must wait until changed - */ - private void idleAwaitWork(ForkJoinWorkerThread w, int v) { - ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs - if (shutdown) - tryTerminate(false); - long c = ctl; - long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) | - (long)(w.nextWait & E_MASK)); // ctl value to release w - if (w.eventCount == v && - parallelism + (int)(c >> AC_SHIFT) == 0 && - blockedCount == 0 && quiescerCount == 0) { - long startTime = System.nanoTime(); - Thread.interrupted(); - if (w.eventCount == v) { + * @param currentCtl the ctl value after enqueuing w + * @param prevCtl the ctl value if w terminated + * @param v the eventCount w awaits change + */ + private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, + long prevCtl, int v) { + if (w.eventCount == v) { + if (shutdown) + tryTerminate(false); + ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs + while (ctl == currentCtl) { + long startTime = System.nanoTime(); w.parked = true; - if (w.eventCount == v) + if (w.eventCount == v) // must recheck LockSupport.parkNanos(this, SHRINK_RATE); w.parked = false; - if (w.eventCount == v && ctl == c && - System.nanoTime() - startTime >= SHRINK_RATE && - UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - w.terminate = true; - w.eventCount = ((int)c + EC_UNIT) & E_MASK; + if (w.eventCount != v) + break; + else if (System.nanoTime() - startTime < SHRINK_RATE) + Thread.interrupted(); // spurious wakeup + else if (UNSAFE.compareAndSwapLong(this, ctlOffset, + currentCtl, prevCtl)) { + w.terminate = true; // restore previous + w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; + break; } } } @@ -1966,13 +1972,15 @@ public class ForkJoinPool extends Abstra * {@code isReleasable} must return {@code true} if blocking is * not necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} - * before actually blocking). The unusual methods in this API - * accommodate synchronizers that may, but don't usually, block - * for long periods. Similarly, they allow more efficient internal - * handling of cases in which additional workers may be, but - * usually are not, needed to ensure sufficient parallelism. - * Toward this end, implementations of method {@code isReleasable} - * must be amenable to repeated invocation. + * before actually blocking). These actions are performed by any + * thread invoking {@link ForkJoinPool#managedBlock}. The + * unusual methods in this API accommodate synchronizers that may, + * but don't usually, block for long periods. Similarly, they + * allow more efficient internal handling of cases in which + * additional workers may be, but usually are not, needed to + * ensure sufficient parallelism. Toward this end, + * implementations of method {@code isReleasable} must be amenable + * to repeated invocation. * *

For example, here is a ManagedBlocker based on a * ReentrantLock: