155 |
|
* functionality and control for a set of worker threads: |
156 |
|
* Submissions from non-FJ threads enter into submission queues. |
157 |
|
* Workers take these tasks and typically split them into subtasks |
158 |
< |
* that may be stolen by other workers. Preference rules give |
159 |
< |
* first priority to processing tasks from their own queues (LIFO |
160 |
< |
* or FIFO, depending on mode), then to randomized FIFO steals of |
161 |
< |
* tasks in other queues. This framework began as vehicle for |
162 |
< |
* supporting tree-structured parallelism using work-stealing. |
163 |
< |
* Over time, its scalability advantages led to extensions and |
164 |
< |
* changes to better support more diverse usage contexts. Because |
165 |
< |
* most internal methods and nested classes are interrelated, |
166 |
< |
* their main rationale and descriptions are presented here; |
167 |
< |
* individual methods and nested classes contain only brief |
168 |
< |
* comments about details. |
158 |
> |
* that may be stolen by other workers. Work-stealing based on |
159 |
> |
* randomized scans generally leads to better throughput than |
160 |
> |
* "work dealing" in which producers assign tasks to idle threads, |
161 |
> |
* in part because threads that have finished other tasks before |
162 |
> |
* the signalled thread wakes up (which can be a long time) can |
163 |
> |
* take the task instead. Preference rules give first priority to |
164 |
> |
* processing tasks from their own queues (LIFO or FIFO, depending |
165 |
> |
* on mode), then to randomized FIFO steals of tasks in other |
166 |
> |
* queues. This framework began as vehicle for supporting |
167 |
> |
* tree-structured parallelism using work-stealing. Over time, |
168 |
> |
* its scalability advantages led to extensions and changes to |
169 |
> |
* better support more diverse usage contexts. Because most |
170 |
> |
* internal methods and nested classes are interrelated, their |
171 |
> |
* main rationale and descriptions are presented here; individual |
172 |
> |
* methods and nested classes contain only brief comments about |
173 |
> |
* details. |
174 |
|
* |
175 |
|
* WorkQueues |
176 |
|
* ========== |
203 |
|
* |
204 |
|
* (The actual code needs to null-check and size-check the array, |
205 |
|
* uses masking, not mod, for indexing a power-of-two-sized array, |
206 |
< |
* properly fences accesses, and possibly signals waiting workers |
207 |
< |
* to start scanning -- see below.) Both a successful pop and |
208 |
< |
* poll mainly entail a CAS of a slot from non-null to null. |
206 |
> |
* adds a release fence for publication, and possibly signals |
207 |
> |
* waiting workers to start scanning -- see below.) Both a |
208 |
> |
* successful pop and poll mainly entail a CAS of a slot from |
209 |
> |
* non-null to null. |
210 |
|
* |
211 |
|
* The pop operation (always performed by owner) is: |
212 |
|
* if ((the task at top slot is not null) and |
218 |
|
* (CAS slot to null)) |
219 |
|
* increment base and return task; |
220 |
|
* |
221 |
< |
* There are several variants of each of these. In particular, |
222 |
< |
* almost all uses of poll occur within scan operations that also |
223 |
< |
* interleave contention tracking (with associated code sprawl.) |
221 |
> |
* There are several variants of each of these. Most uses occur |
222 |
> |
* within operations that also interleave contention or emptiness |
223 |
> |
* tracking or inspection of elements before extracting them, so |
224 |
> |
* must interleave these with the above code. When performed by |
225 |
> |
* owner, getAndSet is used instead of CAS (see for example method |
226 |
> |
* nextLocalTask) which is usually more efficient, and possible |
227 |
> |
* because the top index cannot independently change during the |
228 |
> |
* operation. |
229 |
|
* |
230 |
|
* Memory ordering. See "Correct and Efficient Work-Stealing for |
231 |
|
* Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 |
234 |
|
* algorithms similar to (but different than) the one used here. |
235 |
|
* Extracting tasks in array slots via (fully fenced) CAS provides |
236 |
|
* primary synchronization. The base and top indices imprecisely |
237 |
< |
* guide where to extract from. We do not always require strict |
238 |
< |
* orderings of array and index updates, so sometimes let them be |
239 |
< |
* subject to compiler and processor reorderings. However, the |
240 |
< |
* volatile "base" index also serves as a basis for memory |
241 |
< |
* ordering: Slot accesses are preceded by a read of base, |
242 |
< |
* ensuring happens-before ordering with respect to stealers (so |
243 |
< |
* the slots themselves can be read via plain array reads.) The |
244 |
< |
* only other memory orderings relied on are maintained in the |
245 |
< |
* course of signalling and activation (see below). A check that |
246 |
< |
* base == top indicates (momentary) emptiness, but otherwise may |
247 |
< |
* err on the side of possibly making the queue appear nonempty |
248 |
< |
* when a push, pop, or poll have not fully committed, or making |
249 |
< |
* it appear empty when an update of top has not yet been visibly |
250 |
< |
* written. (Method isEmpty() checks the case of a partially |
251 |
< |
* completed removal of the last element.) Because of this, the |
252 |
< |
* poll operation, considered individually, is not wait-free. One |
253 |
< |
* thief cannot successfully continue until another in-progress |
254 |
< |
* one (or, if previously empty, a push) visibly completes. |
255 |
< |
* However, in the aggregate, we ensure at least probabilistic |
237 |
> |
* guide where to extract from. We do not usually require strict |
238 |
> |
* orderings of array and index updates. Many index accesses use |
239 |
> |
* plain mode, with ordering constrained by surrounding context |
240 |
> |
* (usually with respect to element CASes or the two WorkQueue |
241 |
> |
* volatile fields source and phase). When not otherwise already |
242 |
> |
* constrained, reads of "base" by queue owners use acquire-mode, |
243 |
> |
* and some externally callable methods preface accesses with |
244 |
> |
* acquire fences. Additionally, to ensure that index update |
245 |
> |
* writes are not coalesced or postponed in loops etc, "opaque" |
246 |
> |
* mode is used in a few cases where timely writes are not |
247 |
> |
* otherwise ensured. The "locked" versions of push- and pop- |
248 |
> |
* based methods for shared queues differ from owned versions |
249 |
> |
* because locking already forces some of the ordering. |
250 |
> |
* |
251 |
> |
* Because indices and slot contents cannot always be consistent, |
252 |
> |
* a check that base == top indicates (momentary) emptiness, but |
253 |
> |
* otherwise may err on the side of possibly making the queue |
254 |
> |
* appear nonempty when a push, pop, or poll have not fully |
255 |
> |
* committed, or making it appear empty when an update of top has |
256 |
> |
* not yet been visibly written. (Method isEmpty() checks the |
257 |
> |
* case of a partially completed removal of the last element.) |
258 |
> |
* Because of this, the poll operation, considered individually, |
259 |
> |
* is not wait-free. One thief cannot successfully continue until |
260 |
> |
* another in-progress one (or, if previously empty, a push) |
261 |
> |
* visibly completes. This can stall threads when required to |
262 |
> |
* consume from a given queue (see method poll()). However, in |
263 |
> |
* the aggregate, we ensure at least probabilistic |
264 |
|
* non-blockingness. If an attempted steal fails, a scanning |
265 |
|
* thief chooses a different random victim target to try next. So, |
266 |
|
* in order for one thief to progress, it suffices for any |
267 |
< |
* in-progress poll or new push on any empty queue to |
249 |
< |
* complete. |
267 |
> |
* in-progress poll or new push on any empty queue to complete. |
268 |
|
* |
269 |
|
* This approach also enables support of a user mode in which |
270 |
|
* local task processing is in FIFO, not LIFO order, simply by |
285 |
|
* different position to use or create other queues -- they block |
286 |
|
* only when creating and registering new queues. Because it is |
287 |
|
* used only as a spinlock, unlocking requires only a "releasing" |
288 |
< |
* store (using setRelease). |
288 |
> |
* store (using setRelease) unless otherwise signalling. |
289 |
|
* |
290 |
|
* Management |
291 |
|
* ========== |
306 |
|
* |
307 |
|
* Field "ctl" contains 64 bits holding information needed to |
308 |
|
* atomically decide to add, enqueue (on an event queue), and |
309 |
< |
* dequeue (and release)-activate workers. To enable this |
310 |
< |
* packing, we restrict maximum parallelism to (1<<15)-1 (which is |
311 |
< |
* far in excess of normal operating range) to allow ids, counts, |
312 |
< |
* and their negations (used for thresholding) to fit into 16bit |
309 |
> |
* dequeue and release workers. To enable this packing, we |
310 |
> |
* restrict maximum parallelism to (1<<15)-1 (which is far in |
311 |
> |
* excess of normal operating range) to allow ids, counts, and |
312 |
> |
* their negations (used for thresholding) to fit into 16bit |
313 |
|
* subfields. |
314 |
|
* |
315 |
|
* Field "mode" holds configuration parameters as well as lifetime |
321 |
|
* lock (using field workerNamePrefix as lock), but is otherwise |
322 |
|
* concurrently readable, and accessed directly. We also ensure |
323 |
|
* that uses of the array reference itself never become too stale |
324 |
< |
* in case of resizing. To simplify index-based operations, the |
325 |
< |
* array size is always a power of two, and all readers must |
326 |
< |
* tolerate null slots. Worker queues are at odd indices. Shared |
327 |
< |
* (submission) queues are at even indices, up to a maximum of 64 |
328 |
< |
* slots, to limit growth even if array needs to expand to add |
329 |
< |
* more workers. Grouping them together in this way simplifies and |
330 |
< |
* speeds up task scanning. |
324 |
> |
* in case of resizing, by arranging that (re-)reads are separated |
325 |
> |
* by at least one acquiring read access. To simplify index-based |
326 |
> |
* operations, the array size is always a power of two, and all |
327 |
> |
* readers must tolerate null slots. Worker queues are at odd |
328 |
> |
* indices. Shared (submission) queues are at even indices, up to |
329 |
> |
* a maximum of 64 slots, to limit growth even if array needs to |
330 |
> |
* expand to add more workers. Grouping them together in this way |
331 |
> |
* simplifies and speeds up task scanning. |
332 |
|
* |
333 |
|
* All worker thread creation is on-demand, triggered by task |
334 |
|
* submissions, replacement of terminated workers, and/or |
406 |
|
* releases so usage requires care -- seeing a negative phase does |
407 |
|
* not guarantee that the worker is available. When queued, the |
408 |
|
* lower 16 bits of scanState must hold its pool index. So we |
409 |
< |
* place the index there upon initialization (see registerWorker) |
410 |
< |
* and otherwise keep it there or restore it when necessary. |
409 |
> |
* place the index there upon initialization and otherwise keep it |
410 |
> |
* there or restore it when necessary. |
411 |
|
* |
412 |
|
* The ctl field also serves as the basis for memory |
413 |
|
* synchronization surrounding activation. This uses a more |
415 |
|
* consumers sync with each other by both writing/CASing ctl (even |
416 |
|
* if to its current value). This would be extremely costly. So |
417 |
|
* we relax it in several ways: (1) Producers only signal when |
418 |
< |
* their queue is empty. Other workers propagate this signal (in |
419 |
< |
* method scan) when they find tasks; to further reduce flailing, |
420 |
< |
* each worker signals only one other per activation. (2) Workers |
421 |
< |
* only enqueue after scanning (see below) and not finding any |
422 |
< |
* tasks. (3) Rather than CASing ctl to its current value in the |
423 |
< |
* common case where no action is required, we reduce write |
424 |
< |
* contention by equivalently prefacing signalWork when called by |
425 |
< |
* an external task producer using a memory access with |
407 |
< |
* full-volatile semantics or a "fullFence". |
418 |
> |
* their queue is possibly empty at some point during a push |
419 |
> |
* operation. (2) Other workers propagate this signal when they |
420 |
> |
* find tasks. (3) Workers only enqueue after scanning (see below) |
421 |
> |
* and not finding any tasks. (4) Rather than CASing ctl to its |
422 |
> |
* current value in the common case where no action is required, |
423 |
> |
* we reduce write contention by equivalently prefacing signalWork |
424 |
> |
* when called by an external task producer using a memory access |
425 |
> |
* with full-volatile semantics or a "fullFence". |
426 |
|
* |
427 |
|
* Almost always, too many signals are issued. A task producer |
428 |
|
* cannot in general tell if some existing worker is in the midst |
434 |
|
* and bookkeeping bottlenecks during ramp-up, ramp-down, and small |
435 |
|
* computations involving only a few workers. |
436 |
|
* |
437 |
< |
* Scanning. Method runWorker performs top-level scanning for |
438 |
< |
* tasks. Each scan traverses and tries to poll from each queue |
439 |
< |
* starting at a random index and circularly stepping. Scans are |
440 |
< |
* not performed in ideal random permutation order, to reduce |
441 |
< |
* cacheline contention. The pseudorandom generator need not have |
437 |
> |
* Scanning. Method scan (from runWorker) performs top-level |
438 |
> |
* scanning for tasks. (Similar scans appear in helpQuiesce and |
439 |
> |
* pollScan.) Each scan traverses and tries to poll from each |
440 |
> |
* queue starting at a random index. Scans are not performed in |
441 |
> |
* ideal random permutation order, to reduce cacheline |
442 |
> |
* contention. The pseudorandom generator need not have |
443 |
|
* high-quality statistical properties in the long term, but just |
444 |
|
* within computations; We use Marsaglia XorShifts (often via |
445 |
|
* ThreadLocalRandom.nextSecondarySeed), which are cheap and |
446 |
< |
* suffice. Scanning also employs contention reduction: When |
446 |
> |
* suffice. Scanning also includes contention reduction: When |
447 |
|
* scanning workers fail to extract an apparently existing task, |
448 |
< |
* they soon restart at a different pseudorandom index. This |
449 |
< |
* improves throughput when many threads are trying to take tasks |
450 |
< |
* from few queues, which can be common in some usages. Scans do |
451 |
< |
* not otherwise explicitly take into account core affinities, |
452 |
< |
* loads, cache localities, etc, However, they do exploit temporal |
453 |
< |
* locality (which usually approximates these) by preferring to |
454 |
< |
* re-poll (at most #workers times) from the same queue after a |
455 |
< |
* successful poll before trying others. |
448 |
> |
* they soon restart at a different pseudorandom index. This form |
449 |
> |
* of backoff improves throughput when many threads are trying to |
450 |
> |
* take tasks from few queues, which can be common in some usages. |
451 |
> |
* Scans do not otherwise explicitly take into account core |
452 |
> |
* affinities, loads, cache localities, etc, However, they do |
453 |
> |
* exploit temporal locality (which usually approximates these) by |
454 |
> |
* preferring to re-poll from the same queue after a successful |
455 |
> |
* poll before trying others (see method topLevelExec). However |
456 |
> |
* this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard |
457 |
> |
* against infinitely unfair looping under unbounded user task |
458 |
> |
* recursion, and also to reduce long-term contention when many |
459 |
> |
* threads poll few queues holding many small tasks. The bound is |
460 |
> |
* high enough to avoid much impact on locality and scheduling |
461 |
> |
* overhead. |
462 |
|
* |
463 |
|
* Trimming workers. To release resources after periods of lack of |
464 |
|
* use, a worker starting to wait when the pool is quiescent will |
465 |
< |
* time out and terminate (see method scan) if the pool has |
465 |
> |
* time out and terminate (see method runWorker) if the pool has |
466 |
|
* remained quiescent for period given by field keepAlive. |
467 |
|
* |
468 |
|
* Shutdown and Termination. A call to shutdownNow invokes |
530 |
|
* time. Some previous versions of this class employed immediate |
531 |
|
* compensations for any blocked join. However, in practice, the |
532 |
|
* vast majority of blockages are transient byproducts of GC and |
533 |
< |
* other JVM or OS activities that are made worse by replacement. |
534 |
< |
* Rather than impose arbitrary policies, we allow users to |
535 |
< |
* override the default of only adding threads upon apparent |
536 |
< |
* starvation. The compensation mechanism may also be bounded. |
537 |
< |
* Bounds for the commonPool (see COMMON_MAX_SPARES) better enable |
538 |
< |
* JVMs to cope with programming errors and abuse before running |
539 |
< |
* out of resources to do so. |
533 |
> |
* other JVM or OS activities that are made worse by replacement |
534 |
> |
* when they cause longer-term oversubscription. Rather than |
535 |
> |
* impose arbitrary policies, we allow users to override the |
536 |
> |
* default of only adding threads upon apparent starvation. The |
537 |
> |
* compensation mechanism may also be bounded. Bounds for the |
538 |
> |
* commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope |
539 |
> |
* with programming errors and abuse before running out of |
540 |
> |
* resources to do so. |
541 |
|
* |
542 |
|
* Common Pool |
543 |
|
* =========== |
570 |
|
* in ForkJoinWorkerThread) may be JVM-dependent and must access |
571 |
|
* particular Thread class fields to achieve this effect. |
572 |
|
* |
573 |
+ |
* Memory placement |
574 |
+ |
* ================ |
575 |
+ |
* |
576 |
+ |
* Performance can be very sensitive to placement of instances of |
577 |
+ |
* ForkJoinPool and WorkQueues and their queue arrays. To reduce |
578 |
+ |
* false-sharing impact, the @Contended annotation isolates |
579 |
+ |
* adjacent WorkQueue instances, as well as the ForkJoinPool.ctl |
580 |
+ |
* field. WorkQueue arrays are allocated (by their threads) with |
581 |
+ |
* larger initial sizes than most ever need, mostly to reduce |
582 |
+ |
* false sharing with current garbage collectors that use cardmark |
583 |
+ |
* tables. |
584 |
+ |
* |
585 |
|
* Style notes |
586 |
|
* =========== |
587 |
|
* |
589 |
|
* awkward and ugly, but also reflects the need to control |
590 |
|
* outcomes across the unusual cases that arise in very racy code |
591 |
|
* with very few invariants. All fields are read into locals |
592 |
< |
* before use, and null-checked if they are references. This is |
593 |
< |
* usually done in a "C"-like style of listing declarations at the |
594 |
< |
* heads of methods or blocks, and using inline assignments on |
595 |
< |
* first encounter. Nearly all explicit checks lead to |
596 |
< |
* bypass/return, not exception throws, because they may |
597 |
< |
* legitimately arise due to cancellation/revocation during |
598 |
< |
* shutdown. |
592 |
> |
* before use, and null-checked if they are references. Array |
593 |
> |
* accesses using masked indices include checks (that are always |
594 |
> |
* true) that the array length is non-zero to avoid compilers |
595 |
> |
* inserting more expensive traps. This is usually done in a |
596 |
> |
* "C"-like style of listing declarations at the heads of methods |
597 |
> |
* or blocks, and using inline assignments on first encounter. |
598 |
> |
* Nearly all explicit checks lead to bypass/return, not exception |
599 |
> |
* throws, because they may legitimately arise due to |
600 |
> |
* cancellation/revocation during shutdown. |
601 |
|
* |
602 |
|
* There is a lot of representation-level coupling among classes |
603 |
|
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The |
607 |
|
* representations will need to be accompanied by algorithmic |
608 |
|
* changes anyway. Several methods intrinsically sprawl because |
609 |
|
* they must accumulate sets of consistent reads of fields held in |
610 |
< |
* local variables. There are also other coding oddities |
611 |
< |
* (including several unnecessary-looking hoisted null checks) |
612 |
< |
* that help some methods perform reasonably even when interpreted |
613 |
< |
* (not compiled). |
610 |
> |
* local variables. Some others are artificially broken up to |
611 |
> |
* reduce producer/consumer imbalances due to dynamic compilation. |
612 |
> |
* There are also other coding oddities (including several |
613 |
> |
* unnecessary-looking hoisted null checks) that help some methods |
614 |
> |
* perform reasonably even when interpreted (not compiled). |
615 |
|
* |
616 |
|
* The order of declarations in this file is (with a few exceptions): |
617 |
|
* (1) Static utility functions |
715 |
|
static final int DORMANT = QUIET | UNSIGNALLED; |
716 |
|
|
717 |
|
/** |
718 |
< |
* The maximum number of local polls from the same queue before |
719 |
< |
* checking others. This is a safeguard against infinitely unfair |
679 |
< |
* looping under unbounded user task recursion, and must be larger |
680 |
< |
* than plausible cases of intentional bounded task recursion. |
718 |
> |
* Initial capacity of work-stealing queue array. |
719 |
> |
* Must be a power of two, at least 2. |
720 |
|
*/ |
721 |
< |
static final int POLL_LIMIT = 1 << 10; |
721 |
> |
static final int INITIAL_QUEUE_CAPACITY = 1 << 13; |
722 |
> |
|
723 |
> |
/** |
724 |
> |
* Maximum capacity for queue arrays. Must be a power of two less |
725 |
> |
* than or equal to 1 << (31 - width of array entry) to ensure |
726 |
> |
* lack of wraparound of index calculations, but defined to a |
727 |
> |
* value a bit less than this to help users trap runaway programs |
728 |
> |
* before saturating systems. |
729 |
> |
*/ |
730 |
> |
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M |
731 |
> |
|
732 |
> |
/** |
733 |
> |
* The maximum number of top-level polls per worker before |
734 |
> |
* checking other queues, expressed as a bit shift to, in effect, |
735 |
> |
* multiply by pool size, and then use as random value mask, so |
736 |
> |
* average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See |
737 |
> |
* above for rationale. |
738 |
> |
*/ |
739 |
> |
static final int TOP_BOUND_SHIFT = 10; |
740 |
|
|
741 |
|
/** |
742 |
|
* Queues supporting work-stealing as well as external task |
743 |
|
* submission. See above for descriptions and algorithms. |
687 |
– |
* Performance on most platforms is very sensitive to placement of |
688 |
– |
* instances of both WorkQueues and their arrays -- we absolutely |
689 |
– |
* do not want multiple WorkQueue instances or multiple queue |
690 |
– |
* arrays sharing cache lines. The @Contended annotation alerts |
691 |
– |
* JVMs to try to keep instances apart. |
744 |
|
*/ |
745 |
|
@jdk.internal.vm.annotation.Contended |
746 |
|
static final class WorkQueue { |
747 |
< |
|
748 |
< |
/** |
749 |
< |
* Capacity of work-stealing queue array upon initialization. |
750 |
< |
* Must be a power of two; at least 4, but should be larger to |
699 |
< |
* reduce or eliminate cacheline sharing among queues. |
700 |
< |
* Currently, it is much larger, as a partial workaround for |
701 |
< |
* the fact that JVMs often place arrays in locations that |
702 |
< |
* share GC bookkeeping (especially cardmarks) such that |
703 |
< |
* per-write accesses encounter serious memory contention. |
704 |
< |
*/ |
705 |
< |
static final int INITIAL_QUEUE_CAPACITY = 1 << 13; |
706 |
< |
|
707 |
< |
/** |
708 |
< |
* Maximum size for queue arrays. Must be a power of two less |
709 |
< |
* than or equal to 1 << (31 - width of array entry) to ensure |
710 |
< |
* lack of wraparound of index calculations, but defined to a |
711 |
< |
* value a bit less than this to help users trap runaway |
712 |
< |
* programs before saturating systems. |
713 |
< |
*/ |
714 |
< |
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M |
715 |
< |
|
716 |
< |
// Instance fields |
747 |
> |
volatile int source; // source queue id, or sentinel |
748 |
> |
int id; // pool index, mode, tag |
749 |
> |
int base; // index of next slot for poll |
750 |
> |
int top; // index of next slot for push |
751 |
|
volatile int phase; // versioned, negative: queued, 1: locked |
752 |
|
int stackPred; // pool stack (ctl) predecessor link |
753 |
|
int nsteals; // number of steals |
754 |
< |
int id; // index, mode, tag |
721 |
< |
volatile int source; // source queue id, or sentinel |
722 |
< |
volatile int base; // index of next slot for poll |
723 |
< |
int top; // index of next slot for push |
724 |
< |
ForkJoinTask<?>[] array; // the elements (initially unallocated) |
754 |
> |
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size |
755 |
|
final ForkJoinPool pool; // the containing pool (may be null) |
756 |
|
final ForkJoinWorkerThread owner; // owning thread or null if shared |
757 |
|
|
763 |
|
} |
764 |
|
|
765 |
|
/** |
766 |
+ |
* Tries to lock shared queue by CASing phase field. |
767 |
+ |
*/ |
768 |
+ |
final boolean tryLockPhase() { |
769 |
+ |
return PHASE.compareAndSet(this, 0, 1); |
770 |
+ |
} |
771 |
+ |
|
772 |
+ |
final void releasePhaseLock() { |
773 |
+ |
PHASE.setRelease(this, 0); |
774 |
+ |
} |
775 |
+ |
|
776 |
+ |
/** |
777 |
|
* Returns an exportable index (used by ForkJoinWorkerThread). |
778 |
|
*/ |
779 |
|
final int getPoolIndex() { |
784 |
|
* Returns the approximate number of tasks in the queue. |
785 |
|
*/ |
786 |
|
final int queueSize() { |
787 |
< |
int n = base - top; // read base first |
787 |
> |
int n = (int)BASE.getAcquire(this) - top; |
788 |
|
return (n >= 0) ? 0 : -n; // ignore transient negative |
789 |
|
} |
790 |
|
|
794 |
|
* near-empty queue has at least one unclaimed task. |
795 |
|
*/ |
796 |
|
final boolean isEmpty() { |
797 |
< |
ForkJoinTask<?>[] a; int n, al, b; |
797 |
> |
ForkJoinTask<?>[] a; int n, cap, b; |
798 |
> |
VarHandle.acquireFence(); // needed by external callers |
799 |
|
return ((n = (b = base) - top) >= 0 || // possibly one task |
800 |
|
(n == -1 && ((a = array) == null || |
801 |
< |
(al = a.length) == 0 || |
802 |
< |
a[(al - 1) & b] == null))); |
801 |
> |
(cap = a.length) == 0 || |
802 |
> |
a[(cap - 1) & b] == null))); |
803 |
|
} |
804 |
|
|
763 |
– |
|
805 |
|
/** |
806 |
|
* Pushes a task. Call only by owner in unshared queues. |
807 |
|
* |
809 |
|
* @throws RejectedExecutionException if array cannot be resized |
810 |
|
*/ |
811 |
|
final void push(ForkJoinTask<?> task) { |
812 |
< |
int s = top; ForkJoinTask<?>[] a; int al, d; |
813 |
< |
if ((a = array) != null && (al = a.length) > 0) { |
814 |
< |
int index = (al - 1) & s; |
815 |
< |
ForkJoinPool p = pool; |
812 |
> |
ForkJoinTask<?>[] a; |
813 |
> |
int s = top, d, cap; |
814 |
> |
ForkJoinPool p = pool; |
815 |
> |
if ((a = array) != null && (cap = a.length) > 0) { |
816 |
> |
QA.setRelease(a, (cap - 1) & s, task); |
817 |
|
top = s + 1; |
818 |
< |
QA.setRelease(a, index, task); |
777 |
< |
if ((d = base - s) == 0 && p != null) { |
818 |
> |
if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) { |
819 |
|
VarHandle.fullFence(); |
820 |
|
p.signalWork(); |
821 |
|
} |
822 |
< |
else if (d + al == 1) |
823 |
< |
growArray(); |
822 |
> |
else if (d + cap - 1 == 0) |
823 |
> |
growArray(false); |
824 |
|
} |
825 |
|
} |
826 |
|
|
827 |
|
/** |
828 |
< |
* Initializes or doubles the capacity of array. Call either |
829 |
< |
* by owner or with lock held -- it is OK for base, but not |
789 |
< |
* top, to move while resizings are in progress. |
828 |
> |
* Version of push for shared queues. Call only with phase lock held. |
829 |
> |
* @return true if should signal work |
830 |
|
*/ |
831 |
< |
final ForkJoinTask<?>[] growArray() { |
832 |
< |
ForkJoinTask<?>[] oldA = array; |
833 |
< |
int oldSize = oldA != null ? oldA.length : 0; |
834 |
< |
int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY; |
835 |
< |
if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) |
836 |
< |
throw new RejectedExecutionException("Queue capacity exceeded"); |
837 |
< |
int oldMask, t, b; |
838 |
< |
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; |
839 |
< |
if (oldA != null && (oldMask = oldSize - 1) > 0 && |
840 |
< |
(t = top) - (b = base) > 0) { |
841 |
< |
int mask = size - 1; |
842 |
< |
do { // emulate poll from old array, push to new array |
843 |
< |
int index = b & oldMask; |
844 |
< |
ForkJoinTask<?> x = (ForkJoinTask<?>) |
845 |
< |
QA.getAcquire(oldA, index); |
846 |
< |
if (x != null && |
807 |
< |
QA.compareAndSet(oldA, index, x, null)) |
808 |
< |
a[b & mask] = x; |
809 |
< |
} while (++b != t); |
810 |
< |
VarHandle.releaseFence(); |
811 |
< |
} |
812 |
< |
return a; |
831 |
> |
final boolean lockedPush(ForkJoinTask<?> task) { |
832 |
> |
ForkJoinTask<?>[] a; |
833 |
> |
boolean signal = false; |
834 |
> |
int s = top, b = base, cap; |
835 |
> |
if ((a = array) != null && (cap = a.length) > 0) { |
836 |
> |
a[(cap - 1) & s] = task; |
837 |
> |
top = s + 1; |
838 |
> |
if (b - s + cap - 1 == 0) |
839 |
> |
growArray(true); |
840 |
> |
else { |
841 |
> |
phase = 0; // full volatile unlock |
842 |
> |
if (base == s) |
843 |
> |
signal = true; |
844 |
> |
} |
845 |
> |
} |
846 |
> |
return signal; |
847 |
|
} |
848 |
|
|
849 |
|
/** |
850 |
< |
* Takes next task, if one exists, in LIFO order. Call only |
851 |
< |
* by owner in unshared queues. |
852 |
< |
*/ |
853 |
< |
final ForkJoinTask<?> pop() { |
854 |
< |
int b = base, s = top, al, i; ForkJoinTask<?>[] a; |
855 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
856 |
< |
int index = (al - 1) & --s; |
857 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
858 |
< |
QA.get(a, index); |
859 |
< |
if (t != null && |
860 |
< |
QA.compareAndSet(a, index, t, null)) { |
861 |
< |
top = s; |
862 |
< |
VarHandle.releaseFence(); |
863 |
< |
return t; |
850 |
> |
* Doubles the capacity of array. Call either by owner or with |
851 |
> |
* lock held -- it is OK for base, but not top, to move while |
852 |
> |
* resizings are in progress. |
853 |
> |
*/ |
854 |
> |
final void growArray(boolean locked) { |
855 |
> |
ForkJoinTask<?>[] newA = null; |
856 |
> |
try { |
857 |
> |
ForkJoinTask<?>[] oldA; int oldSize, newSize; |
858 |
> |
if ((oldA = array) != null && (oldSize = oldA.length) > 0 && |
859 |
> |
(newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && |
860 |
> |
newSize > 0) { |
861 |
> |
try { |
862 |
> |
newA = new ForkJoinTask<?>[newSize]; |
863 |
> |
} catch (OutOfMemoryError ex) { |
864 |
> |
} |
865 |
> |
if (newA != null) { // poll from old array, push to new |
866 |
> |
int oldMask = oldSize - 1, newMask = newSize - 1; |
867 |
> |
for (int s = top - 1, k = oldMask; k >= 0; --k) { |
868 |
> |
ForkJoinTask<?> x = (ForkJoinTask<?>) |
869 |
> |
QA.getAndSet(oldA, s & oldMask, null); |
870 |
> |
if (x != null) |
871 |
> |
newA[s-- & newMask] = x; |
872 |
> |
else |
873 |
> |
break; |
874 |
> |
} |
875 |
> |
array = newA; |
876 |
> |
VarHandle.releaseFence(); |
877 |
> |
} |
878 |
|
} |
879 |
+ |
} finally { |
880 |
+ |
if (locked) |
881 |
+ |
phase = 0; |
882 |
|
} |
883 |
< |
return null; |
883 |
> |
if (newA == null) |
884 |
> |
throw new RejectedExecutionException("Queue capacity exceeded"); |
885 |
|
} |
886 |
|
|
887 |
|
/** |
888 |
|
* Takes next task, if one exists, in FIFO order. |
889 |
|
*/ |
890 |
|
final ForkJoinTask<?> poll() { |
891 |
< |
for (;;) { |
892 |
< |
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
893 |
< |
if ((a = array) != null && (d = b - s) < 0 && |
894 |
< |
(al = a.length) > 0) { |
895 |
< |
int index = (al - 1) & b; |
896 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
897 |
< |
QA.getAcquire(a, index); |
898 |
< |
if (b++ == base) { |
899 |
< |
if (t != null) { |
900 |
< |
if (QA.compareAndSet(a, index, t, null)) { |
901 |
< |
base = b; |
850 |
< |
return t; |
851 |
< |
} |
852 |
< |
} |
853 |
< |
else if (d == -1) |
854 |
< |
break; // now empty |
891 |
> |
int b, k, cap; ForkJoinTask<?>[] a; |
892 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
893 |
> |
(b = base) != top) { |
894 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
895 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
896 |
> |
if (base == b++) { |
897 |
> |
if (t == null) |
898 |
> |
Thread.yield(); // await index advance |
899 |
> |
else if (QA.compareAndSet(a, k, t, null)) { |
900 |
> |
BASE.setOpaque(this, b); |
901 |
> |
return t; |
902 |
|
} |
903 |
|
} |
857 |
– |
else |
858 |
– |
break; |
904 |
|
} |
905 |
|
return null; |
906 |
|
} |
909 |
|
* Takes next task, if one exists, in order specified by mode. |
910 |
|
*/ |
911 |
|
final ForkJoinTask<?> nextLocalTask() { |
912 |
< |
return ((id & FIFO) != 0) ? poll() : pop(); |
912 |
> |
ForkJoinTask<?> t = null; |
913 |
> |
int md = id, b, s, d, cap; ForkJoinTask<?>[] a; |
914 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
915 |
> |
(d = (s = top) - (b = base)) > 0) { |
916 |
> |
if ((md & FIFO) == 0 || d == 1) { |
917 |
> |
if ((t = (ForkJoinTask<?>) |
918 |
> |
QA.getAndSet(a, (cap - 1) & --s, null)) != null) |
919 |
> |
TOP.setOpaque(this, s); |
920 |
> |
} |
921 |
> |
else if ((t = (ForkJoinTask<?>) |
922 |
> |
QA.getAndSet(a, (cap - 1) & b++, null)) != null) { |
923 |
> |
BASE.setOpaque(this, b); |
924 |
> |
} |
925 |
> |
else // on contention in FIFO mode, use regular poll |
926 |
> |
t = poll(); |
927 |
> |
} |
928 |
> |
return t; |
929 |
|
} |
930 |
|
|
931 |
|
/** |
932 |
|
* Returns next task, if one exists, in order specified by mode. |
933 |
|
*/ |
934 |
|
final ForkJoinTask<?> peek() { |
935 |
< |
int al; ForkJoinTask<?>[] a; |
936 |
< |
return ((a = array) != null && (al = a.length) > 0) ? |
937 |
< |
a[(al - 1) & |
877 |
< |
((id & FIFO) != 0 ? base : top - 1)] : null; |
935 |
> |
int cap; ForkJoinTask<?>[] a; |
936 |
> |
return ((a = array) != null && (cap = a.length) > 0) ? |
937 |
> |
a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; |
938 |
|
} |
939 |
|
|
940 |
|
/** |
941 |
|
* Pops the given task only if it is at the current top. |
942 |
|
*/ |
943 |
|
final boolean tryUnpush(ForkJoinTask<?> task) { |
944 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
945 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
946 |
< |
int index = (al - 1) & --s; |
947 |
< |
if (QA.compareAndSet(a, index, task, null)) { |
944 |
> |
boolean popped = false; |
945 |
> |
int s, cap; ForkJoinTask<?>[] a; |
946 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
947 |
> |
(s = top) != base && |
948 |
> |
(popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) |
949 |
> |
TOP.setOpaque(this, s); |
950 |
> |
return popped; |
951 |
> |
} |
952 |
> |
|
953 |
> |
/** |
954 |
> |
* Shared version of tryUnpush. |
955 |
> |
*/ |
956 |
> |
final boolean tryLockedUnpush(ForkJoinTask<?> task) { |
957 |
> |
boolean popped = false; |
958 |
> |
int s = top - 1, k, cap; ForkJoinTask<?>[] a; |
959 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
960 |
> |
a[k = (cap - 1) & s] == task && tryLockPhase()) { |
961 |
> |
if (top == s + 1 && array == a && |
962 |
> |
(popped = QA.compareAndSet(a, k, task, null))) |
963 |
|
top = s; |
964 |
< |
VarHandle.releaseFence(); |
890 |
< |
return true; |
891 |
< |
} |
964 |
> |
releasePhaseLock(); |
965 |
|
} |
966 |
< |
return false; |
966 |
> |
return popped; |
967 |
|
} |
968 |
|
|
969 |
|
/** |
977 |
|
// Specialized execution methods |
978 |
|
|
979 |
|
/** |
980 |
< |
* Pops and executes up to limit consecutive tasks or until empty. |
981 |
< |
* |
982 |
< |
* @param limit max runs, or zero for no limit |
983 |
< |
*/ |
984 |
< |
final void localPopAndExec(int limit) { |
985 |
< |
for (;;) { |
986 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
987 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
988 |
< |
int index = (al - 1) & --s; |
989 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
917 |
< |
QA.getAndSet(a, index, null); |
918 |
< |
if (t != null) { |
919 |
< |
top = s; |
920 |
< |
VarHandle.releaseFence(); |
921 |
< |
t.doExec(); |
922 |
< |
if (limit != 0 && --limit == 0) |
923 |
< |
break; |
924 |
< |
} |
925 |
< |
else |
980 |
> |
* Runs the given (stolen) task if nonnull, as well as |
981 |
> |
* remaining local tasks and others available from the given |
982 |
> |
* queue, up to bound n (to avoid infinite unfairness). |
983 |
> |
*/ |
984 |
> |
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { |
985 |
> |
if (t != null) { |
986 |
> |
int nstolen = 1; |
987 |
> |
for (;;) { |
988 |
> |
t.doExec(); |
989 |
> |
if (n-- < 0) |
990 |
|
break; |
991 |
< |
} |
992 |
< |
else |
993 |
< |
break; |
994 |
< |
} |
931 |
< |
} |
932 |
< |
|
933 |
< |
/** |
934 |
< |
* Polls and executes up to limit consecutive tasks or until empty. |
935 |
< |
* |
936 |
< |
* @param limit, or zero for no limit |
937 |
< |
*/ |
938 |
< |
final void localPollAndExec(int limit) { |
939 |
< |
for (int polls = 0;;) { |
940 |
< |
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
941 |
< |
if ((a = array) != null && (d = b - s) < 0 && |
942 |
< |
(al = a.length) > 0) { |
943 |
< |
int index = (al - 1) & b++; |
944 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
945 |
< |
QA.getAndSet(a, index, null); |
946 |
< |
if (t != null) { |
947 |
< |
base = b; |
948 |
< |
t.doExec(); |
949 |
< |
if (limit != 0 && ++polls == limit) |
991 |
> |
else if ((t = nextLocalTask()) == null) { |
992 |
> |
if (q != null && (t = q.poll()) != null) |
993 |
> |
++nstolen; |
994 |
> |
else |
995 |
|
break; |
996 |
|
} |
952 |
– |
else if (d == -1) |
953 |
– |
break; // now empty |
954 |
– |
else |
955 |
– |
polls = 0; // stolen; reset |
997 |
|
} |
998 |
< |
else |
999 |
< |
break; |
998 |
> |
ForkJoinWorkerThread thread = owner; |
999 |
> |
nsteals += nstolen; |
1000 |
> |
source = 0; |
1001 |
> |
if (thread != null) |
1002 |
> |
thread.afterTopLevelExec(); |
1003 |
|
} |
1004 |
|
} |
1005 |
|
|
1007 |
|
* If present, removes task from queue and executes it. |
1008 |
|
*/ |
1009 |
|
final void tryRemoveAndExec(ForkJoinTask<?> task) { |
1010 |
< |
ForkJoinTask<?>[] wa; int s, wal; |
1011 |
< |
if (base - (s = top) < 0 && // traverse from top |
1012 |
< |
(wa = array) != null && (wal = wa.length) > 0) { |
1013 |
< |
for (int m = wal - 1, ns = s - 1, i = ns; ; --i) { |
1010 |
> |
ForkJoinTask<?>[] a; int s, cap; |
1011 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
1012 |
> |
base - (s = top) < 0) { // traverse from top |
1013 |
> |
for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { |
1014 |
|
int index = i & m; |
1015 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
972 |
< |
QA.get(wa, index); |
1015 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); |
1016 |
|
if (t == null) |
1017 |
|
break; |
1018 |
|
else if (t == task) { |
1019 |
< |
if (QA.compareAndSet(wa, index, t, null)) { |
1019 |
> |
if (QA.compareAndSet(a, index, t, null)) { |
1020 |
|
top = ns; // safely shift down |
1021 |
|
for (int j = i; j != ns; ++j) { |
1022 |
|
ForkJoinTask<?> f; |
1023 |
|
int pindex = (j + 1) & m; |
1024 |
< |
f = (ForkJoinTask<?>)QA.get(wa, pindex); |
1025 |
< |
QA.setVolatile(wa, pindex, null); |
1024 |
> |
f = (ForkJoinTask<?>)QA.get(a, pindex); |
1025 |
> |
QA.setVolatile(a, pindex, null); |
1026 |
|
int jindex = j & m; |
1027 |
< |
QA.setRelease(wa, jindex, f); |
1027 |
> |
QA.setRelease(a, jindex, f); |
1028 |
|
} |
1029 |
|
VarHandle.releaseFence(); |
1030 |
|
t.doExec(); |
1036 |
|
} |
1037 |
|
|
1038 |
|
/** |
1039 |
< |
* Tries to steal and run tasks within the target's |
1040 |
< |
* computation until done, not found, or limit exceeded. |
1039 |
> |
* Tries to pop and run tasks within the target's computation |
1040 |
> |
* until done, not found, or limit exceeded. |
1041 |
|
* |
1042 |
|
* @param task root of CountedCompleter computation |
1043 |
|
* @param limit max runs, or zero for no limit |
1044 |
+ |
* @param shared true if must lock to extract task |
1045 |
|
* @return task status on exit |
1046 |
|
*/ |
1047 |
< |
final int localHelpCC(CountedCompleter<?> task, int limit) { |
1047 |
> |
final int helpCC(CountedCompleter<?> task, int limit, boolean shared) { |
1048 |
|
int status = 0; |
1049 |
|
if (task != null && (status = task.status) >= 0) { |
1050 |
< |
for (;;) { |
1051 |
< |
boolean help = false; |
1052 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1053 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1054 |
< |
int index = (al - 1) & (s - 1); |
1055 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1056 |
< |
QA.get(a, index); |
1057 |
< |
if (o instanceof CountedCompleter) { |
1058 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1059 |
< |
for (CountedCompleter<?> f = t;;) { |
1060 |
< |
if (f != task) { |
1061 |
< |
if ((f = f.completer) == null) // try parent |
1062 |
< |
break; |
1063 |
< |
} |
1064 |
< |
else { |
1065 |
< |
if (QA.compareAndSet(a, index, t, null)) { |
1050 |
> |
int s, k, cap; ForkJoinTask<?>[] a; |
1051 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
1052 |
> |
(s = top) != base) { |
1053 |
> |
CountedCompleter<?> v = null; |
1054 |
> |
ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; |
1055 |
> |
if (o instanceof CountedCompleter) { |
1056 |
> |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1057 |
> |
for (CountedCompleter<?> f = t;;) { |
1058 |
> |
if (f != task) { |
1059 |
> |
if ((f = f.completer) == null) |
1060 |
> |
break; |
1061 |
> |
} |
1062 |
> |
else if (shared) { |
1063 |
> |
if (tryLockPhase()) { |
1064 |
> |
if (top == s && array == a && |
1065 |
> |
QA.compareAndSet(a, k, t, null)) { |
1066 |
|
top = s - 1; |
1067 |
< |
VarHandle.releaseFence(); |
1024 |
< |
t.doExec(); |
1025 |
< |
help = true; |
1067 |
> |
v = t; |
1068 |
|
} |
1069 |
< |
break; |
1069 |
> |
releasePhaseLock(); |
1070 |
> |
} |
1071 |
> |
break; |
1072 |
> |
} |
1073 |
> |
else { |
1074 |
> |
if (QA.compareAndSet(a, k, t, null)) { |
1075 |
> |
top = s - 1; |
1076 |
> |
v = t; |
1077 |
|
} |
1078 |
+ |
break; |
1079 |
|
} |
1080 |
|
} |
1081 |
|
} |
1082 |
< |
if ((status = task.status) < 0 || !help || |
1082 |
> |
if (v != null) |
1083 |
> |
v.doExec(); |
1084 |
> |
if ((status = task.status) < 0 || v == null || |
1085 |
|
(limit != 0 && --limit == 0)) |
1086 |
|
break; |
1087 |
|
} |
1089 |
|
return status; |
1090 |
|
} |
1091 |
|
|
1040 |
– |
// Operations on shared queues |
1041 |
– |
|
1092 |
|
/** |
1093 |
< |
* Tries to lock shared queue by CASing phase field. |
1094 |
< |
*/ |
1095 |
< |
final boolean tryLockSharedQueue() { |
1096 |
< |
return PHASE.compareAndSet(this, 0, QLOCK); |
1047 |
< |
} |
1048 |
< |
|
1049 |
< |
/** |
1050 |
< |
* Shared version of tryUnpush. |
1051 |
< |
*/ |
1052 |
< |
final boolean trySharedUnpush(ForkJoinTask<?> task) { |
1053 |
< |
boolean popped = false; |
1054 |
< |
int s = top - 1, al; ForkJoinTask<?>[] a; |
1055 |
< |
if ((a = array) != null && (al = a.length) > 0) { |
1056 |
< |
int index = (al - 1) & s; |
1057 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index); |
1058 |
< |
if (t == task && |
1059 |
< |
PHASE.compareAndSet(this, 0, QLOCK)) { |
1060 |
< |
if (top == s + 1 && array == a && |
1061 |
< |
QA.compareAndSet(a, index, task, null)) { |
1062 |
< |
popped = true; |
1063 |
< |
top = s; |
1064 |
< |
} |
1065 |
< |
PHASE.setRelease(this, 0); |
1066 |
< |
} |
1067 |
< |
} |
1068 |
< |
return popped; |
1069 |
< |
} |
1070 |
< |
|
1071 |
< |
/** |
1072 |
< |
* Shared version of localHelpCC. |
1093 |
> |
* Tries to poll and run AsynchronousCompletionTasks until |
1094 |
> |
* none found or blocker is released |
1095 |
> |
* |
1096 |
> |
* @param blocker the blocker |
1097 |
|
*/ |
1098 |
< |
final int sharedHelpCC(CountedCompleter<?> task, int limit) { |
1099 |
< |
int status = 0; |
1100 |
< |
if (task != null && (status = task.status) >= 0) { |
1101 |
< |
for (;;) { |
1102 |
< |
boolean help = false; |
1103 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1104 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1105 |
< |
int index = (al - 1) & (s - 1); |
1106 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1107 |
< |
QA.get(a, index); |
1108 |
< |
if (o instanceof CountedCompleter) { |
1109 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1110 |
< |
for (CountedCompleter<?> f = t;;) { |
1111 |
< |
if (f != task) { |
1112 |
< |
if ((f = f.completer) == null) |
1089 |
< |
break; |
1090 |
< |
} |
1091 |
< |
else { |
1092 |
< |
if (PHASE.compareAndSet(this, 0, QLOCK)) { |
1093 |
< |
if (top == s && array == a && |
1094 |
< |
QA.compareAndSet(a, index, t, null)) { |
1095 |
< |
help = true; |
1096 |
< |
top = s - 1; |
1097 |
< |
} |
1098 |
< |
PHASE.setRelease(this, 0); |
1099 |
< |
if (help) |
1100 |
< |
t.doExec(); |
1101 |
< |
} |
1102 |
< |
break; |
1103 |
< |
} |
1104 |
< |
} |
1098 |
> |
final void helpAsyncBlocker(ManagedBlocker blocker) { |
1099 |
> |
if (blocker != null) { |
1100 |
> |
int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; |
1101 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
1102 |
> |
(b = base) != top) { |
1103 |
> |
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); |
1104 |
> |
if (blocker.isReleasable()) |
1105 |
> |
break; |
1106 |
> |
else if (base == b++ && t != null) { |
1107 |
> |
if (!(t instanceof CompletableFuture. |
1108 |
> |
AsynchronousCompletionTask)) |
1109 |
> |
break; |
1110 |
> |
else if (QA.compareAndSet(a, k, t, null)) { |
1111 |
> |
BASE.setOpaque(this, b); |
1112 |
> |
t.doExec(); |
1113 |
|
} |
1114 |
|
} |
1107 |
– |
if ((status = task.status) < 0 || !help || |
1108 |
– |
(limit != 0 && --limit == 0)) |
1109 |
– |
break; |
1115 |
|
} |
1116 |
|
} |
1112 |
– |
return status; |
1117 |
|
} |
1118 |
|
|
1119 |
|
/** |
1128 |
|
} |
1129 |
|
|
1130 |
|
// VarHandle mechanics. |
1131 |
< |
private static final VarHandle PHASE; |
1131 |
> |
static final VarHandle PHASE; |
1132 |
> |
static final VarHandle BASE; |
1133 |
> |
static final VarHandle TOP; |
1134 |
|
static { |
1135 |
|
try { |
1136 |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
1137 |
|
PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); |
1138 |
+ |
BASE = l.findVarHandle(WorkQueue.class, "base", int.class); |
1139 |
+ |
TOP = l.findVarHandle(WorkQueue.class, "top", int.class); |
1140 |
|
} catch (ReflectiveOperationException e) { |
1141 |
|
throw new Error(e); |
1142 |
|
} |
1335 |
|
wt.setDaemon(true); // configure thread |
1336 |
|
if ((handler = ueh) != null) |
1337 |
|
wt.setUncaughtExceptionHandler(handler); |
1330 |
– |
WorkQueue w = new WorkQueue(this, wt); |
1338 |
|
int tid = 0; // for thread name |
1339 |
< |
int fifo = mode & FIFO; |
1339 |
> |
int idbits = mode & FIFO; |
1340 |
|
String prefix = workerNamePrefix; |
1341 |
+ |
WorkQueue w = new WorkQueue(this, wt); |
1342 |
|
if (prefix != null) { |
1343 |
|
synchronized (prefix) { |
1344 |
|
WorkQueue[] ws = workQueues; int n; |
1345 |
|
int s = indexSeed += SEED_INCREMENT; |
1346 |
+ |
idbits |= (s & ~(SMASK | FIFO | DORMANT)); |
1347 |
|
if (ws != null && (n = ws.length) > 1) { |
1348 |
|
int m = n - 1; |
1349 |
< |
tid = s & m; |
1341 |
< |
int i = m & ((s << 1) | 1); // odd-numbered indices |
1349 |
> |
tid = m & ((s << 1) | 1); // odd-numbered indices |
1350 |
|
for (int probes = n >>> 1;;) { // find empty slot |
1351 |
|
WorkQueue q; |
1352 |
< |
if ((q = ws[i]) == null || q.phase == QUIET) |
1352 |
> |
if ((q = ws[tid]) == null || q.phase == QUIET) |
1353 |
|
break; |
1354 |
|
else if (--probes == 0) { |
1355 |
< |
i = n | 1; // resize below |
1355 |
> |
tid = n | 1; // resize below |
1356 |
|
break; |
1357 |
|
} |
1358 |
|
else |
1359 |
< |
i = (i + 2) & m; |
1359 |
> |
tid = (tid + 2) & m; |
1360 |
|
} |
1361 |
+ |
w.phase = w.id = tid | idbits; // now publishable |
1362 |
|
|
1363 |
< |
int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT)); |
1364 |
< |
w.phase = w.id = id; // now publishable |
1356 |
< |
|
1357 |
< |
if (i < n) |
1358 |
< |
ws[i] = w; |
1363 |
> |
if (tid < n) |
1364 |
> |
ws[tid] = w; |
1365 |
|
else { // expand array |
1366 |
|
int an = n << 1; |
1367 |
|
WorkQueue[] as = new WorkQueue[an]; |
1368 |
< |
as[i] = w; |
1368 |
> |
as[tid] = w; |
1369 |
|
int am = an - 1; |
1370 |
|
for (int j = 0; j < n; ++j) { |
1371 |
|
WorkQueue v; // copy external queue |
1398 |
|
int phase = 0; |
1399 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1400 |
|
Object lock = workerNamePrefix; |
1401 |
+ |
int wid = w.id; |
1402 |
|
long ns = (long)w.nsteals & 0xffffffffL; |
1396 |
– |
int idx = w.id & SMASK; |
1403 |
|
if (lock != null) { |
1398 |
– |
WorkQueue[] ws; // remove index from array |
1404 |
|
synchronized (lock) { |
1405 |
< |
if ((ws = workQueues) != null && ws.length > idx && |
1406 |
< |
ws[idx] == w) |
1407 |
< |
ws[idx] = null; |
1405 |
> |
WorkQueue[] ws; int n, i; // remove index from array |
1406 |
> |
if ((ws = workQueues) != null && (n = ws.length) > 0 && |
1407 |
> |
ws[i = wid & (n - 1)] == w) |
1408 |
> |
ws[i] = null; |
1409 |
|
stealCount += ns; |
1410 |
|
} |
1411 |
|
} |
1457 |
|
Thread vt = v.owner; |
1458 |
|
if (sp == vp && CTL.compareAndSet(this, c, nc)) { |
1459 |
|
v.phase = np; |
1460 |
< |
if (v.source < 0) |
1460 |
> |
if (vt != null && v.source < 0) |
1461 |
|
LockSupport.unpark(vt); |
1462 |
|
break; |
1463 |
|
} |
1498 |
|
long nc = ((long)v.stackPred & SP_MASK) | uc; |
1499 |
|
if (vp == sp && CTL.compareAndSet(this, c, nc)) { |
1500 |
|
v.phase = np; |
1501 |
< |
if (v.source < 0) |
1501 |
> |
if (vt != null && v.source < 0) |
1502 |
|
LockSupport.unpark(vt); |
1503 |
|
return (wp < 0) ? -1 : 1; |
1504 |
|
} |
1555 |
|
* See above for explanation. |
1556 |
|
*/ |
1557 |
|
final void runWorker(WorkQueue w) { |
1558 |
< |
WorkQueue[] ws; |
1559 |
< |
w.growArray(); // allocate queue |
1560 |
< |
int r = w.id ^ ThreadLocalRandom.nextSecondarySeed(); |
1561 |
< |
if (r == 0) // initial nonzero seed |
1562 |
< |
r = 1; |
1563 |
< |
int lastSignalId = 0; // avoid unneeded signals |
1564 |
< |
while ((ws = workQueues) != null) { |
1565 |
< |
boolean nonempty = false; // scan |
1566 |
< |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1567 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1568 |
< |
if ((i = r & m) >= 0 && i < n && // always true |
1569 |
< |
(q = ws[i]) != null && (b = q.base) - q.top < 0 && |
1570 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1571 |
< |
int qid = q.id; // (never zero) |
1572 |
< |
int index = (al - 1) & b; |
1573 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1574 |
< |
QA.getAcquire(a, index); |
1575 |
< |
if (t != null && b++ == q.base && |
1576 |
< |
QA.compareAndSet(a, index, t, null)) { |
1577 |
< |
if ((q.base = b) - q.top < 0 && qid != lastSignalId) |
1578 |
< |
signalWork(); // propagate signal |
1579 |
< |
w.source = lastSignalId = qid; |
1580 |
< |
t.doExec(); |
1581 |
< |
if ((w.id & FIFO) != 0) // run remaining locals |
1582 |
< |
w.localPollAndExec(POLL_LIMIT); |
1583 |
< |
else |
1584 |
< |
w.localPopAndExec(POLL_LIMIT); |
1585 |
< |
ForkJoinWorkerThread thread = w.owner; |
1586 |
< |
++w.nsteals; |
1587 |
< |
w.source = 0; // now idle |
1588 |
< |
if (thread != null) |
1589 |
< |
thread.afterTopLevelExec(); |
1558 |
> |
int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng |
1559 |
> |
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize |
1560 |
> |
for (;;) { |
1561 |
> |
int phase; |
1562 |
> |
if (scan(w, r)) { // scan until apparently empty |
1563 |
> |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) |
1564 |
> |
} |
1565 |
> |
else if ((phase = w.phase) >= 0) { // enqueue, then rescan |
1566 |
> |
long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; |
1567 |
> |
long c, nc; |
1568 |
> |
do { |
1569 |
> |
w.stackPred = (int)(c = ctl); |
1570 |
> |
nc = ((c - RC_UNIT) & UC_MASK) | np; |
1571 |
> |
} while (!CTL.weakCompareAndSet(this, c, nc)); |
1572 |
> |
} |
1573 |
> |
else { // already queued |
1574 |
> |
int pred = w.stackPred; |
1575 |
> |
Thread.interrupted(); // clear before park |
1576 |
> |
w.source = DORMANT; // enable signal |
1577 |
> |
long c = ctl; |
1578 |
> |
int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); |
1579 |
> |
if (md < 0) // terminating |
1580 |
> |
break; |
1581 |
> |
else if (rc <= 0 && (md & SHUTDOWN) != 0 && |
1582 |
> |
tryTerminate(false, false)) |
1583 |
> |
break; // quiescent shutdown |
1584 |
> |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1585 |
> |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1586 |
> |
long d = keepAlive + System.currentTimeMillis(); |
1587 |
> |
LockSupport.parkUntil(this, d); |
1588 |
> |
if (ctl == c && // drop on timeout if all idle |
1589 |
> |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1590 |
> |
CTL.compareAndSet(this, c, nc)) { |
1591 |
> |
w.phase = QUIET; |
1592 |
> |
break; |
1593 |
|
} |
1585 |
– |
nonempty = true; |
1594 |
|
} |
1595 |
< |
else if (nonempty) |
1596 |
< |
break; |
1597 |
< |
else |
1590 |
< |
++r; |
1595 |
> |
else if (w.phase < 0) |
1596 |
> |
LockSupport.park(this); // OK if spuriously woken |
1597 |
> |
w.source = 0; // disable signal |
1598 |
|
} |
1599 |
+ |
} |
1600 |
+ |
} |
1601 |
|
|
1602 |
< |
if (nonempty) { // move (xorshift) |
1603 |
< |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; |
1604 |
< |
} |
1605 |
< |
else { |
1606 |
< |
int phase; |
1607 |
< |
lastSignalId = 0; // clear for next scan |
1608 |
< |
if ((phase = w.phase) >= 0) { // enqueue |
1609 |
< |
int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED; |
1610 |
< |
long c, nc; |
1611 |
< |
do { |
1612 |
< |
w.stackPred = (int)(c = ctl); |
1613 |
< |
nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np); |
1614 |
< |
} while (!CTL.weakCompareAndSet(this, c, nc)); |
1615 |
< |
} |
1616 |
< |
else { // already queued |
1617 |
< |
int pred = w.stackPred; |
1618 |
< |
w.source = DORMANT; // enable signal |
1619 |
< |
for (int steps = 0;;) { |
1620 |
< |
int md, rc; long c; |
1621 |
< |
if (w.phase >= 0) { |
1622 |
< |
w.source = 0; |
1623 |
< |
break; |
1624 |
< |
} |
1625 |
< |
else if ((md = mode) < 0) // shutting down |
1617 |
< |
return; |
1618 |
< |
else if ((rc = ((md & SMASK) + // possibly quiescent |
1619 |
< |
(int)((c = ctl) >> RC_SHIFT))) <= 0 && |
1620 |
< |
(md & SHUTDOWN) != 0 && |
1621 |
< |
tryTerminate(false, false)) |
1622 |
< |
return; // help terminate |
1623 |
< |
else if ((++steps & 1) == 0) |
1624 |
< |
Thread.interrupted(); // clear between parks |
1625 |
< |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1626 |
< |
long d = keepAlive + System.currentTimeMillis(); |
1627 |
< |
LockSupport.parkUntil(this, d); |
1628 |
< |
if (ctl == c && |
1629 |
< |
d - System.currentTimeMillis() <= TIMEOUT_SLOP) { |
1630 |
< |
long nc = ((UC_MASK & (c - TC_UNIT)) | |
1631 |
< |
(SP_MASK & pred)); |
1632 |
< |
if (CTL.compareAndSet(this, c, nc)) { |
1633 |
< |
w.phase = QUIET; |
1634 |
< |
return; // drop on timeout |
1635 |
< |
} |
1636 |
< |
} |
1602 |
> |
/** |
1603 |
> |
* Scans for and if found executes one or more top-level tasks from a queue. |
1604 |
> |
* |
1605 |
> |
* @return true if found an apparently non-empty queue, and |
1606 |
> |
* possibly ran task(s). |
1607 |
> |
*/ |
1608 |
> |
private boolean scan(WorkQueue w, int r) { |
1609 |
> |
WorkQueue[] ws; int n; |
1610 |
> |
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { |
1611 |
> |
for (int m = n - 1, j = r & m;;) { |
1612 |
> |
WorkQueue q; int b, d; |
1613 |
> |
if ((q = ws[j]) != null && (d = (b = q.base) - q.top) != 0) { |
1614 |
> |
int qid = q.id; |
1615 |
> |
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; |
1616 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1617 |
> |
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); |
1618 |
> |
if (q.base == b++ && t != null && |
1619 |
> |
QA.compareAndSet(a, k, t, null)) { |
1620 |
> |
q.base = b; |
1621 |
> |
w.source = qid; |
1622 |
> |
if (d != -1 || b != q.top) |
1623 |
> |
signalWork(); |
1624 |
> |
w.topLevelExec(t, q, // random fairness bound |
1625 |
> |
r & ((n << TOP_BOUND_SHIFT) - 1)); |
1626 |
|
} |
1638 |
– |
else |
1639 |
– |
LockSupport.park(this); |
1627 |
|
} |
1628 |
+ |
return true; |
1629 |
|
} |
1630 |
+ |
else if (--n > 0) |
1631 |
+ |
j = (j + 1) & m; |
1632 |
+ |
else |
1633 |
+ |
break; |
1634 |
|
} |
1635 |
|
} |
1636 |
+ |
return false; |
1637 |
|
} |
1638 |
|
|
1639 |
|
/** |
1649 |
|
*/ |
1650 |
|
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1651 |
|
int s = 0; |
1652 |
+ |
int seed = ThreadLocalRandom.nextSecondarySeed(); |
1653 |
|
if (w != null && task != null && |
1654 |
|
(!(task instanceof CountedCompleter) || |
1655 |
< |
(s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) { |
1655 |
> |
(s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) { |
1656 |
|
w.tryRemoveAndExec(task); |
1657 |
|
int src = w.source, id = w.id; |
1658 |
+ |
int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; |
1659 |
|
s = task.status; |
1660 |
|
while (s >= 0) { |
1661 |
|
WorkQueue[] ws; |
1662 |
< |
boolean nonempty = false; |
1663 |
< |
int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices |
1664 |
< |
if ((ws = workQueues) != null) { // scan for matching id |
1665 |
< |
for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) { |
1666 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1667 |
< |
if ((i = (r + j) & m) >= 0 && i < n && |
1668 |
< |
(q = ws[i]) != null && q.source == id && |
1669 |
< |
(b = q.base) - q.top < 0 && |
1675 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1676 |
< |
int qid = q.id; |
1677 |
< |
int index = (al - 1) & b; |
1662 |
> |
int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; |
1663 |
> |
while (n > 0) { |
1664 |
> |
WorkQueue q; int b; |
1665 |
> |
if ((q = ws[r & m]) != null && q.source == id && |
1666 |
> |
(b = q.base) != q.top) { |
1667 |
> |
ForkJoinTask<?>[] a; int cap, k; |
1668 |
> |
int qid = q.id; |
1669 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1670 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1671 |
< |
QA.getAcquire(a, index); |
1672 |
< |
if (t != null && b++ == q.base && id == q.source && |
1673 |
< |
QA.compareAndSet(a, index, t, null)) { |
1671 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
1672 |
> |
if (q.source == id && q.base == b++ && |
1673 |
> |
t != null && QA.compareAndSet(a, k, t, null)) { |
1674 |
|
q.base = b; |
1675 |
|
w.source = qid; |
1676 |
|
t.doExec(); |
1677 |
|
w.source = src; |
1678 |
|
} |
1687 |
– |
nonempty = true; |
1688 |
– |
break; |
1679 |
|
} |
1680 |
+ |
break; |
1681 |
+ |
} |
1682 |
+ |
else { |
1683 |
+ |
r += step; |
1684 |
+ |
--n; |
1685 |
|
} |
1686 |
|
} |
1687 |
|
if ((s = task.status) < 0) |
1688 |
|
break; |
1689 |
< |
else if (!nonempty) { |
1689 |
> |
else if (n == 0) { // empty scan |
1690 |
|
long ms, ns; int block; |
1691 |
|
if (deadline == 0L) |
1692 |
|
ms = 0L; // untimed |
1711 |
|
* find tasks either. |
1712 |
|
*/ |
1713 |
|
final void helpQuiescePool(WorkQueue w) { |
1714 |
< |
int prevSrc = w.source, fifo = w.id & FIFO; |
1714 |
> |
int prevSrc = w.source; |
1715 |
> |
int seed = ThreadLocalRandom.nextSecondarySeed(); |
1716 |
> |
int r = seed >>> 16, step = r | 1; |
1717 |
|
for (int source = prevSrc, released = -1;;) { // -1 until known |
1718 |
< |
WorkQueue[] ws; |
1719 |
< |
if (fifo != 0) |
1720 |
< |
w.localPollAndExec(0); |
1721 |
< |
else |
1725 |
< |
w.localPopAndExec(0); |
1726 |
< |
if (released == -1 && w.phase >= 0) |
1718 |
> |
ForkJoinTask<?> localTask; WorkQueue[] ws; |
1719 |
> |
while ((localTask = w.nextLocalTask()) != null) |
1720 |
> |
localTask.doExec(); |
1721 |
> |
if (w.phase >= 0 && released == -1) |
1722 |
|
released = 1; |
1723 |
|
boolean quiet = true, empty = true; |
1724 |
< |
int r = ThreadLocalRandom.nextSecondarySeed(); |
1725 |
< |
if ((ws = workQueues) != null) { |
1726 |
< |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1727 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1728 |
< |
if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) { |
1729 |
< |
if ((b = q.base) - q.top < 0 && |
1730 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1731 |
< |
int qid = q.id; |
1724 |
> |
int n = (ws = workQueues) == null ? 0 : ws.length; |
1725 |
> |
for (int m = n - 1; n > 0; r += step, --n) { |
1726 |
> |
WorkQueue q; int b; |
1727 |
> |
if ((q = ws[r & m]) != null) { |
1728 |
> |
int qs = q.source; |
1729 |
> |
if ((b = q.base) != q.top) { |
1730 |
> |
quiet = empty = false; |
1731 |
> |
ForkJoinTask<?>[] a; int cap, k; |
1732 |
> |
int qid = q.id; |
1733 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1734 |
|
if (released == 0) { // increment |
1735 |
|
released = 1; |
1736 |
|
CTL.getAndAdd(this, RC_UNIT); |
1737 |
|
} |
1741 |
– |
int index = (al - 1) & b; |
1738 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1739 |
< |
QA.getAcquire(a, index); |
1740 |
< |
if (t != null && b++ == q.base && |
1741 |
< |
QA.compareAndSet(a, index, t, null)) { |
1739 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
1740 |
> |
if (q.base == b++ && t != null && |
1741 |
> |
QA.compareAndSet(a, k, t, null)) { |
1742 |
|
q.base = b; |
1743 |
< |
w.source = source = q.id; |
1743 |
> |
w.source = qid; |
1744 |
|
t.doExec(); |
1745 |
|
w.source = source = prevSrc; |
1746 |
|
} |
1751 |
– |
quiet = empty = false; |
1752 |
– |
break; |
1747 |
|
} |
1748 |
< |
else if ((q.source & QUIET) == 0) |
1755 |
< |
quiet = false; |
1748 |
> |
break; |
1749 |
|
} |
1750 |
+ |
else if ((qs & QUIET) == 0) |
1751 |
+ |
quiet = false; |
1752 |
|
} |
1753 |
|
} |
1754 |
|
if (quiet) { |
1790 |
|
origin = r & m; |
1791 |
|
step = h | 1; |
1792 |
|
} |
1793 |
< |
for (int k = origin, oldSum = 0, checkSum = 0;;) { |
1794 |
< |
WorkQueue q; int b, al; ForkJoinTask<?>[] a; |
1795 |
< |
if ((q = ws[k]) != null) { |
1796 |
< |
checkSum += b = q.base; |
1797 |
< |
if (b - q.top < 0 && |
1798 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1799 |
< |
int index = (al - 1) & b; |
1800 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1806 |
< |
QA.getAcquire(a, index); |
1807 |
< |
if (t != null && b++ == q.base && |
1808 |
< |
QA.compareAndSet(a, index, t, null)) { |
1809 |
< |
q.base = b; |
1793 |
> |
boolean nonempty = false; |
1794 |
> |
for (int i = origin, oldSum = 0, checkSum = 0;;) { |
1795 |
> |
WorkQueue q; |
1796 |
> |
if ((q = ws[i]) != null) { |
1797 |
> |
int b; ForkJoinTask<?> t; |
1798 |
> |
if ((b = q.base) != q.top) { |
1799 |
> |
nonempty = true; |
1800 |
> |
if ((t = q.poll()) != null) |
1801 |
|
return t; |
1811 |
– |
} |
1812 |
– |
else |
1813 |
– |
break; // restart |
1802 |
|
} |
1803 |
+ |
else |
1804 |
+ |
checkSum += b + q.id; |
1805 |
|
} |
1806 |
< |
if ((k = (k + step) & m) == origin) { |
1807 |
< |
if (oldSum == (oldSum = checkSum)) |
1806 |
> |
if ((i = (i + step) & m) == origin) { |
1807 |
> |
if (!nonempty && oldSum == (oldSum = checkSum)) |
1808 |
|
break rescan; |
1809 |
|
checkSum = 0; |
1810 |
+ |
nonempty = false; |
1811 |
|
} |
1812 |
|
} |
1813 |
|
} |
1821 |
|
*/ |
1822 |
|
final ForkJoinTask<?> nextTaskFor(WorkQueue w) { |
1823 |
|
ForkJoinTask<?> t; |
1824 |
< |
if (w != null && |
1825 |
< |
(t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null) |
1826 |
< |
return t; |
1836 |
< |
else |
1837 |
< |
return pollScan(false); |
1824 |
> |
if (w == null || (t = w.nextLocalTask()) == null) |
1825 |
> |
t = pollScan(false); |
1826 |
> |
return t; |
1827 |
|
} |
1828 |
|
|
1829 |
|
// External operations |
1841 |
|
r = ThreadLocalRandom.getProbe(); |
1842 |
|
} |
1843 |
|
for (;;) { |
1844 |
+ |
WorkQueue q; |
1845 |
|
int md = mode, n; |
1846 |
|
WorkQueue[] ws = workQueues; |
1847 |
|
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) |
1848 |
|
throw new RejectedExecutionException(); |
1849 |
< |
else { |
1850 |
< |
WorkQueue q; |
1851 |
< |
boolean push = false, grow = false; |
1852 |
< |
if ((q = ws[(n - 1) & r & SQMASK]) == null) { |
1853 |
< |
Object lock = workerNamePrefix; |
1854 |
< |
int qid = (r | QUIET) & ~(FIFO | OWNED); |
1855 |
< |
q = new WorkQueue(this, null); |
1856 |
< |
q.id = qid; |
1857 |
< |
q.source = QUIET; |
1858 |
< |
q.phase = QLOCK; // lock queue |
1859 |
< |
if (lock != null) { |
1860 |
< |
synchronized (lock) { // lock pool to install |
1861 |
< |
int i; |
1862 |
< |
if ((ws = workQueues) != null && |
1863 |
< |
(n = ws.length) > 0 && |
1874 |
< |
ws[i = qid & (n - 1) & SQMASK] == null) { |
1875 |
< |
ws[i] = q; |
1876 |
< |
push = grow = true; |
1877 |
< |
} |
1878 |
< |
} |
1849 |
> |
else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue |
1850 |
> |
int qid = (r | QUIET) & ~(FIFO | OWNED); |
1851 |
> |
Object lock = workerNamePrefix; |
1852 |
> |
ForkJoinTask<?>[] qa = |
1853 |
> |
new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; |
1854 |
> |
q = new WorkQueue(this, null); |
1855 |
> |
q.array = qa; |
1856 |
> |
q.id = qid; |
1857 |
> |
q.source = QUIET; |
1858 |
> |
if (lock != null) { // unless disabled, lock pool to install |
1859 |
> |
synchronized (lock) { |
1860 |
> |
WorkQueue[] vs; int i, vn; |
1861 |
> |
if ((vs = workQueues) != null && (vn = vs.length) > 0 && |
1862 |
> |
vs[i = qid & (vn - 1) & SQMASK] == null) |
1863 |
> |
vs[i] = q; // else another thread already installed |
1864 |
|
} |
1865 |
|
} |
1866 |
< |
else if (q.tryLockSharedQueue()) { |
1867 |
< |
int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a; |
1868 |
< |
if ((a = q.array) != null && (al = a.length) > 0 && |
1869 |
< |
al - 1 + (d = b - s) > 0) { |
1870 |
< |
a[(al - 1) & s] = task; |
1886 |
< |
q.top = s + 1; // relaxed writes OK here |
1887 |
< |
q.phase = 0; |
1888 |
< |
if (d < 0 && q.base - s < -1) |
1889 |
< |
break; // no signal needed |
1890 |
< |
} |
1891 |
< |
else |
1892 |
< |
grow = true; |
1893 |
< |
push = true; |
1894 |
< |
} |
1895 |
< |
if (push) { |
1896 |
< |
if (grow) { |
1897 |
< |
try { |
1898 |
< |
q.growArray(); |
1899 |
< |
int s = q.top, al; ForkJoinTask<?>[] a; |
1900 |
< |
if ((a = q.array) != null && (al = a.length) > 0) { |
1901 |
< |
a[(al - 1) & s] = task; |
1902 |
< |
q.top = s + 1; |
1903 |
< |
} |
1904 |
< |
} finally { |
1905 |
< |
q.phase = 0; |
1906 |
< |
} |
1907 |
< |
} |
1866 |
> |
} |
1867 |
> |
else if (!q.tryLockPhase()) // move if busy |
1868 |
> |
r = ThreadLocalRandom.advanceProbe(r); |
1869 |
> |
else { |
1870 |
> |
if (q.lockedPush(task)) |
1871 |
|
signalWork(); |
1872 |
< |
break; |
1910 |
< |
} |
1911 |
< |
else // move if busy |
1912 |
< |
r = ThreadLocalRandom.advanceProbe(r); |
1872 |
> |
return; |
1873 |
|
} |
1874 |
|
} |
1875 |
|
} |
1911 |
|
return ((ws = workQueues) != null && |
1912 |
|
(n = ws.length) > 0 && |
1913 |
|
(w = ws[(n - 1) & r & SQMASK]) != null && |
1914 |
< |
w.trySharedUnpush(task)); |
1914 |
> |
w.tryLockedUnpush(task)); |
1915 |
|
} |
1916 |
|
|
1917 |
|
/** |
1922 |
|
WorkQueue[] ws; WorkQueue w; int n; |
1923 |
|
return ((ws = workQueues) != null && (n = ws.length) > 0 && |
1924 |
|
(w = ws[(n - 1) & r & SQMASK]) != null) ? |
1925 |
< |
w.sharedHelpCC(task, maxTasks) : 0; |
1925 |
> |
w.helpCC(task, maxTasks, true) : 0; |
1926 |
|
} |
1927 |
|
|
1928 |
|
/** |
1937 |
|
*/ |
1938 |
|
final int helpComplete(WorkQueue w, CountedCompleter<?> task, |
1939 |
|
int maxTasks) { |
1940 |
< |
return (w == null) ? 0 : w.localHelpCC(task, maxTasks); |
1940 |
> |
return (w == null) ? 0 : w.helpCC(task, maxTasks, false); |
1941 |
|
} |
1942 |
|
|
1943 |
|
/** |
2067 |
|
} catch (Throwable ignore) { |
2068 |
|
} |
2069 |
|
} |
2070 |
< |
checkSum += w.base + w.id; |
2070 |
> |
checkSum += w.base + w.phase; |
2071 |
|
} |
2072 |
|
} |
2073 |
|
} |
2560 |
|
* @return the number of worker threads |
2561 |
|
*/ |
2562 |
|
public int getRunningThreadCount() { |
2603 |
– |
int rc = 0; |
2563 |
|
WorkQueue[] ws; WorkQueue w; |
2564 |
+ |
VarHandle.acquireFence(); |
2565 |
+ |
int rc = 0; |
2566 |
|
if ((ws = workQueues) != null) { |
2567 |
|
for (int i = 1; i < ws.length; i += 2) { |
2568 |
|
if ((w = ws[i]) != null && w.isApparentlyUnblocked()) |
2610 |
|
if ((ws = workQueues) != null) { |
2611 |
|
for (int i = 1; i < ws.length; i += 2) { |
2612 |
|
if ((v = ws[i]) != null) { |
2613 |
< |
if ((v.source & QUIET) == 0) |
2613 |
> |
if (v.source > 0) |
2614 |
|
return false; |
2615 |
|
--tc; |
2616 |
|
} |
2656 |
|
* @return the number of queued tasks |
2657 |
|
*/ |
2658 |
|
public long getQueuedTaskCount() { |
2698 |
– |
long count = 0; |
2659 |
|
WorkQueue[] ws; WorkQueue w; |
2660 |
+ |
VarHandle.acquireFence(); |
2661 |
+ |
int count = 0; |
2662 |
|
if ((ws = workQueues) != null) { |
2663 |
|
for (int i = 1; i < ws.length; i += 2) { |
2664 |
|
if ((w = ws[i]) != null) |
2676 |
|
* @return the number of queued submissions |
2677 |
|
*/ |
2678 |
|
public int getQueuedSubmissionCount() { |
2717 |
– |
int count = 0; |
2679 |
|
WorkQueue[] ws; WorkQueue w; |
2680 |
+ |
VarHandle.acquireFence(); |
2681 |
+ |
int count = 0; |
2682 |
|
if ((ws = workQueues) != null) { |
2683 |
|
for (int i = 0; i < ws.length; i += 2) { |
2684 |
|
if ((w = ws[i]) != null) |
2696 |
|
*/ |
2697 |
|
public boolean hasQueuedSubmissions() { |
2698 |
|
WorkQueue[] ws; WorkQueue w; |
2699 |
+ |
VarHandle.acquireFence(); |
2700 |
|
if ((ws = workQueues) != null) { |
2701 |
|
for (int i = 0; i < ws.length; i += 2) { |
2702 |
|
if ((w = ws[i]) != null && !w.isEmpty()) |
2735 |
|
* @return the number of elements transferred |
2736 |
|
*/ |
2737 |
|
protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { |
2774 |
– |
int count = 0; |
2738 |
|
WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; |
2739 |
+ |
VarHandle.acquireFence(); |
2740 |
+ |
int count = 0; |
2741 |
|
if ((ws = workQueues) != null) { |
2742 |
|
for (int i = 0; i < ws.length; ++i) { |
2743 |
|
if ((w = ws[i]) != null) { |
2760 |
|
*/ |
2761 |
|
public String toString() { |
2762 |
|
// Use a single pass through workQueues to collect counts |
2763 |
< |
long qt = 0L, qs = 0L; int rc = 0; |
2763 |
> |
int md = mode; // read volatile fields first |
2764 |
> |
long c = ctl; |
2765 |
|
long st = stealCount; |
2766 |
+ |
long qt = 0L, qs = 0L; int rc = 0; |
2767 |
|
WorkQueue[] ws; WorkQueue w; |
2768 |
|
if ((ws = workQueues) != null) { |
2769 |
|
for (int i = 0; i < ws.length; ++i) { |
2781 |
|
} |
2782 |
|
} |
2783 |
|
|
2817 |
– |
int md = mode; |
2784 |
|
int pc = (md & SMASK); |
2819 |
– |
long c = ctl; |
2785 |
|
int tc = pc + (short)(c >>> TC_SHIFT); |
2786 |
|
int ac = pc + (int)(c >> RC_SHIFT); |
2787 |
|
if (ac < 0) // ignore transient negative |
3067 |
|
*/ |
3068 |
|
public static void managedBlock(ManagedBlocker blocker) |
3069 |
|
throws InterruptedException { |
3070 |
+ |
if (blocker == null) throw new NullPointerException(); |
3071 |
|
ForkJoinPool p; |
3072 |
|
ForkJoinWorkerThread wt; |
3073 |
|
WorkQueue w; |
3100 |
|
* available or blocker is released. |
3101 |
|
*/ |
3102 |
|
static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { |
3103 |
< |
if (blocker != null && (e instanceof ForkJoinPool)) { |
3103 |
> |
if (e instanceof ForkJoinPool) { |
3104 |
|
WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; |
3105 |
|
ForkJoinPool p = (ForkJoinPool)e; |
3106 |
|
Thread thread = Thread.currentThread(); |
3112 |
|
w = ws[(n - 1) & r & SQMASK]; |
3113 |
|
else |
3114 |
|
w = null; |
3115 |
< |
if (w != null) { |
3116 |
< |
for (;;) { |
3151 |
< |
int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a; |
3152 |
< |
if ((a = w.array) != null && (d = b - s) < 0 && |
3153 |
< |
(al = a.length) > 0) { |
3154 |
< |
int index = (al - 1) & b; |
3155 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
3156 |
< |
QA.getAcquire(a, index); |
3157 |
< |
if (blocker.isReleasable()) |
3158 |
< |
break; |
3159 |
< |
else if (b++ == w.base) { |
3160 |
< |
if (t == null) { |
3161 |
< |
if (d == -1) |
3162 |
< |
break; |
3163 |
< |
} |
3164 |
< |
else if (!(t instanceof CompletableFuture. |
3165 |
< |
AsynchronousCompletionTask)) |
3166 |
< |
break; |
3167 |
< |
else if (QA.compareAndSet(a, index, t, null)) { |
3168 |
< |
w.base = b; |
3169 |
< |
t.doExec(); |
3170 |
< |
} |
3171 |
< |
} |
3172 |
< |
} |
3173 |
< |
else |
3174 |
< |
break; |
3175 |
< |
} |
3176 |
< |
} |
3115 |
> |
if (w != null) |
3116 |
> |
w.helpAsyncBlocker(blocker); |
3117 |
|
} |
3118 |
|
} |
3119 |
|
|
3132 |
|
// VarHandle mechanics |
3133 |
|
private static final VarHandle CTL; |
3134 |
|
private static final VarHandle MODE; |
3135 |
< |
private static final VarHandle QA; |
3135 |
> |
static final VarHandle QA; |
3136 |
|
|
3137 |
|
static { |
3138 |
|
try { |