151 |
|
* Updates tend not to contend with each other except during |
152 |
|
* bursts while submitted tasks begin or end. In some cases when |
153 |
|
* they do contend, threads can instead do something else |
154 |
< |
* (usually, scan for tesks) until contention subsides. |
154 |
> |
* (usually, scan for tasks) until contention subsides. |
155 |
|
* |
156 |
|
* To enable packing, we restrict maximum parallelism to (1<<15)-1 |
157 |
|
* (which is far in excess of normal operating range) to allow |
195 |
|
* shutdown schemes. |
196 |
|
* |
197 |
|
* Wait Queuing. Unlike HPC work-stealing frameworks, we cannot |
198 |
< |
* let workers spin indefinitely scanning for tasks when none are |
199 |
< |
* can be immediately found, and we cannot start/resume workers |
200 |
< |
* unless there appear to be tasks available. On the other hand, |
201 |
< |
* we must quickly prod them into action when new tasks are |
202 |
< |
* submitted or generated. We park/unpark workers after placing |
203 |
< |
* in an event wait queue when they cannot find work. This "queue" |
204 |
< |
* is actually a simple Treiber stack, headed by the "id" field of |
205 |
< |
* ctl, plus a 15bit counter value to both wake up waiters (by |
206 |
< |
* advancing their count) and avoid ABA effects. Successors are |
207 |
< |
* held in worker field "nextWait". Queuing deals with several |
208 |
< |
* intrinsic races, mainly that a task-producing thread can miss |
209 |
< |
* seeing (and signalling) another thread that gave up looking for |
210 |
< |
* work but has not yet entered the wait queue. We solve this by |
211 |
< |
* requiring a full sweep of all workers both before (in scan()) |
212 |
< |
* and after (in awaitWork()) a newly waiting worker is added to |
213 |
< |
* the wait queue. During a rescan, the worker might release some |
214 |
< |
* other queued worker rather than itself, which has the same net |
215 |
< |
* effect. |
198 |
> |
* let workers spin indefinitely scanning for tasks when none can |
199 |
> |
* be found immediately, and we cannot start/resume workers unless |
200 |
> |
* there appear to be tasks available. On the other hand, we must |
201 |
> |
* quickly prod them into action when new tasks are submitted or |
202 |
> |
* generated. We park/unpark workers after placing in an event |
203 |
> |
* wait queue when they cannot find work. This "queue" is actually |
204 |
> |
* a simple Treiber stack, headed by the "id" field of ctl, plus a |
205 |
> |
* 15bit counter value to both wake up waiters (by advancing their |
206 |
> |
* count) and avoid ABA effects. Successors are held in worker |
207 |
> |
* field "nextWait". Queuing deals with several intrinsic races, |
208 |
> |
* mainly that a task-producing thread can miss seeing (and |
209 |
> |
* signalling) another thread that gave up looking for work but |
210 |
> |
* has not yet entered the wait queue. We solve this by requiring |
211 |
> |
* a full sweep of all workers both before (in scan()) and after |
212 |
> |
* (in tryAwaitWork()) a newly waiting worker is added to the wait |
213 |
> |
* queue. During a rescan, the worker might release some other |
214 |
> |
* queued worker rather than itself, which has the same net |
215 |
> |
* effect. Because enqueued workers may actually be rescanning |
216 |
> |
* rather than waiting, we set and clear the "parked" field of |
217 |
> |
* ForkJoinWorkerThread to reduce unnecessary calls to unpark. |
218 |
> |
* (Use of the parked field requires a secondary recheck to avoid |
219 |
> |
* missed signals.) |
220 |
|
* |
221 |
|
* Signalling. We create or wake up workers only when there |
222 |
|
* appears to be at least one task they might be able to find and |
233 |
|
* Trimming workers. To release resources after periods of lack of |
234 |
|
* use, a worker starting to wait when the pool is quiescent will |
235 |
|
* time out and terminate if the pool has remained quiescent for |
236 |
< |
* SHRINK_RATE nanosecs. |
236 |
> |
* SHRINK_RATE nanosecs. This will slowly propagate, eventually |
237 |
> |
* terminating all workers after long periods of non-use. |
238 |
|
* |
239 |
|
* Submissions. External submissions are maintained in an |
240 |
|
* array-based queue that is structured identically to |
241 |
< |
* ForkJoinWorkerThread queues (which see) except for the use of |
242 |
< |
* submissionLock in method addSubmission. Unlike worker queues, |
243 |
< |
* multiple external threads can add new submissions. |
241 |
> |
* ForkJoinWorkerThread queues except for the use of |
242 |
> |
* submissionLock in method addSubmission. Unlike the case for |
243 |
> |
* worker queues, multiple external threads can add new |
244 |
> |
* submissions, so adding requires a lock. |
245 |
|
* |
246 |
|
* Compensation. Beyond work-stealing support and lifecycle |
247 |
|
* control, the main responsibility of this framework is to take |
278 |
|
* if blocking would leave less than one active (non-waiting, |
279 |
|
* non-blocked) worker. Additionally, to avoid some false alarms |
280 |
|
* due to GC, lagging counters, system activity, etc, compensated |
281 |
< |
* blocking for joins is only attempted after a number of rechecks |
282 |
< |
* proportional to the current apparent deficit (where retries are |
283 |
< |
* interspersed with Thread.yield, for good citizenship). The |
284 |
< |
* variable blockedCount, incremented before blocking and |
285 |
< |
* decremented after, is sometimes needed to distinguish cases of |
286 |
< |
* waiting for work vs blocking on joins or other managed sync, |
287 |
< |
* but both the cases are equivalent for most pool control, so we |
288 |
< |
* can update non-atomically. (Additionally, contention on |
283 |
< |
* blockedCount alleviates some contention on ctl). |
281 |
> |
* blocking for joins is only attempted after rechecks stabilize |
282 |
> |
* (retries are interspersed with Thread.yield, for good |
283 |
> |
* citizenship). The variable blockedCount, incremented before |
284 |
> |
* blocking and decremented after, is sometimes needed to |
285 |
> |
* distinguish cases of waiting for work vs blocking on joins or |
286 |
> |
* other managed sync. Both cases are equivalent for most pool |
287 |
> |
* control, so we can update non-atomically. (Additionally, |
288 |
> |
* contention on blockedCount alleviates some contention on ctl). |
289 |
|
* |
290 |
|
* Shutdown and Termination. A call to shutdownNow atomically sets |
291 |
|
* the ctl stop bit and then (non-atomically) sets each workers |
527 |
|
|
528 |
|
/** |
529 |
|
* Index (mod submission queue length) of next element to take |
530 |
< |
* from submission queue. |
530 |
> |
* from submission queue. Usage is identical to that for |
531 |
> |
* per-worker queues -- see ForkJoinWorkerThread internal |
532 |
> |
* documentation. |
533 |
|
*/ |
534 |
|
volatile int queueBase; |
535 |
|
|
536 |
|
/** |
537 |
|
* Index (mod submission queue length) of next element to add |
538 |
< |
* in submission queue. |
538 |
> |
* in submission queue. Usage is identical to that for |
539 |
> |
* per-worker queues -- see ForkJoinWorkerThread internal |
540 |
> |
* documentation. |
541 |
|
*/ |
542 |
|
int queueTop; |
543 |
|
|
575 |
|
private int nextWorkerIndex; |
576 |
|
|
577 |
|
/** |
578 |
< |
* SeqLock and index masking for for updates to workers array. |
579 |
< |
* Locked when SG_UNIT is set. Unlocking clears bit by adding |
578 |
> |
* SeqLock and index masking for updates to workers array. Locked |
579 |
> |
* when SG_UNIT is set. Unlocking clears bit by adding |
580 |
|
* SG_UNIT. Staleness of read-only operations can be checked by |
581 |
|
* comparing scanGuard to value before the reads. The low 16 bits |
582 |
|
* (i.e, anding with SMASK) hold (the smallest power of two |
767 |
|
* rescans queues to avoid missed signals. Upon finding work, |
768 |
|
* releases at least one worker (which may be the current |
769 |
|
* worker). Rescans restart upon detected staleness or failure to |
770 |
< |
* release due to contention. |
770 |
> |
* release due to contention. Note the unusual conventions about |
771 |
> |
* Thread.interrupt here and elsewhere: Because interrupts are |
772 |
> |
* used solely to alert threads to check termination, which is |
773 |
> |
* checked here anyway, we clear status (using Thread.interrupted) |
774 |
> |
* before any call to park, so that park does not immediately |
775 |
> |
* return due to status being set via some other unrelated call to |
776 |
> |
* interrupt in user code. |
777 |
|
* |
778 |
|
* @param w the calling worker |
779 |
|
* @param c the ctl value on entry |
906 |
|
|
907 |
|
/** |
908 |
|
* Creates or doubles submissionQueue array. |
909 |
< |
* Basically identical to ForkJoinWorkerThread version |
909 |
> |
* Basically identical to ForkJoinWorkerThread version. |
910 |
|
*/ |
911 |
|
private void growSubmissionQueue() { |
912 |
|
ForkJoinTask<?>[] oldQ = submissionQueue; |
1011 |
|
joinMe.tryAwaitDone(0L); |
1012 |
|
postBlock(); |
1013 |
|
} |
1014 |
< |
if ((ctl & STOP_BIT) != 0L) |
1014 |
> |
else if ((ctl & STOP_BIT) != 0L) |
1015 |
|
joinMe.cancelIgnoringExceptions(); |
1016 |
|
} |
1017 |
|
} |
1226 |
|
if ((int)(c >> AC_SHIFT) != -parallelism) |
1227 |
|
return false; |
1228 |
|
if (!shutdown || blockedCount != 0 || quiescerCount != 0 || |
1229 |
< |
queueTop - queueBase > 0) { |
1229 |
> |
queueBase != queueTop) { |
1230 |
|
if (ctl == c) // staleness check |
1231 |
|
return false; |
1232 |
|
continue; |
1235 |
|
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) |
1236 |
|
startTerminating(); |
1237 |
|
} |
1238 |
< |
if ((short)(c >>> TC_SHIFT) == -parallelism) { |
1239 |
< |
submissionLock.lock(); |
1240 |
< |
termination.signalAll(); |
1241 |
< |
submissionLock.unlock(); |
1238 |
> |
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers |
1239 |
> |
final ReentrantLock lock = this.submissionLock; |
1240 |
> |
lock.lock(); |
1241 |
> |
try { |
1242 |
> |
termination.signalAll(); |
1243 |
> |
} finally { |
1244 |
> |
lock.unlock(); |
1245 |
> |
} |
1246 |
|
} |
1247 |
|
return true; |
1248 |
|
} |
1249 |
|
|
1250 |
|
/** |
1251 |
|
* Runs up to three passes through workers: (0) Setting |
1252 |
< |
* termination status for each worker, followed by wakeups up |
1253 |
< |
* queued workers (1) helping cancel tasks (2) interrupting |
1252 |
> |
* termination status for each worker, followed by wakeups up to |
1253 |
> |
* queued workers; (1) helping cancel tasks; (2) interrupting |
1254 |
|
* lagging threads (likely in external tasks, but possibly also |
1255 |
|
* blocked in joins). Each pass repeats previous steps because of |
1256 |
|
* potential lagging thread creation. |
1296 |
|
|
1297 |
|
/** |
1298 |
|
* Tries to set the termination status of waiting workers, and |
1299 |
< |
* then wake them up (after which they will terminate). |
1299 |
> |
* then wakes them up (after which they will terminate). |
1300 |
|
*/ |
1301 |
|
private void terminateWaiters() { |
1302 |
|
ForkJoinWorkerThread[] ws = workers; |