3 |
|
* Expert Group and released to the public domain, as explained at |
4 |
|
* http://creativecommons.org/publicdomain/zero/1.0/ |
5 |
|
*/ |
6 |
– |
|
6 |
|
package java.util.concurrent; |
7 |
|
|
8 |
|
import java.lang.Thread.UncaughtExceptionHandler; |
14 |
|
import java.util.Collection; |
15 |
|
import java.util.Collections; |
16 |
|
import java.util.List; |
17 |
< |
import java.util.concurrent.locks.ReentrantLock; |
17 |
> |
import java.util.concurrent.TimeUnit; |
18 |
> |
import java.util.concurrent.CountedCompleter; |
19 |
> |
import java.util.concurrent.ForkJoinTask; |
20 |
> |
import java.util.concurrent.ForkJoinWorkerThread; |
21 |
|
import java.util.concurrent.locks.LockSupport; |
22 |
|
|
23 |
|
/** |
54 |
|
* However, no such adjustments are guaranteed in the face of blocked |
55 |
|
* I/O or other unmanaged synchronization. The nested {@link |
56 |
|
* ManagedBlocker} interface enables extension of the kinds of |
57 |
< |
* synchronization accommodated. |
57 |
> |
* synchronization accommodated. The default policies may be |
58 |
> |
* overridden using a constructor with parameters corresponding to |
59 |
> |
* those documented in class {@link ThreadPoolExecutor}. |
60 |
|
* |
61 |
|
* <p>In addition to execution and lifecycle control methods, this |
62 |
|
* class provides status check methods (for example |
137 |
|
* @since 1.7 |
138 |
|
* @author Doug Lea |
139 |
|
*/ |
136 |
– |
@jdk.internal.vm.annotation.Contended |
140 |
|
public class ForkJoinPool extends AbstractExecutorService { |
141 |
|
|
142 |
|
/* |
203 |
|
* (CAS slot to null)) |
204 |
|
* increment base and return task; |
205 |
|
* |
206 |
< |
* There are several variants of each of these; for example most |
207 |
< |
* versions of poll pre-screen the CAS by rechecking that the base |
208 |
< |
* has not changed since reading the slot, and most methods only |
206 |
< |
* attempt the CAS if base appears not to be equal to top. |
206 |
> |
* There are several variants of each of these. In particular, |
207 |
> |
* almost all uses of poll occur within scan operations that also |
208 |
> |
* interleave contention tracking (with associated code sprawl.) |
209 |
|
* |
210 |
|
* Memory ordering. See "Correct and Efficient Work-Stealing for |
211 |
|
* Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 |
237 |
|
* thief chooses a different random victim target to try next. So, |
238 |
|
* in order for one thief to progress, it suffices for any |
239 |
|
* in-progress poll or new push on any empty queue to |
240 |
< |
* complete. (This is why we normally use method pollAt and its |
239 |
< |
* variants that try once at the apparent base index, else |
240 |
< |
* consider alternative actions, rather than method poll, which |
241 |
< |
* retries.) |
240 |
> |
* complete. |
241 |
|
* |
242 |
|
* This approach also enables support of a user mode in which |
243 |
|
* local task processing is in FIFO, not LIFO order, simply by |
252 |
|
* choosing existing queues, and may be randomly repositioned upon |
253 |
|
* contention with other submitters. In essence, submitters act |
254 |
|
* like workers except that they are restricted to executing local |
255 |
< |
* tasks that they submitted (or in the case of CountedCompleters, |
256 |
< |
* others with the same root task). Insertion of tasks in shared |
257 |
< |
* mode requires a lock but we use only a simple spinlock (using |
258 |
< |
* field qlock), because submitters encountering a busy queue move |
259 |
< |
* on to try or create other queues -- they block only when |
260 |
< |
* creating and registering new queues. Because it is used only as |
261 |
< |
* a spinlock, unlocking requires only a "releasing" store (using |
263 |
< |
* putOrderedInt). The qlock is also used during termination |
264 |
< |
* detection, in which case it is forced to a negative |
265 |
< |
* non-lockable value. |
255 |
> |
* tasks that they submitted. Insertion of tasks in shared mode |
256 |
> |
* requires a lock but we use only a simple spinlock (using field |
257 |
> |
* phase), because submitters encountering a busy queue move on to |
258 |
> |
* try or create other queues -- they block only when creating and |
259 |
> |
* registering new queues. Because it is used only as a spinlock, |
260 |
> |
* unlocking requires only a "releasing" store (using |
261 |
> |
* putOrderedInt). |
262 |
|
* |
263 |
|
* Management |
264 |
|
* ========== |
272 |
|
* There are only a few properties that we can globally track or |
273 |
|
* maintain, so we pack them into a small number of variables, |
274 |
|
* often maintaining atomicity without blocking or locking. |
275 |
< |
* Nearly all essentially atomic control state is held in two |
275 |
> |
* Nearly all essentially atomic control state is held in a few |
276 |
|
* volatile variables that are by far most often read (not |
277 |
< |
* written) as status and consistency checks. (Also, field |
278 |
< |
* "config" holds unchanging configuration state.) |
277 |
> |
* written) as status and consistency checks. We pack as much |
278 |
> |
* information into them as we can. |
279 |
|
* |
280 |
|
* Field "ctl" contains 64 bits holding information needed to |
281 |
< |
* atomically decide to add, inactivate, enqueue (on an event |
282 |
< |
* queue), dequeue, and/or re-activate workers. To enable this |
281 |
> |
* atomically decide to add, enqueue (on an event queue), and |
282 |
> |
* dequeue (and release)-activate workers. To enable this |
283 |
|
* packing, we restrict maximum parallelism to (1<<15)-1 (which is |
284 |
|
* far in excess of normal operating range) to allow ids, counts, |
285 |
|
* and their negations (used for thresholding) to fit into 16bit |
286 |
|
* subfields. |
287 |
|
* |
288 |
< |
* Field "runState" holds lifetime status, atomically and |
289 |
< |
* monotonically setting STARTED, SHUTDOWN, STOP, and finally |
290 |
< |
* TERMINATED bits. |
295 |
< |
* |
296 |
< |
* Field "auxState" is a ReentrantLock subclass that also |
297 |
< |
* opportunistically holds some other bookkeeping fields accessed |
298 |
< |
* only when locked. It is mainly used to lock (infrequent) |
299 |
< |
* updates to workQueues. The auxState instance is itself lazily |
300 |
< |
* constructed (see tryInitialize), requiring a double-check-style |
301 |
< |
* bootstrapping use of field runState, and locking a private |
302 |
< |
* static. |
288 |
> |
* Field "mode" holds configuration parameters as well as lifetime |
289 |
> |
* status, atomically and monotonically setting SHUTDOWN, STOP, |
290 |
> |
* and finally TERMINATED bits. |
291 |
|
* |
292 |
|
* Field "workQueues" holds references to WorkQueues. It is |
293 |
< |
* updated (only during worker creation and termination) under the |
294 |
< |
* lock, but is otherwise concurrently readable, and accessed |
295 |
< |
* directly. We also ensure that reads of the array reference |
296 |
< |
* itself never become too stale (for example, re-reading before |
297 |
< |
* each scan). To simplify index-based operations, the array size |
298 |
< |
* is always a power of two, and all readers must tolerate null |
299 |
< |
* slots. Worker queues are at odd indices. Shared (submission) |
300 |
< |
* queues are at even indices, up to a maximum of 64 slots, to |
301 |
< |
* limit growth even if array needs to expand to add more |
302 |
< |
* workers. Grouping them together in this way simplifies and |
293 |
> |
* updated (only during worker creation and termination) under |
294 |
> |
* lock (using field workerNamePrefix as lock), but is otherwise |
295 |
> |
* concurrently readable, and accessed directly. We also ensure |
296 |
> |
* that uses of the array reference itself never become too stale |
297 |
> |
* in case of resizing. To simplify index-based operations, the |
298 |
> |
* array size is always a power of two, and all readers must |
299 |
> |
* tolerate null slots. Worker queues are at odd indices. Shared |
300 |
> |
* (submission) queues are at even indices, up to a maximum of 64 |
301 |
> |
* slots, to limit growth even if array needs to expand to add |
302 |
> |
* more workers. Grouping them together in this way simplifies and |
303 |
|
* speeds up task scanning. |
304 |
|
* |
305 |
|
* All worker thread creation is on-demand, triggered by task |
319 |
|
* workers unless there appear to be tasks available. On the |
320 |
|
* other hand, we must quickly prod them into action when new |
321 |
|
* tasks are submitted or generated. In many usages, ramp-up time |
322 |
< |
* to activate workers is the main limiting factor in overall |
323 |
< |
* performance, which is compounded at program start-up by JIT |
324 |
< |
* compilation and allocation. So we streamline this as much as |
325 |
< |
* possible. |
326 |
< |
* |
327 |
< |
* The "ctl" field atomically maintains active and total worker |
328 |
< |
* counts as well as a queue to place waiting threads so they can |
329 |
< |
* be located for signalling. Active counts also play the role of |
330 |
< |
* quiescence indicators, so are decremented when workers believe |
331 |
< |
* that there are no more tasks to execute. The "queue" is |
332 |
< |
* actually a form of Treiber stack. A stack is ideal for |
333 |
< |
* activating threads in most-recently used order. This improves |
322 |
> |
* is the main limiting factor in overall performance, which is |
323 |
> |
* compounded at program start-up by JIT compilation and |
324 |
> |
* allocation. So we streamline this as much as possible. |
325 |
> |
* |
326 |
> |
* The "ctl" field atomically maintains total worker and |
327 |
> |
* "released" worker counts, plus the head of the available worker |
328 |
> |
* queue (actually stack, represented by the lower 32bit subfield |
329 |
> |
* of ctl). Released workers are those known to be scanning for |
330 |
> |
* and/or running tasks. Unreleased ("available") workers are |
331 |
> |
* recorded in the ctl stack. These workers are made available for |
332 |
> |
* signalling by enqueuing in ctl (see method runWorker). The |
333 |
> |
* "queue" is a form of Treiber stack. This is ideal for |
334 |
> |
* activating threads in most-recently used order, and improves |
335 |
|
* performance and locality, outweighing the disadvantages of |
336 |
|
* being prone to contention and inability to release a worker |
337 |
< |
* unless it is topmost on stack. We block/unblock workers after |
338 |
< |
* pushing on the idle worker stack (represented by the lower |
339 |
< |
* 32bit subfield of ctl) when they cannot find work. The top |
340 |
< |
* stack state holds the value of the "scanState" field of the |
341 |
< |
* worker: its index and status, plus a version counter that, in |
342 |
< |
* addition to the count subfields (also serving as version |
343 |
< |
* stamps) provide protection against Treiber stack ABA effects. |
337 |
> |
* unless it is topmost on stack. To avoid missed signal problems |
338 |
> |
* inherent in any wait/signal design, available workers rescan |
339 |
> |
* for (and if found run) tasks after enqueuing. Normally their |
340 |
> |
* release status will be updated while doing so, but the released |
341 |
> |
* worker ctl count may underestimate the number of active |
342 |
> |
* threads. (However, it is still possible to determine quiescence |
343 |
> |
* via a validation traversal -- see isQuiescent). After an |
344 |
> |
* unsuccessful rescan, available workers are blocked until |
345 |
> |
* signalled (see signalWork). The top stack state holds the |
346 |
> |
* value of the "phase" field of the worker: its index and status, |
347 |
> |
* plus a version counter that, in addition to the count subfields |
348 |
> |
* (also serving as version stamps) provide protection against |
349 |
> |
* Treiber stack ABA effects. |
350 |
|
* |
351 |
< |
* Creating workers. To create a worker, we pre-increment total |
352 |
< |
* count (serving as a reservation), and attempt to construct a |
351 |
> |
* Creating workers. To create a worker, we pre-increment counts |
352 |
> |
* (serving as a reservation), and attempt to construct a |
353 |
|
* ForkJoinWorkerThread via its factory. Upon construction, the |
354 |
|
* new thread invokes registerWorker, where it constructs a |
355 |
|
* WorkQueue and is assigned an index in the workQueues array |
371 |
|
* submission queues for existing external threads (see |
372 |
|
* externalPush). |
373 |
|
* |
374 |
< |
* WorkQueue field scanState is used by both workers and the pool |
375 |
< |
* to manage and track whether a worker is UNSIGNALLED (possibly |
376 |
< |
* blocked waiting for a signal). When a worker is inactivated, |
377 |
< |
* its scanState field is set, and is prevented from executing |
378 |
< |
* tasks, even though it must scan once for them to avoid queuing |
379 |
< |
* races. Note that scanState updates lag queue CAS releases so |
380 |
< |
* usage requires care. When queued, the lower 16 bits of |
381 |
< |
* scanState must hold its pool index. So we place the index there |
382 |
< |
* upon initialization (see registerWorker) and otherwise keep it |
388 |
< |
* there or restore it when necessary. |
374 |
> |
* WorkQueue field "phase" is used by both workers and the pool to |
375 |
> |
* manage and track whether a worker is UNSIGNALLED (possibly |
376 |
> |
* blocked waiting for a signal). When a worker is enqueued its |
377 |
> |
* phase field is set. Note that phase field updates lag queue CAS |
378 |
> |
* releases so usage requires care -- seeing a negative phase does |
379 |
> |
* not guarantee that the worker is available. When queued, the |
380 |
> |
* lower 16 bits of scanState must hold its pool index. So we |
381 |
> |
* place the index there upon initialization (see registerWorker) |
382 |
> |
* and otherwise keep it there or restore it when necessary. |
383 |
|
* |
384 |
|
* The ctl field also serves as the basis for memory |
385 |
|
* synchronization surrounding activation. This uses a more |
388 |
|
* if to its current value). This would be extremely costly. So |
389 |
|
* we relax it in several ways: (1) Producers only signal when |
390 |
|
* their queue is empty. Other workers propagate this signal (in |
391 |
< |
* method scan) when they find tasks. (2) Workers only enqueue |
392 |
< |
* after scanning (see below) and not finding any tasks. (3) |
393 |
< |
* Rather than CASing ctl to its current value in the common case |
394 |
< |
* where no action is required, we reduce write contention by |
395 |
< |
* equivalently prefacing signalWork when called by an external |
396 |
< |
* task producer using a memory access with full-volatile |
397 |
< |
* semantics or a "fullFence". (4) For internal task producers we |
398 |
< |
* rely on the fact that even if no other workers awaken, the |
405 |
< |
* producer itself will eventually see the task and execute it. |
391 |
> |
* method scan) when they find tasks; to further reduce flailing, |
392 |
> |
* each worker signals only one other per activation. (2) Workers |
393 |
> |
* only enqueue after scanning (see below) and not finding any |
394 |
> |
* tasks. (3) Rather than CASing ctl to its current value in the |
395 |
> |
* common case where no action is required, we reduce write |
396 |
> |
* contention by equivalently prefacing signalWork when called by |
397 |
> |
* an external task producer using a memory access with |
398 |
> |
* full-volatile semantics or a "fullFence". |
399 |
|
* |
400 |
|
* Almost always, too many signals are issued. A task producer |
401 |
|
* cannot in general tell if some existing worker is in the midst |
407 |
|
* and bookkeeping bottlenecks during ramp-up, ramp-down, and small |
408 |
|
* computations involving only a few workers. |
409 |
|
* |
410 |
< |
* Scanning. Method scan() performs top-level scanning for tasks. |
411 |
< |
* Each scan traverses (and tries to poll from) each queue in |
412 |
< |
* pseudorandom permutation order by randomly selecting an origin |
413 |
< |
* index and a step value. (The pseudorandom generator need not |
414 |
< |
* have high-quality statistical properties in the long term, but |
415 |
< |
* just within computations; We use 64bit and 32bit Marsaglia |
416 |
< |
* XorShifts, which are cheap and suffice here.) Scanning also |
417 |
< |
* employs contention reduction: When scanning workers fail a CAS |
418 |
< |
* polling for work, they soon restart with a different |
419 |
< |
* pseudorandom scan order (thus likely retrying at different |
420 |
< |
* intervals). This improves throughput when many threads are |
421 |
< |
* trying to take tasks from few queues. Scans do not otherwise |
422 |
< |
* explicitly take into account core affinities, loads, cache |
423 |
< |
* localities, etc, However, they do exploit temporal locality |
424 |
< |
* (which usually approximates these) by preferring to re-poll (up |
425 |
< |
* to POLL_LIMIT times) from the same queue after a successful |
426 |
< |
* poll before trying others. Restricted forms of scanning occur |
427 |
< |
* in methods helpComplete and findNonEmptyStealQueue, and take |
435 |
< |
* similar but simpler forms. |
436 |
< |
* |
437 |
< |
* Deactivation and waiting. Queuing encounters several intrinsic |
438 |
< |
* races; most notably that an inactivating scanning worker can |
439 |
< |
* miss seeing a task produced during a scan. So when a worker |
440 |
< |
* cannot find a task to steal, it inactivates and enqueues, and |
441 |
< |
* then rescans to ensure that it didn't miss one, reactivating |
442 |
< |
* upon seeing one with probability approximately proportional to |
443 |
< |
* probability of a miss. (In most cases, the worker will be |
444 |
< |
* signalled before self-signalling, avoiding cascades of multiple |
445 |
< |
* signals for the same task). |
446 |
< |
* |
447 |
< |
* Workers block (in method awaitWork) using park/unpark; |
448 |
< |
* advertising the need for signallers to unpark by setting their |
449 |
< |
* "parker" fields. |
410 |
> |
* Scanning. Method runWorker performs top-level scanning for |
411 |
> |
* tasks. Each scan traverses and tries to poll from each queue |
412 |
> |
* starting at a random index and circularly stepping. Scans are |
413 |
> |
* not performed in ideal random permutation order, to reduce |
414 |
> |
* cacheline contention. The pseudorandom generator need not have |
415 |
> |
* high-quality statistical properties in the long term, but just |
416 |
> |
* within computations; We use Marsaglia XorShifts (often via |
417 |
> |
* ThreadLocalRandom.nextSecondarySeed), which are cheap and |
418 |
> |
* suffice. Scanning also employs contention reduction: When |
419 |
> |
* scanning workers fail to extract an apparently existing task, |
420 |
> |
* they soon restart at a different pseudorandom index. This |
421 |
> |
* improves throughput when many threads are trying to take tasks |
422 |
> |
* from few queues, which can be common in some usages. Scans do |
423 |
> |
* not otherwise explicitly take into account core affinities, |
424 |
> |
* loads, cache localities, etc, However, they do exploit temporal |
425 |
> |
* locality (which usually approximates these) by preferring to |
426 |
> |
* re-poll (at most #workers times) from the same queue after a |
427 |
> |
* successful poll before trying others. |
428 |
|
* |
429 |
|
* Trimming workers. To release resources after periods of lack of |
430 |
|
* use, a worker starting to wait when the pool is quiescent will |
431 |
< |
* time out and terminate (see awaitWork) if the pool has remained |
432 |
< |
* quiescent for period given by IDLE_TIMEOUT_MS, increasing the |
455 |
< |
* period as the number of threads decreases, eventually removing |
456 |
< |
* all workers. |
431 |
> |
* time out and terminate (see method scan) if the pool has |
432 |
> |
* remained quiescent for period given by field keepAlive. |
433 |
|
* |
434 |
|
* Shutdown and Termination. A call to shutdownNow invokes |
435 |
|
* tryTerminate to atomically set a runState bit. The calling |
436 |
|
* thread, as well as every other worker thereafter terminating, |
437 |
< |
* helps terminate others by setting their (qlock) status, |
438 |
< |
* cancelling their unprocessed tasks, and waking them up, doing |
439 |
< |
* so repeatedly until stable. Calls to non-abrupt shutdown() |
440 |
< |
* preface this by checking whether termination should commence. |
441 |
< |
* This relies primarily on the active count bits of "ctl" |
442 |
< |
* maintaining consensus -- tryTerminate is called from awaitWork |
443 |
< |
* whenever quiescent. However, external submitters do not take |
468 |
< |
* part in this consensus. So, tryTerminate sweeps through queues |
469 |
< |
* (until stable) to ensure lack of in-flight submissions and |
470 |
< |
* workers about to process them before triggering the "STOP" |
471 |
< |
* phase of termination. (Note: there is an intrinsic conflict if |
472 |
< |
* helpQuiescePool is called when shutdown is enabled. Both wait |
473 |
< |
* for quiescence, but tryTerminate is biased to not trigger until |
474 |
< |
* helpQuiescePool completes.) |
437 |
> |
* helps terminate others by cancelling their unprocessed tasks, |
438 |
> |
* and waking them up, doing so repeatedly until stable. Calls to |
439 |
> |
* non-abrupt shutdown() preface this by checking whether |
440 |
> |
* termination should commence by sweeping through queues (until |
441 |
> |
* stable) to ensure lack of in-flight submissions and workers |
442 |
> |
* about to process them before triggering the "STOP" phase of |
443 |
> |
* termination. |
444 |
|
* |
445 |
|
* Joining Tasks |
446 |
|
* ============= |
448 |
|
* Any of several actions may be taken when one worker is waiting |
449 |
|
* to join a task stolen (or always held) by another. Because we |
450 |
|
* are multiplexing many tasks on to a pool of workers, we can't |
451 |
< |
* just let them block (as in Thread.join). We also cannot just |
452 |
< |
* reassign the joiner's run-time stack with another and replace |
453 |
< |
* it later, which would be a form of "continuation", that even if |
454 |
< |
* possible is not necessarily a good idea since we may need both |
455 |
< |
* an unblocked task and its continuation to progress. Instead we |
456 |
< |
* combine two tactics: |
451 |
> |
* always just let them block (as in Thread.join). We also cannot |
452 |
> |
* just reassign the joiner's run-time stack with another and |
453 |
> |
* replace it later, which would be a form of "continuation", that |
454 |
> |
* even if possible is not necessarily a good idea since we may |
455 |
> |
* need both an unblocked task and its continuation to progress. |
456 |
> |
* Instead we combine two tactics: |
457 |
|
* |
458 |
|
* Helping: Arranging for the joiner to execute some task that it |
459 |
|
* would be running if the steal had not occurred. |
466 |
|
* helping a hypothetical compensator: If we can readily tell that |
467 |
|
* a possible action of a compensator is to steal and execute the |
468 |
|
* task being joined, the joining thread can do so directly, |
469 |
< |
* without the need for a compensation thread (although at the |
501 |
< |
* expense of larger run-time stacks, but the tradeoff is |
502 |
< |
* typically worthwhile). |
469 |
> |
* without the need for a compensation thread. |
470 |
|
* |
471 |
|
* The ManagedBlocker extension API can't use helping so relies |
472 |
|
* only on compensation in method awaitBlocker. |
473 |
|
* |
474 |
< |
* The algorithm in helpStealer entails a form of "linear |
475 |
< |
* helping". Each worker records (in field currentSteal) the most |
476 |
< |
* recent task it stole from some other worker (or a submission). |
477 |
< |
* It also records (in field currentJoin) the task it is currently |
478 |
< |
* actively joining. Method helpStealer uses these markers to try |
479 |
< |
* to find a worker to help (i.e., steal back a task from and |
480 |
< |
* execute it) that could hasten completion of the actively joined |
481 |
< |
* task. Thus, the joiner executes a task that would be on its |
482 |
< |
* own local deque had the to-be-joined task not been stolen. This |
483 |
< |
* is a conservative variant of the approach described in Wagner & |
484 |
< |
* Calder "Leapfrogging: a portable technique for implementing |
485 |
< |
* efficient futures" SIGPLAN Notices, 1993 |
486 |
< |
* (http://portal.acm.org/citation.cfm?id=155354). It differs in |
487 |
< |
* that: (1) We only maintain dependency links across workers upon |
488 |
< |
* steals, rather than use per-task bookkeeping. This sometimes |
489 |
< |
* requires a linear scan of workQueues array to locate stealers, |
490 |
< |
* but often doesn't because stealers leave hints (that may become |
491 |
< |
* stale/wrong) of where to locate them. It is only a hint |
492 |
< |
* because a worker might have had multiple steals and the hint |
526 |
< |
* records only one of them (usually the most current). Hinting |
527 |
< |
* isolates cost to when it is needed, rather than adding to |
528 |
< |
* per-task overhead. (2) It is "shallow", ignoring nesting and |
529 |
< |
* potentially cyclic mutual steals. (3) It is intentionally |
530 |
< |
* racy: field currentJoin is updated only while actively joining, |
531 |
< |
* which means that we miss links in the chain during long-lived |
532 |
< |
* tasks, GC stalls etc (which is OK since blocking in such cases |
533 |
< |
* is usually a good idea). (4) We bound the number of attempts |
534 |
< |
* to find work using checksums and fall back to suspending the |
535 |
< |
* worker and if necessary replacing it with another. |
536 |
< |
* |
537 |
< |
* Helping actions for CountedCompleters do not require tracking |
538 |
< |
* currentJoins: Method helpComplete takes and executes any task |
539 |
< |
* with the same root as the task being waited on (preferring |
540 |
< |
* local pops to non-local polls). However, this still entails |
541 |
< |
* some traversal of completer chains, so is less efficient than |
542 |
< |
* using CountedCompleters without explicit joins. |
474 |
> |
* The algorithm in awaitJoin entails a form of "linear helping". |
475 |
> |
* Each worker records (in field source) the id of the queue from |
476 |
> |
* which it last stole a task. The scan in method awaitJoin uses |
477 |
> |
* these markers to try to find a worker to help (i.e., steal back |
478 |
> |
* a task from and execute it) that could hasten completion of the |
479 |
> |
* actively joined task. Thus, the joiner executes a task that |
480 |
> |
* would be on its own local deque if the to-be-joined task had |
481 |
> |
* not been stolen. This is a conservative variant of the approach |
482 |
> |
* described in Wagner & Calder "Leapfrogging: a portable |
483 |
> |
* technique for implementing efficient futures" SIGPLAN Notices, |
484 |
> |
* 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs |
485 |
> |
* mainly in that we only record queue ids, not full dependency |
486 |
> |
* links. This requires a linear scan of the workQueues array to |
487 |
> |
* locate stealers, but isolates cost to when it is needed, rather |
488 |
> |
* than adding to per-task overhead. Searches can fail to locate |
489 |
> |
* stealers GC stalls and the like delay recording sources. |
490 |
> |
* Further, even when accurately identified, stealers might not |
491 |
> |
* ever produce a task that the joiner can in turn help with. So, |
492 |
> |
* compensation is tried upon failure to find tasks to run. |
493 |
|
* |
494 |
< |
* Compensation does not aim to keep exactly the target |
494 |
> |
* Compensation does not by default aim to keep exactly the target |
495 |
|
* parallelism number of unblocked threads running at any given |
496 |
|
* time. Some previous versions of this class employed immediate |
497 |
|
* compensations for any blocked join. However, in practice, the |
498 |
|
* vast majority of blockages are transient byproducts of GC and |
499 |
|
* other JVM or OS activities that are made worse by replacement. |
500 |
< |
* Currently, compensation is attempted only after validating that |
501 |
< |
* all purportedly active threads are processing tasks by checking |
502 |
< |
* field WorkQueue.scanState, which eliminates most false |
503 |
< |
* positives. Also, compensation is bypassed (tolerating fewer |
504 |
< |
* threads) in the most common case in which it is rarely |
505 |
< |
* beneficial: when a worker with an empty queue (thus no |
506 |
< |
* continuation tasks) blocks on a join and there still remain |
557 |
< |
* enough threads to ensure liveness. |
558 |
< |
* |
559 |
< |
* Spare threads are removed as soon as they notice that the |
560 |
< |
* target parallelism level has been exceeded, in method |
561 |
< |
* tryDropSpare. (Method scan arranges returns for rechecks upon |
562 |
< |
* each probe via the "bound" parameter.) |
563 |
< |
* |
564 |
< |
* The compensation mechanism may be bounded. Bounds for the |
565 |
< |
* commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope |
566 |
< |
* with programming errors and abuse before running out of |
567 |
< |
* resources to do so. In other cases, users may supply factories |
568 |
< |
* that limit thread construction. The effects of bounding in this |
569 |
< |
* pool (like all others) is imprecise. Total worker counts are |
570 |
< |
* decremented when threads deregister, not when they exit and |
571 |
< |
* resources are reclaimed by the JVM and OS. So the number of |
572 |
< |
* simultaneously live threads may transiently exceed bounds. |
573 |
< |
* |
500 |
> |
* Rather than impose arbitrary policies, we allow users to |
501 |
> |
* override the default of only adding threads upon apparent |
502 |
> |
* starvation. The compensation mechanism may also be bounded. |
503 |
> |
* Bounds for the commonPool (see COMMON_MAX_SPARES) better enable |
504 |
> |
* JVMs to cope with programming errors and abuse before running |
505 |
> |
* out of resources to do so. |
506 |
> |
|
507 |
|
* Common Pool |
508 |
|
* =========== |
509 |
|
* |
510 |
|
* The static common pool always exists after static |
511 |
|
* initialization. Since it (or any other created pool) need |
512 |
|
* never be used, we minimize initial construction overhead and |
513 |
< |
* footprint to the setup of about a dozen fields, with no nested |
581 |
< |
* allocation. Most bootstrapping occurs within method |
582 |
< |
* externalSubmit during the first submission to the pool. |
513 |
> |
* footprint to the setup of about a dozen fields. |
514 |
|
* |
515 |
|
* When external threads submit to the common pool, they can |
516 |
|
* perform subtask processing (see externalHelpComplete and |
530 |
|
* InnocuousForkJoinWorkerThread when there is a SecurityManager |
531 |
|
* present. These workers have no permissions set, do not belong |
532 |
|
* to any user-defined ThreadGroup, and erase all ThreadLocals |
533 |
< |
* after executing any top-level task (see WorkQueue.runTask). |
534 |
< |
* The associated mechanics (mainly in ForkJoinWorkerThread) may |
535 |
< |
* be JVM-dependent and must access particular Thread class fields |
536 |
< |
* to achieve this effect. |
533 |
> |
* after executing any top-level task (see |
534 |
> |
* WorkQueue.afterTopLevelExec). The associated mechanics (mainly |
535 |
> |
* in ForkJoinWorkerThread) may be JVM-dependent and must access |
536 |
> |
* particular Thread class fields to achieve this effect. |
537 |
|
* |
538 |
|
* Style notes |
539 |
|
* =========== |
603 |
|
public static interface ForkJoinWorkerThreadFactory { |
604 |
|
/** |
605 |
|
* Returns a new worker thread operating in the given pool. |
606 |
+ |
* Returning null or throwing an exception may result in tasks |
607 |
+ |
* never being executed. If this method throws an exception, |
608 |
+ |
* it is relayed to the caller of the method (for example |
609 |
+ |
* {@code execute}) causing attempted thread creation. If this |
610 |
+ |
* method returns null or throws an exception, it is not |
611 |
+ |
* retried until the next attempted creation (for example |
612 |
+ |
* another call to {@code execute}). |
613 |
|
* |
614 |
|
* @param pool the pool this thread works in |
615 |
|
* @return the new worker thread, or {@code null} if the request |
616 |
< |
* to create a thread is rejected |
616 |
> |
* to create a thread is rejected. |
617 |
|
* @throws NullPointerException if the pool is null |
618 |
|
*/ |
619 |
|
public ForkJoinWorkerThread newThread(ForkJoinPool pool); |
630 |
|
} |
631 |
|
} |
632 |
|
|
695 |
– |
/** |
696 |
– |
* Class for artificial tasks that are used to replace the target |
697 |
– |
* of local joins if they are removed from an interior queue slot |
698 |
– |
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to |
699 |
– |
* actually do anything beyond having a unique identity. |
700 |
– |
*/ |
701 |
– |
private static final class EmptyTask extends ForkJoinTask<Void> { |
702 |
– |
private static final long serialVersionUID = -7721805057305804111L; |
703 |
– |
EmptyTask() { status = ForkJoinTask.NORMAL; } // force done |
704 |
– |
public final Void getRawResult() { return null; } |
705 |
– |
public final void setRawResult(Void x) {} |
706 |
– |
public final boolean exec() { return true; } |
707 |
– |
} |
708 |
– |
|
709 |
– |
/** |
710 |
– |
* Additional fields and lock created upon initialization. |
711 |
– |
*/ |
712 |
– |
private static final class AuxState extends ReentrantLock { |
713 |
– |
private static final long serialVersionUID = -6001602636862214147L; |
714 |
– |
volatile long stealCount; // cumulative steal count |
715 |
– |
long indexSeed; // index bits for registerWorker |
716 |
– |
AuxState() {} |
717 |
– |
} |
718 |
– |
|
633 |
|
// Constants shared across ForkJoinPool and WorkQueue |
634 |
|
|
635 |
|
// Bounds |
636 |
+ |
static final int SWIDTH = 16; // width of short |
637 |
|
static final int SMASK = 0xffff; // short bits == max index |
638 |
|
static final int MAX_CAP = 0x7fff; // max #workers - 1 |
724 |
– |
static final int EVENMASK = 0xfffe; // even short bits |
639 |
|
static final int SQMASK = 0x007e; // max 64 (even) slots |
640 |
|
|
641 |
< |
// Masks and units for WorkQueue.scanState and ctl sp subfield |
641 |
> |
// Masks and units for WorkQueue.phase and ctl sp subfield |
642 |
|
static final int UNSIGNALLED = 1 << 31; // must be negative |
643 |
|
static final int SS_SEQ = 1 << 16; // version count |
644 |
+ |
static final int QLOCK = 1; // must be 1 |
645 |
|
|
646 |
< |
// Mode bits for ForkJoinPool.config and WorkQueue.config |
647 |
< |
static final int MODE_MASK = 0xffff << 16; // top half of int |
648 |
< |
static final int SPARE_WORKER = 1 << 17; // set if tc > 0 on creation |
649 |
< |
static final int UNREGISTERED = 1 << 18; // to skip some of deregister |
650 |
< |
static final int FIFO_QUEUE = 1 << 31; // must be negative |
651 |
< |
static final int LIFO_QUEUE = 0; // for clarity |
652 |
< |
static final int IS_OWNED = 1; // low bit 0 if shared |
653 |
< |
|
654 |
< |
/** |
655 |
< |
* The maximum number of task executions from the same queue |
656 |
< |
* before checking other queues, bounding unfairness and impact of |
657 |
< |
* infinite user task recursion. Must be a power of two minus 1. |
646 |
> |
// Mode bits and sentinels, some also used in WorkQueue id and.source fields |
647 |
> |
static final int OWNED = 1; // queue has owner thread |
648 |
> |
static final int FIFO = 1 << 16; // fifo queue or access mode |
649 |
> |
static final int SATURATE = 1 << 17; // for tryCompensate |
650 |
> |
static final int SHUTDOWN = 1 << 18; |
651 |
> |
static final int TERMINATED = 1 << 19; |
652 |
> |
static final int STOP = 1 << 31; // must be negative |
653 |
> |
static final int QUIET = 1 << 30; // not scanning or working |
654 |
> |
static final int DORMANT = QUIET | UNSIGNALLED; |
655 |
> |
|
656 |
> |
/** |
657 |
> |
* The maximum number of local polls from the same queue before |
658 |
> |
* checking others. This is a safeguard against infinitely unfair |
659 |
> |
* looping under unbounded user task recursion, and must be larger |
660 |
> |
* than plausible cases of intentional bounded task recursion. |
661 |
|
*/ |
662 |
< |
static final int POLL_LIMIT = (1 << 10) - 1; |
662 |
> |
static final int POLL_LIMIT = 1 << 10; |
663 |
|
|
664 |
|
/** |
665 |
|
* Queues supporting work-stealing as well as external task |
670 |
|
* arrays sharing cache lines. The @Contended annotation alerts |
671 |
|
* JVMs to try to keep instances apart. |
672 |
|
*/ |
673 |
< |
@jdk.internal.vm.annotation.Contended |
673 |
> |
// For now, using manual padding. |
674 |
> |
// @jdk.internal.vm.annotation.Contended |
675 |
> |
// @sun.misc.Contended |
676 |
|
static final class WorkQueue { |
677 |
|
|
678 |
|
/** |
696 |
|
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M |
697 |
|
|
698 |
|
// Instance fields |
699 |
< |
|
700 |
< |
volatile int scanState; // versioned, negative if inactive |
701 |
< |
int stackPred; // pool stack (ctl) predecessor |
699 |
> |
volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07; |
700 |
> |
volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f; |
701 |
> |
volatile int phase; // versioned, negative: queued, 1: locked |
702 |
> |
int stackPred; // pool stack (ctl) predecessor link |
703 |
|
int nsteals; // number of steals |
704 |
< |
int hint; // randomization and stealer index hint |
705 |
< |
int config; // pool index and mode |
785 |
< |
volatile int qlock; // 1: locked, < 0: terminate; else 0 |
704 |
> |
int id; // index, mode, tag |
705 |
> |
volatile int source; // source queue id, or sentinel |
706 |
|
volatile int base; // index of next slot for poll |
707 |
|
int top; // index of next slot for push |
708 |
|
ForkJoinTask<?>[] array; // the elements (initially unallocated) |
709 |
|
final ForkJoinPool pool; // the containing pool (may be null) |
710 |
|
final ForkJoinWorkerThread owner; // owning thread or null if shared |
711 |
< |
volatile Thread parker; // == owner during call to park; else null |
712 |
< |
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin |
793 |
< |
|
794 |
< |
@jdk.internal.vm.annotation.Contended("group2") // segregate |
795 |
< |
volatile ForkJoinTask<?> currentSteal; // nonnull when running some task |
711 |
> |
volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; |
712 |
> |
volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f; |
713 |
|
|
714 |
|
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { |
715 |
|
this.pool = pool; |
722 |
|
* Returns an exportable index (used by ForkJoinWorkerThread). |
723 |
|
*/ |
724 |
|
final int getPoolIndex() { |
725 |
< |
return (config & 0xffff) >>> 1; // ignore odd/even tag bit |
725 |
> |
return (id & 0xffff) >>> 1; // ignore odd/even tag bit |
726 |
|
} |
727 |
|
|
728 |
|
/** |
739 |
|
* near-empty queue has at least one unclaimed task. |
740 |
|
*/ |
741 |
|
final boolean isEmpty() { |
742 |
< |
ForkJoinTask<?>[] a; int n, al, s; |
743 |
< |
return ((n = base - (s = top)) >= 0 || // possibly one task |
742 |
> |
ForkJoinTask<?>[] a; int n, al, b; |
743 |
> |
return ((n = (b = base) - top) >= 0 || // possibly one task |
744 |
|
(n == -1 && ((a = array) == null || |
745 |
|
(al = a.length) == 0 || |
746 |
< |
a[(al - 1) & (s - 1)] == null))); |
746 |
> |
a[(al - 1) & b] == null))); |
747 |
|
} |
748 |
|
|
749 |
+ |
|
750 |
|
/** |
751 |
|
* Pushes a task. Call only by owner in unshared queues. |
752 |
|
* |
754 |
|
* @throws RejectedExecutionException if array cannot be resized |
755 |
|
*/ |
756 |
|
final void push(ForkJoinTask<?> task) { |
757 |
< |
U.storeFence(); // ensure safe publication |
840 |
< |
int s = top, al, d; ForkJoinTask<?>[] a; |
757 |
> |
int s = top; ForkJoinTask<?>[] a; int al, d; |
758 |
|
if ((a = array) != null && (al = a.length) > 0) { |
759 |
< |
a[(al - 1) & s] = task; // relaxed writes OK |
760 |
< |
top = s + 1; |
759 |
> |
int index = (al - 1) & s; |
760 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
761 |
|
ForkJoinPool p = pool; |
762 |
+ |
top = s + 1; |
763 |
+ |
U.putOrderedObject(a, offset, task); |
764 |
|
if ((d = base - s) == 0 && p != null) { |
765 |
|
U.fullFence(); |
766 |
|
p.signalWork(); |
767 |
|
} |
768 |
< |
else if (al + d == 1) |
768 |
> |
else if (d + al == 1) |
769 |
|
growArray(); |
770 |
|
} |
771 |
|
} |
777 |
|
*/ |
778 |
|
final ForkJoinTask<?>[] growArray() { |
779 |
|
ForkJoinTask<?>[] oldA = array; |
780 |
< |
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; |
780 |
> |
int oldSize = oldA != null ? oldA.length : 0; |
781 |
> |
int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY; |
782 |
|
if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) |
783 |
|
throw new RejectedExecutionException("Queue capacity exceeded"); |
784 |
|
int oldMask, t, b; |
785 |
|
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; |
786 |
< |
if (oldA != null && (oldMask = oldA.length - 1) > 0 && |
786 |
> |
if (oldA != null && (oldMask = oldSize - 1) > 0 && |
787 |
|
(t = top) - (b = base) > 0) { |
788 |
|
int mask = size - 1; |
789 |
|
do { // emulate poll from old array, push to new array |
814 |
|
if (t != null && |
815 |
|
U.compareAndSwapObject(a, offset, t, null)) { |
816 |
|
top = s; |
817 |
< |
return t; |
898 |
< |
} |
899 |
< |
} |
900 |
< |
return null; |
901 |
< |
} |
902 |
< |
|
903 |
< |
/** |
904 |
< |
* Takes a task in FIFO order if b is base of queue and a task |
905 |
< |
* can be claimed without contention. Specialized versions |
906 |
< |
* appear in ForkJoinPool methods scan and helpStealer. |
907 |
< |
*/ |
908 |
< |
final ForkJoinTask<?> pollAt(int b) { |
909 |
< |
ForkJoinTask<?>[] a; int al; |
910 |
< |
if ((a = array) != null && (al = a.length) > 0) { |
911 |
< |
int index = (al - 1) & b; |
912 |
< |
long offset = ((long)index << ASHIFT) + ABASE; |
913 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
914 |
< |
U.getObjectVolatile(a, offset); |
915 |
< |
if (t != null && b++ == base && |
916 |
< |
U.compareAndSwapObject(a, offset, t, null)) { |
917 |
< |
base = b; |
817 |
> |
U.storeFence(); |
818 |
|
return t; |
819 |
|
} |
820 |
|
} |
854 |
|
* Takes next task, if one exists, in order specified by mode. |
855 |
|
*/ |
856 |
|
final ForkJoinTask<?> nextLocalTask() { |
857 |
< |
return (config < 0) ? poll() : pop(); |
857 |
> |
return ((id & FIFO) != 0) ? poll() : pop(); |
858 |
|
} |
859 |
|
|
860 |
|
/** |
863 |
|
final ForkJoinTask<?> peek() { |
864 |
|
int al; ForkJoinTask<?>[] a; |
865 |
|
return ((a = array) != null && (al = a.length) > 0) ? |
866 |
< |
a[(al - 1) & (config < 0 ? base : top - 1)] : null; |
866 |
> |
a[(al - 1) & |
867 |
> |
((id & FIFO) != 0 ? base : top - 1)] : null; |
868 |
|
} |
869 |
|
|
870 |
|
/** |
877 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
878 |
|
if (U.compareAndSwapObject(a, offset, task, null)) { |
879 |
|
top = s; |
880 |
+ |
U.storeFence(); |
881 |
|
return true; |
882 |
|
} |
883 |
|
} |
885 |
|
} |
886 |
|
|
887 |
|
/** |
986 |
– |
* Shared version of push. Fails if already locked. |
987 |
– |
* |
988 |
– |
* @return status: > 0 locked, 0 possibly was empty, < 0 was nonempty |
989 |
– |
*/ |
990 |
– |
final int sharedPush(ForkJoinTask<?> task) { |
991 |
– |
int stat; |
992 |
– |
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
993 |
– |
int b = base, s = top, al, d; ForkJoinTask<?>[] a; |
994 |
– |
if ((a = array) != null && (al = a.length) > 0 && |
995 |
– |
al - 1 + (d = b - s) > 0) { |
996 |
– |
a[(al - 1) & s] = task; |
997 |
– |
top = s + 1; // relaxed writes OK here |
998 |
– |
qlock = 0; |
999 |
– |
stat = (d < 0 && b == base) ? d : 0; |
1000 |
– |
} |
1001 |
– |
else { |
1002 |
– |
growAndSharedPush(task); |
1003 |
– |
stat = 0; |
1004 |
– |
} |
1005 |
– |
} |
1006 |
– |
else |
1007 |
– |
stat = 1; |
1008 |
– |
return stat; |
1009 |
– |
} |
1010 |
– |
|
1011 |
– |
/** |
1012 |
– |
* Helper for sharedPush; called only when locked and resize |
1013 |
– |
* needed. |
1014 |
– |
*/ |
1015 |
– |
private void growAndSharedPush(ForkJoinTask<?> task) { |
1016 |
– |
try { |
1017 |
– |
growArray(); |
1018 |
– |
int s = top, al; ForkJoinTask<?>[] a; |
1019 |
– |
if ((a = array) != null && (al = a.length) > 0) { |
1020 |
– |
a[(al - 1) & s] = task; |
1021 |
– |
top = s + 1; |
1022 |
– |
} |
1023 |
– |
} finally { |
1024 |
– |
qlock = 0; |
1025 |
– |
} |
1026 |
– |
} |
1027 |
– |
|
1028 |
– |
/** |
1029 |
– |
* Shared version of tryUnpush. |
1030 |
– |
*/ |
1031 |
– |
final boolean trySharedUnpush(ForkJoinTask<?> task) { |
1032 |
– |
boolean popped = false; |
1033 |
– |
int s = top - 1, al; ForkJoinTask<?>[] a; |
1034 |
– |
if ((a = array) != null && (al = a.length) > 0) { |
1035 |
– |
int index = (al - 1) & s; |
1036 |
– |
long offset = ((long)index << ASHIFT) + ABASE; |
1037 |
– |
ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset); |
1038 |
– |
if (t == task && |
1039 |
– |
U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
1040 |
– |
if (top == s + 1 && array == a && |
1041 |
– |
U.compareAndSwapObject(a, offset, task, null)) { |
1042 |
– |
popped = true; |
1043 |
– |
top = s; |
1044 |
– |
} |
1045 |
– |
U.putOrderedInt(this, QLOCK, 0); |
1046 |
– |
} |
1047 |
– |
} |
1048 |
– |
return popped; |
1049 |
– |
} |
1050 |
– |
|
1051 |
– |
/** |
888 |
|
* Removes and cancels all known tasks, ignoring any exceptions. |
889 |
|
*/ |
890 |
|
final void cancelAll() { |
891 |
< |
ForkJoinTask<?> t; |
1056 |
< |
if ((t = currentJoin) != null) { |
1057 |
< |
currentJoin = null; |
1058 |
< |
ForkJoinTask.cancelIgnoringExceptions(t); |
1059 |
< |
} |
1060 |
< |
if ((t = currentSteal) != null) { |
1061 |
< |
currentSteal = null; |
1062 |
< |
ForkJoinTask.cancelIgnoringExceptions(t); |
1063 |
< |
} |
1064 |
< |
while ((t = poll()) != null) |
891 |
> |
for (ForkJoinTask<?> t; (t = poll()) != null; ) |
892 |
|
ForkJoinTask.cancelIgnoringExceptions(t); |
893 |
|
} |
894 |
|
|
895 |
|
// Specialized execution methods |
896 |
|
|
897 |
|
/** |
898 |
< |
* Pops and executes up to POLL_LIMIT tasks or until empty. |
898 |
> |
* Pops and executes up to limit consecutive tasks or until empty. |
899 |
> |
* |
900 |
> |
* @param limit max runs, or zero for no limit |
901 |
|
*/ |
902 |
< |
final void localPopAndExec() { |
903 |
< |
for (int nexec = 0;;) { |
902 |
> |
final void localPopAndExec(int limit) { |
903 |
> |
for (;;) { |
904 |
|
int b = base, s = top, al; ForkJoinTask<?>[] a; |
905 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
906 |
|
int index = (al - 1) & --s; |
909 |
|
U.getAndSetObject(a, offset, null); |
910 |
|
if (t != null) { |
911 |
|
top = s; |
912 |
< |
(currentSteal = t).doExec(); |
913 |
< |
if (++nexec > POLL_LIMIT) |
912 |
> |
U.storeFence(); |
913 |
> |
t.doExec(); |
914 |
> |
if (limit != 0 && --limit == 0) |
915 |
|
break; |
916 |
|
} |
917 |
|
else |
923 |
|
} |
924 |
|
|
925 |
|
/** |
926 |
< |
* Polls and executes up to POLL_LIMIT tasks or until empty. |
926 |
> |
* Polls and executes up to limit consecutive tasks or until empty. |
927 |
> |
* |
928 |
> |
* @param limit, or zero for no limit |
929 |
|
*/ |
930 |
< |
final void localPollAndExec() { |
931 |
< |
for (int nexec = 0;;) { |
932 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
933 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
930 |
> |
final void localPollAndExec(int limit) { |
931 |
> |
for (int polls = 0;;) { |
932 |
> |
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
933 |
> |
if ((a = array) != null && (d = b - s) < 0 && |
934 |
> |
(al = a.length) > 0) { |
935 |
|
int index = (al - 1) & b++; |
936 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
937 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
939 |
|
if (t != null) { |
940 |
|
base = b; |
941 |
|
t.doExec(); |
942 |
< |
if (++nexec > POLL_LIMIT) |
942 |
> |
if (limit != 0 && ++polls == limit) |
943 |
|
break; |
944 |
|
} |
945 |
+ |
else if (d == -1) |
946 |
+ |
break; // now empty |
947 |
+ |
else |
948 |
+ |
polls = 0; // stolen; reset |
949 |
|
} |
950 |
|
else |
951 |
|
break; |
953 |
|
} |
954 |
|
|
955 |
|
/** |
956 |
< |
* Executes the given task and (some) remaining local tasks. |
956 |
> |
* If present, removes task from queue and executes |
957 |
|
*/ |
958 |
< |
final void runTask(ForkJoinTask<?> task) { |
959 |
< |
if (task != null) { |
960 |
< |
task.doExec(); |
961 |
< |
if (config < 0) |
962 |
< |
localPollAndExec(); |
963 |
< |
else |
964 |
< |
localPopAndExec(); |
965 |
< |
int ns = ++nsteals; |
966 |
< |
ForkJoinWorkerThread thread = owner; |
967 |
< |
currentSteal = null; |
968 |
< |
if (ns < 0) // collect on overflow |
969 |
< |
transferStealCount(pool); |
970 |
< |
if (thread != null) |
971 |
< |
thread.afterTopLevelExec(); |
972 |
< |
} |
973 |
< |
} |
974 |
< |
|
975 |
< |
/** |
976 |
< |
* Adds steal count to pool steal count if it exists, and resets. |
977 |
< |
*/ |
978 |
< |
final void transferStealCount(ForkJoinPool p) { |
979 |
< |
AuxState aux; |
980 |
< |
if (p != null && (aux = p.auxState) != null) { |
981 |
< |
long s = nsteals; |
982 |
< |
nsteals = 0; // if negative, correct for overflow |
983 |
< |
if (s < 0) s = Integer.MAX_VALUE; |
984 |
< |
aux.lock(); |
985 |
< |
try { |
986 |
< |
aux.stealCount += s; |
987 |
< |
} finally { |
1151 |
< |
aux.unlock(); |
958 |
> |
final void tryRemoveAndExec(ForkJoinTask<?> task) { |
959 |
> |
ForkJoinTask<?>[] wa; int s, wal; |
960 |
> |
if (base - (s = top) < 0 && // traverse from top |
961 |
> |
(wa = array) != null && (wal = wa.length) > 0) { |
962 |
> |
for (int m = wal - 1, ns = s - 1, i = ns; ; --i) { |
963 |
> |
int index = i & m; |
964 |
> |
long offset = (index << ASHIFT) + ABASE; |
965 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
966 |
> |
U.getObject(wa, offset); |
967 |
> |
if (t == null) |
968 |
> |
break; |
969 |
> |
else if (t == task) { |
970 |
> |
if (U.compareAndSwapObject(wa, offset, t, null)) { |
971 |
> |
top = ns; // safely shift down |
972 |
> |
for (int j = i; j != ns; ++j) { |
973 |
> |
ForkJoinTask<?> f; |
974 |
> |
int pindex = (j + 1) & m; |
975 |
> |
long pOffset = (pindex << ASHIFT) + ABASE; |
976 |
> |
f = (ForkJoinTask<?>)U.getObject(wa, pOffset); |
977 |
> |
U.putObjectVolatile(wa, pOffset, null); |
978 |
> |
|
979 |
> |
int jindex = j & m; |
980 |
> |
long jOffset = (jindex << ASHIFT) + ABASE; |
981 |
> |
U.putOrderedObject(wa, jOffset, f); |
982 |
> |
} |
983 |
> |
U.storeFence(); |
984 |
> |
t.doExec(); |
985 |
> |
} |
986 |
> |
break; |
987 |
> |
} |
988 |
|
} |
989 |
|
} |
990 |
|
} |
991 |
|
|
992 |
|
/** |
993 |
< |
* If present, removes from queue and executes the given task, |
994 |
< |
* or any other cancelled task. Used only by awaitJoin. |
993 |
> |
* Tries to steal and run tasks within the target's |
994 |
> |
* computation until done, not found, or limit exceeded |
995 |
|
* |
996 |
< |
* @return true if queue empty and task not known to be done |
997 |
< |
*/ |
998 |
< |
final boolean tryRemoveAndExec(ForkJoinTask<?> task) { |
999 |
< |
if (task != null && task.status >= 0) { |
1000 |
< |
int b, s, d, al; ForkJoinTask<?>[] a; |
1001 |
< |
while ((d = (b = base) - (s = top)) < 0 && |
1002 |
< |
(a = array) != null && (al = a.length) > 0) { |
1003 |
< |
for (;;) { // traverse from s to b |
1004 |
< |
int index = --s & (al - 1); |
1005 |
< |
long offset = (index << ASHIFT) + ABASE; |
1006 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1007 |
< |
U.getObjectVolatile(a, offset); |
1008 |
< |
if (t == null) |
1009 |
< |
break; // restart |
1010 |
< |
else if (t == task) { |
1011 |
< |
boolean removed = false; |
1012 |
< |
if (s + 1 == top) { // pop |
1013 |
< |
if (U.compareAndSwapObject(a, offset, t, null)) { |
1014 |
< |
top = s; |
1015 |
< |
removed = true; |
996 |
> |
* @param task root of CountedCompleter computation |
997 |
> |
* @param limit max runs, or zero for no limit |
998 |
> |
* @return task status on exit |
999 |
> |
*/ |
1000 |
> |
final int localHelpCC(CountedCompleter<?> task, int limit) { |
1001 |
> |
int status = 0; |
1002 |
> |
if (task != null && (status = task.status) >= 0) { |
1003 |
> |
for (;;) { |
1004 |
> |
boolean help = false; |
1005 |
> |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1006 |
> |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1007 |
> |
int index = (al - 1) & (s - 1); |
1008 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1009 |
> |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1010 |
> |
U.getObject(a, offset); |
1011 |
> |
if (o instanceof CountedCompleter) { |
1012 |
> |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1013 |
> |
for (CountedCompleter<?> f = t;;) { |
1014 |
> |
if (f != task) { |
1015 |
> |
if ((f = f.completer) == null) // try parent |
1016 |
> |
break; |
1017 |
> |
} |
1018 |
> |
else { |
1019 |
> |
if (U.compareAndSwapObject(a, offset, |
1020 |
> |
t, null)) { |
1021 |
> |
top = s - 1; |
1022 |
> |
U.storeFence(); |
1023 |
> |
t.doExec(); |
1024 |
> |
help = true; |
1025 |
> |
} |
1026 |
> |
break; |
1027 |
|
} |
1028 |
|
} |
1182 |
– |
else if (base == b) // replace with proxy |
1183 |
– |
removed = U.compareAndSwapObject(a, offset, t, |
1184 |
– |
new EmptyTask()); |
1185 |
– |
if (removed) { |
1186 |
– |
ForkJoinTask<?> ps = currentSteal; |
1187 |
– |
(currentSteal = task).doExec(); |
1188 |
– |
currentSteal = ps; |
1189 |
– |
} |
1190 |
– |
break; |
1191 |
– |
} |
1192 |
– |
else if (t.status < 0 && s + 1 == top) { |
1193 |
– |
if (U.compareAndSwapObject(a, offset, t, null)) { |
1194 |
– |
top = s; |
1195 |
– |
} |
1196 |
– |
break; // was cancelled |
1197 |
– |
} |
1198 |
– |
else if (++d == 0) { |
1199 |
– |
if (base != b) // rescan |
1200 |
– |
break; |
1201 |
– |
return false; |
1029 |
|
} |
1030 |
|
} |
1031 |
< |
if (task.status < 0) |
1032 |
< |
return false; |
1031 |
> |
if ((status = task.status) < 0 || !help || |
1032 |
> |
(limit != 0 && --limit == 0)) |
1033 |
> |
break; |
1034 |
|
} |
1035 |
|
} |
1036 |
< |
return true; |
1036 |
> |
return status; |
1037 |
|
} |
1038 |
|
|
1039 |
+ |
// Operations on shared queues |
1040 |
+ |
|
1041 |
|
/** |
1042 |
< |
* Pops task if in the same CC computation as the given task, |
1213 |
< |
* in either shared or owned mode. Used only by helpComplete. |
1042 |
> |
* Tries to lock shared queue by CASing phase field |
1043 |
|
*/ |
1044 |
< |
final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) { |
1045 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1046 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1047 |
< |
int index = (al - 1) & (s - 1); |
1044 |
> |
final boolean tryLockSharedQueue() { |
1045 |
> |
return U.compareAndSwapInt(this, PHASE, 0, QLOCK); |
1046 |
> |
} |
1047 |
> |
|
1048 |
> |
/** |
1049 |
> |
* Shared version of tryUnpush. |
1050 |
> |
*/ |
1051 |
> |
final boolean trySharedUnpush(ForkJoinTask<?> task) { |
1052 |
> |
boolean popped = false; |
1053 |
> |
int s = top - 1, al; ForkJoinTask<?>[] a; |
1054 |
> |
if ((a = array) != null && (al = a.length) > 0) { |
1055 |
> |
int index = (al - 1) & s; |
1056 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
1057 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1058 |
< |
U.getObjectVolatile(a, offset); |
1059 |
< |
if (o instanceof CountedCompleter) { |
1060 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1061 |
< |
for (CountedCompleter<?> r = t;;) { |
1062 |
< |
if (r == task) { |
1063 |
< |
if ((mode & IS_OWNED) == 0) { |
1227 |
< |
boolean popped = false; |
1228 |
< |
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
1229 |
< |
if (top == s && array == a && |
1230 |
< |
U.compareAndSwapObject(a, offset, |
1231 |
< |
t, null)) { |
1232 |
< |
popped = true; |
1233 |
< |
top = s - 1; |
1234 |
< |
} |
1235 |
< |
U.putOrderedInt(this, QLOCK, 0); |
1236 |
< |
if (popped) |
1237 |
< |
return t; |
1238 |
< |
} |
1239 |
< |
} |
1240 |
< |
else if (U.compareAndSwapObject(a, offset, |
1241 |
< |
t, null)) { |
1242 |
< |
top = s - 1; |
1243 |
< |
return t; |
1244 |
< |
} |
1245 |
< |
break; |
1246 |
< |
} |
1247 |
< |
else if ((r = r.completer) == null) // try parent |
1248 |
< |
break; |
1057 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset); |
1058 |
> |
if (t == task && |
1059 |
> |
U.compareAndSwapInt(this, PHASE, 0, QLOCK)) { |
1060 |
> |
if (top == s + 1 && array == a && |
1061 |
> |
U.compareAndSwapObject(a, offset, task, null)) { |
1062 |
> |
popped = true; |
1063 |
> |
top = s; |
1064 |
|
} |
1065 |
+ |
U.putOrderedInt(this, PHASE, 0); |
1066 |
|
} |
1067 |
|
} |
1068 |
< |
return null; |
1068 |
> |
return popped; |
1069 |
|
} |
1070 |
|
|
1071 |
|
/** |
1072 |
< |
* Steals and runs a task in the same CC computation as the |
1073 |
< |
* given task if one exists and can be taken without |
1074 |
< |
* contention. Otherwise returns a checksum/control value for |
1075 |
< |
* use by method helpComplete. |
1076 |
< |
* |
1077 |
< |
* @return 1 if successful, 2 if retryable (lost to another |
1078 |
< |
* stealer), -1 if non-empty but no matching task found, else |
1079 |
< |
* the base index, forced negative. |
1080 |
< |
*/ |
1081 |
< |
final int pollAndExecCC(CountedCompleter<?> task) { |
1082 |
< |
ForkJoinTask<?>[] a; |
1083 |
< |
int b = base, s = top, al, h; |
1084 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1085 |
< |
int index = (al - 1) & b; |
1086 |
< |
long offset = ((long)index << ASHIFT) + ABASE; |
1087 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1088 |
< |
U.getObjectVolatile(a, offset); |
1089 |
< |
if (o == null) |
1090 |
< |
h = 2; // retryable |
1091 |
< |
else if (!(o instanceof CountedCompleter)) |
1092 |
< |
h = -1; // unmatchable |
1093 |
< |
else { |
1094 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1095 |
< |
for (CountedCompleter<?> r = t;;) { |
1096 |
< |
if (r == task) { |
1097 |
< |
if (b++ == base && |
1098 |
< |
U.compareAndSwapObject(a, offset, t, null)) { |
1099 |
< |
base = b; |
1100 |
< |
t.doExec(); |
1101 |
< |
h = 1; // success |
1072 |
> |
* Shared version of localHelpCC. |
1073 |
> |
*/ |
1074 |
> |
final int sharedHelpCC(CountedCompleter<?> task, int limit) { |
1075 |
> |
int status = 0; |
1076 |
> |
if (task != null && (status = task.status) >= 0) { |
1077 |
> |
for (;;) { |
1078 |
> |
boolean help = false; |
1079 |
> |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1080 |
> |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1081 |
> |
int index = (al - 1) & (s - 1); |
1082 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1083 |
> |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1084 |
> |
U.getObject(a, offset); |
1085 |
> |
if (o instanceof CountedCompleter) { |
1086 |
> |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1087 |
> |
for (CountedCompleter<?> f = t;;) { |
1088 |
> |
if (f != task) { |
1089 |
> |
if ((f = f.completer) == null) |
1090 |
> |
break; |
1091 |
> |
} |
1092 |
> |
else { |
1093 |
> |
if (U.compareAndSwapInt(this, PHASE, |
1094 |
> |
0, QLOCK)) { |
1095 |
> |
if (top == s && array == a && |
1096 |
> |
U.compareAndSwapObject(a, offset, |
1097 |
> |
t, null)) { |
1098 |
> |
help = true; |
1099 |
> |
top = s - 1; |
1100 |
> |
} |
1101 |
> |
U.putOrderedInt(this, PHASE, 0); |
1102 |
> |
if (help) |
1103 |
> |
t.doExec(); |
1104 |
> |
} |
1105 |
> |
break; |
1106 |
> |
} |
1107 |
|
} |
1287 |
– |
else |
1288 |
– |
h = 2; // lost CAS |
1289 |
– |
break; |
1290 |
– |
} |
1291 |
– |
else if ((r = r.completer) == null) { |
1292 |
– |
h = -1; // unmatched |
1293 |
– |
break; |
1108 |
|
} |
1109 |
|
} |
1110 |
+ |
if ((status = task.status) < 0 || !help || |
1111 |
+ |
(limit != 0 && --limit == 0)) |
1112 |
+ |
break; |
1113 |
|
} |
1114 |
|
} |
1115 |
< |
else |
1299 |
< |
h = b | Integer.MIN_VALUE; // to sense movement on re-poll |
1300 |
< |
return h; |
1115 |
> |
return status; |
1116 |
|
} |
1117 |
|
|
1118 |
|
/** |
1120 |
|
*/ |
1121 |
|
final boolean isApparentlyUnblocked() { |
1122 |
|
Thread wt; Thread.State s; |
1123 |
< |
return (scanState >= 0 && |
1309 |
< |
(wt = owner) != null && |
1123 |
> |
return ((wt = owner) != null && |
1124 |
|
(s = wt.getState()) != Thread.State.BLOCKED && |
1125 |
|
s != Thread.State.WAITING && |
1126 |
|
s != Thread.State.TIMED_WAITING); |
1128 |
|
|
1129 |
|
// Unsafe mechanics. Note that some are (and must be) the same as in FJP |
1130 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
1131 |
< |
private static final long QLOCK; |
1131 |
> |
private static final long PHASE; |
1132 |
|
private static final int ABASE; |
1133 |
|
private static final int ASHIFT; |
1134 |
|
static { |
1135 |
|
try { |
1136 |
< |
QLOCK = U.objectFieldOffset |
1137 |
< |
(WorkQueue.class.getDeclaredField("qlock")); |
1136 |
> |
PHASE = U.objectFieldOffset |
1137 |
> |
(WorkQueue.class.getDeclaredField("phase")); |
1138 |
|
ABASE = U.arrayBaseOffset(ForkJoinTask[].class); |
1139 |
|
int scale = U.arrayIndexScale(ForkJoinTask[].class); |
1140 |
|
if ((scale & (scale - 1)) != 0) |
1157 |
|
|
1158 |
|
/** |
1159 |
|
* Permission required for callers of methods that may start or |
1160 |
< |
* kill threads. Also used as a static lock in tryInitialize. |
1160 |
> |
* kill threads. |
1161 |
|
*/ |
1162 |
|
static final RuntimePermission modifyThreadPermission; |
1163 |
|
|
1198 |
|
// static configuration constants |
1199 |
|
|
1200 |
|
/** |
1201 |
< |
* Initial timeout value (in milliseconds) for the thread |
1202 |
< |
* triggering quiescence to park waiting for new work. On timeout, |
1389 |
< |
* the thread will instead try to shrink the number of workers. |
1390 |
< |
* The value should be large enough to avoid overly aggressive |
1391 |
< |
* shrinkage during most transient stalls (long GCs etc). |
1201 |
> |
* Default idle timeout value (in milliseconds) for the thread |
1202 |
> |
* triggering quiescence to park waiting for new work |
1203 |
|
*/ |
1204 |
< |
private static final long IDLE_TIMEOUT_MS = 2000L; // 2sec |
1204 |
> |
private static final long DEFAULT_KEEPALIVE = 60000L; |
1205 |
|
|
1206 |
|
/** |
1207 |
< |
* Tolerance for idle timeouts, to cope with timer undershoots. |
1207 |
> |
* Undershoot tolerance for idle timeouts |
1208 |
|
*/ |
1209 |
< |
private static final long TIMEOUT_SLOP_MS = 20L; // 20ms |
1209 |
> |
private static final long TIMEOUT_SLOP = 20L; |
1210 |
|
|
1211 |
|
/** |
1212 |
|
* The default value for COMMON_MAX_SPARES. Overridable using the |
1226 |
|
|
1227 |
|
/* |
1228 |
|
* Bits and masks for field ctl, packed with 4 16 bit subfields: |
1229 |
< |
* AC: Number of active running workers minus target parallelism |
1229 |
> |
* RC: Number of released (unqueued) workers minus target parallelism |
1230 |
|
* TC: Number of total workers minus target parallelism |
1231 |
|
* SS: version count and status of top waiting thread |
1232 |
|
* ID: poolIndex of top of Treiber stack of waiters |
1235 |
|
* (including version bits) as sp=(int)ctl. The offsets of counts |
1236 |
|
* by the target parallelism and the positionings of fields makes |
1237 |
|
* it possible to perform the most common checks via sign tests of |
1238 |
< |
* fields: When ac is negative, there are not enough active |
1238 |
> |
* fields: When ac is negative, there are not enough unqueued |
1239 |
|
* workers, when tc is negative, there are not enough total |
1240 |
|
* workers. When sp is non-zero, there are waiting workers. To |
1241 |
|
* deal with possibly negative fields, we use casts in and out of |
1242 |
|
* "short" and/or signed shifts to maintain signedness. |
1243 |
|
* |
1244 |
< |
* Because it occupies uppermost bits, we can add one active count |
1245 |
< |
* using getAndAddLong of AC_UNIT, rather than CAS, when returning |
1244 |
> |
* Because it occupies uppermost bits, we can add one release count |
1245 |
> |
* using getAndAddLong of RC_UNIT, rather than CAS, when returning |
1246 |
|
* from a blocked join. Other updates entail multiple subfields |
1247 |
|
* and masking, requiring CAS. |
1248 |
+ |
* |
1249 |
+ |
* The limits packed in field "bounds" are also offset by the |
1250 |
+ |
* parallelism level to make them comparable to the ctl rc and tc |
1251 |
+ |
* fields. |
1252 |
|
*/ |
1253 |
|
|
1254 |
|
// Lower and upper word masks |
1255 |
|
private static final long SP_MASK = 0xffffffffL; |
1256 |
|
private static final long UC_MASK = ~SP_MASK; |
1257 |
|
|
1258 |
< |
// Active counts |
1259 |
< |
private static final int AC_SHIFT = 48; |
1260 |
< |
private static final long AC_UNIT = 0x0001L << AC_SHIFT; |
1261 |
< |
private static final long AC_MASK = 0xffffL << AC_SHIFT; |
1258 |
> |
// Release counts |
1259 |
> |
private static final int RC_SHIFT = 48; |
1260 |
> |
private static final long RC_UNIT = 0x0001L << RC_SHIFT; |
1261 |
> |
private static final long RC_MASK = 0xffffL << RC_SHIFT; |
1262 |
|
|
1263 |
|
// Total counts |
1264 |
|
private static final int TC_SHIFT = 32; |
1266 |
|
private static final long TC_MASK = 0xffffL << TC_SHIFT; |
1267 |
|
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign |
1268 |
|
|
1454 |
– |
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two |
1455 |
– |
private static final int STARTED = 1; |
1456 |
– |
private static final int STOP = 1 << 1; |
1457 |
– |
private static final int TERMINATED = 1 << 2; |
1458 |
– |
private static final int SHUTDOWN = 1 << 31; |
1459 |
– |
|
1269 |
|
// Instance fields |
1270 |
+ |
|
1271 |
+ |
// Segregate ctl field, For now using padding vs @Contended |
1272 |
+ |
// @jdk.internal.vm.annotation.Contended("fjpctl") |
1273 |
+ |
// @sun.misc.Contended("fjpctl") |
1274 |
+ |
volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07; |
1275 |
+ |
volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f; |
1276 |
|
volatile long ctl; // main pool control |
1277 |
< |
volatile int runState; |
1278 |
< |
final int config; // parallelism, mode |
1279 |
< |
AuxState auxState; // lock, steal counts |
1280 |
< |
volatile WorkQueue[] workQueues; // main registry |
1281 |
< |
final String workerNamePrefix; // to create worker name string |
1277 |
> |
volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; |
1278 |
> |
volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e; |
1279 |
> |
|
1280 |
> |
volatile long stealCount; // collects worker nsteals |
1281 |
> |
final long keepAlive; // milliseconds before dropping if idle |
1282 |
> |
int indexSeed; // next worker index |
1283 |
> |
final int bounds; // min, max threads packed as shorts |
1284 |
> |
volatile int mode; // parallelism, runstate, queue mode |
1285 |
> |
WorkQueue[] workQueues; // main registry |
1286 |
> |
final String workerNamePrefix; // for worker thread string; sync lock |
1287 |
|
final ForkJoinWorkerThreadFactory factory; |
1288 |
|
final UncaughtExceptionHandler ueh; // per-worker UEH |
1289 |
|
|
1470 |
– |
/** |
1471 |
– |
* Instantiates fields upon first submission, or upon shutdown if |
1472 |
– |
* no submissions. If checkTermination true, also responds to |
1473 |
– |
* termination by external calls submitting tasks. |
1474 |
– |
*/ |
1475 |
– |
private void tryInitialize(boolean checkTermination) { |
1476 |
– |
if (runState == 0) { // bootstrap by locking static field |
1477 |
– |
int p = config & SMASK; |
1478 |
– |
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots |
1479 |
– |
n |= n >>> 1; // create workQueues array with size a power of two |
1480 |
– |
n |= n >>> 2; |
1481 |
– |
n |= n >>> 4; |
1482 |
– |
n |= n >>> 8; |
1483 |
– |
n |= n >>> 16; |
1484 |
– |
n = ((n + 1) << 1) & SMASK; |
1485 |
– |
AuxState aux = new AuxState(); |
1486 |
– |
WorkQueue[] ws = new WorkQueue[n]; |
1487 |
– |
synchronized (modifyThreadPermission) { // double-check |
1488 |
– |
if (runState == 0) { |
1489 |
– |
workQueues = ws; |
1490 |
– |
auxState = aux; |
1491 |
– |
runState = STARTED; |
1492 |
– |
} |
1493 |
– |
} |
1494 |
– |
} |
1495 |
– |
if (checkTermination && runState < 0) { |
1496 |
– |
tryTerminate(false, false); // help terminate |
1497 |
– |
throw new RejectedExecutionException(); |
1498 |
– |
} |
1499 |
– |
} |
1500 |
– |
|
1290 |
|
// Creating, registering and deregistering workers |
1291 |
|
|
1292 |
|
/** |
1294 |
|
* count has already been incremented as a reservation. Invokes |
1295 |
|
* deregisterWorker on any failure. |
1296 |
|
* |
1508 |
– |
* @param isSpare true if this is a spare thread |
1297 |
|
* @return true if successful |
1298 |
|
*/ |
1299 |
< |
private boolean createWorker(boolean isSpare) { |
1299 |
> |
private boolean createWorker() { |
1300 |
|
ForkJoinWorkerThreadFactory fac = factory; |
1301 |
|
Throwable ex = null; |
1302 |
|
ForkJoinWorkerThread wt = null; |
1515 |
– |
WorkQueue q; |
1303 |
|
try { |
1304 |
|
if (fac != null && (wt = fac.newThread(this)) != null) { |
1518 |
– |
if (isSpare && (q = wt.workQueue) != null) |
1519 |
– |
q.config |= SPARE_WORKER; |
1305 |
|
wt.start(); |
1306 |
|
return true; |
1307 |
|
} |
1322 |
|
*/ |
1323 |
|
private void tryAddWorker(long c) { |
1324 |
|
do { |
1325 |
< |
long nc = ((AC_MASK & (c + AC_UNIT)) | |
1325 |
> |
long nc = ((RC_MASK & (c + RC_UNIT)) | |
1326 |
|
(TC_MASK & (c + TC_UNIT))); |
1327 |
|
if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) { |
1328 |
< |
createWorker(false); |
1328 |
> |
createWorker(); |
1329 |
|
break; |
1330 |
|
} |
1331 |
|
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); |
1340 |
|
*/ |
1341 |
|
final WorkQueue registerWorker(ForkJoinWorkerThread wt) { |
1342 |
|
UncaughtExceptionHandler handler; |
1343 |
< |
AuxState aux; |
1559 |
< |
wt.setDaemon(true); // configure thread |
1343 |
> |
wt.setDaemon(true); // configure thread |
1344 |
|
if ((handler = ueh) != null) |
1345 |
|
wt.setUncaughtExceptionHandler(handler); |
1346 |
|
WorkQueue w = new WorkQueue(this, wt); |
1347 |
< |
int i = 0; // assign a pool index |
1348 |
< |
int mode = config & MODE_MASK; |
1349 |
< |
if ((aux = auxState) != null) { |
1350 |
< |
aux.lock(); |
1351 |
< |
try { |
1352 |
< |
int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m; |
1353 |
< |
WorkQueue[] ws = workQueues; |
1354 |
< |
if (ws != null && (n = ws.length) > 0) { |
1355 |
< |
i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices |
1356 |
< |
if (ws[i] != null) { // collision |
1357 |
< |
int probes = 0; // step by approx half n |
1358 |
< |
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; |
1359 |
< |
while (ws[i = (i + step) & m] != null) { |
1360 |
< |
if (++probes >= n) { |
1361 |
< |
workQueues = ws = Arrays.copyOf(ws, n <<= 1); |
1362 |
< |
m = n - 1; |
1363 |
< |
probes = 0; |
1364 |
< |
} |
1347 |
> |
int tid = 0; // for thread name |
1348 |
> |
int fifo = mode & FIFO; |
1349 |
> |
String prefix = workerNamePrefix; |
1350 |
> |
if (prefix != null) { |
1351 |
> |
synchronized(prefix) { |
1352 |
> |
WorkQueue[] ws = workQueues; int n; |
1353 |
> |
int s = indexSeed += SEED_INCREMENT; |
1354 |
> |
if (ws != null && (n = ws.length) > 1) { |
1355 |
> |
int m = n - 1; |
1356 |
> |
tid = s & m; |
1357 |
> |
int i = m & ((s << 1) | 1); // odd-numbered indices |
1358 |
> |
for (int probes = n >>> 1;;) { // find empty slot |
1359 |
> |
WorkQueue q; |
1360 |
> |
if ((q = ws[i]) == null || q.phase == QUIET) |
1361 |
> |
break; |
1362 |
> |
else if (--probes == 0) { |
1363 |
> |
i = n | 1; // resize below |
1364 |
> |
break; |
1365 |
> |
} |
1366 |
> |
else |
1367 |
> |
i = (i + 2) & m; |
1368 |
> |
} |
1369 |
> |
|
1370 |
> |
int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT)); |
1371 |
> |
w.phase = w.id = id; // now publishable |
1372 |
> |
|
1373 |
> |
if (i < n) |
1374 |
> |
ws[i] = w; |
1375 |
> |
else { // expand array |
1376 |
> |
int an = n << 1; |
1377 |
> |
WorkQueue[] as = new WorkQueue[an]; |
1378 |
> |
as[i] = w; |
1379 |
> |
int am = an - 1; |
1380 |
> |
for (int j = 0; j < n; ++j) { |
1381 |
> |
WorkQueue v; // copy external queue |
1382 |
> |
if ((v = ws[j]) != null) // position may change |
1383 |
> |
as[v.id & am & SQMASK] = v; |
1384 |
> |
if (++j >= n) |
1385 |
> |
break; |
1386 |
> |
as[j] = ws[j]; // copy worker |
1387 |
|
} |
1388 |
+ |
workQueues = as; |
1389 |
|
} |
1583 |
– |
w.hint = s; // use as random seed |
1584 |
– |
w.config = i | mode; |
1585 |
– |
w.scanState = i | (s & 0x7fff0000); // random seq bits |
1586 |
– |
ws[i] = w; |
1390 |
|
} |
1588 |
– |
} finally { |
1589 |
– |
aux.unlock(); |
1391 |
|
} |
1392 |
+ |
wt.setName(prefix.concat(Integer.toString(tid))); |
1393 |
|
} |
1592 |
– |
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); |
1394 |
|
return w; |
1395 |
|
} |
1396 |
|
|
1405 |
|
*/ |
1406 |
|
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1407 |
|
WorkQueue w = null; |
1408 |
+ |
int phase = 0; |
1409 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1410 |
< |
AuxState aux; WorkQueue[] ws; // remove index from array |
1411 |
< |
int idx = w.config & SMASK; |
1412 |
< |
int ns = w.nsteals; |
1413 |
< |
if ((aux = auxState) != null) { |
1414 |
< |
aux.lock(); |
1415 |
< |
try { |
1410 |
> |
Object lock = workerNamePrefix; |
1411 |
> |
long ns = (long)w.nsteals & 0xffffffffL; |
1412 |
> |
int idx = w.id & SMASK; |
1413 |
> |
if (lock != null) { |
1414 |
> |
WorkQueue[] ws; // remove index from array |
1415 |
> |
synchronized(lock) { |
1416 |
|
if ((ws = workQueues) != null && ws.length > idx && |
1417 |
|
ws[idx] == w) |
1418 |
|
ws[idx] = null; |
1419 |
< |
aux.stealCount += ns; |
1618 |
< |
} finally { |
1619 |
< |
aux.unlock(); |
1419 |
> |
stealCount += ns; |
1420 |
|
} |
1421 |
|
} |
1422 |
+ |
phase = w.phase; |
1423 |
|
} |
1424 |
< |
if (w == null || (w.config & UNREGISTERED) == 0) { // else pre-adjusted |
1424 |
> |
if (phase != QUIET) { // else pre-adjusted |
1425 |
|
long c; // decrement counts |
1426 |
|
do {} while (!U.compareAndSwapLong |
1427 |
< |
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | |
1427 |
> |
(this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) | |
1428 |
|
(TC_MASK & (c - TC_UNIT)) | |
1429 |
|
(SP_MASK & c)))); |
1430 |
|
} |
1431 |
< |
if (w != null) { |
1631 |
< |
w.currentSteal = null; |
1632 |
< |
w.qlock = -1; // ensure set |
1431 |
> |
if (w != null) |
1432 |
|
w.cancelAll(); // cancel remaining tasks |
1433 |
< |
} |
1434 |
< |
while (tryTerminate(false, false) >= 0) { // possibly replace |
1435 |
< |
WorkQueue[] ws; int wl, sp; long c; |
1436 |
< |
if (w == null || w.array == null || |
1437 |
< |
(ws = workQueues) == null || (wl = ws.length) <= 0) |
1639 |
< |
break; |
1640 |
< |
else if ((sp = (int)(c = ctl)) != 0) { // wake up replacement |
1641 |
< |
if (tryRelease(c, ws[(wl - 1) & sp], AC_UNIT)) |
1642 |
< |
break; |
1643 |
< |
} |
1644 |
< |
else if (ex != null && (c & ADD_WORKER) != 0L) { |
1645 |
< |
tryAddWorker(c); // create replacement |
1646 |
< |
break; |
1647 |
< |
} |
1648 |
< |
else // don't need replacement |
1649 |
< |
break; |
1650 |
< |
} |
1433 |
> |
|
1434 |
> |
if (!tryTerminate(false, false) && // possibly replace worker |
1435 |
> |
w != null && w.array != null) // avoid repeated failures |
1436 |
> |
signalWork(); |
1437 |
> |
|
1438 |
|
if (ex == null) // help clean on way out |
1439 |
|
ForkJoinTask.helpExpungeStaleExceptions(); |
1440 |
|
else // rethrow |
1441 |
|
ForkJoinTask.rethrow(ex); |
1442 |
|
} |
1443 |
|
|
1657 |
– |
// Signalling |
1658 |
– |
|
1444 |
|
/** |
1445 |
< |
* Tries to create or activate a worker if too few are active. |
1445 |
> |
* Tries to create or release a worker if too few are running. |
1446 |
|
*/ |
1447 |
|
final void signalWork() { |
1448 |
|
for (;;) { |
1449 |
< |
long c; int sp, i; WorkQueue v; WorkQueue[] ws; |
1449 |
> |
long c; int sp; WorkQueue[] ws; int i; WorkQueue v; |
1450 |
|
if ((c = ctl) >= 0L) // enough workers |
1451 |
|
break; |
1452 |
|
else if ((sp = (int)c) == 0) { // no idle workers |
1461 |
|
else if ((v = ws[i]) == null) |
1462 |
|
break; // terminating |
1463 |
|
else { |
1464 |
< |
int ns = sp & ~UNSIGNALLED; |
1465 |
< |
int vs = v.scanState; |
1466 |
< |
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT)); |
1467 |
< |
if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) { |
1468 |
< |
v.scanState = ns; |
1469 |
< |
LockSupport.unpark(v.parker); |
1464 |
> |
int np = sp & ~UNSIGNALLED; |
1465 |
> |
int vp = v.phase; |
1466 |
> |
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); |
1467 |
> |
Thread vt = v.owner; |
1468 |
> |
if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) { |
1469 |
> |
v.phase = np; |
1470 |
> |
if (v.source < 0) |
1471 |
> |
LockSupport.unpark(vt); |
1472 |
|
break; |
1473 |
|
} |
1474 |
|
} |
1476 |
|
} |
1477 |
|
|
1478 |
|
/** |
1479 |
< |
* Signals and releases worker v if it is top of idle worker |
1480 |
< |
* stack. This performs a one-shot version of signalWork only if |
1481 |
< |
* there is (apparently) at least one idle worker. |
1482 |
< |
* |
1483 |
< |
* @param c incoming ctl value |
1484 |
< |
* @param v if non-null, a worker |
1485 |
< |
* @param inc the increment to active count (zero when compensating) |
1486 |
< |
* @return true if successful |
1487 |
< |
*/ |
1488 |
< |
private boolean tryRelease(long c, WorkQueue v, long inc) { |
1489 |
< |
int sp = (int)c, ns = sp & ~UNSIGNALLED; |
1703 |
< |
if (v != null) { |
1704 |
< |
int vs = v.scanState; |
1705 |
< |
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + inc)); |
1706 |
< |
if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) { |
1707 |
< |
v.scanState = ns; |
1708 |
< |
LockSupport.unpark(v.parker); |
1709 |
< |
return true; |
1710 |
< |
} |
1711 |
< |
} |
1712 |
< |
return false; |
1713 |
< |
} |
1714 |
< |
|
1715 |
< |
/** |
1716 |
< |
* With approx probability of a missed signal, tries (once) to |
1717 |
< |
* reactivate worker w (or some other worker), failing if stale or |
1718 |
< |
* known to be already active. |
1719 |
< |
* |
1720 |
< |
* @param w the worker |
1721 |
< |
* @param ws the workQueue array to use |
1722 |
< |
* @param r random seed |
1723 |
< |
*/ |
1724 |
< |
private void tryReactivate(WorkQueue w, WorkQueue[] ws, int r) { |
1725 |
< |
long c; int sp, wl; WorkQueue v; |
1726 |
< |
if ((sp = (int)(c = ctl)) != 0 && w != null && |
1727 |
< |
ws != null && (wl = ws.length) > 0 && |
1728 |
< |
((sp ^ r) & SS_SEQ) == 0 && |
1729 |
< |
(v = ws[(wl - 1) & sp]) != null) { |
1730 |
< |
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT)); |
1731 |
< |
int ns = sp & ~UNSIGNALLED; |
1732 |
< |
if (w.scanState < 0 && |
1733 |
< |
v.scanState == sp && |
1734 |
< |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1735 |
< |
v.scanState = ns; |
1736 |
< |
LockSupport.unpark(v.parker); |
1737 |
< |
} |
1738 |
< |
} |
1739 |
< |
} |
1740 |
< |
|
1741 |
< |
/** |
1742 |
< |
* If worker w exists and is active, enqueues and sets status to inactive. |
1743 |
< |
* |
1744 |
< |
* @param w the worker |
1745 |
< |
* @param ss current (non-negative) scanState |
1746 |
< |
*/ |
1747 |
< |
private void inactivate(WorkQueue w, int ss) { |
1748 |
< |
int ns = (ss + SS_SEQ) | UNSIGNALLED; |
1749 |
< |
long lc = ns & SP_MASK, nc, c; |
1750 |
< |
if (w != null) { |
1751 |
< |
w.scanState = ns; |
1752 |
< |
do { |
1753 |
< |
nc = lc | (UC_MASK & ((c = ctl) - AC_UNIT)); |
1754 |
< |
w.stackPred = (int)c; |
1755 |
< |
} while (!U.compareAndSwapLong(this, CTL, c, nc)); |
1756 |
< |
} |
1757 |
< |
} |
1758 |
< |
|
1759 |
< |
/** |
1760 |
< |
* Possibly blocks worker w waiting for signal, or returns |
1761 |
< |
* negative status if the worker should terminate. May return |
1762 |
< |
* without status change if multiple stale unparks and/or |
1763 |
< |
* interrupts occur. |
1479 |
> |
* Tries to decrement counts (sometimes implicitly) and possibly |
1480 |
> |
* arrange for a compensating worker in preparation for blocking: |
1481 |
> |
* If not all core workers yet exist, creates one, else if any are |
1482 |
> |
* unreleased (possibly including caller) releases one, else if |
1483 |
> |
* fewer than the minimum allowed number of workers running, |
1484 |
> |
* checks to see that they are all active, and if so creates an |
1485 |
> |
* extra worker unless over maximum limit and policy is to |
1486 |
> |
* saturate. Most of these steps can fail due to interference, in |
1487 |
> |
* which case 0 is returned so caller will retry. A negative |
1488 |
> |
* return value indicates that the caller doesn't need to |
1489 |
> |
* re-adjust counts when later unblocked. |
1490 |
|
* |
1491 |
< |
* @param w the calling worker |
1766 |
< |
* @return negative if w should terminate |
1491 |
> |
* @return 1: block then adjust, -1: block without adjust, 0 : retry |
1492 |
|
*/ |
1493 |
< |
private int awaitWork(WorkQueue w) { |
1494 |
< |
int stat = 0; |
1495 |
< |
if (w != null && w.scanState < 0) { |
1496 |
< |
long c = ctl; |
1497 |
< |
if ((int)(c >> AC_SHIFT) + (config & SMASK) <= 0) |
1498 |
< |
stat = timedAwaitWork(w, c); // possibly quiescent |
1499 |
< |
else if ((runState & STOP) != 0) |
1500 |
< |
stat = w.qlock = -1; // pool terminating |
1501 |
< |
else if (w.scanState < 0) { |
1502 |
< |
w.parker = Thread.currentThread(); |
1503 |
< |
if (w.scanState < 0) // recheck after write |
1504 |
< |
LockSupport.park(this); |
1505 |
< |
w.parker = null; |
1506 |
< |
if ((runState & STOP) != 0) |
1507 |
< |
stat = w.qlock = -1; // recheck |
1508 |
< |
else if (w.scanState < 0) |
1509 |
< |
Thread.interrupted(); // clear status |
1510 |
< |
} |
1511 |
< |
} |
1512 |
< |
return stat; |
1513 |
< |
} |
1514 |
< |
|
1515 |
< |
/** |
1516 |
< |
* Possibly triggers shutdown and tries (once) to block worker |
1517 |
< |
* when pool is (or may be) quiescent. Waits up to a duration |
1518 |
< |
* determined by number of workers. On timeout, if ctl has not |
1519 |
< |
* changed, terminates the worker, which will in turn wake up |
1520 |
< |
* another worker to possibly repeat this process. |
1521 |
< |
* |
1522 |
< |
* @param w the calling worker |
1523 |
< |
* @return negative if w should terminate |
1524 |
< |
*/ |
1525 |
< |
private int timedAwaitWork(WorkQueue w, long c) { |
1526 |
< |
int stat = 0; |
1527 |
< |
int scale = 1 - (short)(c >>> TC_SHIFT); |
1528 |
< |
long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS + |
1529 |
< |
System.currentTimeMillis()); |
1530 |
< |
if ((runState >= 0 || (stat = tryTerminate(false, false)) > 0) && |
1531 |
< |
w != null && w.scanState < 0) { |
1532 |
< |
int ss; AuxState aux; |
1533 |
< |
w.parker = Thread.currentThread(); |
1534 |
< |
if (w.scanState < 0) |
1535 |
< |
LockSupport.parkUntil(this, deadline); |
1536 |
< |
w.parker = null; |
1537 |
< |
if ((runState & STOP) != 0) |
1538 |
< |
stat = w.qlock = -1; // pool terminating |
1539 |
< |
else if ((ss = w.scanState) < 0 && !Thread.interrupted() && |
1815 |
< |
(int)c == ss && (aux = auxState) != null && ctl == c && |
1816 |
< |
deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) { |
1817 |
< |
aux.lock(); |
1818 |
< |
try { // pre-deregister |
1819 |
< |
WorkQueue[] ws; |
1820 |
< |
int cfg = w.config, idx = cfg & SMASK; |
1821 |
< |
long nc = ((UC_MASK & (c - TC_UNIT)) | |
1822 |
< |
(SP_MASK & w.stackPred)); |
1823 |
< |
if ((runState & STOP) == 0 && |
1824 |
< |
(ws = workQueues) != null && |
1825 |
< |
idx < ws.length && idx >= 0 && ws[idx] == w && |
1826 |
< |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1827 |
< |
ws[idx] = null; |
1828 |
< |
w.config = cfg | UNREGISTERED; |
1829 |
< |
stat = w.qlock = -1; |
1493 |
> |
private int tryCompensate(WorkQueue w) { |
1494 |
> |
int t, n, sp; |
1495 |
> |
long c = ctl; |
1496 |
> |
WorkQueue[] ws = workQueues; |
1497 |
> |
if ((t = (short)(c >> TC_SHIFT)) >= 0) { |
1498 |
> |
if (ws == null || (n = ws.length) <= 0 || w == null) |
1499 |
> |
return 0; // disabled |
1500 |
> |
else if ((sp = (int)c) != 0) { // replace or release |
1501 |
> |
WorkQueue v = ws[sp & (n - 1)]; |
1502 |
> |
int wp = w.phase; |
1503 |
> |
long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); |
1504 |
> |
int np = sp & ~UNSIGNALLED; |
1505 |
> |
if (v != null) { |
1506 |
> |
int vp = v.phase; |
1507 |
> |
Thread vt = v.owner; |
1508 |
> |
long nc = ((long)v.stackPred & SP_MASK) | uc; |
1509 |
> |
if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) { |
1510 |
> |
v.phase = np; |
1511 |
> |
if (v.source < 0) |
1512 |
> |
LockSupport.unpark(vt); |
1513 |
> |
return (wp < 0) ? -1 : 1; |
1514 |
> |
} |
1515 |
> |
} |
1516 |
> |
return 0; |
1517 |
> |
} |
1518 |
> |
else if ((int)(c >> RC_SHIFT) - // reduce parallelism |
1519 |
> |
(short)(bounds & SMASK) > 0) { |
1520 |
> |
long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); |
1521 |
> |
return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0; |
1522 |
> |
} |
1523 |
> |
else { // validate |
1524 |
> |
int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; |
1525 |
> |
boolean unstable = false; |
1526 |
> |
for (int i = 1; i < n; i += 2) { |
1527 |
> |
WorkQueue q; Thread wt; Thread.State ts; |
1528 |
> |
if ((q = ws[i]) != null) { |
1529 |
> |
if (q.source == 0) { |
1530 |
> |
unstable = true; |
1531 |
> |
break; |
1532 |
> |
} |
1533 |
> |
else { |
1534 |
> |
--tc; |
1535 |
> |
if ((wt = q.owner) != null && |
1536 |
> |
((ts = wt.getState()) == Thread.State.BLOCKED || |
1537 |
> |
ts == Thread.State.WAITING)) |
1538 |
> |
++bc; // worker is blocking |
1539 |
> |
} |
1540 |
|
} |
1831 |
– |
} finally { |
1832 |
– |
aux.unlock(); |
1541 |
|
} |
1542 |
< |
} |
1543 |
< |
} |
1544 |
< |
return stat; |
1545 |
< |
} |
1546 |
< |
|
1547 |
< |
/** |
1548 |
< |
* If the given worker is a spare with no queued tasks, and there |
1549 |
< |
* are enough existing workers, drops it from ctl counts and sets |
1842 |
< |
* its state to terminated. |
1843 |
< |
* |
1844 |
< |
* @param w the calling worker -- must be a spare |
1845 |
< |
* @return true if dropped (in which case it must not process more tasks) |
1846 |
< |
*/ |
1847 |
< |
private boolean tryDropSpare(WorkQueue w) { |
1848 |
< |
if (w != null && w.isEmpty()) { // no local tasks |
1849 |
< |
long c; int sp, wl; WorkQueue[] ws; WorkQueue v; |
1850 |
< |
while ((short)((c = ctl) >> TC_SHIFT) > 0 && |
1851 |
< |
((sp = (int)c) != 0 || (int)(c >> AC_SHIFT) > 0) && |
1852 |
< |
(ws = workQueues) != null && (wl = ws.length) > 0) { |
1853 |
< |
boolean dropped, canDrop; |
1854 |
< |
if (sp == 0) { // no queued workers |
1855 |
< |
long nc = ((AC_MASK & (c - AC_UNIT)) | |
1856 |
< |
(TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)); |
1857 |
< |
dropped = U.compareAndSwapLong(this, CTL, c, nc); |
1858 |
< |
} |
1859 |
< |
else if ( |
1860 |
< |
(v = ws[(wl - 1) & sp]) == null || v.scanState != sp) |
1861 |
< |
dropped = false; // stale; retry |
1862 |
< |
else { |
1863 |
< |
long nc = v.stackPred & SP_MASK; |
1864 |
< |
if (w == v || w.scanState >= 0) { |
1865 |
< |
canDrop = true; // w unqueued or topmost |
1866 |
< |
nc |= ((AC_MASK & c) | // ensure replacement |
1867 |
< |
(TC_MASK & (c - TC_UNIT))); |
1868 |
< |
} |
1869 |
< |
else { // w may be queued |
1870 |
< |
canDrop = false; // help uncover |
1871 |
< |
nc |= ((AC_MASK & (c + AC_UNIT)) | |
1872 |
< |
(TC_MASK & c)); |
1873 |
< |
} |
1874 |
< |
if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1875 |
< |
v.scanState = sp & ~UNSIGNALLED; |
1876 |
< |
LockSupport.unpark(v.parker); |
1877 |
< |
dropped = canDrop; |
1542 |
> |
if (unstable || tc != 0 || ctl != c) |
1543 |
> |
return 0; // inconsistent |
1544 |
> |
else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { |
1545 |
> |
if ((md & SATURATE) != 0) |
1546 |
> |
return -1; |
1547 |
> |
else if (bc < pc) { // lagging |
1548 |
> |
Thread.yield(); // for retry spins |
1549 |
> |
return 0; |
1550 |
|
} |
1551 |
|
else |
1552 |
< |
dropped = false; |
1553 |
< |
} |
1882 |
< |
if (dropped) { // pre-deregister |
1883 |
< |
int cfg = w.config, idx = cfg & SMASK; |
1884 |
< |
if (idx >= 0 && idx < ws.length && ws[idx] == w) |
1885 |
< |
ws[idx] = null; |
1886 |
< |
w.config = cfg | UNREGISTERED; |
1887 |
< |
w.qlock = -1; |
1888 |
< |
return true; |
1552 |
> |
throw new RejectedExecutionException( |
1553 |
> |
"Thread limit exceeded replacing blocked worker"); |
1554 |
|
} |
1555 |
|
} |
1556 |
|
} |
1557 |
< |
return false; |
1557 |
> |
|
1558 |
> |
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool |
1559 |
> |
return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0; |
1560 |
|
} |
1561 |
|
|
1562 |
|
/** |
1563 |
|
* Top-level runloop for workers, called by ForkJoinWorkerThread.run. |
1564 |
+ |
* See above for explanation. |
1565 |
|
*/ |
1566 |
|
final void runWorker(WorkQueue w) { |
1567 |
+ |
WorkQueue[] ws; |
1568 |
|
w.growArray(); // allocate queue |
1569 |
< |
int bound = (w.config & SPARE_WORKER) != 0 ? 0 : POLL_LIMIT; |
1570 |
< |
long seed = w.hint * 0xdaba0b6eb09322e3L; // initial random seed |
1571 |
< |
if ((runState & STOP) == 0) { |
1572 |
< |
for (long r = (seed == 0L) ? 1L : seed;;) { // ensure nonzero |
1573 |
< |
if (bound == 0 && tryDropSpare(w)) |
1574 |
< |
break; |
1575 |
< |
// high bits of prev seed for step; current low bits for idx |
1576 |
< |
int step = (int)(r >>> 48) | 1; |
1577 |
< |
r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift |
1578 |
< |
if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0) |
1910 |
< |
break; |
1911 |
< |
} |
1912 |
< |
} |
1913 |
< |
} |
1914 |
< |
|
1915 |
< |
// Scanning for tasks |
1916 |
< |
|
1917 |
< |
/** |
1918 |
< |
* Repeatedly scans for and tries to steal and execute (via |
1919 |
< |
* workQueue.runTask) a queued task. Each scan traverses queues in |
1920 |
< |
* pseudorandom permutation. Upon finding a non-empty queue, makes |
1921 |
< |
* at most the given bound attempts to re-poll (fewer if |
1922 |
< |
* contended) on the same queue before returning (impossible |
1923 |
< |
* scanState value) 0 to restart scan. Else returns after at least |
1924 |
< |
* 1 and at most 32 full scans. |
1925 |
< |
* |
1926 |
< |
* @param w the worker (via its WorkQueue) |
1927 |
< |
* @param bound repoll bound as bitmask (0 if spare) |
1928 |
< |
* @param step (circular) index increment per iteration (must be odd) |
1929 |
< |
* @param r a random seed for origin index |
1930 |
< |
* @return negative if should await signal |
1931 |
< |
*/ |
1932 |
< |
private int scan(WorkQueue w, int bound, int step, int r) { |
1933 |
< |
int stat = 0, wl; WorkQueue[] ws; |
1934 |
< |
if ((ws = workQueues) != null && w != null && (wl = ws.length) > 0) { |
1935 |
< |
for (int m = wl - 1, |
1936 |
< |
origin = m & r, idx = origin, |
1937 |
< |
npolls = 0, |
1938 |
< |
ss = w.scanState;;) { // negative if inactive |
1939 |
< |
WorkQueue q; ForkJoinTask<?>[] a; int b, al; |
1940 |
< |
if ((q = ws[idx]) != null && (b = q.base) - q.top < 0 && |
1569 |
> |
int r = w.id ^ ThreadLocalRandom.nextSecondarySeed(); |
1570 |
> |
if (r == 0) // initial nonzero seed |
1571 |
> |
r = 1; |
1572 |
> |
int lastSignalId = 0; // avoid unneeded signals |
1573 |
> |
while ((ws = workQueues) != null) { |
1574 |
> |
boolean nonempty = false; // scan |
1575 |
> |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1576 |
> |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1577 |
> |
if ((i = r & m) >= 0 && i < n && // always true |
1578 |
> |
(q = ws[i]) != null && (b = q.base) - q.top < 0 && |
1579 |
|
(a = q.array) != null && (al = a.length) > 0) { |
1580 |
+ |
int qid = q.id; // (never zero) |
1581 |
|
int index = (al - 1) & b; |
1582 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
1583 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1584 |
|
U.getObjectVolatile(a, offset); |
1585 |
< |
if (t == null) |
1586 |
< |
break; // empty or busy |
1587 |
< |
else if (b++ != q.base) |
1588 |
< |
break; // busy |
1589 |
< |
else if (ss < 0) { |
1590 |
< |
tryReactivate(w, ws, r); |
1591 |
< |
break; // retry upon rescan |
1592 |
< |
} |
1593 |
< |
else if (!U.compareAndSwapObject(a, offset, t, null)) |
1594 |
< |
break; // contended |
1595 |
< |
else { |
1596 |
< |
q.base = b; |
1597 |
< |
w.currentSteal = t; |
1598 |
< |
if (b != q.top) // propagate signal |
1599 |
< |
signalWork(); |
1961 |
< |
w.runTask(t); |
1962 |
< |
if (++npolls > bound) |
1963 |
< |
break; |
1585 |
> |
if (t != null && b++ == q.base && |
1586 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
1587 |
> |
if ((q.base = b) - q.top < 0 && qid != lastSignalId) |
1588 |
> |
signalWork(); // propagate signal |
1589 |
> |
w.source = lastSignalId = qid; |
1590 |
> |
t.doExec(); |
1591 |
> |
if ((w.id & FIFO) != 0) // run remaining locals |
1592 |
> |
w.localPollAndExec(POLL_LIMIT); |
1593 |
> |
else |
1594 |
> |
w.localPopAndExec(POLL_LIMIT); |
1595 |
> |
ForkJoinWorkerThread thread = w.owner; |
1596 |
> |
++w.nsteals; |
1597 |
> |
w.source = 0; // now idle |
1598 |
> |
if (thread != null) |
1599 |
> |
thread.afterTopLevelExec(); |
1600 |
|
} |
1601 |
+ |
nonempty = true; |
1602 |
|
} |
1603 |
< |
else if (npolls != 0) // rescan |
1603 |
> |
else if (nonempty) |
1604 |
|
break; |
1605 |
< |
else if ((idx = (idx + step) & m) == origin) { |
1606 |
< |
if (ss < 0) { // await signal |
1607 |
< |
stat = ss; |
1608 |
< |
break; |
1609 |
< |
} |
1610 |
< |
else if (r >= 0) { |
1611 |
< |
inactivate(w, ss); |
1612 |
< |
break; |
1605 |
> |
else |
1606 |
> |
++r; |
1607 |
> |
} |
1608 |
> |
|
1609 |
> |
if (nonempty) { // move (xorshift) |
1610 |
> |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; |
1611 |
> |
} |
1612 |
> |
else { |
1613 |
> |
int phase; |
1614 |
> |
lastSignalId = 0; // clear for next scan |
1615 |
> |
if ((phase = w.phase) >= 0) { // enqueue |
1616 |
> |
int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED; |
1617 |
> |
long c, nc; |
1618 |
> |
do { |
1619 |
> |
w.stackPred = (int)(c = ctl); |
1620 |
> |
nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np); |
1621 |
> |
} while (!U.compareAndSwapLong(this, CTL, c, nc)); |
1622 |
> |
} |
1623 |
> |
else { // already queued |
1624 |
> |
int pred = w.stackPred; |
1625 |
> |
w.source = DORMANT; // enable signal |
1626 |
> |
for (int steps = 0;;) { |
1627 |
> |
int md, rc; long c; |
1628 |
> |
if (w.phase >= 0) { |
1629 |
> |
w.source = 0; |
1630 |
> |
break; |
1631 |
> |
} |
1632 |
> |
else if ((md = mode) < 0) // shutting down |
1633 |
> |
return; |
1634 |
> |
else if ((rc = ((md & SMASK) + // possibly quiescent |
1635 |
> |
(int)((c = ctl) >> RC_SHIFT))) <= 0 && |
1636 |
> |
(md & SHUTDOWN) != 0 && |
1637 |
> |
tryTerminate(false, false)) |
1638 |
> |
return; // help terminate |
1639 |
> |
else if ((++steps & 1) == 0) |
1640 |
> |
Thread.interrupted(); // clear between parks |
1641 |
> |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1642 |
> |
long d = keepAlive + System.currentTimeMillis(); |
1643 |
> |
LockSupport.parkUntil(this, d); |
1644 |
> |
if (ctl == c && |
1645 |
> |
d - System.currentTimeMillis() <= TIMEOUT_SLOP) { |
1646 |
> |
long nc = ((UC_MASK & (c - TC_UNIT)) | |
1647 |
> |
(SP_MASK & pred)); |
1648 |
> |
if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1649 |
> |
w.phase = QUIET; |
1650 |
> |
return; // drop on timeout |
1651 |
> |
} |
1652 |
> |
} |
1653 |
> |
} |
1654 |
> |
else |
1655 |
> |
LockSupport.park(this); |
1656 |
|
} |
1977 |
– |
else |
1978 |
– |
r <<= 1; // at most 31 rescans |
1657 |
|
} |
1658 |
|
} |
1659 |
|
} |
1982 |
– |
return stat; |
1660 |
|
} |
1661 |
|
|
1985 |
– |
// Joining tasks |
1986 |
– |
|
1662 |
|
/** |
1663 |
< |
* Tries to steal and run tasks within the target's computation. |
1664 |
< |
* Uses a variant of the top-level algorithm, restricted to tasks |
1665 |
< |
* with the given task as ancestor: It prefers taking and running |
1666 |
< |
* eligible tasks popped from the worker's own queue (via |
1992 |
< |
* popCC). Otherwise it scans others, randomly moving on |
1993 |
< |
* contention or execution, deciding to give up based on a |
1994 |
< |
* checksum (via return codes from pollAndExecCC). The maxTasks |
1995 |
< |
* argument supports external usages; internal calls use zero, |
1996 |
< |
* allowing unbounded steps (external calls trap non-positive |
1997 |
< |
* values). |
1663 |
> |
* Helps and/or blocks until the given task is done or timeout. |
1664 |
> |
* First tries locally helping, then scans other queues for a task |
1665 |
> |
* produced by one of w's stealers; compensating and blocking if |
1666 |
> |
* none are found (rescanning if tryCompensate fails). |
1667 |
|
* |
1668 |
|
* @param w caller |
1669 |
< |
* @param maxTasks if non-zero, the maximum number of other tasks to run |
1669 |
> |
* @param task the task |
1670 |
> |
* @param deadline for timed waits, if nonzero |
1671 |
|
* @return task status on exit |
1672 |
|
*/ |
1673 |
< |
final int helpComplete(WorkQueue w, CountedCompleter<?> task, |
1674 |
< |
int maxTasks) { |
1675 |
< |
WorkQueue[] ws; int s = 0, wl; |
1676 |
< |
if ((ws = workQueues) != null && (wl = ws.length) > 1 && |
1677 |
< |
task != null && w != null) { |
1678 |
< |
for (int m = wl - 1, |
1679 |
< |
mode = w.config, |
1680 |
< |
r = ~mode, // scanning seed |
1681 |
< |
origin = r & m, k = origin, // first queue to scan |
1682 |
< |
step = 3, // first scan step |
1683 |
< |
h = 1, // 1:ran, >1:contended, <0:hash |
1684 |
< |
oldSum = 0, checkSum = 0;;) { |
1685 |
< |
CountedCompleter<?> p; WorkQueue q; int i; |
1673 |
> |
final int xawaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1674 |
> |
int s = 0; |
1675 |
> |
if (w != null && task != null && |
1676 |
> |
(!(task instanceof CountedCompleter) || |
1677 |
> |
(s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) { |
1678 |
> |
w.tryRemoveAndExec(task); |
1679 |
> |
int src = w.source, id = w.id, block = 0; |
1680 |
> |
s = task.status; |
1681 |
> |
while (s >= 0) { |
1682 |
> |
WorkQueue[] ws; |
1683 |
> |
boolean nonempty = false; |
1684 |
> |
int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices |
1685 |
> |
if ((ws = workQueues) != null) { // scan for matching id |
1686 |
> |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1687 |
> |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1688 |
> |
if ((i = r & m) >= 0 && i < n && |
1689 |
> |
(q = ws[i]) != null && q.source == id && |
1690 |
> |
(b = q.base) - q.top < 0 && |
1691 |
> |
(a = q.array) != null && (al = a.length) > 0) { |
1692 |
> |
int qid = q.id; |
1693 |
> |
if (block > 0) |
1694 |
> |
U.getAndAddLong(this, CTL, RC_UNIT); |
1695 |
> |
block = 0; |
1696 |
> |
int index = (al - 1) & b; |
1697 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1698 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1699 |
> |
U.getObjectVolatile(a, offset); |
1700 |
> |
if (t != null && b++ == q.base && id == q.source && |
1701 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
1702 |
> |
q.base = b; |
1703 |
> |
w.source = qid; |
1704 |
> |
t.doExec(); |
1705 |
> |
w.source = src; |
1706 |
> |
} |
1707 |
> |
nonempty = true; |
1708 |
> |
break; |
1709 |
> |
} |
1710 |
> |
else |
1711 |
> |
r += 2; |
1712 |
> |
} |
1713 |
> |
} |
1714 |
|
if ((s = task.status) < 0) |
1715 |
|
break; |
1716 |
< |
if (h == 1 && (p = w.popCC(task, mode)) != null) { |
1717 |
< |
p.doExec(); // run local task |
1718 |
< |
if (maxTasks != 0 && --maxTasks == 0) |
1719 |
< |
break; |
1720 |
< |
origin = k; // reset |
1721 |
< |
oldSum = checkSum = 0; |
1716 |
> |
else if (!nonempty) { |
1717 |
> |
long ms, ns; |
1718 |
> |
if (deadline == 0L) |
1719 |
> |
ms = 0L; // untimed |
1720 |
> |
else if ((ns = deadline - System.nanoTime()) <= 0L) |
1721 |
> |
break; // timeout |
1722 |
> |
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) |
1723 |
> |
ms = 1L; // avoid 0 for timed wait |
1724 |
> |
if (block == 0) |
1725 |
> |
block = tryCompensate(w); |
1726 |
> |
else |
1727 |
> |
task.internalWait(ms); |
1728 |
> |
s = task.status; |
1729 |
|
} |
1730 |
< |
else { // poll other worker queues |
1731 |
< |
if ((i = k | 1) < 0 || i > m || (q = ws[i]) == null) |
1732 |
< |
h = 0; |
1733 |
< |
else if ((h = q.pollAndExecCC(task)) < 0) |
1734 |
< |
checkSum += h; |
1735 |
< |
if (h > 0) { |
1736 |
< |
if (h == 1 && maxTasks != 0 && --maxTasks == 0) |
1730 |
> |
} |
1731 |
> |
if (block > 0) |
1732 |
> |
U.getAndAddLong(this, CTL, RC_UNIT); |
1733 |
> |
} |
1734 |
> |
return s; |
1735 |
> |
} |
1736 |
> |
|
1737 |
> |
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1738 |
> |
int s = 0; |
1739 |
> |
if (w != null && task != null && |
1740 |
> |
(!(task instanceof CountedCompleter) || |
1741 |
> |
(s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) { |
1742 |
> |
w.tryRemoveAndExec(task); |
1743 |
> |
int src = w.source, id = w.id; |
1744 |
> |
s = task.status; |
1745 |
> |
while (s >= 0) { |
1746 |
> |
WorkQueue[] ws; |
1747 |
> |
boolean nonempty = false; |
1748 |
> |
int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices |
1749 |
> |
if ((ws = workQueues) != null) { // scan for matching id |
1750 |
> |
for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) { |
1751 |
> |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1752 |
> |
if ((i = (r + j) & m) >= 0 && i < n && |
1753 |
> |
(q = ws[i]) != null && q.source == id && |
1754 |
> |
(b = q.base) - q.top < 0 && |
1755 |
> |
(a = q.array) != null && (al = a.length) > 0) { |
1756 |
> |
int qid = q.id; |
1757 |
> |
int index = (al - 1) & b; |
1758 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1759 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1760 |
> |
U.getObjectVolatile(a, offset); |
1761 |
> |
if (t != null && b++ == q.base && id == q.source && |
1762 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
1763 |
> |
q.base = b; |
1764 |
> |
w.source = qid; |
1765 |
> |
t.doExec(); |
1766 |
> |
w.source = src; |
1767 |
> |
} |
1768 |
> |
nonempty = true; |
1769 |
|
break; |
1770 |
< |
step = (r >>> 16) | 3; |
2034 |
< |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
2035 |
< |
k = origin = r & m; // move and restart |
2036 |
< |
oldSum = checkSum = 0; |
1770 |
> |
} |
1771 |
|
} |
1772 |
< |
else if ((k = (k + step) & m) == origin) { |
1773 |
< |
if (oldSum == (oldSum = checkSum)) |
1774 |
< |
break; |
1775 |
< |
checkSum = 0; |
1772 |
> |
} |
1773 |
> |
if ((s = task.status) < 0) |
1774 |
> |
break; |
1775 |
> |
else if (!nonempty) { |
1776 |
> |
long ms, ns; int block; |
1777 |
> |
if (deadline == 0L) |
1778 |
> |
ms = 0L; // untimed |
1779 |
> |
else if ((ns = deadline - System.nanoTime()) <= 0L) |
1780 |
> |
break; // timeout |
1781 |
> |
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) |
1782 |
> |
ms = 1L; // avoid 0 for timed wait |
1783 |
> |
if ((block = tryCompensate(w)) != 0) { |
1784 |
> |
task.internalWait(ms); |
1785 |
> |
U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L); |
1786 |
|
} |
1787 |
+ |
s = task.status; |
1788 |
|
} |
1789 |
|
} |
1790 |
|
} |
1792 |
|
} |
1793 |
|
|
1794 |
|
/** |
1795 |
< |
* Tries to locate and execute tasks for a stealer of the given |
1796 |
< |
* task, or in turn one of its stealers. Traces currentSteal -> |
1797 |
< |
* currentJoin links looking for a thread working on a descendant |
2053 |
< |
* of the given task and with a non-empty queue to steal back and |
2054 |
< |
* execute tasks from. The first call to this method upon a |
2055 |
< |
* waiting join will often entail scanning/search, (which is OK |
2056 |
< |
* because the joiner has nothing better to do), but this method |
2057 |
< |
* leaves hints in workers to speed up subsequent calls. |
2058 |
< |
* |
2059 |
< |
* @param w caller |
2060 |
< |
* @param task the task to join |
1795 |
> |
* Runs tasks until {@code isQuiescent()}. Rather than blocking |
1796 |
> |
* when tasks cannot be found, rescans until all others cannot |
1797 |
> |
* find tasks either. |
1798 |
|
*/ |
1799 |
< |
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { |
1800 |
< |
if (task != null && w != null) { |
1801 |
< |
ForkJoinTask<?> ps = w.currentSteal; |
1802 |
< |
WorkQueue[] ws; int wl, oldSum = 0; |
1803 |
< |
outer: while (w.tryRemoveAndExec(task) && task.status >= 0 && |
1804 |
< |
(ws = workQueues) != null && (wl = ws.length) > 0) { |
1805 |
< |
ForkJoinTask<?> subtask; |
1806 |
< |
int m = wl - 1, checkSum = 0; // for stability check |
1807 |
< |
WorkQueue j = w, v; // v is subtask stealer |
1808 |
< |
descent: for (subtask = task; subtask.status >= 0; ) { |
1809 |
< |
for (int h = j.hint | 1, k = 0, i;;) { |
1810 |
< |
if ((v = ws[i = (h + (k << 1)) & m]) != null) { |
1811 |
< |
if (v.currentSteal == subtask) { |
1812 |
< |
j.hint = i; |
1813 |
< |
break; |
1799 |
> |
final void helpQuiescePool(WorkQueue w) { |
1800 |
> |
int prevSrc = w.source, fifo = w.id & FIFO; |
1801 |
> |
for (int source = prevSrc, released = -1;;) { // -1 until known |
1802 |
> |
WorkQueue[] ws; |
1803 |
> |
if (fifo != 0) |
1804 |
> |
w.localPollAndExec(0); |
1805 |
> |
else |
1806 |
> |
w.localPopAndExec(0); |
1807 |
> |
if (released == -1 && w.phase >= 0) |
1808 |
> |
released = 1; |
1809 |
> |
boolean quiet = true, empty = true; |
1810 |
> |
int r = ThreadLocalRandom.nextSecondarySeed(); |
1811 |
> |
if ((ws = workQueues) != null) { |
1812 |
> |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1813 |
> |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1814 |
> |
if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) { |
1815 |
> |
if ((b = q.base) - q.top < 0 && |
1816 |
> |
(a = q.array) != null && (al = a.length) > 0) { |
1817 |
> |
int qid = q.id; |
1818 |
> |
if (released == 0) { // increment |
1819 |
> |
released = 1; |
1820 |
> |
U.getAndAddLong(this, CTL, RC_UNIT); |
1821 |
|
} |
2078 |
– |
checkSum += v.base; |
2079 |
– |
} |
2080 |
– |
if (++k > m) // can't find stealer |
2081 |
– |
break outer; |
2082 |
– |
} |
2083 |
– |
|
2084 |
– |
for (;;) { // help v or descend |
2085 |
– |
ForkJoinTask<?>[] a; int b, al; |
2086 |
– |
if (subtask.status < 0) // too late to help |
2087 |
– |
break descent; |
2088 |
– |
checkSum += (b = v.base); |
2089 |
– |
ForkJoinTask<?> next = v.currentJoin; |
2090 |
– |
ForkJoinTask<?> t = null; |
2091 |
– |
if ((a = v.array) != null && (al = a.length) > 0) { |
1822 |
|
int index = (al - 1) & b; |
1823 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
1824 |
< |
t = (ForkJoinTask<?>) |
1824 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1825 |
|
U.getObjectVolatile(a, offset); |
1826 |
< |
if (t != null && b++ == v.base) { |
1827 |
< |
if (j.currentJoin != subtask || |
1828 |
< |
v.currentSteal != subtask || |
1829 |
< |
subtask.status < 0) |
1830 |
< |
break descent; // stale |
1831 |
< |
if (U.compareAndSwapObject(a, offset, t, null)) { |
2102 |
< |
v.base = b; |
2103 |
< |
w.currentSteal = t; |
2104 |
< |
for (int top = w.top;;) { |
2105 |
< |
t.doExec(); // help |
2106 |
< |
w.currentSteal = ps; |
2107 |
< |
if (task.status < 0) |
2108 |
< |
break outer; |
2109 |
< |
if (w.top == top) |
2110 |
< |
break; // run local tasks |
2111 |
< |
if ((t = w.pop()) == null) |
2112 |
< |
break descent; |
2113 |
< |
w.currentSteal = t; |
2114 |
< |
} |
2115 |
< |
} |
2116 |
< |
} |
2117 |
< |
} |
2118 |
< |
if (t == null && b == v.base && b - v.top >= 0) { |
2119 |
< |
if ((subtask = next) == null) { // try to descend |
2120 |
< |
if (next == v.currentJoin && |
2121 |
< |
oldSum == (oldSum = checkSum)) |
2122 |
< |
break outer; |
2123 |
< |
break descent; |
1826 |
> |
if (t != null && b++ == q.base && |
1827 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
1828 |
> |
q.base = b; |
1829 |
> |
w.source = source = q.id; |
1830 |
> |
t.doExec(); |
1831 |
> |
w.source = source = prevSrc; |
1832 |
|
} |
1833 |
< |
j = v; |
1833 |
> |
quiet = empty = false; |
1834 |
|
break; |
1835 |
|
} |
1836 |
+ |
else if ((q.source & QUIET) == 0) |
1837 |
+ |
quiet = false; |
1838 |
|
} |
1839 |
|
} |
1840 |
|
} |
1841 |
+ |
if (quiet) { |
1842 |
+ |
if (released == 0) |
1843 |
+ |
U.getAndAddLong(this, CTL, RC_UNIT); |
1844 |
+ |
w.source = prevSrc; |
1845 |
+ |
break; |
1846 |
+ |
} |
1847 |
+ |
else if (empty) { |
1848 |
+ |
if (source != QUIET) |
1849 |
+ |
w.source = source = QUIET; |
1850 |
+ |
if (released == 1) { // decrement |
1851 |
+ |
released = 0; |
1852 |
+ |
U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT); |
1853 |
+ |
} |
1854 |
+ |
} |
1855 |
|
} |
1856 |
|
} |
1857 |
|
|
1858 |
|
/** |
1859 |
< |
* Tries to decrement active count (sometimes implicitly) and |
1860 |
< |
* possibly release or create a compensating worker in preparation |
2137 |
< |
* for blocking. Returns false (retryable by caller), on |
2138 |
< |
* contention, detected staleness, instability, or termination. |
1859 |
> |
* Scans for and returns a polled task, if available. |
1860 |
> |
* Used only for untracked polls. |
1861 |
|
* |
1862 |
< |
* @param w caller |
1862 |
> |
* @param submissionsOnly if true, only scan submission queues |
1863 |
|
*/ |
1864 |
< |
private boolean tryCompensate(WorkQueue w) { |
1865 |
< |
boolean canBlock; int wl; |
1866 |
< |
long c = ctl; |
1867 |
< |
WorkQueue[] ws = workQueues; |
1868 |
< |
int pc = config & SMASK; |
1869 |
< |
int ac = pc + (int)(c >> AC_SHIFT); |
1870 |
< |
int tc = pc + (short)(c >> TC_SHIFT); |
1871 |
< |
if (w == null || w.qlock < 0 || pc == 0 || // terminating or disabled |
1872 |
< |
ws == null || (wl = ws.length) <= 0) |
1873 |
< |
canBlock = false; |
1874 |
< |
else { |
2153 |
< |
int m = wl - 1, sp; |
2154 |
< |
boolean busy = true; // validate ac |
2155 |
< |
for (int i = 0; i <= m; ++i) { |
2156 |
< |
int k; WorkQueue v; |
2157 |
< |
if ((k = (i << 1) | 1) <= m && k >= 0 && (v = ws[k]) != null && |
2158 |
< |
v.scanState >= 0 && v.currentSteal == null) { |
2159 |
< |
busy = false; |
2160 |
< |
break; |
2161 |
< |
} |
1864 |
> |
private ForkJoinTask<?> pollScan(boolean submissionsOnly) { |
1865 |
> |
WorkQueue[] ws; int n; |
1866 |
> |
rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && |
1867 |
> |
(n = ws.length) > 0) { |
1868 |
> |
int m = n - 1; |
1869 |
> |
int r = ThreadLocalRandom.nextSecondarySeed(); |
1870 |
> |
int h = r >>> 16; |
1871 |
> |
int origin, step; |
1872 |
> |
if (submissionsOnly) { |
1873 |
> |
origin = (r & ~1) & m; // even indices and steps |
1874 |
> |
step = (h & ~1) | 2; |
1875 |
|
} |
1876 |
< |
if (!busy || ctl != c) |
1877 |
< |
canBlock = false; // unstable or stale |
1878 |
< |
else if ((sp = (int)c) != 0) // release idle worker |
1879 |
< |
canBlock = tryRelease(c, ws[m & sp], 0L); |
1880 |
< |
else if (tc >= pc && ac > 1 && w.isEmpty()) { |
1881 |
< |
long nc = ((AC_MASK & (c - AC_UNIT)) | |
1882 |
< |
(~AC_MASK & c)); // uncompensated |
1883 |
< |
canBlock = U.compareAndSwapLong(this, CTL, c, nc); |
1884 |
< |
} |
1885 |
< |
else if (tc >= MAX_CAP || |
1886 |
< |
(this == common && tc >= pc + COMMON_MAX_SPARES)) |
1887 |
< |
throw new RejectedExecutionException( |
1888 |
< |
"Thread limit exceeded replacing blocked worker"); |
1889 |
< |
else { // similar to tryAddWorker |
1890 |
< |
boolean isSpare = (tc >= pc); |
1891 |
< |
long nc = (AC_MASK & c) | (TC_MASK & (c + TC_UNIT)); |
1892 |
< |
canBlock = (U.compareAndSwapLong(this, CTL, c, nc) && |
1893 |
< |
createWorker(isSpare)); // throws on exception |
1876 |
> |
else { |
1877 |
> |
origin = r & m; |
1878 |
> |
step = h | 1; |
1879 |
> |
} |
1880 |
> |
for (int k = origin, oldSum = 0, checkSum = 0;;) { |
1881 |
> |
WorkQueue q; int b, al; ForkJoinTask<?>[] a; |
1882 |
> |
if ((q = ws[k]) != null) { |
1883 |
> |
checkSum += b = q.base; |
1884 |
> |
if (b - q.top < 0 && |
1885 |
> |
(a = q.array) != null && (al = a.length) > 0) { |
1886 |
> |
int index = (al - 1) & b; |
1887 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1888 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1889 |
> |
U.getObjectVolatile(a, offset); |
1890 |
> |
if (t != null && b++ == q.base && |
1891 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
1892 |
> |
q.base = b; |
1893 |
> |
return t; |
1894 |
> |
} |
1895 |
> |
else |
1896 |
> |
break; // restart |
1897 |
> |
} |
1898 |
> |
} |
1899 |
> |
if ((k = (k + step) & m) == origin) { |
1900 |
> |
if (oldSum == (oldSum = checkSum)) |
1901 |
> |
break rescan; |
1902 |
> |
checkSum = 0; |
1903 |
> |
} |
1904 |
|
} |
1905 |
|
} |
1906 |
< |
return canBlock; |
1906 |
> |
return null; |
1907 |
|
} |
1908 |
|
|
1909 |
|
/** |
1910 |
< |
* Helps and/or blocks until the given task is done or timeout. |
1910 |
> |
* Gets and removes a local or stolen task for the given worker. |
1911 |
|
* |
1912 |
< |
* @param w caller |
2190 |
< |
* @param task the task |
2191 |
< |
* @param deadline for timed waits, if nonzero |
2192 |
< |
* @return task status on exit |
1912 |
> |
* @return a task, if available |
1913 |
|
*/ |
1914 |
< |
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1915 |
< |
int s = 0; |
1916 |
< |
if (w != null) { |
1917 |
< |
ForkJoinTask<?> prevJoin = w.currentJoin; |
1918 |
< |
if (task != null && (s = task.status) >= 0) { |
1919 |
< |
w.currentJoin = task; |
1920 |
< |
CountedCompleter<?> cc = (task instanceof CountedCompleter) ? |
1921 |
< |
(CountedCompleter<?>)task : null; |
1922 |
< |
for (;;) { |
1923 |
< |
if (cc != null) |
1924 |
< |
helpComplete(w, cc, 0); |
1914 |
> |
final ForkJoinTask<?> nextTaskFor(WorkQueue w) { |
1915 |
> |
ForkJoinTask<?> t; |
1916 |
> |
if (w != null && |
1917 |
> |
(t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null) |
1918 |
> |
return t; |
1919 |
> |
else |
1920 |
> |
return pollScan(false); |
1921 |
> |
} |
1922 |
> |
|
1923 |
> |
// External operations |
1924 |
> |
|
1925 |
> |
/** |
1926 |
> |
* Adds the given task to a submission queue at submitter's |
1927 |
> |
* current queue, creating one if null or contended. |
1928 |
> |
* |
1929 |
> |
* @param task the task. Caller must ensure non-null. |
1930 |
> |
*/ |
1931 |
> |
final void externalPush(ForkJoinTask<?> task) { |
1932 |
> |
int r; // initialize caller's probe |
1933 |
> |
if ((r = ThreadLocalRandom.getProbe()) == 0) { |
1934 |
> |
ThreadLocalRandom.localInit(); |
1935 |
> |
r = ThreadLocalRandom.getProbe(); |
1936 |
> |
} |
1937 |
> |
for (;;) { |
1938 |
> |
int md = mode, n; |
1939 |
> |
WorkQueue[] ws = workQueues; |
1940 |
> |
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) |
1941 |
> |
throw new RejectedExecutionException(); |
1942 |
> |
else { |
1943 |
> |
WorkQueue q; |
1944 |
> |
boolean push = false, grow = false; |
1945 |
> |
if ((q = ws[(n - 1) & r & SQMASK]) == null) { |
1946 |
> |
Object lock = workerNamePrefix; |
1947 |
> |
int qid = (r | QUIET) & ~(FIFO | OWNED); |
1948 |
> |
q = new WorkQueue(this, null); |
1949 |
> |
q.id = qid; |
1950 |
> |
q.source = QUIET; |
1951 |
> |
q.phase = QLOCK; // lock queue |
1952 |
> |
if (lock != null) { |
1953 |
> |
synchronized(lock) { // lock pool to install |
1954 |
> |
int i; |
1955 |
> |
if ((ws = workQueues) != null && |
1956 |
> |
(n = ws.length) > 0 && |
1957 |
> |
ws[i = qid & (n - 1) & SQMASK] == null) { |
1958 |
> |
ws[i] = q; |
1959 |
> |
push = grow = true; |
1960 |
> |
} |
1961 |
> |
} |
1962 |
> |
} |
1963 |
> |
} |
1964 |
> |
else if (q.tryLockSharedQueue()) { |
1965 |
> |
int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a; |
1966 |
> |
if ((a = q.array) != null && (al = a.length) > 0 && |
1967 |
> |
al - 1 + (d = b - s) > 0) { |
1968 |
> |
a[(al - 1) & s] = task; |
1969 |
> |
q.top = s + 1; // relaxed writes OK here |
1970 |
> |
q.phase = 0; |
1971 |
> |
if (d < 0 && q.base - s < 0) |
1972 |
> |
break; // no signal needed |
1973 |
> |
} |
1974 |
|
else |
1975 |
< |
helpStealer(w, task); |
1976 |
< |
if ((s = task.status) < 0) |
1977 |
< |
break; |
1978 |
< |
long ms, ns; |
1979 |
< |
if (deadline == 0L) |
1980 |
< |
ms = 0L; |
1981 |
< |
else if ((ns = deadline - System.nanoTime()) <= 0L) |
1982 |
< |
break; |
1983 |
< |
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) |
1984 |
< |
ms = 1L; |
1985 |
< |
if (tryCompensate(w)) { |
1986 |
< |
task.internalWait(ms); |
1987 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); |
1975 |
> |
grow = true; |
1976 |
> |
push = true; |
1977 |
> |
} |
1978 |
> |
if (push) { |
1979 |
> |
if (grow) { |
1980 |
> |
try { |
1981 |
> |
q.growArray(); |
1982 |
> |
int s = q.top, al; ForkJoinTask<?>[] a; |
1983 |
> |
if ((a = q.array) != null && (al = a.length) > 0) { |
1984 |
> |
a[(al - 1) & s] = task; |
1985 |
> |
q.top = s + 1; |
1986 |
> |
} |
1987 |
> |
} finally { |
1988 |
> |
q.phase = 0; |
1989 |
> |
} |
1990 |
|
} |
1991 |
< |
if ((s = task.status) < 0) |
1992 |
< |
break; |
1991 |
> |
signalWork(); |
1992 |
> |
break; |
1993 |
|
} |
1994 |
< |
w.currentJoin = prevJoin; |
1994 |
> |
else // move if busy |
1995 |
> |
r = ThreadLocalRandom.advanceProbe(r); |
1996 |
|
} |
1997 |
|
} |
2226 |
– |
return s; |
1998 |
|
} |
1999 |
|
|
2000 |
< |
// Specialized scanning |
2000 |
> |
/** |
2001 |
> |
* Pushes a possibly-external submission. |
2002 |
> |
*/ |
2003 |
> |
private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { |
2004 |
> |
Thread t; ForkJoinWorkerThread w; WorkQueue q; |
2005 |
> |
if (task == null) |
2006 |
> |
throw new NullPointerException(); |
2007 |
> |
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && |
2008 |
> |
(w = (ForkJoinWorkerThread)t).pool == this && |
2009 |
> |
(q = w.workQueue) != null) |
2010 |
> |
q.push(task); |
2011 |
> |
else |
2012 |
> |
externalPush(task); |
2013 |
> |
return task; |
2014 |
> |
} |
2015 |
|
|
2016 |
|
/** |
2017 |
< |
* Returns a (probably) non-empty steal queue, if one is found |
2018 |
< |
* during a scan, else null. This method must be retried by |
2019 |
< |
* caller if, by the time it tries to use the queue, it is empty. |
2020 |
< |
*/ |
2021 |
< |
private WorkQueue findNonEmptyStealQueue() { |
2022 |
< |
WorkQueue[] ws; int wl; // one-shot version of scan loop |
2023 |
< |
int r = ThreadLocalRandom.nextSecondarySeed(); |
2024 |
< |
if ((ws = workQueues) != null && (wl = ws.length) > 0) { |
2025 |
< |
int m = wl - 1, origin = r & m; |
2241 |
< |
for (int k = origin, oldSum = 0, checkSum = 0;;) { |
2242 |
< |
WorkQueue q; int b; |
2243 |
< |
if ((q = ws[k]) != null) { |
2244 |
< |
if ((b = q.base) - q.top < 0) |
2245 |
< |
return q; |
2246 |
< |
checkSum += b; |
2247 |
< |
} |
2248 |
< |
if ((k = (k + 1) & m) == origin) { |
2249 |
< |
if (oldSum == (oldSum = checkSum)) |
2250 |
< |
break; |
2251 |
< |
checkSum = 0; |
2252 |
< |
} |
2253 |
< |
} |
2254 |
< |
} |
2255 |
< |
return null; |
2017 |
> |
* Returns common pool queue for an external thread. |
2018 |
> |
*/ |
2019 |
> |
static WorkQueue commonSubmitterQueue() { |
2020 |
> |
ForkJoinPool p = common; |
2021 |
> |
int r = ThreadLocalRandom.getProbe(); |
2022 |
> |
WorkQueue[] ws; int n; |
2023 |
> |
return (p != null && (ws = p.workQueues) != null && |
2024 |
> |
(n = ws.length) > 0) ? |
2025 |
> |
ws[(n - 1) & r & SQMASK] : null; |
2026 |
|
} |
2027 |
|
|
2028 |
|
/** |
2029 |
< |
* Runs tasks until {@code isQuiescent()}. We piggyback on |
2260 |
< |
* active count ctl maintenance, but rather than blocking |
2261 |
< |
* when tasks cannot be found, we rescan until all others cannot |
2262 |
< |
* find tasks either. |
2029 |
> |
* Performs tryUnpush for an external submitter. |
2030 |
|
*/ |
2031 |
< |
final void helpQuiescePool(WorkQueue w) { |
2032 |
< |
ForkJoinTask<?> ps = w.currentSteal; // save context |
2033 |
< |
int wc = w.config; |
2034 |
< |
for (boolean active = true;;) { |
2035 |
< |
long c; WorkQueue q; ForkJoinTask<?> t; |
2036 |
< |
if (wc >= 0 && (t = w.pop()) != null) { // run locals if LIFO |
2037 |
< |
(w.currentSteal = t).doExec(); |
2271 |
< |
w.currentSteal = ps; |
2272 |
< |
} |
2273 |
< |
else if ((q = findNonEmptyStealQueue()) != null) { |
2274 |
< |
if (!active) { // re-establish active count |
2275 |
< |
active = true; |
2276 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); |
2277 |
< |
} |
2278 |
< |
if ((t = q.pollAt(q.base)) != null) { |
2279 |
< |
(w.currentSteal = t).doExec(); |
2280 |
< |
w.currentSteal = ps; |
2281 |
< |
if (++w.nsteals < 0) |
2282 |
< |
w.transferStealCount(this); |
2283 |
< |
} |
2284 |
< |
} |
2285 |
< |
else if (active) { // decrement active count without queuing |
2286 |
< |
long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); |
2287 |
< |
if (U.compareAndSwapLong(this, CTL, c, nc)) |
2288 |
< |
active = false; |
2289 |
< |
} |
2290 |
< |
else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 && |
2291 |
< |
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) |
2292 |
< |
break; |
2293 |
< |
} |
2031 |
> |
final boolean tryExternalUnpush(ForkJoinTask<?> task) { |
2032 |
> |
int r = ThreadLocalRandom.getProbe(); |
2033 |
> |
WorkQueue[] ws; WorkQueue w; int n; |
2034 |
> |
return ((ws = workQueues) != null && |
2035 |
> |
(n = ws.length) > 0 && |
2036 |
> |
(w = ws[(n - 1) & r & SQMASK]) != null && |
2037 |
> |
w.trySharedUnpush(task)); |
2038 |
|
} |
2039 |
|
|
2040 |
|
/** |
2041 |
< |
* Gets and removes a local or stolen task for the given worker. |
2041 |
> |
* Performs helpComplete for an external submitter. |
2042 |
> |
*/ |
2043 |
> |
final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { |
2044 |
> |
int r = ThreadLocalRandom.getProbe(); |
2045 |
> |
WorkQueue[] ws; WorkQueue w; int n; |
2046 |
> |
return ((ws = workQueues) != null && (n = ws.length) > 0 && |
2047 |
> |
(w = ws[(n - 1) & r & SQMASK]) != null) ? |
2048 |
> |
w.sharedHelpCC(task, maxTasks) : 0; |
2049 |
> |
} |
2050 |
> |
|
2051 |
> |
/** |
2052 |
> |
* Tries to steal and run tasks within the target's computation. |
2053 |
> |
* The maxTasks argument supports external usages; internal calls |
2054 |
> |
* use zero, allowing unbounded steps (external calls trap |
2055 |
> |
* non-positive values). |
2056 |
|
* |
2057 |
< |
* @return a task, if available |
2057 |
> |
* @param w caller |
2058 |
> |
* @param maxTasks if non-zero, the maximum number of other tasks to run |
2059 |
> |
* @return task status on exit |
2060 |
|
*/ |
2061 |
< |
final ForkJoinTask<?> nextTaskFor(WorkQueue w) { |
2062 |
< |
for (ForkJoinTask<?> t;;) { |
2063 |
< |
WorkQueue q; |
2304 |
< |
if ((t = w.nextLocalTask()) != null) |
2305 |
< |
return t; |
2306 |
< |
if ((q = findNonEmptyStealQueue()) == null) |
2307 |
< |
return null; |
2308 |
< |
if ((t = q.pollAt(q.base)) != null) |
2309 |
< |
return t; |
2310 |
< |
} |
2061 |
> |
final int helpComplete(WorkQueue w, CountedCompleter<?> task, |
2062 |
> |
int maxTasks) { |
2063 |
> |
return (w == null) ? 0 : w.localHelpCC(task, maxTasks); |
2064 |
|
} |
2065 |
|
|
2066 |
|
/** |
2107 |
|
*/ |
2108 |
|
static int getSurplusQueuedTaskCount() { |
2109 |
|
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; |
2110 |
< |
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { |
2111 |
< |
int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK; |
2112 |
< |
int n = (q = wt.workQueue).top - q.base; |
2113 |
< |
int a = (int)(pool.ctl >> AC_SHIFT) + p; |
2110 |
> |
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && |
2111 |
> |
(pool = (wt = (ForkJoinWorkerThread)t).pool) != null && |
2112 |
> |
(q = wt.workQueue) != null) { |
2113 |
> |
int p = pool.mode & SMASK; |
2114 |
> |
int a = p + (int)(pool.ctl >> RC_SHIFT); |
2115 |
> |
int n = q.top - q.base; |
2116 |
|
return n - (a > (p >>>= 1) ? 0 : |
2117 |
|
a > (p >>>= 1) ? 1 : |
2118 |
|
a > (p >>>= 1) ? 2 : |
2122 |
|
return 0; |
2123 |
|
} |
2124 |
|
|
2125 |
< |
// Termination |
2125 |
> |
// Termination |
2126 |
|
|
2127 |
|
/** |
2128 |
|
* Possibly initiates and/or completes termination. |
2130 |
|
* @param now if true, unconditionally terminate, else only |
2131 |
|
* if no work and no active workers |
2132 |
|
* @param enable if true, terminate when next possible |
2133 |
< |
* @return -1: terminating/terminated, 0: retry if internal caller, else 1 |
2133 |
> |
* @return true if terminating or terminated |
2134 |
|
*/ |
2135 |
< |
private int tryTerminate(boolean now, boolean enable) { |
2136 |
< |
int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED |
2135 |
> |
private boolean tryTerminate(boolean now, boolean enable) { |
2136 |
> |
int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED |
2137 |
|
|
2138 |
< |
while ((rs = runState) >= 0) { |
2138 |
> |
while (((md = mode) & SHUTDOWN) == 0) { |
2139 |
|
if (!enable || this == common) // cannot shutdown |
2140 |
< |
return 1; |
2386 |
< |
else if (rs == 0) |
2387 |
< |
tryInitialize(false); // ensure initialized |
2140 |
> |
return false; |
2141 |
|
else |
2142 |
< |
U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN); |
2142 |
> |
U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN); |
2143 |
|
} |
2144 |
|
|
2145 |
< |
if ((rs & STOP) == 0) { // try to initiate termination |
2146 |
< |
if (!now) { // check quiescence |
2145 |
> |
while (((md = mode) & STOP) == 0) { // try to initiate termination |
2146 |
> |
if (!now) { // check if quiescent & empty |
2147 |
|
for (long oldSum = 0L;;) { // repeat until stable |
2148 |
< |
WorkQueue[] ws; WorkQueue w; int b; |
2148 |
> |
boolean running = false; |
2149 |
|
long checkSum = ctl; |
2150 |
< |
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) |
2151 |
< |
return 0; // still active workers |
2152 |
< |
if ((ws = workQueues) != null) { |
2150 |
> |
WorkQueue[] ws = workQueues; |
2151 |
> |
if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) |
2152 |
> |
running = true; |
2153 |
> |
else if (ws != null) { |
2154 |
> |
WorkQueue w; int b; |
2155 |
|
for (int i = 0; i < ws.length; ++i) { |
2156 |
|
if ((w = ws[i]) != null) { |
2157 |
< |
checkSum += (b = w.base); |
2158 |
< |
if (w.currentSteal != null || b != w.top) |
2159 |
< |
return 0; // retry if internal caller |
2157 |
> |
checkSum += (b = w.base) + w.id; |
2158 |
> |
if (b != w.top || |
2159 |
> |
((i & 1) == 1 && w.source >= 0)) { |
2160 |
> |
running = true; |
2161 |
> |
break; |
2162 |
> |
} |
2163 |
|
} |
2164 |
|
} |
2165 |
|
} |
2166 |
< |
if (oldSum == (oldSum = checkSum)) |
2166 |
> |
if (((md = mode) & STOP) != 0) |
2167 |
> |
break; // already triggered |
2168 |
> |
else if (running) |
2169 |
> |
return false; |
2170 |
> |
else if (workQueues == ws && oldSum == (oldSum = checkSum)) |
2171 |
|
break; |
2172 |
|
} |
2173 |
|
} |
2174 |
< |
do {} while (!U.compareAndSwapInt(this, RUNSTATE, |
2175 |
< |
rs = runState, rs | STOP)); |
2174 |
> |
if ((md & STOP) == 0) |
2175 |
> |
U.compareAndSwapInt(this, MODE, md, md | STOP); |
2176 |
|
} |
2177 |
|
|
2178 |
< |
for (long oldSum = 0L;;) { // repeat until stable |
2179 |
< |
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; |
2180 |
< |
long checkSum = ctl; |
2181 |
< |
if ((ws = workQueues) != null) { // help terminate others |
2182 |
< |
for (int i = 0; i < ws.length; ++i) { |
2183 |
< |
if ((w = ws[i]) != null) { |
2184 |
< |
w.cancelAll(); // clear queues |
2185 |
< |
checkSum += w.base; |
2186 |
< |
if (w.qlock >= 0) { |
2187 |
< |
w.qlock = -1; // racy set OK |
2426 |
< |
if ((wt = w.owner) != null) { |
2178 |
> |
while (((md = mode) & TERMINATED) == 0) { // help terminate others |
2179 |
> |
for (long oldSum = 0L;;) { // repeat until stable |
2180 |
> |
WorkQueue[] ws; WorkQueue w; |
2181 |
> |
long checkSum = ctl; |
2182 |
> |
if ((ws = workQueues) != null) { |
2183 |
> |
for (int i = 0; i < ws.length; ++i) { |
2184 |
> |
if ((w = ws[i]) != null) { |
2185 |
> |
ForkJoinWorkerThread wt = w.owner; |
2186 |
> |
w.cancelAll(); // clear queues |
2187 |
> |
if (wt != null) { |
2188 |
|
try { // unblock join or park |
2189 |
|
wt.interrupt(); |
2190 |
|
} catch (Throwable ignore) { |
2191 |
|
} |
2192 |
|
} |
2193 |
+ |
checkSum += w.base + w.id; |
2194 |
|
} |
2195 |
|
} |
2196 |
|
} |
2197 |
+ |
if (((md = mode) & TERMINATED) != 0 || |
2198 |
+ |
(workQueues == ws && oldSum == (oldSum = checkSum))) |
2199 |
+ |
break; |
2200 |
|
} |
2201 |
< |
if (oldSum == (oldSum = checkSum)) |
2201 |
> |
if ((md & TERMINATED) != 0) |
2202 |
|
break; |
2203 |
< |
} |
2439 |
< |
|
2440 |
< |
if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) { |
2441 |
< |
runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write |
2442 |
< |
synchronized (this) { |
2443 |
< |
notifyAll(); // for awaitTermination |
2444 |
< |
} |
2445 |
< |
} |
2446 |
< |
|
2447 |
< |
return -1; |
2448 |
< |
} |
2449 |
< |
|
2450 |
< |
// External operations |
2451 |
< |
|
2452 |
< |
/** |
2453 |
< |
* Constructs and tries to install a new external queue, |
2454 |
< |
* failing if the workQueues array already has a queue at |
2455 |
< |
* the given index. |
2456 |
< |
* |
2457 |
< |
* @param index the index of the new queue |
2458 |
< |
*/ |
2459 |
< |
private void tryCreateExternalQueue(int index) { |
2460 |
< |
AuxState aux; |
2461 |
< |
if ((aux = auxState) != null && index >= 0) { |
2462 |
< |
WorkQueue q = new WorkQueue(this, null); |
2463 |
< |
q.config = index; |
2464 |
< |
q.scanState = ~UNSIGNALLED; |
2465 |
< |
q.qlock = 1; // lock queue |
2466 |
< |
boolean installed = false; |
2467 |
< |
aux.lock(); |
2468 |
< |
try { // lock pool to install |
2469 |
< |
WorkQueue[] ws; |
2470 |
< |
if ((ws = workQueues) != null && index < ws.length && |
2471 |
< |
ws[index] == null) { |
2472 |
< |
ws[index] = q; // else throw away |
2473 |
< |
installed = true; |
2474 |
< |
} |
2475 |
< |
} finally { |
2476 |
< |
aux.unlock(); |
2477 |
< |
} |
2478 |
< |
if (installed) { |
2479 |
< |
try { |
2480 |
< |
q.growArray(); |
2481 |
< |
} finally { |
2482 |
< |
q.qlock = 0; |
2483 |
< |
} |
2484 |
< |
} |
2485 |
< |
} |
2486 |
< |
} |
2487 |
< |
|
2488 |
< |
/** |
2489 |
< |
* Adds the given task to a submission queue at submitter's |
2490 |
< |
* current queue. Also performs secondary initialization upon the |
2491 |
< |
* first submission of the first task to the pool, and detects |
2492 |
< |
* first submission by an external thread and creates a new shared |
2493 |
< |
* queue if the one at index if empty or contended. |
2494 |
< |
* |
2495 |
< |
* @param task the task. Caller must ensure non-null. |
2496 |
< |
*/ |
2497 |
< |
final void externalPush(ForkJoinTask<?> task) { |
2498 |
< |
int r; // initialize caller's probe |
2499 |
< |
if ((r = ThreadLocalRandom.getProbe()) == 0) { |
2500 |
< |
ThreadLocalRandom.localInit(); |
2501 |
< |
r = ThreadLocalRandom.getProbe(); |
2502 |
< |
} |
2503 |
< |
for (;;) { |
2504 |
< |
WorkQueue q; int wl, k, stat; |
2505 |
< |
int rs = runState; |
2506 |
< |
WorkQueue[] ws = workQueues; |
2507 |
< |
if (rs <= 0 || ws == null || (wl = ws.length) <= 0) |
2508 |
< |
tryInitialize(true); |
2509 |
< |
else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null) |
2510 |
< |
tryCreateExternalQueue(k); |
2511 |
< |
else if ((stat = q.sharedPush(task)) < 0) |
2203 |
> |
else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) |
2204 |
|
break; |
2205 |
< |
else if (stat == 0) { |
2206 |
< |
signalWork(); |
2205 |
> |
else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) { |
2206 |
> |
synchronized (this) { |
2207 |
> |
notifyAll(); // for awaitTermination |
2208 |
> |
} |
2209 |
|
break; |
2210 |
|
} |
2517 |
– |
else // move if busy |
2518 |
– |
r = ThreadLocalRandom.advanceProbe(r); |
2211 |
|
} |
2212 |
< |
} |
2521 |
< |
|
2522 |
< |
/** |
2523 |
< |
* Pushes a possibly-external submission. |
2524 |
< |
*/ |
2525 |
< |
private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { |
2526 |
< |
Thread t; ForkJoinWorkerThread w; WorkQueue q; |
2527 |
< |
if (task == null) |
2528 |
< |
throw new NullPointerException(); |
2529 |
< |
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && |
2530 |
< |
(w = (ForkJoinWorkerThread)t).pool == this && |
2531 |
< |
(q = w.workQueue) != null) |
2532 |
< |
q.push(task); |
2533 |
< |
else |
2534 |
< |
externalPush(task); |
2535 |
< |
return task; |
2536 |
< |
} |
2537 |
< |
|
2538 |
< |
/** |
2539 |
< |
* Returns common pool queue for an external thread. |
2540 |
< |
*/ |
2541 |
< |
static WorkQueue commonSubmitterQueue() { |
2542 |
< |
ForkJoinPool p = common; |
2543 |
< |
int r = ThreadLocalRandom.getProbe(); |
2544 |
< |
WorkQueue[] ws; int wl; |
2545 |
< |
return (p != null && (ws = p.workQueues) != null && |
2546 |
< |
(wl = ws.length) > 0) ? |
2547 |
< |
ws[(wl - 1) & r & SQMASK] : null; |
2548 |
< |
} |
2549 |
< |
|
2550 |
< |
/** |
2551 |
< |
* Performs tryUnpush for an external submitter. |
2552 |
< |
*/ |
2553 |
< |
final boolean tryExternalUnpush(ForkJoinTask<?> task) { |
2554 |
< |
int r = ThreadLocalRandom.getProbe(); |
2555 |
< |
WorkQueue[] ws; WorkQueue w; int wl; |
2556 |
< |
return ((ws = workQueues) != null && |
2557 |
< |
(wl = ws.length) > 0 && |
2558 |
< |
(w = ws[(wl - 1) & r & SQMASK]) != null && |
2559 |
< |
w.trySharedUnpush(task)); |
2560 |
< |
} |
2561 |
< |
|
2562 |
< |
/** |
2563 |
< |
* Performs helpComplete for an external submitter. |
2564 |
< |
*/ |
2565 |
< |
final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { |
2566 |
< |
WorkQueue[] ws; int wl; |
2567 |
< |
int r = ThreadLocalRandom.getProbe(); |
2568 |
< |
return ((ws = workQueues) != null && (wl = ws.length) > 0) ? |
2569 |
< |
helpComplete(ws[(wl - 1) & r & SQMASK], task, maxTasks) : 0; |
2212 |
> |
return true; |
2213 |
|
} |
2214 |
|
|
2215 |
|
// Exported methods |
2218 |
|
|
2219 |
|
/** |
2220 |
|
* Creates a {@code ForkJoinPool} with parallelism equal to {@link |
2221 |
< |
* java.lang.Runtime#availableProcessors}, using the {@linkplain |
2222 |
< |
* #defaultForkJoinWorkerThreadFactory default thread factory}, |
2580 |
< |
* no UncaughtExceptionHandler, and non-async LIFO processing mode. |
2221 |
> |
* java.lang.Runtime#availableProcessors}, using defaults for all |
2222 |
> |
* other parameters. |
2223 |
|
* |
2224 |
|
* @throws SecurityException if a security manager exists and |
2225 |
|
* the caller is not permitted to modify threads |
2228 |
|
*/ |
2229 |
|
public ForkJoinPool() { |
2230 |
|
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), |
2231 |
< |
defaultForkJoinWorkerThreadFactory, null, false); |
2231 |
> |
defaultForkJoinWorkerThreadFactory, null, false, |
2232 |
> |
0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); |
2233 |
|
} |
2234 |
|
|
2235 |
|
/** |
2236 |
|
* Creates a {@code ForkJoinPool} with the indicated parallelism |
2237 |
< |
* level, the {@linkplain |
2595 |
< |
* #defaultForkJoinWorkerThreadFactory default thread factory}, |
2596 |
< |
* no UncaughtExceptionHandler, and non-async LIFO processing mode. |
2237 |
> |
* level, using defaults for all other parameters. |
2238 |
|
* |
2239 |
|
* @param parallelism the parallelism level |
2240 |
|
* @throws IllegalArgumentException if parallelism less than or |
2245 |
|
* java.lang.RuntimePermission}{@code ("modifyThread")} |
2246 |
|
*/ |
2247 |
|
public ForkJoinPool(int parallelism) { |
2248 |
< |
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); |
2248 |
> |
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, |
2249 |
> |
0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); |
2250 |
|
} |
2251 |
|
|
2252 |
|
/** |
2253 |
< |
* Creates a {@code ForkJoinPool} with the given parameters. |
2253 |
> |
* Creates a {@code ForkJoinPool} with the given parameters (using |
2254 |
> |
* defaults for others). |
2255 |
|
* |
2256 |
|
* @param parallelism the parallelism level. For default value, |
2257 |
|
* use {@link java.lang.Runtime#availableProcessors}. |
2278 |
|
ForkJoinWorkerThreadFactory factory, |
2279 |
|
UncaughtExceptionHandler handler, |
2280 |
|
boolean asyncMode) { |
2281 |
< |
this(checkParallelism(parallelism), |
2282 |
< |
checkFactory(factory), |
2640 |
< |
handler, |
2641 |
< |
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, |
2642 |
< |
"ForkJoinPool-" + nextPoolId() + "-worker-"); |
2643 |
< |
checkPermission(); |
2281 |
> |
this(parallelism, factory, handler, asyncMode, |
2282 |
> |
0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); |
2283 |
|
} |
2284 |
|
|
2285 |
< |
private static int checkParallelism(int parallelism) { |
2286 |
< |
if (parallelism <= 0 || parallelism > MAX_CAP) |
2285 |
> |
/** |
2286 |
> |
* Creates a {@code ForkJoinPool} with the given parameters. |
2287 |
> |
* |
2288 |
> |
* @param parallelism the parallelism level. For default value, |
2289 |
> |
* use {@link java.lang.Runtime#availableProcessors}. |
2290 |
> |
* |
2291 |
> |
* @param factory the factory for creating new threads. For |
2292 |
> |
* default value, use {@link #defaultForkJoinWorkerThreadFactory}. |
2293 |
> |
* |
2294 |
> |
* @param handler the handler for internal worker threads that |
2295 |
> |
* terminate due to unrecoverable errors encountered while |
2296 |
> |
* executing tasks. For default value, use {@code null}. |
2297 |
> |
* |
2298 |
> |
* @param asyncMode if true, establishes local first-in-first-out |
2299 |
> |
* scheduling mode for forked tasks that are never joined. This |
2300 |
> |
* mode may be more appropriate than default locally stack-based |
2301 |
> |
* mode in applications in which worker threads only process |
2302 |
> |
* event-style asynchronous tasks. For default value, use {@code |
2303 |
> |
* false}. |
2304 |
> |
* |
2305 |
> |
* @param corePoolSize the number of threads to keep in the pool |
2306 |
> |
* (unless timed out after an elapsed keep-alive). Normally (and |
2307 |
> |
* by default) this is the same value as the parallelism level, |
2308 |
> |
* but may be set to a larger value to reduce dynamic overhead if |
2309 |
> |
* tasks regularly block. Using a smaller value (for example |
2310 |
> |
* {@code 0}) has the same effect as the default. |
2311 |
> |
* |
2312 |
> |
* @param maximumPoolSize the maximum number of threads allowed. |
2313 |
> |
* When the maximum is reached, attempts to replace blocked |
2314 |
> |
* threads fail. (However, because creation and termination of |
2315 |
> |
* different threads may overlap, and may be managed by the given |
2316 |
> |
* thread factory, this value may be transiently exceeded.) The |
2317 |
> |
* default for the common pool is {@code 256} plus the parallelism |
2318 |
> |
* level. Using a value (for example {@code Integer.MAX_VALUE}) |
2319 |
> |
* larger than the implementation's total thread limit has the |
2320 |
> |
* same effect as using this limit. |
2321 |
> |
* |
2322 |
> |
* @param minimumRunnable the minimum allowed number of core |
2323 |
> |
* threads not blocked by a join or {@link ManagedBlocker}. To |
2324 |
> |
* ensure progress, when too few unblocked threads exist and |
2325 |
> |
* unexecuted tasks may exist, new threads are constructed, up to |
2326 |
> |
* the given maximumPoolSize. For the default value, use {@code |
2327 |
> |
* 1}, that ensures liveness. A larger value might improve |
2328 |
> |
* throughput in the presence of blocked activities, but might |
2329 |
> |
* not, due to increased overhead. A value of zero may be |
2330 |
> |
* acceptable when submitted tasks cannot have dependencies |
2331 |
> |
* requiring additional threads. |
2332 |
> |
* |
2333 |
> |
* @param rejectOnSaturation if true, attempts to create more than |
2334 |
> |
* the maximum total allowed threads throw {@link |
2335 |
> |
* RejectedExecutionException}. Otherwise, the pool continues to |
2336 |
> |
* operate, but with fewer than the target number of runnable |
2337 |
> |
* threads, so might not ensure progress. For default value, use |
2338 |
> |
* {@code true}. |
2339 |
> |
* |
2340 |
> |
* @param keepAliveTime the elapsed time since last use before |
2341 |
> |
* a thread is terminated (and then later replaced if needed). |
2342 |
> |
* For the default value, use {@code 60, TimeUnit.SECONDS}. |
2343 |
> |
* |
2344 |
> |
* @param unit the time unit for the {@code keepAliveTime} argument |
2345 |
> |
* |
2346 |
> |
* @throws IllegalArgumentException if parallelism is less than or |
2347 |
> |
* equal to zero, or is greater than implementation limit, |
2348 |
> |
* or if maximumPoolSize is less than parallelism, |
2349 |
> |
* of if the keepAliveTime is less than or equal to zero. |
2350 |
> |
* @throws NullPointerException if the factory is null |
2351 |
> |
* @throws SecurityException if a security manager exists and |
2352 |
> |
* the caller is not permitted to modify threads |
2353 |
> |
* because it does not hold {@link |
2354 |
> |
* java.lang.RuntimePermission}{@code ("modifyThread")} |
2355 |
> |
*/ |
2356 |
> |
public ForkJoinPool(int parallelism, |
2357 |
> |
ForkJoinWorkerThreadFactory factory, |
2358 |
> |
UncaughtExceptionHandler handler, |
2359 |
> |
boolean asyncMode, |
2360 |
> |
int corePoolSize, |
2361 |
> |
int maximumPoolSize, |
2362 |
> |
int minimumRunnable, |
2363 |
> |
boolean rejectOnSaturation, |
2364 |
> |
long keepAliveTime, |
2365 |
> |
TimeUnit unit) { |
2366 |
> |
// check, encode, pack parameters |
2367 |
> |
if (parallelism <= 0 || parallelism > MAX_CAP || |
2368 |
> |
maximumPoolSize < parallelism || keepAliveTime <= 0L) |
2369 |
|
throw new IllegalArgumentException(); |
2649 |
– |
return parallelism; |
2650 |
– |
} |
2651 |
– |
|
2652 |
– |
private static ForkJoinWorkerThreadFactory checkFactory |
2653 |
– |
(ForkJoinWorkerThreadFactory factory) { |
2370 |
|
if (factory == null) |
2371 |
|
throw new NullPointerException(); |
2372 |
< |
return factory; |
2372 |
> |
long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); |
2373 |
> |
|
2374 |
> |
String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; |
2375 |
> |
int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); |
2376 |
> |
long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | |
2377 |
> |
(((long)(-parallelism) << RC_SHIFT) & RC_MASK)); |
2378 |
> |
int m = (parallelism | |
2379 |
> |
(asyncMode ? FIFO : 0) | |
2380 |
> |
(rejectOnSaturation ? 0 : SATURATE)); |
2381 |
> |
int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; |
2382 |
> |
int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); |
2383 |
> |
int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); |
2384 |
> |
int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots |
2385 |
> |
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; |
2386 |
> |
n = (n + 1) << 1; // power of two, including space for submission queues |
2387 |
> |
|
2388 |
> |
this.workQueues = new WorkQueue[n]; |
2389 |
> |
this.workerNamePrefix = prefix; |
2390 |
> |
this.factory = factory; |
2391 |
> |
this.ueh = handler; |
2392 |
> |
this.keepAlive = ms; |
2393 |
> |
this.bounds = b; |
2394 |
> |
this.mode = m; |
2395 |
> |
this.ctl = c; |
2396 |
> |
checkPermission(); |
2397 |
|
} |
2398 |
|
|
2399 |
|
/** |
2400 |
< |
* Creates a {@code ForkJoinPool} with the given parameters, without |
2401 |
< |
* any security checks or parameter validation. Invoked directly by |
2402 |
< |
* makeCommonPool. |
2403 |
< |
*/ |
2404 |
< |
private ForkJoinPool(int parallelism, |
2405 |
< |
ForkJoinWorkerThreadFactory factory, |
2406 |
< |
UncaughtExceptionHandler handler, |
2407 |
< |
int mode, |
2408 |
< |
String workerNamePrefix) { |
2409 |
< |
this.workerNamePrefix = workerNamePrefix; |
2410 |
< |
this.factory = factory; |
2400 |
> |
* Constructor for common pool using parameters possibly |
2401 |
> |
* overridden by system properties |
2402 |
> |
*/ |
2403 |
> |
private ForkJoinPool(byte forCommonPoolOnly) { |
2404 |
> |
int parallelism = -1; |
2405 |
> |
ForkJoinWorkerThreadFactory fac = null; |
2406 |
> |
UncaughtExceptionHandler handler = null; |
2407 |
> |
try { // ignore exceptions in accessing/parsing properties |
2408 |
> |
String pp = System.getProperty |
2409 |
> |
("java.util.concurrent.ForkJoinPool.common.parallelism"); |
2410 |
> |
String fp = System.getProperty |
2411 |
> |
("java.util.concurrent.ForkJoinPool.common.threadFactory"); |
2412 |
> |
String hp = System.getProperty |
2413 |
> |
("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); |
2414 |
> |
if (pp != null) |
2415 |
> |
parallelism = Integer.parseInt(pp); |
2416 |
> |
if (fp != null) |
2417 |
> |
fac = ((ForkJoinWorkerThreadFactory)ClassLoader. |
2418 |
> |
getSystemClassLoader().loadClass(fp).newInstance()); |
2419 |
> |
if (hp != null) |
2420 |
> |
handler = ((UncaughtExceptionHandler)ClassLoader. |
2421 |
> |
getSystemClassLoader().loadClass(hp).newInstance()); |
2422 |
> |
} catch (Exception ignore) { |
2423 |
> |
} |
2424 |
> |
|
2425 |
> |
if (fac == null) { |
2426 |
> |
if (System.getSecurityManager() == null) |
2427 |
> |
fac = defaultForkJoinWorkerThreadFactory; |
2428 |
> |
else // use security-managed default |
2429 |
> |
fac = new InnocuousForkJoinWorkerThreadFactory(); |
2430 |
> |
} |
2431 |
> |
if (parallelism < 0 && // default 1 less than #cores |
2432 |
> |
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) |
2433 |
> |
parallelism = 1; |
2434 |
> |
if (parallelism > MAX_CAP) |
2435 |
> |
parallelism = MAX_CAP; |
2436 |
> |
|
2437 |
> |
long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | |
2438 |
> |
(((long)(-parallelism) << RC_SHIFT) & RC_MASK)); |
2439 |
> |
int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); |
2440 |
> |
int m = (parallelism < 1) ? 1 : parallelism; |
2441 |
> |
int n = (parallelism > 1) ? parallelism - 1 : 1; |
2442 |
> |
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; |
2443 |
> |
n = (n + 1) << 1; |
2444 |
> |
|
2445 |
> |
this.workQueues = new WorkQueue[n]; |
2446 |
> |
this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; |
2447 |
> |
this.factory = fac; |
2448 |
|
this.ueh = handler; |
2449 |
< |
this.config = (parallelism & SMASK) | mode; |
2450 |
< |
long np = (long)(-parallelism); // offset ctl counts |
2451 |
< |
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); |
2449 |
> |
this.keepAlive = DEFAULT_KEEPALIVE; |
2450 |
> |
this.bounds = b; |
2451 |
> |
this.mode = m; |
2452 |
> |
this.ctl = c; |
2453 |
|
} |
2454 |
|
|
2455 |
|
/** |
2625 |
|
* @return the targeted parallelism level of this pool |
2626 |
|
*/ |
2627 |
|
public int getParallelism() { |
2628 |
< |
int par; |
2851 |
< |
return ((par = config & SMASK) > 0) ? par : 1; |
2628 |
> |
return mode & SMASK; |
2629 |
|
} |
2630 |
|
|
2631 |
|
/** |
2647 |
|
* @return the number of worker threads |
2648 |
|
*/ |
2649 |
|
public int getPoolSize() { |
2650 |
< |
return (config & SMASK) + (short)(ctl >>> TC_SHIFT); |
2650 |
> |
return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT)); |
2651 |
|
} |
2652 |
|
|
2653 |
|
/** |
2657 |
|
* @return {@code true} if this pool uses async mode |
2658 |
|
*/ |
2659 |
|
public boolean getAsyncMode() { |
2660 |
< |
return (config & FIFO_QUEUE) != 0; |
2660 |
> |
return (mode & FIFO) != 0; |
2661 |
|
} |
2662 |
|
|
2663 |
|
/** |
2688 |
|
* @return the number of active threads |
2689 |
|
*/ |
2690 |
|
public int getActiveThreadCount() { |
2691 |
< |
int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); |
2691 |
> |
int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT); |
2692 |
|
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
2693 |
|
} |
2694 |
|
|
2704 |
|
* @return {@code true} if all threads are currently idle |
2705 |
|
*/ |
2706 |
|
public boolean isQuiescent() { |
2707 |
< |
return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0; |
2707 |
> |
for (;;) { |
2708 |
> |
long c = ctl; |
2709 |
> |
int md = mode, pc = md & SMASK; |
2710 |
> |
int tc = pc + (short)(c >> TC_SHIFT); |
2711 |
> |
int rc = pc + (int)(c >> RC_SHIFT); |
2712 |
> |
if ((md & (STOP | TERMINATED)) != 0) |
2713 |
> |
return true; |
2714 |
> |
else if (rc > 0) |
2715 |
> |
return false; |
2716 |
> |
else { |
2717 |
> |
WorkQueue[] ws; WorkQueue v; |
2718 |
> |
if ((ws = workQueues) != null) { |
2719 |
> |
for (int i = 1; i < ws.length; i += 2) { |
2720 |
> |
if ((v = ws[i]) != null) { |
2721 |
> |
if ((v.source & QUIET) == 0) |
2722 |
> |
return false; |
2723 |
> |
--tc; |
2724 |
> |
} |
2725 |
> |
} |
2726 |
> |
} |
2727 |
> |
if (tc == 0 && ctl == c) |
2728 |
> |
return true; |
2729 |
> |
} |
2730 |
> |
} |
2731 |
|
} |
2732 |
|
|
2733 |
|
/** |
2742 |
|
* @return the number of steals |
2743 |
|
*/ |
2744 |
|
public long getStealCount() { |
2745 |
< |
AuxState sc = auxState; |
2946 |
< |
long count = (sc == null) ? 0L : sc.stealCount; |
2745 |
> |
long count = stealCount; |
2746 |
|
WorkQueue[] ws; WorkQueue w; |
2747 |
|
if ((ws = workQueues) != null) { |
2748 |
|
for (int i = 1; i < ws.length; i += 2) { |
2749 |
|
if ((w = ws[i]) != null) |
2750 |
< |
count += w.nsteals; |
2750 |
> |
count += (long)w.nsteals & 0xffffffffL; |
2751 |
|
} |
2752 |
|
} |
2753 |
|
return count; |
2819 |
|
* @return the next submission, or {@code null} if none |
2820 |
|
*/ |
2821 |
|
protected ForkJoinTask<?> pollSubmission() { |
2822 |
< |
WorkQueue[] ws; int wl; WorkQueue w; ForkJoinTask<?> t; |
3024 |
< |
int r = ThreadLocalRandom.nextSecondarySeed(); |
3025 |
< |
if ((ws = workQueues) != null && (wl = ws.length) > 0) { |
3026 |
< |
for (int m = wl - 1, i = 0; i < wl; ++i) { |
3027 |
< |
if ((w = ws[(i << 1) & m]) != null && (t = w.poll()) != null) |
3028 |
< |
return t; |
3029 |
< |
} |
3030 |
< |
} |
3031 |
< |
return null; |
2822 |
> |
return pollScan(true); |
2823 |
|
} |
2824 |
|
|
2825 |
|
/** |
2865 |
|
public String toString() { |
2866 |
|
// Use a single pass through workQueues to collect counts |
2867 |
|
long qt = 0L, qs = 0L; int rc = 0; |
2868 |
< |
AuxState sc = auxState; |
3078 |
< |
long st = (sc == null) ? 0L : sc.stealCount; |
3079 |
< |
long c = ctl; |
2868 |
> |
long st = stealCount; |
2869 |
|
WorkQueue[] ws; WorkQueue w; |
2870 |
|
if ((ws = workQueues) != null) { |
2871 |
|
for (int i = 0; i < ws.length; ++i) { |
2875 |
|
qs += size; |
2876 |
|
else { |
2877 |
|
qt += size; |
2878 |
< |
st += w.nsteals; |
2878 |
> |
st += (long)w.nsteals & 0xffffffffL; |
2879 |
|
if (w.isApparentlyUnblocked()) |
2880 |
|
++rc; |
2881 |
|
} |
2882 |
|
} |
2883 |
|
} |
2884 |
|
} |
2885 |
< |
int pc = (config & SMASK); |
2885 |
> |
|
2886 |
> |
int md = mode; |
2887 |
> |
int pc = (md & SMASK); |
2888 |
> |
long c = ctl; |
2889 |
|
int tc = pc + (short)(c >>> TC_SHIFT); |
2890 |
< |
int ac = pc + (int)(c >> AC_SHIFT); |
2890 |
> |
int ac = pc + (int)(c >> RC_SHIFT); |
2891 |
|
if (ac < 0) // ignore transient negative |
2892 |
|
ac = 0; |
2893 |
< |
int rs = runState; |
2894 |
< |
String level = ((rs & TERMINATED) != 0 ? "Terminated" : |
2895 |
< |
(rs & STOP) != 0 ? "Terminating" : |
3104 |
< |
(rs & SHUTDOWN) != 0 ? "Shutting down" : |
2893 |
> |
String level = ((md & TERMINATED) != 0 ? "Terminated" : |
2894 |
> |
(md & STOP) != 0 ? "Terminating" : |
2895 |
> |
(md & SHUTDOWN) != 0 ? "Shutting down" : |
2896 |
|
"Running"); |
2897 |
|
return super.toString() + |
2898 |
|
"[" + level + |
2955 |
|
* @return {@code true} if all tasks have completed following shut down |
2956 |
|
*/ |
2957 |
|
public boolean isTerminated() { |
2958 |
< |
return (runState & TERMINATED) != 0; |
2958 |
> |
return (mode & TERMINATED) != 0; |
2959 |
|
} |
2960 |
|
|
2961 |
|
/** |
2972 |
|
* @return {@code true} if terminating but not yet terminated |
2973 |
|
*/ |
2974 |
|
public boolean isTerminating() { |
2975 |
< |
int rs = runState; |
2976 |
< |
return (rs & STOP) != 0 && (rs & TERMINATED) == 0; |
2975 |
> |
int md = mode; |
2976 |
> |
return (md & STOP) != 0 && (md & TERMINATED) == 0; |
2977 |
|
} |
2978 |
|
|
2979 |
|
/** |
2982 |
|
* @return {@code true} if this pool has been shut down |
2983 |
|
*/ |
2984 |
|
public boolean isShutdown() { |
2985 |
< |
return (runState & SHUTDOWN) != 0; |
2985 |
> |
return (mode & SHUTDOWN) != 0; |
2986 |
|
} |
2987 |
|
|
2988 |
|
/** |
3046 |
|
helpQuiescePool(wt.workQueue); |
3047 |
|
return true; |
3048 |
|
} |
3049 |
< |
long startTime = System.nanoTime(); |
3050 |
< |
WorkQueue[] ws; |
3051 |
< |
int r = 0, wl; |
3052 |
< |
boolean found = true; |
3053 |
< |
while (!isQuiescent() && (ws = workQueues) != null && |
3054 |
< |
(wl = ws.length) > 0) { |
3055 |
< |
if (!found) { |
3056 |
< |
if ((System.nanoTime() - startTime) > nanos) |
3049 |
> |
else { |
3050 |
> |
for (long startTime = System.nanoTime();;) { |
3051 |
> |
ForkJoinTask<?> t; |
3052 |
> |
if ((t = pollScan(false)) != null) |
3053 |
> |
t.doExec(); |
3054 |
> |
else if (isQuiescent()) |
3055 |
> |
return true; |
3056 |
> |
else if ((System.nanoTime() - startTime) > nanos) |
3057 |
|
return false; |
3058 |
< |
Thread.yield(); // cannot block |
3059 |
< |
} |
3269 |
< |
found = false; |
3270 |
< |
for (int m = wl - 1, j = (m + 1) << 2; j >= 0; --j) { |
3271 |
< |
ForkJoinTask<?> t; WorkQueue q; int b, k; |
3272 |
< |
if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null && |
3273 |
< |
(b = q.base) - q.top < 0) { |
3274 |
< |
found = true; |
3275 |
< |
if ((t = q.pollAt(b)) != null) |
3276 |
< |
t.doExec(); |
3277 |
< |
break; |
3278 |
< |
} |
3058 |
> |
else |
3059 |
> |
Thread.yield(); // cannot block |
3060 |
|
} |
3061 |
|
} |
3281 |
– |
return true; |
3062 |
|
} |
3063 |
|
|
3064 |
|
/** |
3173 |
|
throws InterruptedException { |
3174 |
|
ForkJoinPool p; |
3175 |
|
ForkJoinWorkerThread wt; |
3176 |
+ |
WorkQueue w; |
3177 |
|
Thread t = Thread.currentThread(); |
3178 |
|
if ((t instanceof ForkJoinWorkerThread) && |
3179 |
< |
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) { |
3180 |
< |
WorkQueue w = wt.workQueue; |
3179 |
> |
(p = (wt = (ForkJoinWorkerThread)t).pool) != null && |
3180 |
> |
(w = wt.workQueue) != null) { |
3181 |
> |
int block; |
3182 |
|
while (!blocker.isReleasable()) { |
3183 |
< |
if (p.tryCompensate(w)) { |
3183 |
> |
if ((block = p.tryCompensate(w)) != 0) { |
3184 |
|
try { |
3185 |
|
do {} while (!blocker.isReleasable() && |
3186 |
|
!blocker.block()); |
3187 |
|
} finally { |
3188 |
< |
U.getAndAddLong(p, CTL, AC_UNIT); |
3188 |
> |
U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L); |
3189 |
|
} |
3190 |
|
break; |
3191 |
|
} |
3212 |
|
// Unsafe mechanics |
3213 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
3214 |
|
private static final long CTL; |
3215 |
< |
private static final long RUNSTATE; |
3215 |
> |
private static final long MODE; |
3216 |
|
private static final int ABASE; |
3217 |
|
private static final int ASHIFT; |
3218 |
|
|
3220 |
|
try { |
3221 |
|
CTL = U.objectFieldOffset |
3222 |
|
(ForkJoinPool.class.getDeclaredField("ctl")); |
3223 |
< |
RUNSTATE = U.objectFieldOffset |
3224 |
< |
(ForkJoinPool.class.getDeclaredField("runState")); |
3223 |
> |
MODE = U.objectFieldOffset |
3224 |
> |
(ForkJoinPool.class.getDeclaredField("mode")); |
3225 |
|
ABASE = U.arrayBaseOffset(ForkJoinTask[].class); |
3226 |
|
int scale = U.arrayIndexScale(ForkJoinTask[].class); |
3227 |
|
if ((scale & (scale - 1)) != 0) |
3250 |
|
|
3251 |
|
common = java.security.AccessController.doPrivileged |
3252 |
|
(new java.security.PrivilegedAction<ForkJoinPool>() { |
3253 |
< |
public ForkJoinPool run() { return makeCommonPool(); }}); |
3253 |
> |
public ForkJoinPool run() { |
3254 |
> |
return new ForkJoinPool((byte)0); }}); |
3255 |
|
|
3256 |
< |
// report 1 even if threads disabled |
3474 |
< |
COMMON_PARALLELISM = Math.max(common.config & SMASK, 1); |
3475 |
< |
} |
3476 |
< |
|
3477 |
< |
/** |
3478 |
< |
* Creates and returns the common pool, respecting user settings |
3479 |
< |
* specified via system properties. |
3480 |
< |
*/ |
3481 |
< |
static ForkJoinPool makeCommonPool() { |
3482 |
< |
int parallelism = -1; |
3483 |
< |
ForkJoinWorkerThreadFactory factory = null; |
3484 |
< |
UncaughtExceptionHandler handler = null; |
3485 |
< |
try { // ignore exceptions in accessing/parsing properties |
3486 |
< |
String pp = System.getProperty |
3487 |
< |
("java.util.concurrent.ForkJoinPool.common.parallelism"); |
3488 |
< |
String fp = System.getProperty |
3489 |
< |
("java.util.concurrent.ForkJoinPool.common.threadFactory"); |
3490 |
< |
String hp = System.getProperty |
3491 |
< |
("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); |
3492 |
< |
if (pp != null) |
3493 |
< |
parallelism = Integer.parseInt(pp); |
3494 |
< |
if (fp != null) |
3495 |
< |
factory = ((ForkJoinWorkerThreadFactory)ClassLoader. |
3496 |
< |
getSystemClassLoader().loadClass(fp).newInstance()); |
3497 |
< |
if (hp != null) |
3498 |
< |
handler = ((UncaughtExceptionHandler)ClassLoader. |
3499 |
< |
getSystemClassLoader().loadClass(hp).newInstance()); |
3500 |
< |
} catch (Exception ignore) { |
3501 |
< |
} |
3502 |
< |
if (factory == null) { |
3503 |
< |
if (System.getSecurityManager() == null) |
3504 |
< |
factory = defaultForkJoinWorkerThreadFactory; |
3505 |
< |
else // use security-managed default |
3506 |
< |
factory = new InnocuousForkJoinWorkerThreadFactory(); |
3507 |
< |
} |
3508 |
< |
if (parallelism < 0 && // default 1 less than #cores |
3509 |
< |
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) |
3510 |
< |
parallelism = 1; |
3511 |
< |
if (parallelism > MAX_CAP) |
3512 |
< |
parallelism = MAX_CAP; |
3513 |
< |
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, |
3514 |
< |
"ForkJoinPool.commonPool-worker-"); |
3256 |
> |
COMMON_PARALLELISM = common.mode & SMASK; |
3257 |
|
} |
3258 |
|
|
3259 |
|
/** |