155 |
|
* functionality and control for a set of worker threads: |
156 |
|
* Submissions from non-FJ threads enter into submission queues. |
157 |
|
* Workers take these tasks and typically split them into subtasks |
158 |
< |
* that may be stolen by other workers. Preference rules give |
159 |
< |
* first priority to processing tasks from their own queues (LIFO |
160 |
< |
* or FIFO, depending on mode), then to randomized FIFO steals of |
161 |
< |
* tasks in other queues. This framework began as vehicle for |
162 |
< |
* supporting tree-structured parallelism using work-stealing. |
163 |
< |
* Over time, its scalability advantages led to extensions and |
164 |
< |
* changes to better support more diverse usage contexts. Because |
165 |
< |
* most internal methods and nested classes are interrelated, |
166 |
< |
* their main rationale and descriptions are presented here; |
167 |
< |
* individual methods and nested classes contain only brief |
168 |
< |
* comments about details. |
158 |
> |
* that may be stolen by other workers. Work-stealing based on |
159 |
> |
* randomized scans generally leads to better throughput than |
160 |
> |
* "work dealing" in which producers assign tasks to idle threads, |
161 |
> |
* in part because threads that have finished other tasks before |
162 |
> |
* the signalled thread wakes up (which can be a long time) can |
163 |
> |
* take the task instead. Preference rules give first priority to |
164 |
> |
* processing tasks from their own queues (LIFO or FIFO, depending |
165 |
> |
* on mode), then to randomized FIFO steals of tasks in other |
166 |
> |
* queues. This framework began as vehicle for supporting |
167 |
> |
* tree-structured parallelism using work-stealing. Over time, |
168 |
> |
* its scalability advantages led to extensions and changes to |
169 |
> |
* better support more diverse usage contexts. Because most |
170 |
> |
* internal methods and nested classes are interrelated, their |
171 |
> |
* main rationale and descriptions are presented here; individual |
172 |
> |
* methods and nested classes contain only brief comments about |
173 |
> |
* details. |
174 |
|
* |
175 |
|
* WorkQueues |
176 |
|
* ========== |
203 |
|
* |
204 |
|
* (The actual code needs to null-check and size-check the array, |
205 |
|
* uses masking, not mod, for indexing a power-of-two-sized array, |
206 |
< |
* properly fences accesses, and possibly signals waiting workers |
207 |
< |
* to start scanning -- see below.) Both a successful pop and |
208 |
< |
* poll mainly entail a CAS of a slot from non-null to null. |
206 |
> |
* adds a release fence for publication,, and possibly signals |
207 |
> |
* waiting workers to start scanning -- see below.) Both a |
208 |
> |
* successful pop and poll mainly entail a CAS of a slot from |
209 |
> |
* non-null to null. |
210 |
|
* |
211 |
|
* The pop operation (always performed by owner) is: |
212 |
|
* if ((the task at top slot is not null) and |
218 |
|
* (CAS slot to null)) |
219 |
|
* increment base and return task; |
220 |
|
* |
221 |
< |
* There are several variants of each of these. In particular, |
222 |
< |
* almost all uses of poll occur within scan operations that also |
223 |
< |
* interleave contention tracking (with associated code sprawl.) |
221 |
> |
* There are several variants of each of these. Most uses occur |
222 |
> |
* within operations that also interleave contention or emptiness |
223 |
> |
* tracking or inspection of elements before extracting them, so |
224 |
> |
* must interleave these with the above code. When performed by |
225 |
> |
* owner, getAndSet is used instead of CAS (see for example method |
226 |
> |
* nextLocalTask) which is usually more efficient, and possible |
227 |
> |
* because the top index cannot independently change during the |
228 |
> |
* operation. |
229 |
|
* |
230 |
|
* Memory ordering. See "Correct and Efficient Work-Stealing for |
231 |
|
* Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 |
234 |
|
* algorithms similar to (but different than) the one used here. |
235 |
|
* Extracting tasks in array slots via (fully fenced) CAS provides |
236 |
|
* primary synchronization. The base and top indices imprecisely |
237 |
< |
* guide where to extract from. We do not always require strict |
238 |
< |
* orderings of array and index updates, so sometimes let them be |
239 |
< |
* subject to compiler and processor reorderings. However, the |
240 |
< |
* volatile "base" index also serves as a basis for memory |
241 |
< |
* ordering: Slot accesses are preceded by a read of base, |
242 |
< |
* ensuring happens-before ordering with respect to stealers (so |
243 |
< |
* the slots themselves can be read via plain array reads.) The |
244 |
< |
* only other memory orderings relied on are maintained in the |
245 |
< |
* course of signalling and activation (see below). A check that |
246 |
< |
* base == top indicates (momentary) emptiness, but otherwise may |
247 |
< |
* err on the side of possibly making the queue appear nonempty |
248 |
< |
* when a push, pop, or poll have not fully committed, or making |
249 |
< |
* it appear empty when an update of top has not yet been visibly |
250 |
< |
* written. (Method isEmpty() checks the case of a partially |
251 |
< |
* completed removal of the last element.) Because of this, the |
252 |
< |
* poll operation, considered individually, is not wait-free. One |
253 |
< |
* thief cannot successfully continue until another in-progress |
254 |
< |
* one (or, if previously empty, a push) visibly completes. |
255 |
< |
* However, in the aggregate, we ensure at least probabilistic |
237 |
> |
* guide where to extract from. We do not usually require strict |
238 |
> |
* orderings of array and index updates. Many index accesses use |
239 |
> |
* plain mode, with ordering constrained by surrounding context |
240 |
> |
* (usually with respect to element CASes or the two WorkQueue |
241 |
> |
* volatile fields source and phase). When not otherwise already |
242 |
> |
* constrained, reads of "base" by queue owners use acquire-mode, |
243 |
> |
* and some externally callable methods preface accesses with |
244 |
> |
* acquire fences. Additionally, to ensure that index update |
245 |
> |
* writes are not coalesced or postponed in loops etc, "opaque" |
246 |
> |
* mode is used in a few cases where timely writes are not |
247 |
> |
* otherwise ensured. The "locked" versions of push- and pop- |
248 |
> |
* based methods for shared queues differ from owned versions |
249 |
> |
* because locking already forces some of the ordering. |
250 |
> |
* |
251 |
> |
* Because indices and slot contents cannot always be consistent, |
252 |
> |
* a check that base == top indicates (momentary) emptiness, but |
253 |
> |
* otherwise may err on the side of possibly making the queue |
254 |
> |
* appear nonempty when a push, pop, or poll have not fully |
255 |
> |
* committed, or making it appear empty when an update of top has |
256 |
> |
* not yet been visibly written. (Method isEmpty() checks the |
257 |
> |
* case of a partially completed removal of the last element.) |
258 |
> |
* Because of this, the poll operation, considered individually, |
259 |
> |
* is not wait-free. One thief cannot successfully continue until |
260 |
> |
* another in-progress one (or, if previously empty, a push) |
261 |
> |
* visibly completes. This can stall threads when required to |
262 |
> |
* consume from a given queue (see method poll()). However, in |
263 |
> |
* the aggregate, we ensure at least probabilistic |
264 |
|
* non-blockingness. If an attempted steal fails, a scanning |
265 |
|
* thief chooses a different random victim target to try next. So, |
266 |
|
* in order for one thief to progress, it suffices for any |
267 |
< |
* in-progress poll or new push on any empty queue to |
249 |
< |
* complete. |
267 |
> |
* in-progress poll or new push on any empty queue to complete. |
268 |
|
* |
269 |
|
* This approach also enables support of a user mode in which |
270 |
|
* local task processing is in FIFO, not LIFO order, simply by |
285 |
|
* different position to use or create other queues -- they block |
286 |
|
* only when creating and registering new queues. Because it is |
287 |
|
* used only as a spinlock, unlocking requires only a "releasing" |
288 |
< |
* store (using setRelease). |
288 |
> |
* store (using setRelease) unless otherwise signalling. |
289 |
|
* |
290 |
|
* Management |
291 |
|
* ========== |
306 |
|
* |
307 |
|
* Field "ctl" contains 64 bits holding information needed to |
308 |
|
* atomically decide to add, enqueue (on an event queue), and |
309 |
< |
* dequeue (and release)-activate workers. To enable this |
310 |
< |
* packing, we restrict maximum parallelism to (1<<15)-1 (which is |
311 |
< |
* far in excess of normal operating range) to allow ids, counts, |
312 |
< |
* and their negations (used for thresholding) to fit into 16bit |
309 |
> |
* dequeue and release workers. To enable this packing, we |
310 |
> |
* restrict maximum parallelism to (1<<15)-1 (which is far in |
311 |
> |
* excess of normal operating range) to allow ids, counts, and |
312 |
> |
* their negations (used for thresholding) to fit into 16bit |
313 |
|
* subfields. |
314 |
|
* |
315 |
|
* Field "mode" holds configuration parameters as well as lifetime |
321 |
|
* lock (using field workerNamePrefix as lock), but is otherwise |
322 |
|
* concurrently readable, and accessed directly. We also ensure |
323 |
|
* that uses of the array reference itself never become too stale |
324 |
< |
* in case of resizing. To simplify index-based operations, the |
325 |
< |
* array size is always a power of two, and all readers must |
326 |
< |
* tolerate null slots. Worker queues are at odd indices. Shared |
327 |
< |
* (submission) queues are at even indices, up to a maximum of 64 |
328 |
< |
* slots, to limit growth even if array needs to expand to add |
329 |
< |
* more workers. Grouping them together in this way simplifies and |
330 |
< |
* speeds up task scanning. |
324 |
> |
* in case of resizing, by arranging that (re-)reads are separated |
325 |
> |
* by at least one acquiring read access. To simplify index-based |
326 |
> |
* operations, the array size is always a power of two, and all |
327 |
> |
* readers must tolerate null slots. Worker queues are at odd |
328 |
> |
* indices. Shared (submission) queues are at even indices, up to |
329 |
> |
* a maximum of 64 slots, to limit growth even if array needs to |
330 |
> |
* expand to add more workers. Grouping them together in this way |
331 |
> |
* simplifies and speeds up task scanning. |
332 |
|
* |
333 |
|
* All worker thread creation is on-demand, triggered by task |
334 |
|
* submissions, replacement of terminated workers, and/or |
406 |
|
* releases so usage requires care -- seeing a negative phase does |
407 |
|
* not guarantee that the worker is available. When queued, the |
408 |
|
* lower 16 bits of scanState must hold its pool index. So we |
409 |
< |
* place the index there upon initialization (see registerWorker) |
410 |
< |
* and otherwise keep it there or restore it when necessary. |
409 |
> |
* place the index there upon initialization and otherwise keep it |
410 |
> |
* there or restore it when necessary. |
411 |
|
* |
412 |
|
* The ctl field also serves as the basis for memory |
413 |
|
* synchronization surrounding activation. This uses a more |
415 |
|
* consumers sync with each other by both writing/CASing ctl (even |
416 |
|
* if to its current value). This would be extremely costly. So |
417 |
|
* we relax it in several ways: (1) Producers only signal when |
418 |
< |
* their queue is empty. Other workers propagate this signal (in |
419 |
< |
* method scan) when they find tasks; to further reduce flailing, |
420 |
< |
* each worker signals only one other per activation. (2) Workers |
421 |
< |
* only enqueue after scanning (see below) and not finding any |
422 |
< |
* tasks. (3) Rather than CASing ctl to its current value in the |
423 |
< |
* common case where no action is required, we reduce write |
424 |
< |
* contention by equivalently prefacing signalWork when called by |
425 |
< |
* an external task producer using a memory access with |
407 |
< |
* full-volatile semantics or a "fullFence". |
418 |
> |
* their queue is seen as empty at some point during a push |
419 |
> |
* operation. (2) Other workers propagate this signal when they |
420 |
> |
* find tasks. (3) Workers only enqueue after scanning (see below) |
421 |
> |
* and not finding any tasks. (4) Rather than CASing ctl to its |
422 |
> |
* current value in the common case where no action is required, |
423 |
> |
* we reduce write contention by equivalently prefacing signalWork |
424 |
> |
* when called by an external task producer using a memory access |
425 |
> |
* with full-volatile semantics or a "fullFence". |
426 |
|
* |
427 |
|
* Almost always, too many signals are issued. A task producer |
428 |
|
* cannot in general tell if some existing worker is in the midst |
434 |
|
* and bookkeeping bottlenecks during ramp-up, ramp-down, and small |
435 |
|
* computations involving only a few workers. |
436 |
|
* |
437 |
< |
* Scanning. Method runWorker performs top-level scanning for |
438 |
< |
* tasks. Each scan traverses and tries to poll from each queue |
439 |
< |
* starting at a random index and circularly stepping. Scans are |
440 |
< |
* not performed in ideal random permutation order, to reduce |
441 |
< |
* cacheline contention. The pseudorandom generator need not have |
437 |
> |
* Scanning. Method scan (from runWorker) performs top-level |
438 |
> |
* scanning for tasks. (Similar scans appear in helpQuiesce and |
439 |
> |
* pollScan.) Each scan traverses and tries to poll from each |
440 |
> |
* queue starting at a random index. Scans are not performed in |
441 |
> |
* ideal random permutation order, to reduce cacheline |
442 |
> |
* contention. The pseudorandom generator need not have |
443 |
|
* high-quality statistical properties in the long term, but just |
444 |
|
* within computations; We use Marsaglia XorShifts (often via |
445 |
|
* ThreadLocalRandom.nextSecondarySeed), which are cheap and |
446 |
< |
* suffice. Scanning also employs contention reduction: When |
446 |
> |
* suffice. Scanning also includes contention reduction: When |
447 |
|
* scanning workers fail to extract an apparently existing task, |
448 |
< |
* they soon restart at a different pseudorandom index. This |
449 |
< |
* improves throughput when many threads are trying to take tasks |
450 |
< |
* from few queues, which can be common in some usages. Scans do |
451 |
< |
* not otherwise explicitly take into account core affinities, |
452 |
< |
* loads, cache localities, etc, However, they do exploit temporal |
453 |
< |
* locality (which usually approximates these) by preferring to |
454 |
< |
* re-poll (at most #workers times) from the same queue after a |
455 |
< |
* successful poll before trying others. |
448 |
> |
* they soon restart at a different pseudorandom index. This form |
449 |
> |
* of backoff improves throughput when many threads are trying to |
450 |
> |
* take tasks from few queues, which can be common in some usages. |
451 |
> |
* Scans do not otherwise explicitly take into account core |
452 |
> |
* affinities, loads, cache localities, etc, However, they do |
453 |
> |
* exploit temporal locality (which usually approximates these) by |
454 |
> |
* preferring to re-poll from the same queue after a successful |
455 |
> |
* poll before trying others (see method topLevelExec). |
456 |
|
* |
457 |
|
* Trimming workers. To release resources after periods of lack of |
458 |
|
* use, a worker starting to wait when the pool is quiescent will |
459 |
< |
* time out and terminate (see method scan) if the pool has |
459 |
> |
* time out and terminate (see method runWorker) if the pool has |
460 |
|
* remained quiescent for period given by field keepAlive. |
461 |
|
* |
462 |
|
* Shutdown and Termination. A call to shutdownNow invokes |
524 |
|
* time. Some previous versions of this class employed immediate |
525 |
|
* compensations for any blocked join. However, in practice, the |
526 |
|
* vast majority of blockages are transient byproducts of GC and |
527 |
< |
* other JVM or OS activities that are made worse by replacement. |
528 |
< |
* Rather than impose arbitrary policies, we allow users to |
529 |
< |
* override the default of only adding threads upon apparent |
530 |
< |
* starvation. The compensation mechanism may also be bounded. |
531 |
< |
* Bounds for the commonPool (see COMMON_MAX_SPARES) better enable |
532 |
< |
* JVMs to cope with programming errors and abuse before running |
533 |
< |
* out of resources to do so. |
527 |
> |
* other JVM or OS activities that are made worse by replacement |
528 |
> |
* when they cause longer-term oversubscription. Rather than |
529 |
> |
* impose arbitrary policies, we allow users to override the |
530 |
> |
* default of only adding threads upon apparent starvation. The |
531 |
> |
* compensation mechanism may also be bounded. Bounds for the |
532 |
> |
* commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope |
533 |
> |
* with programming errors and abuse before running out of |
534 |
> |
* resources to do so. |
535 |
|
* |
536 |
|
* Common Pool |
537 |
|
* =========== |
564 |
|
* in ForkJoinWorkerThread) may be JVM-dependent and must access |
565 |
|
* particular Thread class fields to achieve this effect. |
566 |
|
* |
567 |
+ |
* Memory placement |
568 |
+ |
* ================ |
569 |
+ |
* |
570 |
+ |
* Performance can be very sensitive to placement of instances of |
571 |
+ |
* ForkJoinPool and WorkQueues and their queue arrays. To reduce |
572 |
+ |
* false-sharing impact, the @Contended annotation isolates |
573 |
+ |
* adjacent WorkQueue instances, as well as the ForkJoinPool.ctl |
574 |
+ |
* field. WorkQueue arrays are allocated (by their threads) with |
575 |
+ |
* larger initial sizes than most ever need, mostly to reduce |
576 |
+ |
* false sharing current garbage collectors that use cardmark |
577 |
+ |
* tables. |
578 |
+ |
* |
579 |
|
* Style notes |
580 |
|
* =========== |
581 |
|
* |
583 |
|
* awkward and ugly, but also reflects the need to control |
584 |
|
* outcomes across the unusual cases that arise in very racy code |
585 |
|
* with very few invariants. All fields are read into locals |
586 |
< |
* before use, and null-checked if they are references. This is |
587 |
< |
* usually done in a "C"-like style of listing declarations at the |
588 |
< |
* heads of methods or blocks, and using inline assignments on |
589 |
< |
* first encounter. Nearly all explicit checks lead to |
590 |
< |
* bypass/return, not exception throws, because they may |
591 |
< |
* legitimately arise due to cancellation/revocation during |
592 |
< |
* shutdown. |
586 |
> |
* before use, and null-checked if they are references. Array |
587 |
> |
* accesses using masked indices include checks (that are always |
588 |
> |
* true) that the array length is non-zero to avoid compilers |
589 |
> |
* inserting more expensive traps. This is usually done in a |
590 |
> |
* "C"-like style of listing declarations at the heads of methods |
591 |
> |
* or blocks, and using inline assignments on first encounter. |
592 |
> |
* Nearly all explicit checks lead to bypass/return, not exception |
593 |
> |
* throws, because they may legitimately arise due to |
594 |
> |
* cancellation/revocation during shutdown. |
595 |
|
* |
596 |
|
* There is a lot of representation-level coupling among classes |
597 |
|
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The |
601 |
|
* representations will need to be accompanied by algorithmic |
602 |
|
* changes anyway. Several methods intrinsically sprawl because |
603 |
|
* they must accumulate sets of consistent reads of fields held in |
604 |
< |
* local variables. There are also other coding oddities |
605 |
< |
* (including several unnecessary-looking hoisted null checks) |
606 |
< |
* that help some methods perform reasonably even when interpreted |
607 |
< |
* (not compiled). |
604 |
> |
* local variables. Some others are artificially broken up to |
605 |
> |
* reduce producer/consumer imbalances due to dynamic compilation. |
606 |
> |
* There are also other coding oddities (including several |
607 |
> |
* unnecessary-looking hoisted null checks) that help some methods |
608 |
> |
* perform reasonably even when interpreted (not compiled). |
609 |
|
* |
610 |
|
* The order of declarations in this file is (with a few exceptions): |
611 |
|
* (1) Static utility functions |
709 |
|
static final int DORMANT = QUIET | UNSIGNALLED; |
710 |
|
|
711 |
|
/** |
712 |
< |
* The maximum number of local polls from the same queue before |
712 |
> |
* Initial capacity of work-stealing queue array. |
713 |
> |
* Must be a power of two, at least 2. |
714 |
> |
*/ |
715 |
> |
static final int INITIAL_QUEUE_CAPACITY = 1 << 13; |
716 |
> |
|
717 |
> |
/** |
718 |
> |
* Maximum capacity for queue arrays. Must be a power of two less |
719 |
> |
* than or equal to 1 << (31 - width of array entry) to ensure |
720 |
> |
* lack of wraparound of index calculations, but defined to a |
721 |
> |
* value a bit less than this to help users trap runway programs |
722 |
> |
* before saturating systems. |
723 |
> |
*/ |
724 |
> |
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M |
725 |
> |
|
726 |
> |
/** |
727 |
> |
* The maximum number of top-level polls from local queue before |
728 |
|
* checking others. This is a safeguard against infinitely unfair |
729 |
|
* looping under unbounded user task recursion, and must be larger |
730 |
|
* than plausible cases of intentional bounded task recursion. |
731 |
|
*/ |
732 |
< |
static final int POLL_LIMIT = 1 << 10; |
732 |
> |
static final int SELF_CONSUME_LIMIT = 1 << 10; |
733 |
|
|
734 |
|
/** |
735 |
|
* Queues supporting work-stealing as well as external task |
736 |
|
* submission. See above for descriptions and algorithms. |
687 |
– |
* Performance on most platforms is very sensitive to placement of |
688 |
– |
* instances of both WorkQueues and their arrays -- we absolutely |
689 |
– |
* do not want multiple WorkQueue instances or multiple queue |
690 |
– |
* arrays sharing cache lines. The @Contended annotation alerts |
691 |
– |
* JVMs to try to keep instances apart. |
737 |
|
*/ |
738 |
|
@jdk.internal.vm.annotation.Contended |
739 |
|
static final class WorkQueue { |
740 |
< |
|
741 |
< |
/** |
742 |
< |
* Capacity of work-stealing queue array upon initialization. |
743 |
< |
* Must be a power of two; at least 4, but should be larger to |
699 |
< |
* reduce or eliminate cacheline sharing among queues. |
700 |
< |
* Currently, it is much larger, as a partial workaround for |
701 |
< |
* the fact that JVMs often place arrays in locations that |
702 |
< |
* share GC bookkeeping (especially cardmarks) such that |
703 |
< |
* per-write accesses encounter serious memory contention. |
704 |
< |
*/ |
705 |
< |
static final int INITIAL_QUEUE_CAPACITY = 1 << 13; |
706 |
< |
|
707 |
< |
/** |
708 |
< |
* Maximum size for queue arrays. Must be a power of two less |
709 |
< |
* than or equal to 1 << (31 - width of array entry) to ensure |
710 |
< |
* lack of wraparound of index calculations, but defined to a |
711 |
< |
* value a bit less than this to help users trap runaway |
712 |
< |
* programs before saturating systems. |
713 |
< |
*/ |
714 |
< |
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M |
715 |
< |
|
716 |
< |
// Instance fields |
740 |
> |
volatile int source; // source queue id, or sentinel |
741 |
> |
int id; // pool index, mode, tag |
742 |
> |
int base; // index of next slot for poll |
743 |
> |
int top; // index of next slot for push |
744 |
|
volatile int phase; // versioned, negative: queued, 1: locked |
745 |
|
int stackPred; // pool stack (ctl) predecessor link |
746 |
|
int nsteals; // number of steals |
747 |
< |
int id; // index, mode, tag |
721 |
< |
volatile int source; // source queue id, or sentinel |
722 |
< |
volatile int base; // index of next slot for poll |
723 |
< |
int top; // index of next slot for push |
724 |
< |
ForkJoinTask<?>[] array; // the elements (initially unallocated) |
747 |
> |
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size |
748 |
|
final ForkJoinPool pool; // the containing pool (may be null) |
749 |
|
final ForkJoinWorkerThread owner; // owning thread or null if shared |
750 |
|
|
756 |
|
} |
757 |
|
|
758 |
|
/** |
759 |
+ |
* Tries to lock shared queue by CASing phase field. |
760 |
+ |
*/ |
761 |
+ |
final boolean tryLockPhase() { |
762 |
+ |
return PHASE.compareAndSet(this, 0, 1); |
763 |
+ |
} |
764 |
+ |
|
765 |
+ |
final void releasePhaseLock() { |
766 |
+ |
PHASE.setRelease(this, 0); |
767 |
+ |
} |
768 |
+ |
|
769 |
+ |
/** |
770 |
|
* Returns an exportable index (used by ForkJoinWorkerThread). |
771 |
|
*/ |
772 |
|
final int getPoolIndex() { |
777 |
|
* Returns the approximate number of tasks in the queue. |
778 |
|
*/ |
779 |
|
final int queueSize() { |
780 |
< |
int n = base - top; // read base first |
780 |
> |
int n = (int)BASE.getAcquire(this) - top; |
781 |
|
return (n >= 0) ? 0 : -n; // ignore transient negative |
782 |
|
} |
783 |
|
|
787 |
|
* near-empty queue has at least one unclaimed task. |
788 |
|
*/ |
789 |
|
final boolean isEmpty() { |
790 |
< |
ForkJoinTask<?>[] a; int n, al, b; |
790 |
> |
ForkJoinTask<?>[] a; int n, cap, b; |
791 |
> |
VarHandle.acquireFence(); // needed by external callers |
792 |
|
return ((n = (b = base) - top) >= 0 || // possibly one task |
793 |
|
(n == -1 && ((a = array) == null || |
794 |
< |
(al = a.length) == 0 || |
795 |
< |
a[(al - 1) & b] == null))); |
794 |
> |
(cap = a.length) == 0 || |
795 |
> |
a[(cap - 1) & b] == null))); |
796 |
|
} |
797 |
|
|
763 |
– |
|
798 |
|
/** |
799 |
|
* Pushes a task. Call only by owner in unshared queues. |
800 |
|
* |
802 |
|
* @throws RejectedExecutionException if array cannot be resized |
803 |
|
*/ |
804 |
|
final void push(ForkJoinTask<?> task) { |
805 |
< |
int s = top; ForkJoinTask<?>[] a; int al, d; |
806 |
< |
if ((a = array) != null && (al = a.length) > 0) { |
807 |
< |
int index = (al - 1) & s; |
808 |
< |
ForkJoinPool p = pool; |
805 |
> |
ForkJoinTask<?>[] a; |
806 |
> |
int s = top, d, cap; |
807 |
> |
ForkJoinPool p = pool; |
808 |
> |
if ((a = array) != null && (cap = a.length) > 0) { |
809 |
> |
QA.setRelease(a, (cap - 1) & s, task); |
810 |
|
top = s + 1; |
811 |
< |
QA.setRelease(a, index, task); |
777 |
< |
if ((d = base - s) == 0 && p != null) { |
811 |
> |
if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) { |
812 |
|
VarHandle.fullFence(); |
813 |
|
p.signalWork(); |
814 |
|
} |
815 |
< |
else if (d + al == 1) |
816 |
< |
growArray(); |
815 |
> |
else if (d + cap - 1 == 0) |
816 |
> |
growArray(false); |
817 |
|
} |
818 |
|
} |
819 |
|
|
820 |
|
/** |
821 |
< |
* Initializes or doubles the capacity of array. Call either |
822 |
< |
* by owner or with lock held -- it is OK for base, but not |
789 |
< |
* top, to move while resizings are in progress. |
821 |
> |
* Version of push for shared queues. Call only with phase lock held. |
822 |
> |
* @return true if should signal work |
823 |
|
*/ |
824 |
< |
final ForkJoinTask<?>[] growArray() { |
825 |
< |
ForkJoinTask<?>[] oldA = array; |
826 |
< |
int oldSize = oldA != null ? oldA.length : 0; |
827 |
< |
int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY; |
828 |
< |
if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) |
829 |
< |
throw new RejectedExecutionException("Queue capacity exceeded"); |
830 |
< |
int oldMask, t, b; |
831 |
< |
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; |
832 |
< |
if (oldA != null && (oldMask = oldSize - 1) > 0 && |
833 |
< |
(t = top) - (b = base) > 0) { |
834 |
< |
int mask = size - 1; |
835 |
< |
do { // emulate poll from old array, push to new array |
836 |
< |
int index = b & oldMask; |
837 |
< |
ForkJoinTask<?> x = (ForkJoinTask<?>) |
838 |
< |
QA.getAcquire(oldA, index); |
839 |
< |
if (x != null && |
807 |
< |
QA.compareAndSet(oldA, index, x, null)) |
808 |
< |
a[b & mask] = x; |
809 |
< |
} while (++b != t); |
810 |
< |
VarHandle.releaseFence(); |
811 |
< |
} |
812 |
< |
return a; |
824 |
> |
final boolean lockedPush(ForkJoinTask<?> task) { |
825 |
> |
ForkJoinTask<?>[] a; |
826 |
> |
boolean signal = false; |
827 |
> |
int s = top, b = base, cap; |
828 |
> |
if ((a = array) != null && (cap = a.length) > 0) { |
829 |
> |
a[(cap - 1) & s] = task; |
830 |
> |
top = s + 1; |
831 |
> |
if (b - s + cap - 1 == 0) |
832 |
> |
growArray(true); |
833 |
> |
else { |
834 |
> |
phase = 0; // full volatile unlock |
835 |
> |
if (base == s) |
836 |
> |
signal = true; |
837 |
> |
} |
838 |
> |
} |
839 |
> |
return signal; |
840 |
|
} |
841 |
|
|
842 |
|
/** |
843 |
< |
* Takes next task, if one exists, in LIFO order. Call only |
844 |
< |
* by owner in unshared queues. |
845 |
< |
*/ |
846 |
< |
final ForkJoinTask<?> pop() { |
847 |
< |
int b = base, s = top, al, i; ForkJoinTask<?>[] a; |
848 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
849 |
< |
int index = (al - 1) & --s; |
850 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
851 |
< |
QA.get(a, index); |
852 |
< |
if (t != null && |
853 |
< |
QA.compareAndSet(a, index, t, null)) { |
854 |
< |
top = s; |
855 |
< |
VarHandle.releaseFence(); |
856 |
< |
return t; |
843 |
> |
* Doubles the capacity of array. Call either by owner or with |
844 |
> |
* lock held -- it is OK for base, but not top, to move while |
845 |
> |
* resizings are in progress. |
846 |
> |
*/ |
847 |
> |
final void growArray(boolean locked) { |
848 |
> |
ForkJoinTask<?>[] newA = null; |
849 |
> |
try { |
850 |
> |
ForkJoinTask<?>[] oldA; int oldSize, newSize; |
851 |
> |
if ((oldA = array) != null && (oldSize = oldA.length) > 0 && |
852 |
> |
(newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && |
853 |
> |
newSize > 0) { |
854 |
> |
try { |
855 |
> |
newA = new ForkJoinTask<?>[newSize]; |
856 |
> |
} catch (OutOfMemoryError ex) { |
857 |
> |
} |
858 |
> |
if (newA != null) { // poll from old array, push to new |
859 |
> |
int oldMask = oldSize - 1, newMask = newSize - 1; |
860 |
> |
for (int s = top - 1, k = oldMask; k >= 0; --k) { |
861 |
> |
ForkJoinTask<?> x = (ForkJoinTask<?>) |
862 |
> |
QA.getAndSet(oldA, s & oldMask, null); |
863 |
> |
if (x != null) |
864 |
> |
newA[s-- & newMask] = x; |
865 |
> |
else |
866 |
> |
break; |
867 |
> |
} |
868 |
> |
array = newA; |
869 |
> |
VarHandle.releaseFence(); |
870 |
> |
} |
871 |
|
} |
872 |
+ |
} finally { |
873 |
+ |
if (locked) |
874 |
+ |
phase = 0; |
875 |
|
} |
876 |
< |
return null; |
876 |
> |
if (newA == null) |
877 |
> |
throw new RejectedExecutionException("Queue capacity exceeded"); |
878 |
|
} |
879 |
|
|
880 |
|
/** |
881 |
|
* Takes next task, if one exists, in FIFO order. |
882 |
|
*/ |
883 |
|
final ForkJoinTask<?> poll() { |
884 |
< |
for (;;) { |
885 |
< |
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
886 |
< |
if ((a = array) != null && (d = b - s) < 0 && |
887 |
< |
(al = a.length) > 0) { |
888 |
< |
int index = (al - 1) & b; |
889 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
890 |
< |
QA.getAcquire(a, index); |
891 |
< |
if (b++ == base) { |
892 |
< |
if (t != null) { |
893 |
< |
if (QA.compareAndSet(a, index, t, null)) { |
894 |
< |
base = b; |
850 |
< |
return t; |
851 |
< |
} |
852 |
< |
} |
853 |
< |
else if (d == -1) |
854 |
< |
break; // now empty |
884 |
> |
int b, k, cap; ForkJoinTask<?>[] a; |
885 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
886 |
> |
(b = base) != top) { |
887 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
888 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
889 |
> |
if (base == b++) { |
890 |
> |
if (t == null) |
891 |
> |
Thread.yield(); // await index advance |
892 |
> |
else if (QA.compareAndSet(a, k, t, null)) { |
893 |
> |
BASE.setOpaque(this, b); |
894 |
> |
return t; |
895 |
|
} |
896 |
|
} |
857 |
– |
else |
858 |
– |
break; |
897 |
|
} |
898 |
|
return null; |
899 |
|
} |
902 |
|
* Takes next task, if one exists, in order specified by mode. |
903 |
|
*/ |
904 |
|
final ForkJoinTask<?> nextLocalTask() { |
905 |
< |
return ((id & FIFO) != 0) ? poll() : pop(); |
905 |
> |
ForkJoinTask<?> t = null; |
906 |
> |
int md = id, b, s, d, cap; ForkJoinTask<?>[] a; |
907 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
908 |
> |
(d = (s = top) - (b = base)) > 0) { |
909 |
> |
if ((md & FIFO) == 0 || d == 1) { |
910 |
> |
if ((t = (ForkJoinTask<?>) |
911 |
> |
QA.getAndSet(a, (cap - 1) & --s, null)) != null) |
912 |
> |
TOP.setOpaque(this, s); |
913 |
> |
} |
914 |
> |
else if ((t = (ForkJoinTask<?>) |
915 |
> |
QA.getAndSet(a, (cap - 1) & b++, null)) != null) { |
916 |
> |
BASE.setOpaque(this, b); |
917 |
> |
} |
918 |
> |
else // on contention in FIFO mode, use regular poll |
919 |
> |
t = poll(); |
920 |
> |
} |
921 |
> |
return t; |
922 |
|
} |
923 |
|
|
924 |
|
/** |
925 |
|
* Returns next task, if one exists, in order specified by mode. |
926 |
|
*/ |
927 |
|
final ForkJoinTask<?> peek() { |
928 |
< |
int al; ForkJoinTask<?>[] a; |
929 |
< |
return ((a = array) != null && (al = a.length) > 0) ? |
930 |
< |
a[(al - 1) & |
877 |
< |
((id & FIFO) != 0 ? base : top - 1)] : null; |
928 |
> |
int cap; ForkJoinTask<?>[] a; |
929 |
> |
return ((a = array) != null && (cap = a.length) > 0) ? |
930 |
> |
a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; |
931 |
|
} |
932 |
|
|
933 |
|
/** |
934 |
|
* Pops the given task only if it is at the current top. |
935 |
|
*/ |
936 |
|
final boolean tryUnpush(ForkJoinTask<?> task) { |
937 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
938 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
939 |
< |
int index = (al - 1) & --s; |
940 |
< |
if (QA.compareAndSet(a, index, task, null)) { |
937 |
> |
boolean popped = false; |
938 |
> |
int s, cap; ForkJoinTask<?>[] a; |
939 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
940 |
> |
(s = top) != base && |
941 |
> |
(popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) |
942 |
> |
TOP.setOpaque(this, s); |
943 |
> |
return popped; |
944 |
> |
} |
945 |
> |
|
946 |
> |
/** |
947 |
> |
* Shared version of tryUnpush. |
948 |
> |
*/ |
949 |
> |
final boolean tryLockedUnpush(ForkJoinTask<?> task) { |
950 |
> |
boolean popped = false; |
951 |
> |
int s = top - 1, k, cap; ForkJoinTask<?>[] a; |
952 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
953 |
> |
a[k = (cap - 1) & s] == task && tryLockPhase()) { |
954 |
> |
if (top == s + 1 && array == a && |
955 |
> |
(popped = QA.compareAndSet(a, k, task, null))) |
956 |
|
top = s; |
957 |
< |
VarHandle.releaseFence(); |
890 |
< |
return true; |
891 |
< |
} |
957 |
> |
releasePhaseLock(); |
958 |
|
} |
959 |
< |
return false; |
959 |
> |
return popped; |
960 |
|
} |
961 |
|
|
962 |
|
/** |
970 |
|
// Specialized execution methods |
971 |
|
|
972 |
|
/** |
973 |
< |
* Pops and executes up to limit consecutive tasks or until empty. |
974 |
< |
* |
975 |
< |
* @param limit max runs, or zero for no limit |
976 |
< |
*/ |
977 |
< |
final void localPopAndExec(int limit) { |
978 |
< |
for (;;) { |
979 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
980 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
981 |
< |
int index = (al - 1) & --s; |
982 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
983 |
< |
QA.getAndSet(a, index, null); |
984 |
< |
if (t != null) { |
985 |
< |
top = s; |
986 |
< |
VarHandle.releaseFence(); |
987 |
< |
t.doExec(); |
922 |
< |
if (limit != 0 && --limit == 0) |
923 |
< |
break; |
924 |
< |
} |
925 |
< |
else |
926 |
< |
break; |
927 |
< |
} |
928 |
< |
else |
929 |
< |
break; |
930 |
< |
} |
931 |
< |
} |
932 |
< |
|
933 |
< |
/** |
934 |
< |
* Polls and executes up to limit consecutive tasks or until empty. |
935 |
< |
* |
936 |
< |
* @param limit, or zero for no limit |
937 |
< |
*/ |
938 |
< |
final void localPollAndExec(int limit) { |
939 |
< |
for (int polls = 0;;) { |
940 |
< |
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
941 |
< |
if ((a = array) != null && (d = b - s) < 0 && |
942 |
< |
(al = a.length) > 0) { |
943 |
< |
int index = (al - 1) & b++; |
944 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
945 |
< |
QA.getAndSet(a, index, null); |
946 |
< |
if (t != null) { |
947 |
< |
base = b; |
948 |
< |
t.doExec(); |
949 |
< |
if (limit != 0 && ++polls == limit) |
950 |
< |
break; |
951 |
< |
} |
952 |
< |
else if (d == -1) |
953 |
< |
break; // now empty |
954 |
< |
else |
955 |
< |
polls = 0; // stolen; reset |
956 |
< |
} |
973 |
> |
* Runs the given (stolen) task, as well as remaining local |
974 |
> |
* tasks (up to limit) and others available from the given |
975 |
> |
* queue. |
976 |
> |
*/ |
977 |
> |
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q) { |
978 |
> |
int nstolen = 1; |
979 |
> |
for (int nlocal = 0;;) { |
980 |
> |
if (t != null) |
981 |
> |
t.doExec(); |
982 |
> |
if (nlocal >= SELF_CONSUME_LIMIT) |
983 |
> |
break; // avoid infinite self-consume loops |
984 |
> |
else if ((t = nextLocalTask()) != null) |
985 |
> |
++nlocal; |
986 |
> |
else if (q != null && (t = q.poll()) != null) |
987 |
> |
++nstolen; |
988 |
|
else |
989 |
|
break; |
990 |
|
} |
991 |
+ |
ForkJoinWorkerThread thread = owner; |
992 |
+ |
nsteals += nstolen; |
993 |
+ |
source = 0; |
994 |
+ |
if (thread != null) |
995 |
+ |
thread.afterTopLevelExec(); |
996 |
|
} |
997 |
|
|
998 |
|
/** |
999 |
|
* If present, removes task from queue and executes it. |
1000 |
|
*/ |
1001 |
|
final void tryRemoveAndExec(ForkJoinTask<?> task) { |
1002 |
< |
ForkJoinTask<?>[] wa; int s, wal; |
1003 |
< |
if (base - (s = top) < 0 && // traverse from top |
1004 |
< |
(wa = array) != null && (wal = wa.length) > 0) { |
1005 |
< |
for (int m = wal - 1, ns = s - 1, i = ns; ; --i) { |
1002 |
> |
ForkJoinTask<?>[] a; int s, cap; |
1003 |
> |
if ((a = array) != null && (cap = a.length) > 0 && |
1004 |
> |
base - (s = top) < 0) { // traverse from top |
1005 |
> |
for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { |
1006 |
|
int index = i & m; |
1007 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
972 |
< |
QA.get(wa, index); |
1007 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); |
1008 |
|
if (t == null) |
1009 |
|
break; |
1010 |
|
else if (t == task) { |
1011 |
< |
if (QA.compareAndSet(wa, index, t, null)) { |
1011 |
> |
if (QA.compareAndSet(a, index, t, null)) { |
1012 |
|
top = ns; // safely shift down |
1013 |
|
for (int j = i; j != ns; ++j) { |
1014 |
|
ForkJoinTask<?> f; |
1015 |
|
int pindex = (j + 1) & m; |
1016 |
< |
f = (ForkJoinTask<?>)QA.get(wa, pindex); |
1017 |
< |
QA.setVolatile(wa, pindex, null); |
1016 |
> |
f = (ForkJoinTask<?>)QA.get(a, pindex); |
1017 |
> |
QA.setVolatile(a, pindex, null); |
1018 |
|
int jindex = j & m; |
1019 |
< |
QA.setRelease(wa, jindex, f); |
1019 |
> |
QA.setRelease(a, jindex, f); |
1020 |
|
} |
1021 |
|
VarHandle.releaseFence(); |
1022 |
|
t.doExec(); |
1028 |
|
} |
1029 |
|
|
1030 |
|
/** |
1031 |
< |
* Tries to steal and run tasks within the target's |
1032 |
< |
* computation until done, not found, or limit exceeded. |
1031 |
> |
* Tries to pop and run tasks within the target's computation |
1032 |
> |
* until done, not found, or limit exceeded. |
1033 |
|
* |
1034 |
|
* @param task root of CountedCompleter computation |
1035 |
|
* @param limit max runs, or zero for no limit |
1036 |
+ |
* @param shared true if must lock to extract task |
1037 |
|
* @return task status on exit |
1038 |
|
*/ |
1039 |
< |
final int localHelpCC(CountedCompleter<?> task, int limit) { |
1039 |
> |
final int helpCC(CountedCompleter<?> task, int limit, boolean shared) { |
1040 |
|
int status = 0; |
1041 |
|
if (task != null && (status = task.status) >= 0) { |
1042 |
< |
for (;;) { |
1043 |
< |
boolean help = false; |
1044 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1045 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1046 |
< |
int index = (al - 1) & (s - 1); |
1047 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1048 |
< |
QA.get(a, index); |
1049 |
< |
if (o instanceof CountedCompleter) { |
1050 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1051 |
< |
for (CountedCompleter<?> f = t;;) { |
1052 |
< |
if (f != task) { |
1053 |
< |
if ((f = f.completer) == null) // try parent |
1054 |
< |
break; |
1055 |
< |
} |
1056 |
< |
else { |
1057 |
< |
if (QA.compareAndSet(a, index, t, null)) { |
1042 |
> |
int s, k, cap; ForkJoinTask<?>[] a; |
1043 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
1044 |
> |
(s = top) != base) { |
1045 |
> |
CountedCompleter<?> v = null; |
1046 |
> |
ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; |
1047 |
> |
if (o instanceof CountedCompleter) { |
1048 |
> |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1049 |
> |
for (CountedCompleter<?> f = t;;) { |
1050 |
> |
if (f != task) { |
1051 |
> |
if ((f = f.completer) == null) |
1052 |
> |
break; |
1053 |
> |
} |
1054 |
> |
else if (shared) { |
1055 |
> |
if (tryLockPhase()) { |
1056 |
> |
if (top == s && array == a && |
1057 |
> |
QA.compareAndSet(a, k, t, null)) { |
1058 |
|
top = s - 1; |
1059 |
< |
VarHandle.releaseFence(); |
1024 |
< |
t.doExec(); |
1025 |
< |
help = true; |
1059 |
> |
v = t; |
1060 |
|
} |
1061 |
< |
break; |
1061 |
> |
releasePhaseLock(); |
1062 |
|
} |
1063 |
+ |
break; |
1064 |
+ |
} |
1065 |
+ |
else { |
1066 |
+ |
if (QA.compareAndSet(a, k, t, null)) { |
1067 |
+ |
top = s - 1; |
1068 |
+ |
v = t; |
1069 |
+ |
} |
1070 |
+ |
break; |
1071 |
|
} |
1072 |
|
} |
1073 |
|
} |
1074 |
< |
if ((status = task.status) < 0 || !help || |
1074 |
> |
if (v != null) |
1075 |
> |
v.doExec(); |
1076 |
> |
if ((status = task.status) < 0 || v == null || |
1077 |
|
(limit != 0 && --limit == 0)) |
1078 |
|
break; |
1079 |
|
} |
1081 |
|
return status; |
1082 |
|
} |
1083 |
|
|
1040 |
– |
// Operations on shared queues |
1041 |
– |
|
1042 |
– |
/** |
1043 |
– |
* Tries to lock shared queue by CASing phase field. |
1044 |
– |
*/ |
1045 |
– |
final boolean tryLockSharedQueue() { |
1046 |
– |
return PHASE.compareAndSet(this, 0, QLOCK); |
1047 |
– |
} |
1048 |
– |
|
1049 |
– |
/** |
1050 |
– |
* Shared version of tryUnpush. |
1051 |
– |
*/ |
1052 |
– |
final boolean trySharedUnpush(ForkJoinTask<?> task) { |
1053 |
– |
boolean popped = false; |
1054 |
– |
int s = top - 1, al; ForkJoinTask<?>[] a; |
1055 |
– |
if ((a = array) != null && (al = a.length) > 0) { |
1056 |
– |
int index = (al - 1) & s; |
1057 |
– |
ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index); |
1058 |
– |
if (t == task && |
1059 |
– |
PHASE.compareAndSet(this, 0, QLOCK)) { |
1060 |
– |
if (top == s + 1 && array == a && |
1061 |
– |
QA.compareAndSet(a, index, task, null)) { |
1062 |
– |
popped = true; |
1063 |
– |
top = s; |
1064 |
– |
} |
1065 |
– |
PHASE.setRelease(this, 0); |
1066 |
– |
} |
1067 |
– |
} |
1068 |
– |
return popped; |
1069 |
– |
} |
1070 |
– |
|
1084 |
|
/** |
1085 |
< |
* Shared version of localHelpCC. |
1085 |
> |
* Tries to poll and run AsynchronousCompletionTasks until |
1086 |
> |
* none found or blocker is released |
1087 |
> |
* |
1088 |
> |
* @param blocker the blocker |
1089 |
|
*/ |
1090 |
< |
final int sharedHelpCC(CountedCompleter<?> task, int limit) { |
1091 |
< |
int status = 0; |
1092 |
< |
if (task != null && (status = task.status) >= 0) { |
1093 |
< |
for (;;) { |
1094 |
< |
boolean help = false; |
1095 |
< |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1096 |
< |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1097 |
< |
int index = (al - 1) & (s - 1); |
1098 |
< |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1099 |
< |
QA.get(a, index); |
1100 |
< |
if (o instanceof CountedCompleter) { |
1101 |
< |
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1102 |
< |
for (CountedCompleter<?> f = t;;) { |
1103 |
< |
if (f != task) { |
1104 |
< |
if ((f = f.completer) == null) |
1089 |
< |
break; |
1090 |
< |
} |
1091 |
< |
else { |
1092 |
< |
if (PHASE.compareAndSet(this, 0, QLOCK)) { |
1093 |
< |
if (top == s && array == a && |
1094 |
< |
QA.compareAndSet(a, index, t, null)) { |
1095 |
< |
help = true; |
1096 |
< |
top = s - 1; |
1097 |
< |
} |
1098 |
< |
PHASE.setRelease(this, 0); |
1099 |
< |
if (help) |
1100 |
< |
t.doExec(); |
1101 |
< |
} |
1102 |
< |
break; |
1103 |
< |
} |
1104 |
< |
} |
1090 |
> |
final void helpAsyncBlocker(ManagedBlocker blocker) { |
1091 |
> |
if (blocker != null) { |
1092 |
> |
int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; |
1093 |
> |
while ((a = array) != null && (cap = a.length) > 0 && |
1094 |
> |
(b = base) != top) { |
1095 |
> |
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); |
1096 |
> |
if (blocker.isReleasable()) |
1097 |
> |
break; |
1098 |
> |
else if (base == b++ && t != null) { |
1099 |
> |
if (!(t instanceof CompletableFuture. |
1100 |
> |
AsynchronousCompletionTask)) |
1101 |
> |
break; |
1102 |
> |
else if (QA.compareAndSet(a, k, t, null)) { |
1103 |
> |
BASE.setOpaque(this, b); |
1104 |
> |
t.doExec(); |
1105 |
|
} |
1106 |
|
} |
1107 |
– |
if ((status = task.status) < 0 || !help || |
1108 |
– |
(limit != 0 && --limit == 0)) |
1109 |
– |
break; |
1107 |
|
} |
1108 |
|
} |
1112 |
– |
return status; |
1109 |
|
} |
1110 |
|
|
1111 |
|
/** |
1120 |
|
} |
1121 |
|
|
1122 |
|
// VarHandle mechanics. |
1123 |
< |
private static final VarHandle PHASE; |
1123 |
> |
static final VarHandle PHASE; |
1124 |
> |
static final VarHandle BASE; |
1125 |
> |
static final VarHandle TOP; |
1126 |
|
static { |
1127 |
|
try { |
1128 |
|
MethodHandles.Lookup l = MethodHandles.lookup(); |
1129 |
|
PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); |
1130 |
+ |
BASE = l.findVarHandle(WorkQueue.class, "base", int.class); |
1131 |
+ |
TOP = l.findVarHandle(WorkQueue.class, "top", int.class); |
1132 |
|
} catch (ReflectiveOperationException e) { |
1133 |
|
throw new Error(e); |
1134 |
|
} |
1327 |
|
wt.setDaemon(true); // configure thread |
1328 |
|
if ((handler = ueh) != null) |
1329 |
|
wt.setUncaughtExceptionHandler(handler); |
1330 |
– |
WorkQueue w = new WorkQueue(this, wt); |
1330 |
|
int tid = 0; // for thread name |
1331 |
< |
int fifo = mode & FIFO; |
1331 |
> |
int idbits = mode & FIFO; |
1332 |
|
String prefix = workerNamePrefix; |
1333 |
+ |
WorkQueue w = new WorkQueue(this, wt); |
1334 |
|
if (prefix != null) { |
1335 |
|
synchronized (prefix) { |
1336 |
|
WorkQueue[] ws = workQueues; int n; |
1337 |
|
int s = indexSeed += SEED_INCREMENT; |
1338 |
+ |
idbits |= (s & ~(SMASK | FIFO | DORMANT)); |
1339 |
|
if (ws != null && (n = ws.length) > 1) { |
1340 |
|
int m = n - 1; |
1341 |
< |
tid = s & m; |
1341 |
< |
int i = m & ((s << 1) | 1); // odd-numbered indices |
1341 |
> |
tid = m & ((s << 1) | 1); // odd-numbered indices |
1342 |
|
for (int probes = n >>> 1;;) { // find empty slot |
1343 |
|
WorkQueue q; |
1344 |
< |
if ((q = ws[i]) == null || q.phase == QUIET) |
1344 |
> |
if ((q = ws[tid]) == null || q.phase == QUIET) |
1345 |
|
break; |
1346 |
|
else if (--probes == 0) { |
1347 |
< |
i = n | 1; // resize below |
1347 |
> |
tid = n | 1; // resize below |
1348 |
|
break; |
1349 |
|
} |
1350 |
|
else |
1351 |
< |
i = (i + 2) & m; |
1351 |
> |
tid = (tid + 2) & m; |
1352 |
|
} |
1353 |
+ |
w.phase = w.id = tid | idbits; // now publishable |
1354 |
|
|
1355 |
< |
int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT)); |
1356 |
< |
w.phase = w.id = id; // now publishable |
1356 |
< |
|
1357 |
< |
if (i < n) |
1358 |
< |
ws[i] = w; |
1355 |
> |
if (tid < n) |
1356 |
> |
ws[tid] = w; |
1357 |
|
else { // expand array |
1358 |
|
int an = n << 1; |
1359 |
|
WorkQueue[] as = new WorkQueue[an]; |
1360 |
< |
as[i] = w; |
1360 |
> |
as[tid] = w; |
1361 |
|
int am = an - 1; |
1362 |
|
for (int j = 0; j < n; ++j) { |
1363 |
|
WorkQueue v; // copy external queue |
1390 |
|
int phase = 0; |
1391 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1392 |
|
Object lock = workerNamePrefix; |
1393 |
+ |
int wid = w.id; |
1394 |
|
long ns = (long)w.nsteals & 0xffffffffL; |
1396 |
– |
int idx = w.id & SMASK; |
1395 |
|
if (lock != null) { |
1398 |
– |
WorkQueue[] ws; // remove index from array |
1396 |
|
synchronized (lock) { |
1397 |
< |
if ((ws = workQueues) != null && ws.length > idx && |
1398 |
< |
ws[idx] == w) |
1399 |
< |
ws[idx] = null; |
1397 |
> |
WorkQueue[] ws; int n, i; // remove index from array |
1398 |
> |
if ((ws = workQueues) != null && (n = ws.length) > 0 && |
1399 |
> |
ws[i = wid & (n - 1)] == w) |
1400 |
> |
ws[i] = null; |
1401 |
|
stealCount += ns; |
1402 |
|
} |
1403 |
|
} |
1449 |
|
Thread vt = v.owner; |
1450 |
|
if (sp == vp && CTL.compareAndSet(this, c, nc)) { |
1451 |
|
v.phase = np; |
1452 |
< |
if (v.source < 0) |
1452 |
> |
if (vt != null && v.source < 0) |
1453 |
|
LockSupport.unpark(vt); |
1454 |
|
break; |
1455 |
|
} |
1490 |
|
long nc = ((long)v.stackPred & SP_MASK) | uc; |
1491 |
|
if (vp == sp && CTL.compareAndSet(this, c, nc)) { |
1492 |
|
v.phase = np; |
1493 |
< |
if (v.source < 0) |
1493 |
> |
if (vt != null && v.source < 0) |
1494 |
|
LockSupport.unpark(vt); |
1495 |
|
return (wp < 0) ? -1 : 1; |
1496 |
|
} |
1547 |
|
* See above for explanation. |
1548 |
|
*/ |
1549 |
|
final void runWorker(WorkQueue w) { |
1550 |
< |
WorkQueue[] ws; |
1551 |
< |
w.growArray(); // allocate queue |
1552 |
< |
int r = w.id ^ ThreadLocalRandom.nextSecondarySeed(); |
1553 |
< |
if (r == 0) // initial nonzero seed |
1554 |
< |
r = 1; |
1555 |
< |
int lastSignalId = 0; // avoid unneeded signals |
1556 |
< |
while ((ws = workQueues) != null) { |
1557 |
< |
boolean nonempty = false; // scan |
1558 |
< |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1559 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1560 |
< |
if ((i = r & m) >= 0 && i < n && // always true |
1561 |
< |
(q = ws[i]) != null && (b = q.base) - q.top < 0 && |
1562 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1563 |
< |
int qid = q.id; // (never zero) |
1564 |
< |
int index = (al - 1) & b; |
1565 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1566 |
< |
QA.getAcquire(a, index); |
1567 |
< |
if (t != null && b++ == q.base && |
1568 |
< |
QA.compareAndSet(a, index, t, null)) { |
1569 |
< |
if ((q.base = b) - q.top < 0 && qid != lastSignalId) |
1570 |
< |
signalWork(); // propagate signal |
1571 |
< |
w.source = lastSignalId = qid; |
1572 |
< |
t.doExec(); |
1573 |
< |
if ((w.id & FIFO) != 0) // run remaining locals |
1574 |
< |
w.localPollAndExec(POLL_LIMIT); |
1575 |
< |
else |
1576 |
< |
w.localPopAndExec(POLL_LIMIT); |
1577 |
< |
ForkJoinWorkerThread thread = w.owner; |
1578 |
< |
++w.nsteals; |
1579 |
< |
w.source = 0; // now idle |
1580 |
< |
if (thread != null) |
1581 |
< |
thread.afterTopLevelExec(); |
1550 |
> |
int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng |
1551 |
> |
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize |
1552 |
> |
for (;;) { |
1553 |
> |
int phase; |
1554 |
> |
if (scan(w, r)) { // scan until apparently empty |
1555 |
> |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) |
1556 |
> |
} |
1557 |
> |
else if ((phase = w.phase) >= 0) { // enqueue, then rescan |
1558 |
> |
long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; |
1559 |
> |
long c, nc; |
1560 |
> |
do { |
1561 |
> |
w.stackPred = (int)(c = ctl); |
1562 |
> |
nc = ((c - RC_UNIT) & UC_MASK) | np; |
1563 |
> |
} while (!CTL.weakCompareAndSet(this, c, nc)); |
1564 |
> |
} |
1565 |
> |
else { // already queued |
1566 |
> |
int pred = w.stackPred; |
1567 |
> |
Thread.interrupted(); // clear before park |
1568 |
> |
w.source = DORMANT; // enable signal |
1569 |
> |
long c = ctl; |
1570 |
> |
int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); |
1571 |
> |
if (md < 0) // terminating |
1572 |
> |
break; |
1573 |
> |
else if (rc <= 0 && (md & SHUTDOWN) != 0 && |
1574 |
> |
tryTerminate(false, false)) |
1575 |
> |
break; // quiescent shutdown |
1576 |
> |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1577 |
> |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1578 |
> |
long d = keepAlive + System.currentTimeMillis(); |
1579 |
> |
LockSupport.parkUntil(this, d); |
1580 |
> |
if (ctl == c && // drop on timeout if all idle |
1581 |
> |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1582 |
> |
CTL.compareAndSet(this, c, nc)) { |
1583 |
> |
w.phase = QUIET; |
1584 |
> |
break; |
1585 |
|
} |
1585 |
– |
nonempty = true; |
1586 |
|
} |
1587 |
< |
else if (nonempty) |
1588 |
< |
break; |
1589 |
< |
else |
1590 |
< |
++r; |
1587 |
> |
else if (w.phase < 0) |
1588 |
> |
LockSupport.park(this); // OK if spuriously woken |
1589 |
> |
w.source = 0; // disable signal |
1590 |
|
} |
1591 |
+ |
} |
1592 |
+ |
} |
1593 |
|
|
1594 |
< |
if (nonempty) { // move (xorshift) |
1595 |
< |
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; |
1596 |
< |
} |
1597 |
< |
else { |
1598 |
< |
int phase; |
1599 |
< |
lastSignalId = 0; // clear for next scan |
1600 |
< |
if ((phase = w.phase) >= 0) { // enqueue |
1601 |
< |
int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED; |
1602 |
< |
long c, nc; |
1603 |
< |
do { |
1604 |
< |
w.stackPred = (int)(c = ctl); |
1605 |
< |
nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np); |
1606 |
< |
} while (!CTL.weakCompareAndSet(this, c, nc)); |
1607 |
< |
} |
1608 |
< |
else { // already queued |
1609 |
< |
int pred = w.stackPred; |
1610 |
< |
w.source = DORMANT; // enable signal |
1611 |
< |
for (int steps = 0;;) { |
1612 |
< |
int md, rc; long c; |
1613 |
< |
if (w.phase >= 0) { |
1614 |
< |
w.source = 0; |
1615 |
< |
break; |
1615 |
< |
} |
1616 |
< |
else if ((md = mode) < 0) // shutting down |
1617 |
< |
return; |
1618 |
< |
else if ((rc = ((md & SMASK) + // possibly quiescent |
1619 |
< |
(int)((c = ctl) >> RC_SHIFT))) <= 0 && |
1620 |
< |
(md & SHUTDOWN) != 0 && |
1621 |
< |
tryTerminate(false, false)) |
1622 |
< |
return; // help terminate |
1623 |
< |
else if ((++steps & 1) == 0) |
1624 |
< |
Thread.interrupted(); // clear between parks |
1625 |
< |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1626 |
< |
long d = keepAlive + System.currentTimeMillis(); |
1627 |
< |
LockSupport.parkUntil(this, d); |
1628 |
< |
if (ctl == c && |
1629 |
< |
d - System.currentTimeMillis() <= TIMEOUT_SLOP) { |
1630 |
< |
long nc = ((UC_MASK & (c - TC_UNIT)) | |
1631 |
< |
(SP_MASK & pred)); |
1632 |
< |
if (CTL.compareAndSet(this, c, nc)) { |
1633 |
< |
w.phase = QUIET; |
1634 |
< |
return; // drop on timeout |
1635 |
< |
} |
1636 |
< |
} |
1594 |
> |
/** |
1595 |
> |
* Scans for and executes top-level task |
1596 |
> |
* @return true if found an apparently non-empty queue (and |
1597 |
> |
* possibly ran task) |
1598 |
> |
*/ |
1599 |
> |
private boolean scan(WorkQueue w, int r) { |
1600 |
> |
WorkQueue[] ws; int n; |
1601 |
> |
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { |
1602 |
> |
for (int m = n - 1, j = r & m;;) { |
1603 |
> |
WorkQueue q; int b, d; |
1604 |
> |
if ((q = ws[j]) != null && (d = (b = q.base) - q.top) < 0) { |
1605 |
> |
int qid = q.id; |
1606 |
> |
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; |
1607 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1608 |
> |
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); |
1609 |
> |
if (q.base == b++ && t != null && |
1610 |
> |
QA.compareAndSet(a, k, t, null)) { |
1611 |
> |
q.base = b; |
1612 |
> |
w.source = qid; |
1613 |
> |
if (d != -1) |
1614 |
> |
signalWork(); |
1615 |
> |
w.topLevelExec(t, q); |
1616 |
|
} |
1638 |
– |
else |
1639 |
– |
LockSupport.park(this); |
1617 |
|
} |
1618 |
+ |
return true; |
1619 |
|
} |
1620 |
+ |
else if (--n > 0) |
1621 |
+ |
j = (j + 1) & m; |
1622 |
+ |
else |
1623 |
+ |
break; |
1624 |
|
} |
1625 |
|
} |
1626 |
+ |
return false; |
1627 |
|
} |
1628 |
|
|
1629 |
|
/** |
1639 |
|
*/ |
1640 |
|
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1641 |
|
int s = 0; |
1642 |
+ |
int seed = ThreadLocalRandom.nextSecondarySeed(); |
1643 |
|
if (w != null && task != null && |
1644 |
|
(!(task instanceof CountedCompleter) || |
1645 |
< |
(s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) { |
1645 |
> |
(s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) { |
1646 |
|
w.tryRemoveAndExec(task); |
1647 |
|
int src = w.source, id = w.id; |
1648 |
+ |
int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; |
1649 |
|
s = task.status; |
1650 |
|
while (s >= 0) { |
1651 |
|
WorkQueue[] ws; |
1652 |
< |
boolean nonempty = false; |
1653 |
< |
int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices |
1654 |
< |
if ((ws = workQueues) != null) { // scan for matching id |
1655 |
< |
for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) { |
1656 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1657 |
< |
if ((i = (r + j) & m) >= 0 && i < n && |
1658 |
< |
(q = ws[i]) != null && q.source == id && |
1659 |
< |
(b = q.base) - q.top < 0 && |
1675 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1676 |
< |
int qid = q.id; |
1677 |
< |
int index = (al - 1) & b; |
1652 |
> |
int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; |
1653 |
> |
while (n > 0) { |
1654 |
> |
WorkQueue q; int b; |
1655 |
> |
if ((q = ws[r & m]) != null && q.source == id && |
1656 |
> |
(b = q.base) - q.top < 0) { |
1657 |
> |
ForkJoinTask<?>[] a; int cap, k; |
1658 |
> |
int qid = q.id; |
1659 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1660 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1661 |
< |
QA.getAcquire(a, index); |
1662 |
< |
if (t != null && b++ == q.base && id == q.source && |
1663 |
< |
QA.compareAndSet(a, index, t, null)) { |
1661 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
1662 |
> |
if (q.source == id && q.base == b++ && |
1663 |
> |
t != null && QA.compareAndSet(a, k, t, null)) { |
1664 |
|
q.base = b; |
1665 |
|
w.source = qid; |
1666 |
|
t.doExec(); |
1667 |
|
w.source = src; |
1668 |
|
} |
1687 |
– |
nonempty = true; |
1688 |
– |
break; |
1669 |
|
} |
1670 |
+ |
break; |
1671 |
+ |
} |
1672 |
+ |
else { |
1673 |
+ |
r += step; |
1674 |
+ |
--n; |
1675 |
|
} |
1676 |
|
} |
1677 |
|
if ((s = task.status) < 0) |
1678 |
|
break; |
1679 |
< |
else if (!nonempty) { |
1679 |
> |
else if (n == 0) { // empty scan |
1680 |
|
long ms, ns; int block; |
1681 |
|
if (deadline == 0L) |
1682 |
|
ms = 0L; // untimed |
1701 |
|
* find tasks either. |
1702 |
|
*/ |
1703 |
|
final void helpQuiescePool(WorkQueue w) { |
1704 |
< |
int prevSrc = w.source, fifo = w.id & FIFO; |
1704 |
> |
int prevSrc = w.source; |
1705 |
> |
int seed = ThreadLocalRandom.nextSecondarySeed(); |
1706 |
> |
int r = seed >>> 16, step = r | 1; |
1707 |
|
for (int source = prevSrc, released = -1;;) { // -1 until known |
1708 |
< |
WorkQueue[] ws; |
1709 |
< |
if (fifo != 0) |
1710 |
< |
w.localPollAndExec(0); |
1711 |
< |
else |
1725 |
< |
w.localPopAndExec(0); |
1726 |
< |
if (released == -1 && w.phase >= 0) |
1708 |
> |
ForkJoinTask<?> localTask; WorkQueue[] ws; |
1709 |
> |
while ((localTask = w.nextLocalTask()) != null) |
1710 |
> |
localTask.doExec(); |
1711 |
> |
if (w.phase >= 0 && released == -1) |
1712 |
|
released = 1; |
1713 |
|
boolean quiet = true, empty = true; |
1714 |
< |
int r = ThreadLocalRandom.nextSecondarySeed(); |
1715 |
< |
if ((ws = workQueues) != null) { |
1716 |
< |
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { |
1717 |
< |
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; |
1718 |
< |
if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) { |
1719 |
< |
if ((b = q.base) - q.top < 0 && |
1720 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1721 |
< |
int qid = q.id; |
1714 |
> |
int n = (ws = workQueues) == null ? 0 : ws.length; |
1715 |
> |
for (int m = n - 1; n > 0; r += step, --n) { |
1716 |
> |
WorkQueue q; int b; |
1717 |
> |
if ((q = ws[r & m]) != null) { |
1718 |
> |
int qs = q.source; |
1719 |
> |
if ((b = q.base) - q.top < 0) { |
1720 |
> |
quiet = empty = false; |
1721 |
> |
ForkJoinTask<?>[] a; int cap, k; |
1722 |
> |
int qid = q.id; |
1723 |
> |
if ((a = q.array) != null && (cap = a.length) > 0) { |
1724 |
|
if (released == 0) { // increment |
1725 |
|
released = 1; |
1726 |
|
CTL.getAndAdd(this, RC_UNIT); |
1727 |
|
} |
1741 |
– |
int index = (al - 1) & b; |
1728 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1729 |
< |
QA.getAcquire(a, index); |
1730 |
< |
if (t != null && b++ == q.base && |
1731 |
< |
QA.compareAndSet(a, index, t, null)) { |
1729 |
> |
QA.getAcquire(a, k = (cap - 1) & b); |
1730 |
> |
if (q.base == b++ && t != null && |
1731 |
> |
QA.compareAndSet(a, k, t, null)) { |
1732 |
|
q.base = b; |
1733 |
< |
w.source = source = q.id; |
1733 |
> |
w.source = qid; |
1734 |
|
t.doExec(); |
1735 |
|
w.source = source = prevSrc; |
1736 |
|
} |
1751 |
– |
quiet = empty = false; |
1752 |
– |
break; |
1737 |
|
} |
1738 |
< |
else if ((q.source & QUIET) == 0) |
1755 |
< |
quiet = false; |
1738 |
> |
break; |
1739 |
|
} |
1740 |
+ |
else if ((qs & QUIET) == 0) |
1741 |
+ |
quiet = false; |
1742 |
|
} |
1743 |
|
} |
1744 |
|
if (quiet) { |
1780 |
|
origin = r & m; |
1781 |
|
step = h | 1; |
1782 |
|
} |
1783 |
< |
for (int k = origin, oldSum = 0, checkSum = 0;;) { |
1784 |
< |
WorkQueue q; int b, al; ForkJoinTask<?>[] a; |
1785 |
< |
if ((q = ws[k]) != null) { |
1786 |
< |
checkSum += b = q.base; |
1787 |
< |
if (b - q.top < 0 && |
1788 |
< |
(a = q.array) != null && (al = a.length) > 0) { |
1789 |
< |
int index = (al - 1) & b; |
1790 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1806 |
< |
QA.getAcquire(a, index); |
1807 |
< |
if (t != null && b++ == q.base && |
1808 |
< |
QA.compareAndSet(a, index, t, null)) { |
1809 |
< |
q.base = b; |
1783 |
> |
boolean nonempty = false; |
1784 |
> |
for (int i = origin, oldSum = 0, checkSum = 0;;) { |
1785 |
> |
WorkQueue q; |
1786 |
> |
if ((q = ws[i]) != null) { |
1787 |
> |
int b; ForkJoinTask<?> t; |
1788 |
> |
if ((b = q.base) - q.top < 0) { |
1789 |
> |
nonempty = true; |
1790 |
> |
if ((t = q.poll()) != null) |
1791 |
|
return t; |
1811 |
– |
} |
1812 |
– |
else |
1813 |
– |
break; // restart |
1792 |
|
} |
1793 |
+ |
else |
1794 |
+ |
checkSum += b + q.id; |
1795 |
|
} |
1796 |
< |
if ((k = (k + step) & m) == origin) { |
1797 |
< |
if (oldSum == (oldSum = checkSum)) |
1796 |
> |
if ((i = (i + step) & m) == origin) { |
1797 |
> |
if (!nonempty && oldSum == (oldSum = checkSum)) |
1798 |
|
break rescan; |
1799 |
|
checkSum = 0; |
1800 |
+ |
nonempty = false; |
1801 |
|
} |
1802 |
|
} |
1803 |
|
} |
1811 |
|
*/ |
1812 |
|
final ForkJoinTask<?> nextTaskFor(WorkQueue w) { |
1813 |
|
ForkJoinTask<?> t; |
1814 |
< |
if (w != null && |
1815 |
< |
(t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null) |
1816 |
< |
return t; |
1836 |
< |
else |
1837 |
< |
return pollScan(false); |
1814 |
> |
if (w == null || (t = w.nextLocalTask()) == null) |
1815 |
> |
t = pollScan(false); |
1816 |
> |
return t; |
1817 |
|
} |
1818 |
|
|
1819 |
|
// External operations |
1831 |
|
r = ThreadLocalRandom.getProbe(); |
1832 |
|
} |
1833 |
|
for (;;) { |
1834 |
+ |
WorkQueue q; |
1835 |
|
int md = mode, n; |
1836 |
|
WorkQueue[] ws = workQueues; |
1837 |
|
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) |
1838 |
|
throw new RejectedExecutionException(); |
1839 |
< |
else { |
1840 |
< |
WorkQueue q; |
1841 |
< |
boolean push = false, grow = false; |
1842 |
< |
if ((q = ws[(n - 1) & r & SQMASK]) == null) { |
1843 |
< |
Object lock = workerNamePrefix; |
1844 |
< |
int qid = (r | QUIET) & ~(FIFO | OWNED); |
1845 |
< |
q = new WorkQueue(this, null); |
1846 |
< |
q.id = qid; |
1847 |
< |
q.source = QUIET; |
1848 |
< |
q.phase = QLOCK; // lock queue |
1849 |
< |
if (lock != null) { |
1850 |
< |
synchronized (lock) { // lock pool to install |
1851 |
< |
int i; |
1852 |
< |
if ((ws = workQueues) != null && |
1853 |
< |
(n = ws.length) > 0 && |
1874 |
< |
ws[i = qid & (n - 1) & SQMASK] == null) { |
1875 |
< |
ws[i] = q; |
1876 |
< |
push = grow = true; |
1877 |
< |
} |
1878 |
< |
} |
1839 |
> |
else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue |
1840 |
> |
int qid = (r | QUIET) & ~(FIFO | OWNED); |
1841 |
> |
Object lock = workerNamePrefix; |
1842 |
> |
ForkJoinTask<?>[] qa = |
1843 |
> |
new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; |
1844 |
> |
q = new WorkQueue(this, null); |
1845 |
> |
q.array = qa; |
1846 |
> |
q.id = qid; |
1847 |
> |
q.source = QUIET; |
1848 |
> |
if (lock != null) { // unless disabled, lock pool to install |
1849 |
> |
synchronized (lock) { |
1850 |
> |
WorkQueue[] vs; int i, vn; |
1851 |
> |
if ((vs = workQueues) != null && (vn = vs.length) > 0 && |
1852 |
> |
vs[i = qid & (vn - 1) & SQMASK] == null) |
1853 |
> |
vs[i] = q; // else another thread already installed |
1854 |
|
} |
1855 |
|
} |
1856 |
< |
else if (q.tryLockSharedQueue()) { |
1857 |
< |
int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a; |
1858 |
< |
if ((a = q.array) != null && (al = a.length) > 0 && |
1859 |
< |
al - 1 + (d = b - s) > 0) { |
1860 |
< |
a[(al - 1) & s] = task; |
1886 |
< |
q.top = s + 1; // relaxed writes OK here |
1887 |
< |
q.phase = 0; |
1888 |
< |
if (d < 0 && q.base - s < -1) |
1889 |
< |
break; // no signal needed |
1890 |
< |
} |
1891 |
< |
else |
1892 |
< |
grow = true; |
1893 |
< |
push = true; |
1894 |
< |
} |
1895 |
< |
if (push) { |
1896 |
< |
if (grow) { |
1897 |
< |
try { |
1898 |
< |
q.growArray(); |
1899 |
< |
int s = q.top, al; ForkJoinTask<?>[] a; |
1900 |
< |
if ((a = q.array) != null && (al = a.length) > 0) { |
1901 |
< |
a[(al - 1) & s] = task; |
1902 |
< |
q.top = s + 1; |
1903 |
< |
} |
1904 |
< |
} finally { |
1905 |
< |
q.phase = 0; |
1906 |
< |
} |
1907 |
< |
} |
1856 |
> |
} |
1857 |
> |
else if (!q.tryLockPhase()) // move if busy |
1858 |
> |
r = ThreadLocalRandom.advanceProbe(r); |
1859 |
> |
else { |
1860 |
> |
if (q.lockedPush(task)) |
1861 |
|
signalWork(); |
1862 |
< |
break; |
1910 |
< |
} |
1911 |
< |
else // move if busy |
1912 |
< |
r = ThreadLocalRandom.advanceProbe(r); |
1862 |
> |
return; |
1863 |
|
} |
1864 |
|
} |
1865 |
|
} |
1901 |
|
return ((ws = workQueues) != null && |
1902 |
|
(n = ws.length) > 0 && |
1903 |
|
(w = ws[(n - 1) & r & SQMASK]) != null && |
1904 |
< |
w.trySharedUnpush(task)); |
1904 |
> |
w.tryLockedUnpush(task)); |
1905 |
|
} |
1906 |
|
|
1907 |
|
/** |
1912 |
|
WorkQueue[] ws; WorkQueue w; int n; |
1913 |
|
return ((ws = workQueues) != null && (n = ws.length) > 0 && |
1914 |
|
(w = ws[(n - 1) & r & SQMASK]) != null) ? |
1915 |
< |
w.sharedHelpCC(task, maxTasks) : 0; |
1915 |
> |
w.helpCC(task, maxTasks, true) : 0; |
1916 |
|
} |
1917 |
|
|
1918 |
|
/** |
1927 |
|
*/ |
1928 |
|
final int helpComplete(WorkQueue w, CountedCompleter<?> task, |
1929 |
|
int maxTasks) { |
1930 |
< |
return (w == null) ? 0 : w.localHelpCC(task, maxTasks); |
1930 |
> |
return (w == null) ? 0 : w.helpCC(task, maxTasks, false); |
1931 |
|
} |
1932 |
|
|
1933 |
|
/** |
2057 |
|
} catch (Throwable ignore) { |
2058 |
|
} |
2059 |
|
} |
2060 |
< |
checkSum += w.base + w.id; |
2060 |
> |
checkSum += w.base + w.phase; |
2061 |
|
} |
2062 |
|
} |
2063 |
|
} |
2550 |
|
* @return the number of worker threads |
2551 |
|
*/ |
2552 |
|
public int getRunningThreadCount() { |
2603 |
– |
int rc = 0; |
2553 |
|
WorkQueue[] ws; WorkQueue w; |
2554 |
+ |
VarHandle.acquireFence(); |
2555 |
+ |
int rc = 0; |
2556 |
|
if ((ws = workQueues) != null) { |
2557 |
|
for (int i = 1; i < ws.length; i += 2) { |
2558 |
|
if ((w = ws[i]) != null && w.isApparentlyUnblocked()) |
2600 |
|
if ((ws = workQueues) != null) { |
2601 |
|
for (int i = 1; i < ws.length; i += 2) { |
2602 |
|
if ((v = ws[i]) != null) { |
2603 |
< |
if ((v.source & QUIET) == 0) |
2603 |
> |
if (v.source > 0) |
2604 |
|
return false; |
2605 |
|
--tc; |
2606 |
|
} |
2646 |
|
* @return the number of queued tasks |
2647 |
|
*/ |
2648 |
|
public long getQueuedTaskCount() { |
2698 |
– |
long count = 0; |
2649 |
|
WorkQueue[] ws; WorkQueue w; |
2650 |
+ |
VarHandle.acquireFence(); |
2651 |
+ |
int count = 0; |
2652 |
|
if ((ws = workQueues) != null) { |
2653 |
|
for (int i = 1; i < ws.length; i += 2) { |
2654 |
|
if ((w = ws[i]) != null) |
2666 |
|
* @return the number of queued submissions |
2667 |
|
*/ |
2668 |
|
public int getQueuedSubmissionCount() { |
2717 |
– |
int count = 0; |
2669 |
|
WorkQueue[] ws; WorkQueue w; |
2670 |
+ |
VarHandle.acquireFence(); |
2671 |
+ |
int count = 0; |
2672 |
|
if ((ws = workQueues) != null) { |
2673 |
|
for (int i = 0; i < ws.length; i += 2) { |
2674 |
|
if ((w = ws[i]) != null) |
2686 |
|
*/ |
2687 |
|
public boolean hasQueuedSubmissions() { |
2688 |
|
WorkQueue[] ws; WorkQueue w; |
2689 |
+ |
VarHandle.acquireFence(); |
2690 |
|
if ((ws = workQueues) != null) { |
2691 |
|
for (int i = 0; i < ws.length; i += 2) { |
2692 |
|
if ((w = ws[i]) != null && !w.isEmpty()) |
2725 |
|
* @return the number of elements transferred |
2726 |
|
*/ |
2727 |
|
protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { |
2774 |
– |
int count = 0; |
2728 |
|
WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; |
2729 |
+ |
VarHandle.acquireFence(); |
2730 |
+ |
int count = 0; |
2731 |
|
if ((ws = workQueues) != null) { |
2732 |
|
for (int i = 0; i < ws.length; ++i) { |
2733 |
|
if ((w = ws[i]) != null) { |
2750 |
|
*/ |
2751 |
|
public String toString() { |
2752 |
|
// Use a single pass through workQueues to collect counts |
2753 |
< |
long qt = 0L, qs = 0L; int rc = 0; |
2753 |
> |
int md = mode; // read volatile fields first |
2754 |
> |
long c = ctl; |
2755 |
|
long st = stealCount; |
2756 |
+ |
long qt = 0L, qs = 0L; int rc = 0; |
2757 |
|
WorkQueue[] ws; WorkQueue w; |
2758 |
|
if ((ws = workQueues) != null) { |
2759 |
|
for (int i = 0; i < ws.length; ++i) { |
2771 |
|
} |
2772 |
|
} |
2773 |
|
|
2817 |
– |
int md = mode; |
2774 |
|
int pc = (md & SMASK); |
2819 |
– |
long c = ctl; |
2775 |
|
int tc = pc + (short)(c >>> TC_SHIFT); |
2776 |
|
int ac = pc + (int)(c >> RC_SHIFT); |
2777 |
|
if (ac < 0) // ignore transient negative |
3057 |
|
*/ |
3058 |
|
public static void managedBlock(ManagedBlocker blocker) |
3059 |
|
throws InterruptedException { |
3060 |
+ |
if (blocker == null) throw new NullPointerException(); |
3061 |
|
ForkJoinPool p; |
3062 |
|
ForkJoinWorkerThread wt; |
3063 |
|
WorkQueue w; |
3090 |
|
* available or blocker is released. |
3091 |
|
*/ |
3092 |
|
static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { |
3093 |
< |
if (blocker != null && (e instanceof ForkJoinPool)) { |
3093 |
> |
if (e instanceof ForkJoinPool) { |
3094 |
|
WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; |
3095 |
|
ForkJoinPool p = (ForkJoinPool)e; |
3096 |
|
Thread thread = Thread.currentThread(); |
3102 |
|
w = ws[(n - 1) & r & SQMASK]; |
3103 |
|
else |
3104 |
|
w = null; |
3105 |
< |
if (w != null) { |
3106 |
< |
for (;;) { |
3151 |
< |
int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a; |
3152 |
< |
if ((a = w.array) != null && (d = b - s) < 0 && |
3153 |
< |
(al = a.length) > 0) { |
3154 |
< |
int index = (al - 1) & b; |
3155 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
3156 |
< |
QA.getAcquire(a, index); |
3157 |
< |
if (blocker.isReleasable()) |
3158 |
< |
break; |
3159 |
< |
else if (b++ == w.base) { |
3160 |
< |
if (t == null) { |
3161 |
< |
if (d == -1) |
3162 |
< |
break; |
3163 |
< |
} |
3164 |
< |
else if (!(t instanceof CompletableFuture. |
3165 |
< |
AsynchronousCompletionTask)) |
3166 |
< |
break; |
3167 |
< |
else if (QA.compareAndSet(a, index, t, null)) { |
3168 |
< |
w.base = b; |
3169 |
< |
t.doExec(); |
3170 |
< |
} |
3171 |
< |
} |
3172 |
< |
} |
3173 |
< |
else |
3174 |
< |
break; |
3175 |
< |
} |
3176 |
< |
} |
3105 |
> |
if (w != null) |
3106 |
> |
w.helpAsyncBlocker(blocker); |
3107 |
|
} |
3108 |
|
} |
3109 |
|
|
3122 |
|
// VarHandle mechanics |
3123 |
|
private static final VarHandle CTL; |
3124 |
|
private static final VarHandle MODE; |
3125 |
< |
private static final VarHandle QA; |
3125 |
> |
static final VarHandle QA; |
3126 |
|
|
3127 |
|
static { |
3128 |
|
try { |
3185 |
|
} |
3186 |
|
} |
3187 |
|
} |
3188 |
+ |
|