--- jsr166/src/jsr166y/ForkJoinPool.java 2011/02/23 12:48:43 1.93 +++ jsr166/src/jsr166y/ForkJoinPool.java 2011/03/21 23:29:03 1.98 @@ -1,7 +1,7 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package jsr166y; @@ -151,7 +151,7 @@ public class ForkJoinPool extends Abstra * Updates tend not to contend with each other except during * bursts while submitted tasks begin or end. In some cases when * they do contend, threads can instead do something else - * (usually, scan for tesks) until contention subsides. + * (usually, scan for tasks) until contention subsides. * * To enable packing, we restrict maximum parallelism to (1<<15)-1 * (which is far in excess of normal operating range) to allow @@ -195,24 +195,28 @@ public class ForkJoinPool extends Abstra * shutdown schemes. * * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot - * let workers spin indefinitely scanning for tasks when none are - * can be immediately found, and we cannot start/resume workers - * unless there appear to be tasks available. On the other hand, - * we must quickly prod them into action when new tasks are - * submitted or generated. We park/unpark workers after placing - * in an event wait queue when they cannot find work. This "queue" - * is actually a simple Treiber stack, headed by the "id" field of - * ctl, plus a 15bit counter value to both wake up waiters (by - * advancing their count) and avoid ABA effects. Successors are - * held in worker field "nextWait". Queuing deals with several - * intrinsic races, mainly that a task-producing thread can miss - * seeing (and signalling) another thread that gave up looking for - * work but has not yet entered the wait queue. We solve this by - * requiring a full sweep of all workers both before (in scan()) - * and after (in awaitWork()) a newly waiting worker is added to - * the wait queue. During a rescan, the worker might release some - * other queued worker rather than itself, which has the same net - * effect. + * let workers spin indefinitely scanning for tasks when none can + * be found immediately, and we cannot start/resume workers unless + * there appear to be tasks available. On the other hand, we must + * quickly prod them into action when new tasks are submitted or + * generated. We park/unpark workers after placing in an event + * wait queue when they cannot find work. This "queue" is actually + * a simple Treiber stack, headed by the "id" field of ctl, plus a + * 15bit counter value to both wake up waiters (by advancing their + * count) and avoid ABA effects. Successors are held in worker + * field "nextWait". Queuing deals with several intrinsic races, + * mainly that a task-producing thread can miss seeing (and + * signalling) another thread that gave up looking for work but + * has not yet entered the wait queue. We solve this by requiring + * a full sweep of all workers both before (in scan()) and after + * (in tryAwaitWork()) a newly waiting worker is added to the wait + * queue. During a rescan, the worker might release some other + * queued worker rather than itself, which has the same net + * effect. Because enqueued workers may actually be rescanning + * rather than waiting, we set and clear the "parked" field of + * ForkJoinWorkerThread to reduce unnecessary calls to unpark. + * (Use of the parked field requires a secondary recheck to avoid + * missed signals.) * * Signalling. We create or wake up workers only when there * appears to be at least one task they might be able to find and @@ -229,13 +233,15 @@ public class ForkJoinPool extends Abstra * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will * time out and terminate if the pool has remained quiescent for - * SHRINK_RATE nanosecs. + * SHRINK_RATE nanosecs. This will slowly propagate, eventually + * terminating all workers after long periods of non-use. * * Submissions. External submissions are maintained in an * array-based queue that is structured identically to - * ForkJoinWorkerThread queues (which see) except for the use of - * submissionLock in method addSubmission. Unlike worker queues, - * multiple external threads can add new submissions. + * ForkJoinWorkerThread queues except for the use of + * submissionLock in method addSubmission. Unlike the case for + * worker queues, multiple external threads can add new + * submissions, so adding requires a lock. * * Compensation. Beyond work-stealing support and lifecycle * control, the main responsibility of this framework is to take @@ -272,15 +278,14 @@ public class ForkJoinPool extends Abstra * if blocking would leave less than one active (non-waiting, * non-blocked) worker. Additionally, to avoid some false alarms * due to GC, lagging counters, system activity, etc, compensated - * blocking for joins is only attempted after a number of rechecks - * proportional to the current apparent deficit (where retries are - * interspersed with Thread.yield, for good citizenship). The - * variable blockedCount, incremented before blocking and - * decremented after, is sometimes needed to distinguish cases of - * waiting for work vs blocking on joins or other managed sync, - * but both the cases are equivalent for most pool control, so we - * can update non-atomically. (Additionally, contention on - * blockedCount alleviates some contention on ctl). + * blocking for joins is only attempted after rechecks stabilize + * (retries are interspersed with Thread.yield, for good + * citizenship). The variable blockedCount, incremented before + * blocking and decremented after, is sometimes needed to + * distinguish cases of waiting for work vs blocking on joins or + * other managed sync. Both cases are equivalent for most pool + * control, so we can update non-atomically. (Additionally, + * contention on blockedCount alleviates some contention on ctl). * * Shutdown and Termination. A call to shutdownNow atomically sets * the ctl stop bit and then (non-atomically) sets each workers @@ -478,9 +483,7 @@ public class ForkJoinPool extends Abstra * negative, there is at least one waiting worker, and when e is * negative, the pool is terminating. To deal with these possibly * negative fields, we use casts in and out of "short" and/or - * signed shifts to maintain signedness. Note: AC_SHIFT is - * redundantly declared in ForkJoinWorkerThread in order to - * integrate a surplus-threads check. + * signed shifts to maintain signedness. */ volatile long ctl; @@ -524,13 +527,17 @@ public class ForkJoinPool extends Abstra /** * Index (mod submission queue length) of next element to take - * from submission queue. + * from submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ volatile int queueBase; /** * Index (mod submission queue length) of next element to add - * in submission queue. + * in submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ int queueTop; @@ -568,8 +575,8 @@ public class ForkJoinPool extends Abstra private int nextWorkerIndex; /** - * SeqLock and index masking for for updates to workers array. - * Locked when SG_UNIT is set. Unlocking clears bit by adding + * SeqLock and index masking for updates to workers array. Locked + * when SG_UNIT is set. Unlocking clears bit by adding * SG_UNIT. Staleness of read-only operations can be checked by * comparing scanGuard to value before the reads. The low 16 bits * (i.e, anding with SMASK) hold (the smallest power of two @@ -707,7 +714,7 @@ public class ForkJoinPool extends Abstra */ private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active - int m = parallelism == 1 - a? 0 : g & SMASK; + int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; @@ -760,7 +767,13 @@ public class ForkJoinPool extends Abstra * rescans queues to avoid missed signals. Upon finding work, * releases at least one worker (which may be the current * worker). Rescans restart upon detected staleness or failure to - * release due to contention. + * release due to contention. Note the unusual conventions about + * Thread.interrupt here and elsewhere: Because interrupts are + * used solely to alert threads to check termination, which is + * checked here anyway, we clear status (using Thread.interrupted) + * before any call to park, so that park does not immediately + * return due to status being set via some other unrelated call to + * interrupt in user code. * * @param w the calling worker * @param c the ctl value on entry @@ -893,7 +906,7 @@ public class ForkJoinPool extends Abstra /** * Creates or doubles submissionQueue array. - * Basically identical to ForkJoinWorkerThread version + * Basically identical to ForkJoinWorkerThread version. */ private void growSubmissionQueue() { ForkJoinTask[] oldQ = submissionQueue; @@ -998,7 +1011,7 @@ public class ForkJoinPool extends Abstra joinMe.tryAwaitDone(0L); postBlock(); } - if ((ctl & STOP_BIT) != 0L) + else if ((ctl & STOP_BIT) != 0L) joinMe.cancelIgnoringExceptions(); } } @@ -1133,7 +1146,7 @@ public class ForkJoinPool extends Abstra ws[k] = w; nextWorkerIndex = k + 1; int m = g & SMASK; - g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); + g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); } } finally { scanGuard = g; @@ -1213,7 +1226,7 @@ public class ForkJoinPool extends Abstra if ((int)(c >> AC_SHIFT) != -parallelism) return false; if (!shutdown || blockedCount != 0 || quiescerCount != 0 || - queueTop - queueBase > 0) { + queueBase != queueTop) { if (ctl == c) // staleness check return false; continue; @@ -1222,18 +1235,22 @@ public class ForkJoinPool extends Abstra if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) startTerminating(); } - if ((short)(c >>> TC_SHIFT) == -parallelism) { - submissionLock.lock(); - termination.signalAll(); - submissionLock.unlock(); + if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers + final ReentrantLock lock = this.submissionLock; + lock.lock(); + try { + termination.signalAll(); + } finally { + lock.unlock(); + } } return true; } /** * Runs up to three passes through workers: (0) Setting - * termination status for each worker, followed by wakeups up - * queued workers (1) helping cancel tasks (2) interrupting + * termination status for each worker, followed by wakeups up to + * queued workers; (1) helping cancel tasks; (2) interrupting * lagging threads (likely in external tasks, but possibly also * blocked in joins). Each pass repeats previous steps because of * potential lagging thread creation. @@ -1279,7 +1296,7 @@ public class ForkJoinPool extends Abstra /** * Tries to set the termination status of waiting workers, and - * then wake them up (after which they will terminate). + * then wakes them up (after which they will terminate). */ private void terminateWaiters() { ForkJoinWorkerThread[] ws = workers; @@ -1735,7 +1752,7 @@ public class ForkJoinPool extends Abstra /** * Returns an estimate of the number of tasks submitted to this - * pool that have not yet begun executing. This meThod may take + * pool that have not yet begun executing. This method may take * time proportional to the number of submissions. * * @return the number of queued submissions