--- jsr166/src/jsr166y/ForkJoinPool.java 2011/02/22 10:50:51 1.92 +++ jsr166/src/jsr166y/ForkJoinPool.java 2011/04/01 20:20:37 1.100 @@ -1,7 +1,7 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; @@ -151,7 +151,7 @@ public class ForkJoinPool extends Abstra * Updates tend not to contend with each other except during * bursts while submitted tasks begin or end. In some cases when * they do contend, threads can instead do something else - * (usually, scan for tesks) until contention subsides. + * (usually, scan for tasks) until contention subsides. * * To enable packing, we restrict maximum parallelism to (1<<15)-1 * (which is far in excess of normal operating range) to allow @@ -195,24 +195,28 @@ public class ForkJoinPool extends Abstra * shutdown schemes. * * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot - * let workers spin indefinitely scanning for tasks when none are - * can be immediately found, and we cannot start/resume workers - * unless there appear to be tasks available. On the other hand, - * we must quickly prod them into action when new tasks are - * submitted or generated. We park/unpark workers after placing - * in an event wait queue when they cannot find work. This "queue" - * is actually a simple Treiber stack, headed by the "id" field of - * ctl, plus a 15bit counter value to both wake up waiters (by - * advancing their count) and avoid ABA effects. Successors are - * held in worker field "nextWait". Queuing deals with several - * intrinsic races, mainly that a task-producing thread can miss - * seeing (and signalling) another thread that gave up looking for - * work but has not yet entered the wait queue. We solve this by - * requiring a full sweep of all workers both before (in scan()) - * and after (in awaitWork()) a newly waiting worker is added to - * the wait queue. During a rescan, the worker might release some - * other queued worker rather than itself, which has the same net - * effect. + * let workers spin indefinitely scanning for tasks when none can + * be found immediately, and we cannot start/resume workers unless + * there appear to be tasks available. On the other hand, we must + * quickly prod them into action when new tasks are submitted or + * generated. We park/unpark workers after placing in an event + * wait queue when they cannot find work. This "queue" is actually + * a simple Treiber stack, headed by the "id" field of ctl, plus a + * 15bit counter value to both wake up waiters (by advancing their + * count) and avoid ABA effects. Successors are held in worker + * field "nextWait". Queuing deals with several intrinsic races, + * mainly that a task-producing thread can miss seeing (and + * signalling) another thread that gave up looking for work but + * has not yet entered the wait queue. We solve this by requiring + * a full sweep of all workers both before (in scan()) and after + * (in tryAwaitWork()) a newly waiting worker is added to the wait + * queue. During a rescan, the worker might release some other + * queued worker rather than itself, which has the same net + * effect. Because enqueued workers may actually be rescanning + * rather than waiting, we set and clear the "parked" field of + * ForkJoinWorkerThread to reduce unnecessary calls to unpark. + * (Use of the parked field requires a secondary recheck to avoid + * missed signals.) * * Signalling. We create or wake up workers only when there * appears to be at least one task they might be able to find and @@ -229,13 +233,15 @@ public class ForkJoinPool extends Abstra * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will * time out and terminate if the pool has remained quiescent for - * SHRINK_RATE nanosecs. + * SHRINK_RATE nanosecs. This will slowly propagate, eventually + * terminating all workers after long periods of non-use. * * Submissions. External submissions are maintained in an * array-based queue that is structured identically to - * ForkJoinWorkerThread queues (which see) except for the use of - * submissionLock in method addSubmission. Unlike worker queues, - * multiple external threads can add new submissions. + * ForkJoinWorkerThread queues except for the use of + * submissionLock in method addSubmission. Unlike the case for + * worker queues, multiple external threads can add new + * submissions, so adding requires a lock. * * Compensation. Beyond work-stealing support and lifecycle * control, the main responsibility of this framework is to take @@ -272,15 +278,14 @@ public class ForkJoinPool extends Abstra * if blocking would leave less than one active (non-waiting, * non-blocked) worker. Additionally, to avoid some false alarms * due to GC, lagging counters, system activity, etc, compensated - * blocking for joins is only attempted after a number of rechecks - * proportional to the current apparent deficit (where retries are - * interspersed with Thread.yield, for good citizenship). The - * variable blockedCount, incremented before blocking and - * decremented after, is sometimes needed to distinguish cases of - * waiting for work vs blocking on joins or other managed sync, - * but both the cases are equivalent for most pool control, so we - * can update non-atomically. (Additionally, contention on - * blockedCount alleviates some contention on ctl). + * blocking for joins is only attempted after rechecks stabilize + * (retries are interspersed with Thread.yield, for good + * citizenship). The variable blockedCount, incremented before + * blocking and decremented after, is sometimes needed to + * distinguish cases of waiting for work vs blocking on joins or + * other managed sync. Both cases are equivalent for most pool + * control, so we can update non-atomically. (Additionally, + * contention on blockedCount alleviates some contention on ctl). * * Shutdown and Termination. A call to shutdownNow atomically sets * the ctl stop bit and then (non-atomically) sets each workers @@ -478,9 +483,7 @@ public class ForkJoinPool extends Abstra * negative, there is at least one waiting worker, and when e is * negative, the pool is terminating. To deal with these possibly * negative fields, we use casts in and out of "short" and/or - * signed shifts to maintain signedness. Note: AC_SHIFT is - * redundantly declared in ForkJoinWorkerThread in order to - * integrate a surplus-threads check. + * signed shifts to maintain signedness. */ volatile long ctl; @@ -524,13 +527,17 @@ public class ForkJoinPool extends Abstra /** * Index (mod submission queue length) of next element to take - * from submission queue. + * from submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ volatile int queueBase; /** * Index (mod submission queue length) of next element to add - * in submission queue. + * in submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ int queueTop; @@ -568,8 +575,8 @@ public class ForkJoinPool extends Abstra private int nextWorkerIndex; /** - * SeqLock and index masking for for updates to workers array. - * Locked when SG_UNIT is set. Unlocking clears bit by adding + * SeqLock and index masking for updates to workers array. Locked + * when SG_UNIT is set. Unlocking clears bit by adding * SG_UNIT. Staleness of read-only operations can be checked by * comparing scanGuard to value before the reads. The low 16 bits * (i.e, anding with SMASK) hold (the smallest power of two @@ -707,7 +714,7 @@ public class ForkJoinPool extends Abstra */ private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active - int m = parallelism == 1 - a? 0 : g & SMASK; + int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; @@ -754,10 +761,20 @@ public class ForkJoinPool extends Abstra } /** - * Tries to enqueue worker in wait queue and await change in - * worker's eventCount. Before blocking, rescans queues to avoid - * missed signals. If the pool is quiescent, possibly terminates - * worker upon exit. + * Tries to enqueue worker w in wait queue and await change in + * worker's eventCount. If the pool is quiescent and there is + * more than one worker, possibly terminates worker upon exit. + * Otherwise, before blocking, rescans queues to avoid missed + * signals. Upon finding work, releases at least one worker + * (which may be the current worker). Rescans restart upon + * detected staleness or failure to release due to + * contention. Note the unusual conventions about Thread.interrupt + * here and elsewhere: Because interrupts are used solely to alert + * threads to check termination, which is checked here anyway, we + * clear status (using Thread.interrupted) before any call to + * park, so that park does not immediately return due to status + * being set via some other unrelated call to interrupt in user + * code. * * @param w the calling worker * @param c the ctl value on entry @@ -765,26 +782,26 @@ public class ForkJoinPool extends Abstra */ private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { int v = w.eventCount; - w.nextWait = (int)c; // w's successor record + w.nextWait = (int)c; // w's successor record long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - long d = ctl; // return true if lost to a deq, to force rescan + long d = ctl; // return true if lost to a deq, to force scan return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; } - if (parallelism + (int)(c >> AC_SHIFT) == 1 && + for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount + long s = stealCount; + if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) + sc = w.stealCount = 0; + else if (w.eventCount != v) + return true; // update next time + } + if ((int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && blockedCount == 0 && quiescerCount == 0) - idleAwaitWork(w, v); // quiescent -- maybe shrink - - boolean rescanned = false; - for (int sc;;) { + idleAwaitWork(w, nc, c, v); // quiescent + for (boolean rescanned = false;;) { if (w.eventCount != v) return true; - if ((sc = w.stealCount) != 0) { - long s = stealCount; // accumulate stealCount - if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc)) - w.stealCount = 0; - } - else if (!rescanned) { + if (!rescanned) { int g = scanGuard, m = g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws != null && m < ws.length) { @@ -821,37 +838,41 @@ public class ForkJoinPool extends Abstra } /** - * If pool is quiescent, checks for termination, and waits for - * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl - * has not changed, terminates the worker. Upon its termination - * (see deregisterWorker), it may wake up another worker to - * possibly repeat this process. + * If inactivating worker w has caused pool to become + * quiescent, check for pool termination, and wait for event + * for up to SHRINK_RATE nanosecs (rescans are unnecessary in + * this case because quiescence reflects consensus about lack + * of work). On timeout, if ctl has not changed, terminate the + * worker. Upon its termination (see deregisterWorker), it may + * wake up another worker to possibly repeat this process. * * @param w the calling worker - * @param v the eventCount w must wait until changed - */ - private void idleAwaitWork(ForkJoinWorkerThread w, int v) { - ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs - if (shutdown) - tryTerminate(false); - long c = ctl; - long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) | - (long)(w.nextWait & E_MASK)); // ctl value to release w - if (w.eventCount == v && - parallelism + (int)(c >> AC_SHIFT) == 0 && - blockedCount == 0 && quiescerCount == 0) { - long startTime = System.nanoTime(); - Thread.interrupted(); - if (w.eventCount == v) { + * @param currentCtl the ctl value after enqueuing w + * @param prevCtl the ctl value if w terminated + * @param v the eventCount w awaits change + */ + private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, + long prevCtl, int v) { + if (w.eventCount == v) { + if (shutdown) + tryTerminate(false); + ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs + while (ctl == currentCtl) { + long startTime = System.nanoTime(); w.parked = true; - if (w.eventCount == v) + if (w.eventCount == v) // must recheck LockSupport.parkNanos(this, SHRINK_RATE); w.parked = false; - if (w.eventCount == v && ctl == c && - System.nanoTime() - startTime >= SHRINK_RATE && - UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { - w.terminate = true; - w.eventCount = ((int)c + EC_UNIT) & E_MASK; + if (w.eventCount != v) + break; + else if (System.nanoTime() - startTime < + SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop + Thread.interrupted(); // spurious wakeup + else if (UNSAFE.compareAndSwapLong(this, ctlOffset, + currentCtl, prevCtl)) { + w.terminate = true; // restore previous + w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; + break; } } } @@ -887,7 +908,7 @@ public class ForkJoinPool extends Abstra /** * Creates or doubles submissionQueue array. - * Basically identical to ForkJoinWorkerThread version + * Basically identical to ForkJoinWorkerThread version. */ private void growSubmissionQueue() { ForkJoinTask[] oldQ = submissionQueue; @@ -992,7 +1013,7 @@ public class ForkJoinPool extends Abstra joinMe.tryAwaitDone(0L); postBlock(); } - if ((ctl & STOP_BIT) != 0L) + else if ((ctl & STOP_BIT) != 0L) joinMe.cancelIgnoringExceptions(); } } @@ -1127,7 +1148,7 @@ public class ForkJoinPool extends Abstra ws[k] = w; nextWorkerIndex = k + 1; int m = g & SMASK; - g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); + g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); } } finally { scanGuard = g; @@ -1207,7 +1228,7 @@ public class ForkJoinPool extends Abstra if ((int)(c >> AC_SHIFT) != -parallelism) return false; if (!shutdown || blockedCount != 0 || quiescerCount != 0 || - queueTop - queueBase > 0) { + queueBase != queueTop) { if (ctl == c) // staleness check return false; continue; @@ -1216,18 +1237,22 @@ public class ForkJoinPool extends Abstra if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) startTerminating(); } - if ((short)(c >>> TC_SHIFT) == -parallelism) { - submissionLock.lock(); - termination.signalAll(); - submissionLock.unlock(); + if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers + final ReentrantLock lock = this.submissionLock; + lock.lock(); + try { + termination.signalAll(); + } finally { + lock.unlock(); + } } return true; } /** * Runs up to three passes through workers: (0) Setting - * termination status for each worker, followed by wakeups up - * queued workers (1) helping cancel tasks (2) interrupting + * termination status for each worker, followed by wakeups up to + * queued workers; (1) helping cancel tasks; (2) interrupting * lagging threads (likely in external tasks, but possibly also * blocked in joins). Each pass repeats previous steps because of * potential lagging thread creation. @@ -1273,7 +1298,7 @@ public class ForkJoinPool extends Abstra /** * Tries to set the termination status of waiting workers, and - * then wake them up (after which they will terminate). + * then wakes them up (after which they will terminate). */ private void terminateWaiters() { ForkJoinWorkerThread[] ws = workers; @@ -1729,7 +1754,7 @@ public class ForkJoinPool extends Abstra /** * Returns an estimate of the number of tasks submitted to this - * pool that have not yet begun executing. This meThod may take + * pool that have not yet begun executing. This method may take * time proportional to the number of submissions. * * @return the number of queued submissions @@ -1966,13 +1991,15 @@ public class ForkJoinPool extends Abstra * {@code isReleasable} must return {@code true} if blocking is * not necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} - * before actually blocking). The unusual methods in this API - * accommodate synchronizers that may, but don't usually, block - * for long periods. Similarly, they allow more efficient internal - * handling of cases in which additional workers may be, but - * usually are not, needed to ensure sufficient parallelism. - * Toward this end, implementations of method {@code isReleasable} - * must be amenable to repeated invocation. + * before actually blocking). These actions are performed by any + * thread invoking {@link ForkJoinPool#managedBlock}. The + * unusual methods in this API accommodate synchronizers that may, + * but don't usually, block for long periods. Similarly, they + * allow more efficient internal handling of cases in which + * additional workers may be, but usually are not, needed to + * ensure sufficient parallelism. Toward this end, + * implementations of method {@code isReleasable} must be amenable + * to repeated invocation. * *

For example, here is a ManagedBlocker based on a * ReentrantLock: