ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.344 by jsr166, Thu Feb 1 01:31:11 2018 UTC vs.
Revision 1.345 by dl, Mon Feb 12 20:01:40 2018 UTC

# Line 155 | Line 155 | public class ForkJoinPool extends Abstra
155       * functionality and control for a set of worker threads:
156       * Submissions from non-FJ threads enter into submission queues.
157       * Workers take these tasks and typically split them into subtasks
158 <     * that may be stolen by other workers.  Preference rules give
159 <     * first priority to processing tasks from their own queues (LIFO
160 <     * or FIFO, depending on mode), then to randomized FIFO steals of
161 <     * tasks in other queues.  This framework began as vehicle for
162 <     * supporting tree-structured parallelism using work-stealing.
163 <     * Over time, its scalability advantages led to extensions and
164 <     * changes to better support more diverse usage contexts.  Because
165 <     * most internal methods and nested classes are interrelated,
166 <     * their main rationale and descriptions are presented here;
167 <     * individual methods and nested classes contain only brief
168 <     * comments about details.
158 >     * that may be stolen by other workers. Work-stealing based on
159 >     * randomized scans generally leads to better throughput than
160 >     * "work dealing" in which producers assign tasks to idle threads,
161 >     * in part because threads that have finished other tasks before
162 >     * the signalled thread wakes up (which can be a long time) can
163 >     * take the task instead.  Preference rules give first priority to
164 >     * processing tasks from their own queues (LIFO or FIFO, depending
165 >     * on mode), then to randomized FIFO steals of tasks in other
166 >     * queues.  This framework began as vehicle for supporting
167 >     * tree-structured parallelism using work-stealing.  Over time,
168 >     * its scalability advantages led to extensions and changes to
169 >     * better support more diverse usage contexts.  Because most
170 >     * internal methods and nested classes are interrelated, their
171 >     * main rationale and descriptions are presented here; individual
172 >     * methods and nested classes contain only brief comments about
173 >     * details.
174       *
175       * WorkQueues
176       * ==========
# Line 198 | Line 203 | public class ForkJoinPool extends Abstra
203       *
204       * (The actual code needs to null-check and size-check the array,
205       * uses masking, not mod, for indexing a power-of-two-sized array,
206 <     * properly fences accesses, and possibly signals waiting workers
207 <     * to start scanning -- see below.)  Both a successful pop and
208 <     * poll mainly entail a CAS of a slot from non-null to null.
206 >     * adds a release fence for publication, and possibly signals
207 >     * waiting workers to start scanning -- see below.)  Both a
208 >     * successful pop and poll mainly entail a CAS of a slot from
209 >     * non-null to null.
210       *
211       * The pop operation (always performed by owner) is:
212       *   if ((the task at top slot is not null) and
# Line 212 | Line 218 | public class ForkJoinPool extends Abstra
218       *        (CAS slot to null))
219       *           increment base and return task;
220       *
221 <     * There are several variants of each of these. In particular,
222 <     * almost all uses of poll occur within scan operations that also
223 <     * interleave contention tracking (with associated code sprawl.)
221 >     * There are several variants of each of these. Most uses occur
222 >     * within operations that also interleave contention or emptiness
223 >     * tracking or inspection of elements before extracting them, so
224 >     * must interleave these with the above code. When performed by
225 >     * owner, getAndSet is used instead of CAS (see for example method
226 >     * nextLocalTask) which is usually more efficient, and possible
227 >     * because the top index cannot independently change during the
228 >     * operation.
229       *
230       * Memory ordering.  See "Correct and Efficient Work-Stealing for
231       * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
# Line 223 | Line 234 | public class ForkJoinPool extends Abstra
234       * algorithms similar to (but different than) the one used here.
235       * Extracting tasks in array slots via (fully fenced) CAS provides
236       * primary synchronization. The base and top indices imprecisely
237 <     * guide where to extract from. We do not always require strict
238 <     * orderings of array and index updates, so sometimes let them be
239 <     * subject to compiler and processor reorderings. However, the
240 <     * volatile "base" index also serves as a basis for memory
241 <     * ordering: Slot accesses are preceded by a read of base,
242 <     * ensuring happens-before ordering with respect to stealers (so
243 <     * the slots themselves can be read via plain array reads.)  The
244 <     * only other memory orderings relied on are maintained in the
245 <     * course of signalling and activation (see below).  A check that
246 <     * base == top indicates (momentary) emptiness, but otherwise may
247 <     * err on the side of possibly making the queue appear nonempty
248 <     * when a push, pop, or poll have not fully committed, or making
249 <     * it appear empty when an update of top has not yet been visibly
250 <     * written.  (Method isEmpty() checks the case of a partially
251 <     * completed removal of the last element.)  Because of this, the
252 <     * poll operation, considered individually, is not wait-free. One
253 <     * thief cannot successfully continue until another in-progress
254 <     * one (or, if previously empty, a push) visibly completes.
255 <     * However, in the aggregate, we ensure at least probabilistic
237 >     * guide where to extract from. We do not usually require strict
238 >     * orderings of array and index updates. Many index accesses use
239 >     * plain mode, with ordering constrained by surrounding context
240 >     * (usually with respect to element CASes or the two WorkQueue
241 >     * volatile fields source and phase). When not otherwise already
242 >     * constrained, reads of "base" by queue owners use acquire-mode,
243 >     * and some externally callable methods preface accesses with
244 >     * acquire fences.  Additionally, to ensure that index update
245 >     * writes are not coalesced or postponed in loops etc, "opaque"
246 >     * mode is used in a few cases where timely writes are not
247 >     * otherwise ensured. The "locked" versions of push- and pop-
248 >     * based methods for shared queues differ from owned versions
249 >     * because locking already forces some of the ordering.
250 >     *
251 >     * Because indices and slot contents cannot always be consistent,
252 >     * a check that base == top indicates (momentary) emptiness, but
253 >     * otherwise may err on the side of possibly making the queue
254 >     * appear nonempty when a push, pop, or poll have not fully
255 >     * committed, or making it appear empty when an update of top has
256 >     * not yet been visibly written.  (Method isEmpty() checks the
257 >     * case of a partially completed removal of the last element.)
258 >     * Because of this, the poll operation, considered individually,
259 >     * is not wait-free. One thief cannot successfully continue until
260 >     * another in-progress one (or, if previously empty, a push)
261 >     * visibly completes.  This can stall threads when required to
262 >     * consume from a given queue (see method poll()).  However, in
263 >     * the aggregate, we ensure at least probabilistic
264       * non-blockingness.  If an attempted steal fails, a scanning
265       * thief chooses a different random victim target to try next. So,
266       * in order for one thief to progress, it suffices for any
267 <     * in-progress poll or new push on any empty queue to
249 <     * complete.
267 >     * in-progress poll or new push on any empty queue to complete.
268       *
269       * This approach also enables support of a user mode in which
270       * local task processing is in FIFO, not LIFO order, simply by
# Line 267 | Line 285 | public class ForkJoinPool extends Abstra
285       * different position to use or create other queues -- they block
286       * only when creating and registering new queues. Because it is
287       * used only as a spinlock, unlocking requires only a "releasing"
288 <     * store (using setRelease).
288 >     * store (using setRelease) unless otherwise signalling.
289       *
290       * Management
291       * ==========
# Line 288 | Line 306 | public class ForkJoinPool extends Abstra
306       *
307       * Field "ctl" contains 64 bits holding information needed to
308       * atomically decide to add, enqueue (on an event queue), and
309 <     * dequeue (and release)-activate workers.  To enable this
310 <     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
311 <     * far in excess of normal operating range) to allow ids, counts,
312 <     * and their negations (used for thresholding) to fit into 16bit
309 >     * dequeue and release workers.  To enable this packing, we
310 >     * restrict maximum parallelism to (1<<15)-1 (which is far in
311 >     * excess of normal operating range) to allow ids, counts, and
312 >     * their negations (used for thresholding) to fit into 16bit
313       * subfields.
314       *
315       * Field "mode" holds configuration parameters as well as lifetime
# Line 303 | Line 321 | public class ForkJoinPool extends Abstra
321       * lock (using field workerNamePrefix as lock), but is otherwise
322       * concurrently readable, and accessed directly. We also ensure
323       * that uses of the array reference itself never become too stale
324 <     * in case of resizing.  To simplify index-based operations, the
325 <     * array size is always a power of two, and all readers must
326 <     * tolerate null slots. Worker queues are at odd indices. Shared
327 <     * (submission) queues are at even indices, up to a maximum of 64
328 <     * slots, to limit growth even if array needs to expand to add
329 <     * more workers. Grouping them together in this way simplifies and
330 <     * speeds up task scanning.
324 >     * in case of resizing, by arranging that (re-)reads are separated
325 >     * by at least one acquiring read access.  To simplify index-based
326 >     * operations, the array size is always a power of two, and all
327 >     * readers must tolerate null slots. Worker queues are at odd
328 >     * indices. Shared (submission) queues are at even indices, up to
329 >     * a maximum of 64 slots, to limit growth even if array needs to
330 >     * expand to add more workers. Grouping them together in this way
331 >     * simplifies and speeds up task scanning.
332       *
333       * All worker thread creation is on-demand, triggered by task
334       * submissions, replacement of terminated workers, and/or
# Line 387 | Line 406 | public class ForkJoinPool extends Abstra
406       * releases so usage requires care -- seeing a negative phase does
407       * not guarantee that the worker is available. When queued, the
408       * lower 16 bits of scanState must hold its pool index. So we
409 <     * place the index there upon initialization (see registerWorker)
410 <     * and otherwise keep it there or restore it when necessary.
409 >     * place the index there upon initialization and otherwise keep it
410 >     * there or restore it when necessary.
411       *
412       * The ctl field also serves as the basis for memory
413       * synchronization surrounding activation. This uses a more
# Line 396 | Line 415 | public class ForkJoinPool extends Abstra
415       * consumers sync with each other by both writing/CASing ctl (even
416       * if to its current value).  This would be extremely costly. So
417       * we relax it in several ways: (1) Producers only signal when
418 <     * their queue is empty. Other workers propagate this signal (in
419 <     * method scan) when they find tasks; to further reduce flailing,
420 <     * each worker signals only one other per activation. (2) Workers
421 <     * only enqueue after scanning (see below) and not finding any
422 <     * tasks.  (3) Rather than CASing ctl to its current value in the
423 <     * common case where no action is required, we reduce write
424 <     * contention by equivalently prefacing signalWork when called by
425 <     * an external task producer using a memory access with
407 <     * full-volatile semantics or a "fullFence".
418 >     * their queue is possibly empty at some point during a push
419 >     * operation. (2) Other workers propagate this signal when they
420 >     * find tasks. (3) Workers only enqueue after scanning (see below)
421 >     * and not finding any tasks.  (4) Rather than CASing ctl to its
422 >     * current value in the common case where no action is required,
423 >     * we reduce write contention by equivalently prefacing signalWork
424 >     * when called by an external task producer using a memory access
425 >     * with full-volatile semantics or a "fullFence".
426       *
427       * Almost always, too many signals are issued. A task producer
428       * cannot in general tell if some existing worker is in the midst
# Line 416 | Line 434 | public class ForkJoinPool extends Abstra
434       * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
435       * computations involving only a few workers.
436       *
437 <     * Scanning. Method runWorker performs top-level scanning for
438 <     * tasks.  Each scan traverses and tries to poll from each queue
439 <     * starting at a random index and circularly stepping. Scans are
440 <     * not performed in ideal random permutation order, to reduce
441 <     * cacheline contention.  The pseudorandom generator need not have
437 >     * Scanning. Method scan (from runWorker) performs top-level
438 >     * scanning for tasks. (Similar scans appear in helpQuiesce and
439 >     * pollScan.)  Each scan traverses and tries to poll from each
440 >     * queue starting at a random index. Scans are not performed in
441 >     * ideal random permutation order, to reduce cacheline
442 >     * contention. The pseudorandom generator need not have
443       * high-quality statistical properties in the long term, but just
444       * within computations; We use Marsaglia XorShifts (often via
445       * ThreadLocalRandom.nextSecondarySeed), which are cheap and
446 <     * suffice. Scanning also employs contention reduction: When
446 >     * suffice. Scanning also includes contention reduction: When
447       * scanning workers fail to extract an apparently existing task,
448 <     * they soon restart at a different pseudorandom index.  This
449 <     * improves throughput when many threads are trying to take tasks
450 <     * from few queues, which can be common in some usages.  Scans do
451 <     * not otherwise explicitly take into account core affinities,
452 <     * loads, cache localities, etc, However, they do exploit temporal
453 <     * locality (which usually approximates these) by preferring to
454 <     * re-poll (at most #workers times) from the same queue after a
455 <     * successful poll before trying others.
448 >     * they soon restart at a different pseudorandom index.  This form
449 >     * of backoff improves throughput when many threads are trying to
450 >     * take tasks from few queues, which can be common in some usages.
451 >     * Scans do not otherwise explicitly take into account core
452 >     * affinities, loads, cache localities, etc, However, they do
453 >     * exploit temporal locality (which usually approximates these) by
454 >     * preferring to re-poll from the same queue after a successful
455 >     * poll before trying others (see method topLevelExec). However
456 >     * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
457 >     * against infinitely unfair looping under unbounded user task
458 >     * recursion, and also to reduce long-term contention when many
459 >     * threads poll few queues holding many small tasks. The bound is
460 >     * high enough to avoid much impact on locality and scheduling
461 >     * overhead.
462       *
463       * Trimming workers. To release resources after periods of lack of
464       * use, a worker starting to wait when the pool is quiescent will
465 <     * time out and terminate (see method scan) if the pool has
465 >     * time out and terminate (see method runWorker) if the pool has
466       * remained quiescent for period given by field keepAlive.
467       *
468       * Shutdown and Termination. A call to shutdownNow invokes
# Line 505 | Line 530 | public class ForkJoinPool extends Abstra
530       * time. Some previous versions of this class employed immediate
531       * compensations for any blocked join. However, in practice, the
532       * vast majority of blockages are transient byproducts of GC and
533 <     * other JVM or OS activities that are made worse by replacement.
534 <     * Rather than impose arbitrary policies, we allow users to
535 <     * override the default of only adding threads upon apparent
536 <     * starvation.  The compensation mechanism may also be bounded.
537 <     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
538 <     * JVMs to cope with programming errors and abuse before running
539 <     * out of resources to do so.
533 >     * other JVM or OS activities that are made worse by replacement
534 >     * when they cause longer-term oversubscription.  Rather than
535 >     * impose arbitrary policies, we allow users to override the
536 >     * default of only adding threads upon apparent starvation.  The
537 >     * compensation mechanism may also be bounded.  Bounds for the
538 >     * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
539 >     * with programming errors and abuse before running out of
540 >     * resources to do so.
541       *
542       * Common Pool
543       * ===========
# Line 544 | Line 570 | public class ForkJoinPool extends Abstra
570       * in ForkJoinWorkerThread) may be JVM-dependent and must access
571       * particular Thread class fields to achieve this effect.
572       *
573 +     * Memory placement
574 +     * ================
575 +     *
576 +     * Performance can be very sensitive to placement of instances of
577 +     * ForkJoinPool and WorkQueues and their queue arrays. To reduce
578 +     * false-sharing impact, the @Contended annotation isolates
579 +     * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
580 +     * field. WorkQueue arrays are allocated (by their threads) with
581 +     * larger initial sizes than most ever need, mostly to reduce
582 +     * false sharing with current garbage collectors that use cardmark
583 +     * tables.
584 +     *
585       * Style notes
586       * ===========
587       *
# Line 551 | Line 589 | public class ForkJoinPool extends Abstra
589       * awkward and ugly, but also reflects the need to control
590       * outcomes across the unusual cases that arise in very racy code
591       * with very few invariants. All fields are read into locals
592 <     * before use, and null-checked if they are references.  This is
593 <     * usually done in a "C"-like style of listing declarations at the
594 <     * heads of methods or blocks, and using inline assignments on
595 <     * first encounter.  Nearly all explicit checks lead to
596 <     * bypass/return, not exception throws, because they may
597 <     * legitimately arise due to cancellation/revocation during
598 <     * shutdown.
592 >     * before use, and null-checked if they are references.  Array
593 >     * accesses using masked indices include checks (that are always
594 >     * true) that the array length is non-zero to avoid compilers
595 >     * inserting more expensive traps.  This is usually done in a
596 >     * "C"-like style of listing declarations at the heads of methods
597 >     * or blocks, and using inline assignments on first encounter.
598 >     * Nearly all explicit checks lead to bypass/return, not exception
599 >     * throws, because they may legitimately arise due to
600 >     * cancellation/revocation during shutdown.
601       *
602       * There is a lot of representation-level coupling among classes
603       * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
# Line 567 | Line 607 | public class ForkJoinPool extends Abstra
607       * representations will need to be accompanied by algorithmic
608       * changes anyway. Several methods intrinsically sprawl because
609       * they must accumulate sets of consistent reads of fields held in
610 <     * local variables.  There are also other coding oddities
611 <     * (including several unnecessary-looking hoisted null checks)
612 <     * that help some methods perform reasonably even when interpreted
613 <     * (not compiled).
610 >     * local variables. Some others are artificially broken up to
611 >     * reduce producer/consumer imbalances due to dynamic compilation.
612 >     * There are also other coding oddities (including several
613 >     * unnecessary-looking hoisted null checks) that help some methods
614 >     * perform reasonably even when interpreted (not compiled).
615       *
616       * The order of declarations in this file is (with a few exceptions):
617       * (1) Static utility functions
# Line 674 | Line 715 | public class ForkJoinPool extends Abstra
715      static final int DORMANT      = QUIET | UNSIGNALLED;
716  
717      /**
718 <     * The maximum number of local polls from the same queue before
719 <     * checking others. This is a safeguard against infinitely unfair
679 <     * looping under unbounded user task recursion, and must be larger
680 <     * than plausible cases of intentional bounded task recursion.
718 >     * Initial capacity of work-stealing queue array.
719 >     * Must be a power of two, at least 2.
720       */
721 <    static final int POLL_LIMIT = 1 << 10;
721 >    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
722 >
723 >    /**
724 >     * Maximum capacity for queue arrays. Must be a power of two less
725 >     * than or equal to 1 << (31 - width of array entry) to ensure
726 >     * lack of wraparound of index calculations, but defined to a
727 >     * value a bit less than this to help users trap runaway programs
728 >     * before saturating systems.
729 >     */
730 >    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
731 >
732 >    /**
733 >     * The maximum number of top-level polls per worker before
734 >     * checking other queues, expressed as a bit shift to, in effect,
735 >     * multiply by pool size, and then use as random value mask, so
736 >     * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
737 >     * above for rationale.
738 >     */
739 >    static final int TOP_BOUND_SHIFT = 10;
740  
741      /**
742       * Queues supporting work-stealing as well as external task
743       * submission. See above for descriptions and algorithms.
687     * Performance on most platforms is very sensitive to placement of
688     * instances of both WorkQueues and their arrays -- we absolutely
689     * do not want multiple WorkQueue instances or multiple queue
690     * arrays sharing cache lines. The @Contended annotation alerts
691     * JVMs to try to keep instances apart.
744       */
745      @jdk.internal.vm.annotation.Contended
746      static final class WorkQueue {
747 <
748 <        /**
749 <         * Capacity of work-stealing queue array upon initialization.
750 <         * Must be a power of two; at least 4, but should be larger to
699 <         * reduce or eliminate cacheline sharing among queues.
700 <         * Currently, it is much larger, as a partial workaround for
701 <         * the fact that JVMs often place arrays in locations that
702 <         * share GC bookkeeping (especially cardmarks) such that
703 <         * per-write accesses encounter serious memory contention.
704 <         */
705 <        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
706 <
707 <        /**
708 <         * Maximum size for queue arrays. Must be a power of two less
709 <         * than or equal to 1 << (31 - width of array entry) to ensure
710 <         * lack of wraparound of index calculations, but defined to a
711 <         * value a bit less than this to help users trap runaway
712 <         * programs before saturating systems.
713 <         */
714 <        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
715 <
716 <        // Instance fields
747 >        volatile int source;       // source queue id, or sentinel
748 >        int id;                    // pool index, mode, tag
749 >        int base;                  // index of next slot for poll
750 >        int top;                   // index of next slot for push
751          volatile int phase;        // versioned, negative: queued, 1: locked
752          int stackPred;             // pool stack (ctl) predecessor link
753          int nsteals;               // number of steals
754 <        int id;                    // index, mode, tag
721 <        volatile int source;       // source queue id, or sentinel
722 <        volatile int base;         // index of next slot for poll
723 <        int top;                   // index of next slot for push
724 <        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
754 >        ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
755          final ForkJoinPool pool;   // the containing pool (may be null)
756          final ForkJoinWorkerThread owner; // owning thread or null if shared
757  
# Line 733 | Line 763 | public class ForkJoinPool extends Abstra
763          }
764  
765          /**
766 +         * Tries to lock shared queue by CASing phase field.
767 +         */
768 +        final boolean tryLockPhase() {
769 +            return PHASE.compareAndSet(this, 0, 1);
770 +        }
771 +
772 +        final void releasePhaseLock() {
773 +            PHASE.setRelease(this, 0);
774 +        }
775 +
776 +        /**
777           * Returns an exportable index (used by ForkJoinWorkerThread).
778           */
779          final int getPoolIndex() {
# Line 743 | Line 784 | public class ForkJoinPool extends Abstra
784           * Returns the approximate number of tasks in the queue.
785           */
786          final int queueSize() {
787 <            int n = base - top;       // read base first
787 >            int n = (int)BASE.getAcquire(this) - top;
788              return (n >= 0) ? 0 : -n; // ignore transient negative
789          }
790  
# Line 753 | Line 794 | public class ForkJoinPool extends Abstra
794           * near-empty queue has at least one unclaimed task.
795           */
796          final boolean isEmpty() {
797 <            ForkJoinTask<?>[] a; int n, al, b;
797 >            ForkJoinTask<?>[] a; int n, cap, b;
798 >            VarHandle.acquireFence(); // needed by external callers
799              return ((n = (b = base) - top) >= 0 || // possibly one task
800                      (n == -1 && ((a = array) == null ||
801 <                                 (al = a.length) == 0 ||
802 <                                 a[(al - 1) & b] == null)));
801 >                                 (cap = a.length) == 0 ||
802 >                                 a[(cap - 1) & b] == null)));
803          }
804  
763
805          /**
806           * Pushes a task. Call only by owner in unshared queues.
807           *
# Line 768 | Line 809 | public class ForkJoinPool extends Abstra
809           * @throws RejectedExecutionException if array cannot be resized
810           */
811          final void push(ForkJoinTask<?> task) {
812 <            int s = top; ForkJoinTask<?>[] a; int al, d;
813 <            if ((a = array) != null && (al = a.length) > 0) {
814 <                int index = (al - 1) & s;
815 <                ForkJoinPool p = pool;
812 >            ForkJoinTask<?>[] a;
813 >            int s = top, d, cap;
814 >            ForkJoinPool p = pool;
815 >            if ((a = array) != null && (cap = a.length) > 0) {
816 >                QA.setRelease(a, (cap - 1) & s, task);
817                  top = s + 1;
818 <                QA.setRelease(a, index, task);
777 <                if ((d = base - s) == 0 && p != null) {
818 >                if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) {
819                      VarHandle.fullFence();
820                      p.signalWork();
821                  }
822 <                else if (d + al == 1)
823 <                    growArray();
822 >                else if (d + cap - 1 == 0)
823 >                    growArray(false);
824              }
825          }
826  
827          /**
828 <         * Initializes or doubles the capacity of array. Call either
829 <         * by owner or with lock held -- it is OK for base, but not
789 <         * top, to move while resizings are in progress.
828 >         * Version of push for shared queues. Call only with phase lock held.
829 >         * @return true if should signal work
830           */
831 <        final ForkJoinTask<?>[] growArray() {
832 <            ForkJoinTask<?>[] oldA = array;
833 <            int oldSize = oldA != null ? oldA.length : 0;
834 <            int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
835 <            if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
836 <                throw new RejectedExecutionException("Queue capacity exceeded");
837 <            int oldMask, t, b;
838 <            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
839 <            if (oldA != null && (oldMask = oldSize - 1) > 0 &&
840 <                (t = top) - (b = base) > 0) {
841 <                int mask = size - 1;
842 <                do { // emulate poll from old array, push to new array
843 <                    int index = b & oldMask;
844 <                    ForkJoinTask<?> x = (ForkJoinTask<?>)
845 <                        QA.getAcquire(oldA, index);
846 <                    if (x != null &&
807 <                        QA.compareAndSet(oldA, index, x, null))
808 <                        a[b & mask] = x;
809 <                } while (++b != t);
810 <                VarHandle.releaseFence();
811 <            }
812 <            return a;
831 >        final boolean lockedPush(ForkJoinTask<?> task) {
832 >            ForkJoinTask<?>[] a;
833 >            boolean signal = false;
834 >            int s = top, b = base, cap;
835 >            if ((a = array) != null && (cap = a.length) > 0) {
836 >                a[(cap - 1) & s] = task;
837 >                top = s + 1;
838 >                if (b - s + cap - 1 == 0)
839 >                    growArray(true);
840 >                else {
841 >                    phase = 0; // full volatile unlock
842 >                    if (base == s)
843 >                        signal = true;
844 >                }
845 >            }
846 >            return signal;
847          }
848  
849          /**
850 <         * Takes next task, if one exists, in LIFO order.  Call only
851 <         * by owner in unshared queues.
852 <         */
853 <        final ForkJoinTask<?> pop() {
854 <            int b = base, s = top, al, i; ForkJoinTask<?>[] a;
855 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
856 <                int index = (al - 1) & --s;
857 <                ForkJoinTask<?> t = (ForkJoinTask<?>)
858 <                    QA.get(a, index);
859 <                if (t != null &&
860 <                    QA.compareAndSet(a, index, t, null)) {
861 <                    top = s;
862 <                    VarHandle.releaseFence();
863 <                    return t;
850 >         * Doubles the capacity of array. Call either by owner or with
851 >         * lock held -- it is OK for base, but not top, to move while
852 >         * resizings are in progress.
853 >         */
854 >        final void growArray(boolean locked) {
855 >            ForkJoinTask<?>[] newA = null;
856 >            try {
857 >                ForkJoinTask<?>[] oldA; int oldSize, newSize;
858 >                if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
859 >                    (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
860 >                    newSize > 0) {
861 >                    try {
862 >                        newA = new ForkJoinTask<?>[newSize];
863 >                    } catch (OutOfMemoryError ex) {
864 >                    }
865 >                    if (newA != null) { // poll from old array, push to new
866 >                        int oldMask = oldSize - 1, newMask = newSize - 1;
867 >                        for (int s = top - 1, k = oldMask; k >= 0; --k) {
868 >                            ForkJoinTask<?> x = (ForkJoinTask<?>)
869 >                                QA.getAndSet(oldA, s & oldMask, null);
870 >                            if (x != null)
871 >                                newA[s-- & newMask] = x;
872 >                            else
873 >                                break;
874 >                        }
875 >                        array = newA;
876 >                        VarHandle.releaseFence();
877 >                    }
878                  }
879 +            } finally {
880 +                if (locked)
881 +                    phase = 0;
882              }
883 <            return null;
883 >            if (newA == null)
884 >                throw new RejectedExecutionException("Queue capacity exceeded");
885          }
886  
887          /**
888           * Takes next task, if one exists, in FIFO order.
889           */
890          final ForkJoinTask<?> poll() {
891 <            for (;;) {
892 <                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
893 <                if ((a = array) != null && (d = b - s) < 0 &&
894 <                    (al = a.length) > 0) {
895 <                    int index = (al - 1) & b;
896 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
897 <                        QA.getAcquire(a, index);
898 <                    if (b++ == base) {
899 <                        if (t != null) {
900 <                            if (QA.compareAndSet(a, index, t, null)) {
901 <                                base = b;
850 <                                return t;
851 <                            }
852 <                        }
853 <                        else if (d == -1)
854 <                            break; // now empty
891 >            int b, k, cap; ForkJoinTask<?>[] a;
892 >            while ((a = array) != null && (cap = a.length) > 0 &&
893 >                   (b = base) != top) {
894 >                ForkJoinTask<?> t = (ForkJoinTask<?>)
895 >                    QA.getAcquire(a, k = (cap - 1) & b);
896 >                if (base == b++) {
897 >                    if (t == null)
898 >                        Thread.yield(); // await index advance
899 >                    else if (QA.compareAndSet(a, k, t, null)) {
900 >                        BASE.setOpaque(this, b);
901 >                        return t;
902                      }
903                  }
857                else
858                    break;
904              }
905              return null;
906          }
# Line 864 | Line 909 | public class ForkJoinPool extends Abstra
909           * Takes next task, if one exists, in order specified by mode.
910           */
911          final ForkJoinTask<?> nextLocalTask() {
912 <            return ((id & FIFO) != 0) ? poll() : pop();
912 >            ForkJoinTask<?> t = null;
913 >            int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
914 >            if ((a = array) != null && (cap = a.length) > 0 &&
915 >                (d = (s = top) - (b = base)) > 0) {
916 >                if ((md & FIFO) == 0 || d == 1) {
917 >                    if ((t = (ForkJoinTask<?>)
918 >                         QA.getAndSet(a, (cap - 1) & --s, null)) != null)
919 >                        TOP.setOpaque(this, s);
920 >                }
921 >                else if ((t = (ForkJoinTask<?>)
922 >                          QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
923 >                    BASE.setOpaque(this, b);
924 >                }
925 >                else // on contention in FIFO mode, use regular poll
926 >                    t = poll();
927 >            }
928 >            return t;
929          }
930  
931          /**
932           * Returns next task, if one exists, in order specified by mode.
933           */
934          final ForkJoinTask<?> peek() {
935 <            int al; ForkJoinTask<?>[] a;
936 <            return ((a = array) != null && (al = a.length) > 0) ?
937 <                a[(al - 1) &
877 <                  ((id & FIFO) != 0 ? base : top - 1)] : null;
935 >            int cap; ForkJoinTask<?>[] a;
936 >            return ((a = array) != null && (cap = a.length) > 0) ?
937 >                a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
938          }
939  
940          /**
941           * Pops the given task only if it is at the current top.
942           */
943          final boolean tryUnpush(ForkJoinTask<?> task) {
944 <            int b = base, s = top, al; ForkJoinTask<?>[] a;
945 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
946 <                int index = (al - 1) & --s;
947 <                if (QA.compareAndSet(a, index, task, null)) {
944 >            boolean popped = false;
945 >            int s, cap; ForkJoinTask<?>[] a;
946 >            if ((a = array) != null && (cap = a.length) > 0 &&
947 >                (s = top) != base &&
948 >                (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
949 >                TOP.setOpaque(this, s);
950 >            return popped;
951 >        }
952 >
953 >        /**
954 >         * Shared version of tryUnpush.
955 >         */
956 >        final boolean tryLockedUnpush(ForkJoinTask<?> task) {
957 >            boolean popped = false;
958 >            int s = top - 1, k, cap; ForkJoinTask<?>[] a;
959 >            if ((a = array) != null && (cap = a.length) > 0 &&
960 >                a[k = (cap - 1) & s] == task && tryLockPhase()) {
961 >                if (top == s + 1 && array == a &&
962 >                    (popped = QA.compareAndSet(a, k, task, null)))
963                      top = s;
964 <                    VarHandle.releaseFence();
890 <                    return true;
891 <                }
964 >                releasePhaseLock();
965              }
966 <            return false;
966 >            return popped;
967          }
968  
969          /**
# Line 904 | Line 977 | public class ForkJoinPool extends Abstra
977          // Specialized execution methods
978  
979          /**
980 <         * Pops and executes up to limit consecutive tasks or until empty.
981 <         *
982 <         * @param limit max runs, or zero for no limit
983 <         */
984 <        final void localPopAndExec(int limit) {
985 <            for (;;) {
986 <                int b = base, s = top, al; ForkJoinTask<?>[] a;
987 <                if ((a = array) != null && b != s && (al = a.length) > 0) {
988 <                    int index = (al - 1) & --s;
989 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
917 <                        QA.getAndSet(a, index, null);
918 <                    if (t != null) {
919 <                        top = s;
920 <                        VarHandle.releaseFence();
921 <                        t.doExec();
922 <                        if (limit != 0 && --limit == 0)
923 <                            break;
924 <                    }
925 <                    else
980 >         * Runs the given (stolen) task if nonnull, as well as
981 >         * remaining local tasks and others available from the given
982 >         * queue, up to bound n (to avoid infinite unfairness).
983 >         */
984 >        final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
985 >            if (t != null) {
986 >                int nstolen = 1;
987 >                for (;;) {
988 >                    t.doExec();
989 >                    if (n-- < 0)
990                          break;
991 <                }
992 <                else
993 <                    break;
994 <            }
931 <        }
932 <
933 <        /**
934 <         * Polls and executes up to limit consecutive tasks or until empty.
935 <         *
936 <         * @param limit, or zero for no limit
937 <         */
938 <        final void localPollAndExec(int limit) {
939 <            for (int polls = 0;;) {
940 <                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
941 <                if ((a = array) != null && (d = b - s) < 0 &&
942 <                    (al = a.length) > 0) {
943 <                    int index = (al - 1) & b++;
944 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
945 <                        QA.getAndSet(a, index, null);
946 <                    if (t != null) {
947 <                        base = b;
948 <                        t.doExec();
949 <                        if (limit != 0 && ++polls == limit)
991 >                    else if ((t = nextLocalTask()) == null) {
992 >                        if (q != null && (t = q.poll()) != null)
993 >                            ++nstolen;
994 >                        else
995                              break;
996                      }
952                    else if (d == -1)
953                        break;     // now empty
954                    else
955                        polls = 0; // stolen; reset
997                  }
998 <                else
999 <                    break;
998 >                ForkJoinWorkerThread thread = owner;
999 >                nsteals += nstolen;
1000 >                source = 0;
1001 >                if (thread != null)
1002 >                    thread.afterTopLevelExec();
1003              }
1004          }
1005  
# Line 963 | Line 1007 | public class ForkJoinPool extends Abstra
1007           * If present, removes task from queue and executes it.
1008           */
1009          final void tryRemoveAndExec(ForkJoinTask<?> task) {
1010 <            ForkJoinTask<?>[] wa; int s, wal;
1011 <            if (base - (s = top) < 0 && // traverse from top
1012 <                (wa = array) != null && (wal = wa.length) > 0) {
1013 <                for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
1010 >            ForkJoinTask<?>[] a; int s, cap;
1011 >            if ((a = array) != null && (cap = a.length) > 0 &&
1012 >                base - (s = top) < 0) { // traverse from top
1013 >                for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1014                      int index = i & m;
1015 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
972 <                        QA.get(wa, index);
1015 >                    ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
1016                      if (t == null)
1017                          break;
1018                      else if (t == task) {
1019 <                        if (QA.compareAndSet(wa, index, t, null)) {
1019 >                        if (QA.compareAndSet(a, index, t, null)) {
1020                              top = ns;   // safely shift down
1021                              for (int j = i; j != ns; ++j) {
1022                                  ForkJoinTask<?> f;
1023                                  int pindex = (j + 1) & m;
1024 <                                f = (ForkJoinTask<?>)QA.get(wa, pindex);
1025 <                                QA.setVolatile(wa, pindex, null);
1024 >                                f = (ForkJoinTask<?>)QA.get(a, pindex);
1025 >                                QA.setVolatile(a, pindex, null);
1026                                  int jindex = j & m;
1027 <                                QA.setRelease(wa, jindex, f);
1027 >                                QA.setRelease(a, jindex, f);
1028                              }
1029                              VarHandle.releaseFence();
1030                              t.doExec();
# Line 993 | Line 1036 | public class ForkJoinPool extends Abstra
1036          }
1037  
1038          /**
1039 <         * Tries to steal and run tasks within the target's
1040 <         * computation until done, not found, or limit exceeded.
1039 >         * Tries to pop and run tasks within the target's computation
1040 >         * until done, not found, or limit exceeded.
1041           *
1042           * @param task root of CountedCompleter computation
1043           * @param limit max runs, or zero for no limit
1044 +         * @param shared true if must lock to extract task
1045           * @return task status on exit
1046           */
1047 <        final int localHelpCC(CountedCompleter<?> task, int limit) {
1047 >        final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
1048              int status = 0;
1049              if (task != null && (status = task.status) >= 0) {
1050 <                for (;;) {
1051 <                    boolean help = false;
1052 <                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1053 <                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1054 <                        int index = (al - 1) & (s - 1);
1055 <                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1056 <                            QA.get(a, index);
1057 <                        if (o instanceof CountedCompleter) {
1058 <                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1059 <                            for (CountedCompleter<?> f = t;;) {
1060 <                                if (f != task) {
1061 <                                    if ((f = f.completer) == null) // try parent
1062 <                                        break;
1063 <                                }
1064 <                                else {
1065 <                                    if (QA.compareAndSet(a, index, t, null)) {
1050 >                int s, k, cap; ForkJoinTask<?>[] a;
1051 >                while ((a = array) != null && (cap = a.length) > 0 &&
1052 >                       (s = top) != base) {
1053 >                    CountedCompleter<?> v = null;
1054 >                    ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
1055 >                    if (o instanceof CountedCompleter) {
1056 >                        CountedCompleter<?> t = (CountedCompleter<?>)o;
1057 >                        for (CountedCompleter<?> f = t;;) {
1058 >                            if (f != task) {
1059 >                                if ((f = f.completer) == null)
1060 >                                    break;
1061 >                            }
1062 >                            else if (shared) {
1063 >                                if (tryLockPhase()) {
1064 >                                    if (top == s && array == a &&
1065 >                                        QA.compareAndSet(a, k, t, null)) {
1066                                          top = s - 1;
1067 <                                        VarHandle.releaseFence();
1024 <                                        t.doExec();
1025 <                                        help = true;
1067 >                                        v = t;
1068                                      }
1069 <                                    break;
1069 >                                    releasePhaseLock();
1070 >                                }
1071 >                                break;
1072 >                            }
1073 >                            else {
1074 >                                if (QA.compareAndSet(a, k, t, null)) {
1075 >                                    top = s - 1;
1076 >                                    v = t;
1077                                  }
1078 +                                break;
1079                              }
1080                          }
1081                      }
1082 <                    if ((status = task.status) < 0 || !help ||
1082 >                    if (v != null)
1083 >                        v.doExec();
1084 >                    if ((status = task.status) < 0 || v == null ||
1085                          (limit != 0 && --limit == 0))
1086                          break;
1087                  }
# Line 1037 | Line 1089 | public class ForkJoinPool extends Abstra
1089              return status;
1090          }
1091  
1040        // Operations on shared queues
1041
1092          /**
1093 <         * Tries to lock shared queue by CASing phase field.
1094 <         */
1095 <        final boolean tryLockSharedQueue() {
1096 <            return PHASE.compareAndSet(this, 0, QLOCK);
1047 <        }
1048 <
1049 <        /**
1050 <         * Shared version of tryUnpush.
1051 <         */
1052 <        final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053 <            boolean popped = false;
1054 <            int s = top - 1, al; ForkJoinTask<?>[] a;
1055 <            if ((a = array) != null && (al = a.length) > 0) {
1056 <                int index = (al - 1) & s;
1057 <                ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
1058 <                if (t == task &&
1059 <                    PHASE.compareAndSet(this, 0, QLOCK)) {
1060 <                    if (top == s + 1 && array == a &&
1061 <                        QA.compareAndSet(a, index, task, null)) {
1062 <                        popped = true;
1063 <                        top = s;
1064 <                    }
1065 <                    PHASE.setRelease(this, 0);
1066 <                }
1067 <            }
1068 <            return popped;
1069 <        }
1070 <
1071 <        /**
1072 <         * Shared version of localHelpCC.
1093 >         * Tries to poll and run AsynchronousCompletionTasks until
1094 >         * none found or blocker is released
1095 >         *
1096 >         * @param blocker the blocker
1097           */
1098 <        final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1099 <            int status = 0;
1100 <            if (task != null && (status = task.status) >= 0) {
1101 <                for (;;) {
1102 <                    boolean help = false;
1103 <                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1104 <                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1105 <                        int index = (al - 1) & (s - 1);
1106 <                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1107 <                            QA.get(a, index);
1108 <                        if (o instanceof CountedCompleter) {
1109 <                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1110 <                            for (CountedCompleter<?> f = t;;) {
1111 <                                if (f != task) {
1112 <                                    if ((f = f.completer) == null)
1089 <                                        break;
1090 <                                }
1091 <                                else {
1092 <                                    if (PHASE.compareAndSet(this, 0, QLOCK)) {
1093 <                                        if (top == s && array == a &&
1094 <                                            QA.compareAndSet(a, index, t, null)) {
1095 <                                            help = true;
1096 <                                            top = s - 1;
1097 <                                        }
1098 <                                        PHASE.setRelease(this, 0);
1099 <                                        if (help)
1100 <                                            t.doExec();
1101 <                                    }
1102 <                                    break;
1103 <                                }
1104 <                            }
1098 >        final void helpAsyncBlocker(ManagedBlocker blocker) {
1099 >            if (blocker != null) {
1100 >                int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1101 >                while ((a = array) != null && (cap = a.length) > 0 &&
1102 >                       (b = base) != top) {
1103 >                    t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1104 >                    if (blocker.isReleasable())
1105 >                        break;
1106 >                    else if (base == b++ && t != null) {
1107 >                        if (!(t instanceof CompletableFuture.
1108 >                              AsynchronousCompletionTask))
1109 >                            break;
1110 >                        else if (QA.compareAndSet(a, k, t, null)) {
1111 >                            BASE.setOpaque(this, b);
1112 >                            t.doExec();
1113                          }
1114                      }
1107                    if ((status = task.status) < 0 || !help ||
1108                        (limit != 0 && --limit == 0))
1109                        break;
1115                  }
1116              }
1112            return status;
1117          }
1118  
1119          /**
# Line 1124 | Line 1128 | public class ForkJoinPool extends Abstra
1128          }
1129  
1130          // VarHandle mechanics.
1131 <        private static final VarHandle PHASE;
1131 >        static final VarHandle PHASE;
1132 >        static final VarHandle BASE;
1133 >        static final VarHandle TOP;
1134          static {
1135              try {
1136                  MethodHandles.Lookup l = MethodHandles.lookup();
1137                  PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1138 +                BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1139 +                TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
1140              } catch (ReflectiveOperationException e) {
1141                  throw new Error(e);
1142              }
# Line 1327 | Line 1335 | public class ForkJoinPool extends Abstra
1335          wt.setDaemon(true);                             // configure thread
1336          if ((handler = ueh) != null)
1337              wt.setUncaughtExceptionHandler(handler);
1330        WorkQueue w = new WorkQueue(this, wt);
1338          int tid = 0;                                    // for thread name
1339 <        int fifo = mode & FIFO;
1339 >        int idbits = mode & FIFO;
1340          String prefix = workerNamePrefix;
1341 +        WorkQueue w = new WorkQueue(this, wt);
1342          if (prefix != null) {
1343              synchronized (prefix) {
1344                  WorkQueue[] ws = workQueues; int n;
1345                  int s = indexSeed += SEED_INCREMENT;
1346 +                idbits |= (s & ~(SMASK | FIFO | DORMANT));
1347                  if (ws != null && (n = ws.length) > 1) {
1348                      int m = n - 1;
1349 <                    tid = s & m;
1341 <                    int i = m & ((s << 1) | 1);         // odd-numbered indices
1349 >                    tid = m & ((s << 1) | 1);           // odd-numbered indices
1350                      for (int probes = n >>> 1;;) {      // find empty slot
1351                          WorkQueue q;
1352 <                        if ((q = ws[i]) == null || q.phase == QUIET)
1352 >                        if ((q = ws[tid]) == null || q.phase == QUIET)
1353                              break;
1354                          else if (--probes == 0) {
1355 <                            i = n | 1;                  // resize below
1355 >                            tid = n | 1;                // resize below
1356                              break;
1357                          }
1358                          else
1359 <                            i = (i + 2) & m;
1359 >                            tid = (tid + 2) & m;
1360                      }
1361 +                    w.phase = w.id = tid | idbits;      // now publishable
1362  
1363 <                    int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1364 <                    w.phase = w.id = id;                // now publishable
1356 <
1357 <                    if (i < n)
1358 <                        ws[i] = w;
1363 >                    if (tid < n)
1364 >                        ws[tid] = w;
1365                      else {                              // expand array
1366                          int an = n << 1;
1367                          WorkQueue[] as = new WorkQueue[an];
1368 <                        as[i] = w;
1368 >                        as[tid] = w;
1369                          int am = an - 1;
1370                          for (int j = 0; j < n; ++j) {
1371                              WorkQueue v;                // copy external queue
# Line 1392 | Line 1398 | public class ForkJoinPool extends Abstra
1398          int phase = 0;
1399          if (wt != null && (w = wt.workQueue) != null) {
1400              Object lock = workerNamePrefix;
1401 +            int wid = w.id;
1402              long ns = (long)w.nsteals & 0xffffffffL;
1396            int idx = w.id & SMASK;
1403              if (lock != null) {
1398                WorkQueue[] ws;                       // remove index from array
1404                  synchronized (lock) {
1405 <                    if ((ws = workQueues) != null && ws.length > idx &&
1406 <                        ws[idx] == w)
1407 <                        ws[idx] = null;
1405 >                    WorkQueue[] ws; int n, i;         // remove index from array
1406 >                    if ((ws = workQueues) != null && (n = ws.length) > 0 &&
1407 >                        ws[i = wid & (n - 1)] == w)
1408 >                        ws[i] = null;
1409                      stealCount += ns;
1410                  }
1411              }
# Line 1451 | Line 1457 | public class ForkJoinPool extends Abstra
1457                  Thread vt = v.owner;
1458                  if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1459                      v.phase = np;
1460 <                    if (v.source < 0)
1460 >                    if (vt != null && v.source < 0)
1461                          LockSupport.unpark(vt);
1462                      break;
1463                  }
# Line 1492 | Line 1498 | public class ForkJoinPool extends Abstra
1498                      long nc = ((long)v.stackPred & SP_MASK) | uc;
1499                      if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1500                          v.phase = np;
1501 <                        if (v.source < 0)
1501 >                        if (vt != null && v.source < 0)
1502                              LockSupport.unpark(vt);
1503                          return (wp < 0) ? -1 : 1;
1504                      }
# Line 1549 | Line 1555 | public class ForkJoinPool extends Abstra
1555       * See above for explanation.
1556       */
1557      final void runWorker(WorkQueue w) {
1558 <        WorkQueue[] ws;
1559 <        w.growArray();                                  // allocate queue
1560 <        int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1561 <        if (r == 0)                                     // initial nonzero seed
1562 <            r = 1;
1563 <        int lastSignalId = 0;                           // avoid unneeded signals
1564 <        while ((ws = workQueues) != null) {
1565 <            boolean nonempty = false;                   // scan
1566 <            for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1567 <                WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1568 <                if ((i = r & m) >= 0 && i < n &&        // always true
1569 <                    (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1570 <                    (a = q.array) != null && (al = a.length) > 0) {
1571 <                    int qid = q.id;                     // (never zero)
1572 <                    int index = (al - 1) & b;
1573 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1574 <                        QA.getAcquire(a, index);
1575 <                    if (t != null && b++ == q.base &&
1576 <                        QA.compareAndSet(a, index, t, null)) {
1577 <                        if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1578 <                            signalWork();               // propagate signal
1579 <                        w.source = lastSignalId = qid;
1580 <                        t.doExec();
1581 <                        if ((w.id & FIFO) != 0)         // run remaining locals
1582 <                            w.localPollAndExec(POLL_LIMIT);
1583 <                        else
1584 <                            w.localPopAndExec(POLL_LIMIT);
1585 <                        ForkJoinWorkerThread thread = w.owner;
1586 <                        ++w.nsteals;
1587 <                        w.source = 0;                   // now idle
1588 <                        if (thread != null)
1589 <                            thread.afterTopLevelExec();
1558 >        int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1559 >        w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
1560 >        for (;;) {
1561 >            int phase;
1562 >            if (scan(w, r)) {                     // scan until apparently empty
1563 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1564 >            }
1565 >            else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1566 >                long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1567 >                long c, nc;
1568 >                do {
1569 >                    w.stackPred = (int)(c = ctl);
1570 >                    nc = ((c - RC_UNIT) & UC_MASK) | np;
1571 >                } while (!CTL.weakCompareAndSet(this, c, nc));
1572 >            }
1573 >            else {                                // already queued
1574 >                int pred = w.stackPred;
1575 >                Thread.interrupted();             // clear before park
1576 >                w.source = DORMANT;               // enable signal
1577 >                long c = ctl;
1578 >                int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
1579 >                if (md < 0)                       // terminating
1580 >                    break;
1581 >                else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1582 >                         tryTerminate(false, false))
1583 >                    break;                        // quiescent shutdown
1584 >                else if (rc <= 0 && pred != 0 && phase == (int)c) {
1585 >                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1586 >                    long d = keepAlive + System.currentTimeMillis();
1587 >                    LockSupport.parkUntil(this, d);
1588 >                    if (ctl == c &&               // drop on timeout if all idle
1589 >                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1590 >                        CTL.compareAndSet(this, c, nc)) {
1591 >                        w.phase = QUIET;
1592 >                        break;
1593                      }
1585                    nonempty = true;
1594                  }
1595 <                else if (nonempty)
1596 <                    break;
1597 <                else
1590 <                    ++r;
1595 >                else if (w.phase < 0)
1596 >                    LockSupport.park(this);       // OK if spuriously woken
1597 >                w.source = 0;                     // disable signal
1598              }
1599 +        }
1600 +    }
1601  
1602 <            if (nonempty) {                             // move (xorshift)
1603 <                r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1604 <            }
1605 <            else {
1606 <                int phase;
1607 <                lastSignalId = 0;                       // clear for next scan
1608 <                if ((phase = w.phase) >= 0) {           // enqueue
1609 <                    int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1610 <                    long c, nc;
1611 <                    do {
1612 <                        w.stackPred = (int)(c = ctl);
1613 <                        nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1614 <                    } while (!CTL.weakCompareAndSet(this, c, nc));
1615 <                }
1616 <                else {                                  // already queued
1617 <                    int pred = w.stackPred;
1618 <                    w.source = DORMANT;                 // enable signal
1619 <                    for (int steps = 0;;) {
1620 <                        int md, rc; long c;
1621 <                        if (w.phase >= 0) {
1622 <                            w.source = 0;
1623 <                            break;
1624 <                        }
1625 <                        else if ((md = mode) < 0)       // shutting down
1617 <                            return;
1618 <                        else if ((rc = ((md & SMASK) +  // possibly quiescent
1619 <                                        (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1620 <                                 (md & SHUTDOWN) != 0 &&
1621 <                                 tryTerminate(false, false))
1622 <                            return;                     // help terminate
1623 <                        else if ((++steps & 1) == 0)
1624 <                            Thread.interrupted();       // clear between parks
1625 <                        else if (rc <= 0 && pred != 0 && phase == (int)c) {
1626 <                            long d = keepAlive + System.currentTimeMillis();
1627 <                            LockSupport.parkUntil(this, d);
1628 <                            if (ctl == c &&
1629 <                                d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1630 <                                long nc = ((UC_MASK & (c - TC_UNIT)) |
1631 <                                           (SP_MASK & pred));
1632 <                                if (CTL.compareAndSet(this, c, nc)) {
1633 <                                    w.phase = QUIET;
1634 <                                    return;             // drop on timeout
1635 <                                }
1636 <                            }
1602 >    /**
1603 >     * Scans for and if found executes one or more top-level tasks from a queue.
1604 >     *
1605 >     * @return true if found an apparently non-empty queue, and
1606 >     * possibly ran task(s).
1607 >     */
1608 >    private boolean scan(WorkQueue w, int r) {
1609 >        WorkQueue[] ws; int n;
1610 >        if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1611 >            for (int m = n - 1, j = r & m;;) {
1612 >                WorkQueue q; int b, d;
1613 >                if ((q = ws[j]) != null && (d = (b = q.base) - q.top) != 0) {
1614 >                    int qid = q.id;
1615 >                    ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1616 >                    if ((a = q.array) != null && (cap = a.length) > 0) {
1617 >                        t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1618 >                        if (q.base == b++ && t != null &&
1619 >                            QA.compareAndSet(a, k, t, null)) {
1620 >                            q.base = b;
1621 >                            w.source = qid;
1622 >                            if (d != -1 || b != q.top)
1623 >                                signalWork();
1624 >                            w.topLevelExec(t, q,  // random fairness bound
1625 >                                           r & ((n << TOP_BOUND_SHIFT) - 1));
1626                          }
1638                        else
1639                            LockSupport.park(this);
1627                      }
1628 +                    return true;
1629                  }
1630 +                else if (--n > 0)
1631 +                    j = (j + 1) & m;
1632 +                else
1633 +                    break;
1634              }
1635          }
1636 +        return false;
1637      }
1638  
1639      /**
# Line 1656 | Line 1649 | public class ForkJoinPool extends Abstra
1649       */
1650      final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1651          int s = 0;
1652 +        int seed = ThreadLocalRandom.nextSecondarySeed();
1653          if (w != null && task != null &&
1654              (!(task instanceof CountedCompleter) ||
1655 <             (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1655 >             (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
1656              w.tryRemoveAndExec(task);
1657              int src = w.source, id = w.id;
1658 +            int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1659              s = task.status;
1660              while (s >= 0) {
1661                  WorkQueue[] ws;
1662 <                boolean nonempty = false;
1663 <                int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1664 <                if ((ws = workQueues) != null) {       // scan for matching id
1665 <                    for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1666 <                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1667 <                        if ((i = (r + j) & m) >= 0 && i < n &&
1668 <                            (q = ws[i]) != null && q.source == id &&
1669 <                            (b = q.base) - q.top < 0 &&
1675 <                            (a = q.array) != null && (al = a.length) > 0) {
1676 <                            int qid = q.id;
1677 <                            int index = (al - 1) & b;
1662 >                int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
1663 >                while (n > 0) {
1664 >                    WorkQueue q; int b;
1665 >                    if ((q = ws[r & m]) != null && q.source == id &&
1666 >                        (b = q.base) != q.top) {
1667 >                        ForkJoinTask<?>[] a; int cap, k;
1668 >                        int qid = q.id;
1669 >                        if ((a = q.array) != null && (cap = a.length) > 0) {
1670                              ForkJoinTask<?> t = (ForkJoinTask<?>)
1671 <                                QA.getAcquire(a, index);
1672 <                            if (t != null && b++ == q.base && id == q.source &&
1673 <                                QA.compareAndSet(a, index, t, null)) {
1671 >                                QA.getAcquire(a, k = (cap - 1) & b);
1672 >                            if (q.source == id && q.base == b++ &&
1673 >                                t != null && QA.compareAndSet(a, k, t, null)) {
1674                                  q.base = b;
1675                                  w.source = qid;
1676                                  t.doExec();
1677                                  w.source = src;
1678                              }
1687                            nonempty = true;
1688                            break;
1679                          }
1680 +                        break;
1681 +                    }
1682 +                    else {
1683 +                        r += step;
1684 +                        --n;
1685                      }
1686                  }
1687                  if ((s = task.status) < 0)
1688                      break;
1689 <                else if (!nonempty) {
1689 >                else if (n == 0) { // empty scan
1690                      long ms, ns; int block;
1691                      if (deadline == 0L)
1692                          ms = 0L;                       // untimed
# Line 1716 | Line 1711 | public class ForkJoinPool extends Abstra
1711       * find tasks either.
1712       */
1713      final void helpQuiescePool(WorkQueue w) {
1714 <        int prevSrc = w.source, fifo = w.id & FIFO;
1714 >        int prevSrc = w.source;
1715 >        int seed = ThreadLocalRandom.nextSecondarySeed();
1716 >        int r = seed >>> 16, step = r | 1;
1717          for (int source = prevSrc, released = -1;;) { // -1 until known
1718 <            WorkQueue[] ws;
1719 <            if (fifo != 0)
1720 <                w.localPollAndExec(0);
1721 <            else
1725 <                w.localPopAndExec(0);
1726 <            if (released == -1 && w.phase >= 0)
1718 >            ForkJoinTask<?> localTask; WorkQueue[] ws;
1719 >            while ((localTask = w.nextLocalTask()) != null)
1720 >                localTask.doExec();
1721 >            if (w.phase >= 0 && released == -1)
1722                  released = 1;
1723              boolean quiet = true, empty = true;
1724 <            int r = ThreadLocalRandom.nextSecondarySeed();
1725 <            if ((ws = workQueues) != null) {
1726 <                for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1727 <                    WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1728 <                    if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1729 <                        if ((b = q.base) - q.top < 0 &&
1730 <                            (a = q.array) != null && (al = a.length) > 0) {
1731 <                            int qid = q.id;
1724 >            int n = (ws = workQueues) == null ? 0 : ws.length;
1725 >            for (int m = n - 1; n > 0; r += step, --n) {
1726 >                WorkQueue q; int b;
1727 >                if ((q = ws[r & m]) != null) {
1728 >                    int qs = q.source;
1729 >                    if ((b = q.base) != q.top) {
1730 >                        quiet = empty = false;
1731 >                        ForkJoinTask<?>[] a; int cap, k;
1732 >                        int qid = q.id;
1733 >                        if ((a = q.array) != null && (cap = a.length) > 0) {
1734                              if (released == 0) {    // increment
1735                                  released = 1;
1736                                  CTL.getAndAdd(this, RC_UNIT);
1737                              }
1741                            int index = (al - 1) & b;
1738                              ForkJoinTask<?> t = (ForkJoinTask<?>)
1739 <                                QA.getAcquire(a, index);
1740 <                            if (t != null && b++ == q.base &&
1741 <                                QA.compareAndSet(a, index, t, null)) {
1739 >                                QA.getAcquire(a, k = (cap - 1) & b);
1740 >                            if (q.base == b++ && t != null &&
1741 >                                QA.compareAndSet(a, k, t, null)) {
1742                                  q.base = b;
1743 <                                w.source = source = q.id;
1743 >                                w.source = qid;
1744                                  t.doExec();
1745                                  w.source = source = prevSrc;
1746                              }
1751                            quiet = empty = false;
1752                            break;
1747                          }
1748 <                        else if ((q.source & QUIET) == 0)
1755 <                            quiet = false;
1748 >                        break;
1749                      }
1750 +                    else if ((qs & QUIET) == 0)
1751 +                        quiet = false;
1752                  }
1753              }
1754              if (quiet) {
# Line 1795 | Line 1790 | public class ForkJoinPool extends Abstra
1790                  origin = r & m;
1791                  step = h | 1;
1792              }
1793 <            for (int k = origin, oldSum = 0, checkSum = 0;;) {
1794 <                WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1795 <                if ((q = ws[k]) != null) {
1796 <                    checkSum += b = q.base;
1797 <                    if (b - q.top < 0 &&
1798 <                        (a = q.array) != null && (al = a.length) > 0) {
1799 <                        int index = (al - 1) & b;
1800 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1806 <                            QA.getAcquire(a, index);
1807 <                        if (t != null && b++ == q.base &&
1808 <                            QA.compareAndSet(a, index, t, null)) {
1809 <                            q.base = b;
1793 >            boolean nonempty = false;
1794 >            for (int i = origin, oldSum = 0, checkSum = 0;;) {
1795 >                WorkQueue q;
1796 >                if ((q = ws[i]) != null) {
1797 >                    int b; ForkJoinTask<?> t;
1798 >                    if ((b = q.base) != q.top) {
1799 >                        nonempty = true;
1800 >                        if ((t = q.poll()) != null)
1801                              return t;
1811                        }
1812                        else
1813                            break; // restart
1802                      }
1803 +                    else
1804 +                        checkSum += b + q.id;
1805                  }
1806 <                if ((k = (k + step) & m) == origin) {
1807 <                    if (oldSum == (oldSum = checkSum))
1806 >                if ((i = (i + step) & m) == origin) {
1807 >                    if (!nonempty && oldSum == (oldSum = checkSum))
1808                          break rescan;
1809                      checkSum = 0;
1810 +                    nonempty = false;
1811                  }
1812              }
1813          }
# Line 1830 | Line 1821 | public class ForkJoinPool extends Abstra
1821       */
1822      final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1823          ForkJoinTask<?> t;
1824 <        if (w != null &&
1825 <            (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1826 <            return t;
1836 <        else
1837 <            return pollScan(false);
1824 >        if (w == null || (t = w.nextLocalTask()) == null)
1825 >            t = pollScan(false);
1826 >        return t;
1827      }
1828  
1829      // External operations
# Line 1852 | Line 1841 | public class ForkJoinPool extends Abstra
1841              r = ThreadLocalRandom.getProbe();
1842          }
1843          for (;;) {
1844 +            WorkQueue q;
1845              int md = mode, n;
1846              WorkQueue[] ws = workQueues;
1847              if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1848                  throw new RejectedExecutionException();
1849 <            else {
1850 <                WorkQueue q;
1851 <                boolean push = false, grow = false;
1852 <                if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1853 <                    Object lock = workerNamePrefix;
1854 <                    int qid = (r | QUIET) & ~(FIFO | OWNED);
1855 <                    q = new WorkQueue(this, null);
1856 <                    q.id = qid;
1857 <                    q.source = QUIET;
1858 <                    q.phase = QLOCK;          // lock queue
1859 <                    if (lock != null) {
1860 <                        synchronized (lock) { // lock pool to install
1861 <                            int i;
1862 <                            if ((ws = workQueues) != null &&
1863 <                                (n = ws.length) > 0 &&
1874 <                                ws[i = qid & (n - 1) & SQMASK] == null) {
1875 <                                ws[i] = q;
1876 <                                push = grow = true;
1877 <                            }
1878 <                        }
1849 >            else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
1850 >                int qid = (r | QUIET) & ~(FIFO | OWNED);
1851 >                Object lock = workerNamePrefix;
1852 >                ForkJoinTask<?>[] qa =
1853 >                    new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1854 >                q = new WorkQueue(this, null);
1855 >                q.array = qa;
1856 >                q.id = qid;
1857 >                q.source = QUIET;
1858 >                if (lock != null) {     // unless disabled, lock pool to install
1859 >                    synchronized (lock) {
1860 >                        WorkQueue[] vs; int i, vn;
1861 >                        if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
1862 >                            vs[i = qid & (vn - 1) & SQMASK] == null)
1863 >                            vs[i] = q;  // else another thread already installed
1864                      }
1865                  }
1866 <                else if (q.tryLockSharedQueue()) {
1867 <                    int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1868 <                    if ((a = q.array) != null && (al = a.length) > 0 &&
1869 <                        al - 1 + (d = b - s) > 0) {
1870 <                        a[(al - 1) & s] = task;
1886 <                        q.top = s + 1;        // relaxed writes OK here
1887 <                        q.phase = 0;
1888 <                        if (d < 0 && q.base - s < -1)
1889 <                            break;            // no signal needed
1890 <                    }
1891 <                    else
1892 <                        grow = true;
1893 <                    push = true;
1894 <                }
1895 <                if (push) {
1896 <                    if (grow) {
1897 <                        try {
1898 <                            q.growArray();
1899 <                            int s = q.top, al; ForkJoinTask<?>[] a;
1900 <                            if ((a = q.array) != null && (al = a.length) > 0) {
1901 <                                a[(al - 1) & s] = task;
1902 <                                q.top = s + 1;
1903 <                            }
1904 <                        } finally {
1905 <                            q.phase = 0;
1906 <                        }
1907 <                    }
1866 >            }
1867 >            else if (!q.tryLockPhase()) // move if busy
1868 >                r = ThreadLocalRandom.advanceProbe(r);
1869 >            else {
1870 >                if (q.lockedPush(task))
1871                      signalWork();
1872 <                    break;
1910 <                }
1911 <                else                          // move if busy
1912 <                    r = ThreadLocalRandom.advanceProbe(r);
1872 >                return;
1873              }
1874          }
1875      }
# Line 1951 | Line 1911 | public class ForkJoinPool extends Abstra
1911          return ((ws = workQueues) != null &&
1912                  (n = ws.length) > 0 &&
1913                  (w = ws[(n - 1) & r & SQMASK]) != null &&
1914 <                w.trySharedUnpush(task));
1914 >                w.tryLockedUnpush(task));
1915      }
1916  
1917      /**
# Line 1962 | Line 1922 | public class ForkJoinPool extends Abstra
1922          WorkQueue[] ws; WorkQueue w; int n;
1923          return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1924                  (w = ws[(n - 1) & r & SQMASK]) != null) ?
1925 <            w.sharedHelpCC(task, maxTasks) : 0;
1925 >            w.helpCC(task, maxTasks, true) : 0;
1926      }
1927  
1928      /**
# Line 1977 | Line 1937 | public class ForkJoinPool extends Abstra
1937       */
1938      final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1939                             int maxTasks) {
1940 <        return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
1940 >        return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
1941      }
1942  
1943      /**
# Line 2107 | Line 2067 | public class ForkJoinPool extends Abstra
2067                                  } catch (Throwable ignore) {
2068                                  }
2069                              }
2070 <                            checkSum += w.base + w.id;
2070 >                            checkSum += w.base + w.phase;
2071                          }
2072                      }
2073                  }
# Line 2600 | Line 2560 | public class ForkJoinPool extends Abstra
2560       * @return the number of worker threads
2561       */
2562      public int getRunningThreadCount() {
2603        int rc = 0;
2563          WorkQueue[] ws; WorkQueue w;
2564 +        VarHandle.acquireFence();
2565 +        int rc = 0;
2566          if ((ws = workQueues) != null) {
2567              for (int i = 1; i < ws.length; i += 2) {
2568                  if ((w = ws[i]) != null && w.isApparentlyUnblocked())
# Line 2649 | Line 2610 | public class ForkJoinPool extends Abstra
2610                  if ((ws = workQueues) != null) {
2611                      for (int i = 1; i < ws.length; i += 2) {
2612                          if ((v = ws[i]) != null) {
2613 <                            if ((v.source & QUIET) == 0)
2613 >                            if (v.source > 0)
2614                                  return false;
2615                              --tc;
2616                          }
# Line 2695 | Line 2656 | public class ForkJoinPool extends Abstra
2656       * @return the number of queued tasks
2657       */
2658      public long getQueuedTaskCount() {
2698        long count = 0;
2659          WorkQueue[] ws; WorkQueue w;
2660 +        VarHandle.acquireFence();
2661 +        int count = 0;
2662          if ((ws = workQueues) != null) {
2663              for (int i = 1; i < ws.length; i += 2) {
2664                  if ((w = ws[i]) != null)
# Line 2714 | Line 2676 | public class ForkJoinPool extends Abstra
2676       * @return the number of queued submissions
2677       */
2678      public int getQueuedSubmissionCount() {
2717        int count = 0;
2679          WorkQueue[] ws; WorkQueue w;
2680 +        VarHandle.acquireFence();
2681 +        int count = 0;
2682          if ((ws = workQueues) != null) {
2683              for (int i = 0; i < ws.length; i += 2) {
2684                  if ((w = ws[i]) != null)
# Line 2733 | Line 2696 | public class ForkJoinPool extends Abstra
2696       */
2697      public boolean hasQueuedSubmissions() {
2698          WorkQueue[] ws; WorkQueue w;
2699 +        VarHandle.acquireFence();
2700          if ((ws = workQueues) != null) {
2701              for (int i = 0; i < ws.length; i += 2) {
2702                  if ((w = ws[i]) != null && !w.isEmpty())
# Line 2771 | Line 2735 | public class ForkJoinPool extends Abstra
2735       * @return the number of elements transferred
2736       */
2737      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2774        int count = 0;
2738          WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2739 +        VarHandle.acquireFence();
2740 +        int count = 0;
2741          if ((ws = workQueues) != null) {
2742              for (int i = 0; i < ws.length; ++i) {
2743                  if ((w = ws[i]) != null) {
# Line 2795 | Line 2760 | public class ForkJoinPool extends Abstra
2760       */
2761      public String toString() {
2762          // Use a single pass through workQueues to collect counts
2763 <        long qt = 0L, qs = 0L; int rc = 0;
2763 >        int md = mode; // read volatile fields first
2764 >        long c = ctl;
2765          long st = stealCount;
2766 +        long qt = 0L, qs = 0L; int rc = 0;
2767          WorkQueue[] ws; WorkQueue w;
2768          if ((ws = workQueues) != null) {
2769              for (int i = 0; i < ws.length; ++i) {
# Line 2814 | Line 2781 | public class ForkJoinPool extends Abstra
2781              }
2782          }
2783  
2817        int md = mode;
2784          int pc = (md & SMASK);
2819        long c = ctl;
2785          int tc = pc + (short)(c >>> TC_SHIFT);
2786          int ac = pc + (int)(c >> RC_SHIFT);
2787          if (ac < 0) // ignore transient negative
# Line 3102 | Line 3067 | public class ForkJoinPool extends Abstra
3067       */
3068      public static void managedBlock(ManagedBlocker blocker)
3069          throws InterruptedException {
3070 +        if (blocker == null) throw new NullPointerException();
3071          ForkJoinPool p;
3072          ForkJoinWorkerThread wt;
3073          WorkQueue w;
# Line 3134 | Line 3100 | public class ForkJoinPool extends Abstra
3100       * available or blocker is released.
3101       */
3102      static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3103 <        if (blocker != null && (e instanceof ForkJoinPool)) {
3103 >        if (e instanceof ForkJoinPool) {
3104              WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3105              ForkJoinPool p = (ForkJoinPool)e;
3106              Thread thread = Thread.currentThread();
# Line 3146 | Line 3112 | public class ForkJoinPool extends Abstra
3112                  w = ws[(n - 1) & r & SQMASK];
3113              else
3114                  w = null;
3115 <            if (w != null) {
3116 <                for (;;) {
3151 <                    int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3152 <                    if ((a = w.array) != null && (d = b - s) < 0 &&
3153 <                        (al = a.length) > 0) {
3154 <                        int index = (al - 1) & b;
3155 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
3156 <                            QA.getAcquire(a, index);
3157 <                        if (blocker.isReleasable())
3158 <                            break;
3159 <                        else if (b++ == w.base) {
3160 <                            if (t == null) {
3161 <                                if (d == -1)
3162 <                                    break;
3163 <                            }
3164 <                            else if (!(t instanceof CompletableFuture.
3165 <                                  AsynchronousCompletionTask))
3166 <                                break;
3167 <                            else if (QA.compareAndSet(a, index, t, null)) {
3168 <                                w.base = b;
3169 <                                t.doExec();
3170 <                            }
3171 <                        }
3172 <                    }
3173 <                    else
3174 <                        break;
3175 <                }
3176 <            }
3115 >            if (w != null)
3116 >                w.helpAsyncBlocker(blocker);
3117          }
3118      }
3119  
# Line 3192 | Line 3132 | public class ForkJoinPool extends Abstra
3132      // VarHandle mechanics
3133      private static final VarHandle CTL;
3134      private static final VarHandle MODE;
3135 <    private static final VarHandle QA;
3135 >    static final VarHandle QA;
3136  
3137      static {
3138          try {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines