--- jsr166/src/jsr166y/ForkJoinPool.java 2011/03/01 10:59:04 1.94 +++ jsr166/src/jsr166y/ForkJoinPool.java 2011/03/04 13:29:39 1.95 @@ -151,7 +151,7 @@ public class ForkJoinPool extends Abstra * Updates tend not to contend with each other except during * bursts while submitted tasks begin or end. In some cases when * they do contend, threads can instead do something else - * (usually, scan for tesks) until contention subsides. + * (usually, scan for tasks) until contention subsides. * * To enable packing, we restrict maximum parallelism to (1<<15)-1 * (which is far in excess of normal operating range) to allow @@ -195,24 +195,28 @@ public class ForkJoinPool extends Abstra * shutdown schemes. * * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot - * let workers spin indefinitely scanning for tasks when none are - * can be immediately found, and we cannot start/resume workers - * unless there appear to be tasks available. On the other hand, - * we must quickly prod them into action when new tasks are - * submitted or generated. We park/unpark workers after placing - * in an event wait queue when they cannot find work. This "queue" - * is actually a simple Treiber stack, headed by the "id" field of - * ctl, plus a 15bit counter value to both wake up waiters (by - * advancing their count) and avoid ABA effects. Successors are - * held in worker field "nextWait". Queuing deals with several - * intrinsic races, mainly that a task-producing thread can miss - * seeing (and signalling) another thread that gave up looking for - * work but has not yet entered the wait queue. We solve this by - * requiring a full sweep of all workers both before (in scan()) - * and after (in awaitWork()) a newly waiting worker is added to - * the wait queue. During a rescan, the worker might release some - * other queued worker rather than itself, which has the same net - * effect. + * let workers spin indefinitely scanning for tasks when none can + * be found immediately, and we cannot start/resume workers unless + * there appear to be tasks available. On the other hand, we must + * quickly prod them into action when new tasks are submitted or + * generated. We park/unpark workers after placing in an event + * wait queue when they cannot find work. This "queue" is actually + * a simple Treiber stack, headed by the "id" field of ctl, plus a + * 15bit counter value to both wake up waiters (by advancing their + * count) and avoid ABA effects. Successors are held in worker + * field "nextWait". Queuing deals with several intrinsic races, + * mainly that a task-producing thread can miss seeing (and + * signalling) another thread that gave up looking for work but + * has not yet entered the wait queue. We solve this by requiring + * a full sweep of all workers both before (in scan()) and after + * (in tryAwaitWork()) a newly waiting worker is added to the wait + * queue. During a rescan, the worker might release some other + * queued worker rather than itself, which has the same net + * effect. Because enqueued workers may actually be rescanning + * rather than waiting, we set and clear the "parked" field of + * ForkJoinWorkerThread to reduce unnecessary calls to unpark. + * (Use of the parked field requires a secondary recheck to avoid + * missed signals.) * * Signalling. We create or wake up workers only when there * appears to be at least one task they might be able to find and @@ -229,13 +233,15 @@ public class ForkJoinPool extends Abstra * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will * time out and terminate if the pool has remained quiescent for - * SHRINK_RATE nanosecs. + * SHRINK_RATE nanosecs. This will slowly propagate, eventually + * terminating all workers after long periods of non-use. * * Submissions. External submissions are maintained in an * array-based queue that is structured identically to - * ForkJoinWorkerThread queues (which see) except for the use of - * submissionLock in method addSubmission. Unlike worker queues, - * multiple external threads can add new submissions. + * ForkJoinWorkerThread queues except for the use of + * submissionLock in method addSubmission. Unlike the case for + * worker queues, multiple external threads can add new + * submissions, so adding requires a lock. * * Compensation. Beyond work-stealing support and lifecycle * control, the main responsibility of this framework is to take @@ -272,15 +278,14 @@ public class ForkJoinPool extends Abstra * if blocking would leave less than one active (non-waiting, * non-blocked) worker. Additionally, to avoid some false alarms * due to GC, lagging counters, system activity, etc, compensated - * blocking for joins is only attempted after a number of rechecks - * proportional to the current apparent deficit (where retries are - * interspersed with Thread.yield, for good citizenship). The - * variable blockedCount, incremented before blocking and - * decremented after, is sometimes needed to distinguish cases of - * waiting for work vs blocking on joins or other managed sync, - * but both the cases are equivalent for most pool control, so we - * can update non-atomically. (Additionally, contention on - * blockedCount alleviates some contention on ctl). + * blocking for joins is only attempted after rechecks stabilize + * (retries are interspersed with Thread.yield, for good + * citizenship). The variable blockedCount, incremented before + * blocking and decremented after, is sometimes needed to + * distinguish cases of waiting for work vs blocking on joins or + * other managed sync. Both cases are equivalent for most pool + * control, so we can update non-atomically. (Additionally, + * contention on blockedCount alleviates some contention on ctl). * * Shutdown and Termination. A call to shutdownNow atomically sets * the ctl stop bit and then (non-atomically) sets each workers @@ -522,13 +527,17 @@ public class ForkJoinPool extends Abstra /** * Index (mod submission queue length) of next element to take - * from submission queue. + * from submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ volatile int queueBase; /** * Index (mod submission queue length) of next element to add - * in submission queue. + * in submission queue. Usage is identical to that for + * per-worker queues -- see ForkJoinWorkerThread internal + * documentation. */ int queueTop; @@ -566,8 +575,8 @@ public class ForkJoinPool extends Abstra private int nextWorkerIndex; /** - * SeqLock and index masking for for updates to workers array. - * Locked when SG_UNIT is set. Unlocking clears bit by adding + * SeqLock and index masking for updates to workers array. Locked + * when SG_UNIT is set. Unlocking clears bit by adding * SG_UNIT. Staleness of read-only operations can be checked by * comparing scanGuard to value before the reads. The low 16 bits * (i.e, anding with SMASK) hold (the smallest power of two @@ -758,7 +767,13 @@ public class ForkJoinPool extends Abstra * rescans queues to avoid missed signals. Upon finding work, * releases at least one worker (which may be the current * worker). Rescans restart upon detected staleness or failure to - * release due to contention. + * release due to contention. Note the unusual conventions about + * Thread.interrupt here and elsewhere: Because interrupts are + * used solely to alert threads to check termination, which is + * checked here anyway, we clear status (using Thread.interrupted) + * before any call to park, so that park does not immediately + * return due to status being set via some other unrelated call to + * interrupt in user code. * * @param w the calling worker * @param c the ctl value on entry @@ -891,7 +906,7 @@ public class ForkJoinPool extends Abstra /** * Creates or doubles submissionQueue array. - * Basically identical to ForkJoinWorkerThread version + * Basically identical to ForkJoinWorkerThread version. */ private void growSubmissionQueue() { ForkJoinTask[] oldQ = submissionQueue; @@ -996,7 +1011,7 @@ public class ForkJoinPool extends Abstra joinMe.tryAwaitDone(0L); postBlock(); } - if ((ctl & STOP_BIT) != 0L) + else if ((ctl & STOP_BIT) != 0L) joinMe.cancelIgnoringExceptions(); } } @@ -1211,7 +1226,7 @@ public class ForkJoinPool extends Abstra if ((int)(c >> AC_SHIFT) != -parallelism) return false; if (!shutdown || blockedCount != 0 || quiescerCount != 0 || - queueTop - queueBase > 0) { + queueBase != queueTop) { if (ctl == c) // staleness check return false; continue; @@ -1220,18 +1235,22 @@ public class ForkJoinPool extends Abstra if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) startTerminating(); } - if ((short)(c >>> TC_SHIFT) == -parallelism) { - submissionLock.lock(); - termination.signalAll(); - submissionLock.unlock(); + if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers + final ReentrantLock lock = this.submissionLock; + lock.lock(); + try { + termination.signalAll(); + } finally { + lock.unlock(); + } } return true; } /** * Runs up to three passes through workers: (0) Setting - * termination status for each worker, followed by wakeups up - * queued workers (1) helping cancel tasks (2) interrupting + * termination status for each worker, followed by wakeups up to + * queued workers; (1) helping cancel tasks; (2) interrupting * lagging threads (likely in external tasks, but possibly also * blocked in joins). Each pass repeats previous steps because of * potential lagging thread creation. @@ -1277,7 +1296,7 @@ public class ForkJoinPool extends Abstra /** * Tries to set the termination status of waiting workers, and - * then wake them up (after which they will terminate). + * then wakes them up (after which they will terminate). */ private void terminateWaiters() { ForkJoinWorkerThread[] ws = workers;