ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.338 by jsr166, Wed Aug 23 20:17:53 2017 UTC vs.
Revision 1.339 by dl, Tue Jan 16 17:36:32 2018 UTC

# Line 155 | Line 155 | public class ForkJoinPool extends Abstra
155       * functionality and control for a set of worker threads:
156       * Submissions from non-FJ threads enter into submission queues.
157       * Workers take these tasks and typically split them into subtasks
158 <     * that may be stolen by other workers.  Preference rules give
159 <     * first priority to processing tasks from their own queues (LIFO
160 <     * or FIFO, depending on mode), then to randomized FIFO steals of
161 <     * tasks in other queues.  This framework began as vehicle for
162 <     * supporting tree-structured parallelism using work-stealing.
163 <     * Over time, its scalability advantages led to extensions and
164 <     * changes to better support more diverse usage contexts.  Because
165 <     * most internal methods and nested classes are interrelated,
166 <     * their main rationale and descriptions are presented here;
167 <     * individual methods and nested classes contain only brief
168 <     * comments about details.
158 >     * that may be stolen by other workers. Work-stealing based on
159 >     * randomized scans generally leads to better throughput than
160 >     * "work dealing" in which producers assign tasks to idle threads,
161 >     * in part because threads that have finished other tasks before
162 >     * the signalled thread wakes up (which can be a long time) can
163 >     * take the task instead.  Preference rules give first priority to
164 >     * processing tasks from their own queues (LIFO or FIFO, depending
165 >     * on mode), then to randomized FIFO steals of tasks in other
166 >     * queues.  This framework began as vehicle for supporting
167 >     * tree-structured parallelism using work-stealing.  Over time,
168 >     * its scalability advantages led to extensions and changes to
169 >     * better support more diverse usage contexts.  Because most
170 >     * internal methods and nested classes are interrelated, their
171 >     * main rationale and descriptions are presented here; individual
172 >     * methods and nested classes contain only brief comments about
173 >     * details.
174       *
175       * WorkQueues
176       * ==========
# Line 198 | Line 203 | public class ForkJoinPool extends Abstra
203       *
204       * (The actual code needs to null-check and size-check the array,
205       * uses masking, not mod, for indexing a power-of-two-sized array,
206 <     * properly fences accesses, and possibly signals waiting workers
207 <     * to start scanning -- see below.)  Both a successful pop and
208 <     * poll mainly entail a CAS of a slot from non-null to null.
206 >     * adds a release fence for publication,, and possibly signals
207 >     * waiting workers to start scanning -- see below.)  Both a
208 >     * successful pop and poll mainly entail a CAS of a slot from
209 >     * non-null to null.
210       *
211       * The pop operation (always performed by owner) is:
212       *   if ((the task at top slot is not null) and
# Line 212 | Line 218 | public class ForkJoinPool extends Abstra
218       *        (CAS slot to null))
219       *           increment base and return task;
220       *
221 <     * There are several variants of each of these. In particular,
222 <     * almost all uses of poll occur within scan operations that also
223 <     * interleave contention tracking (with associated code sprawl.)
221 >     * There are several variants of each of these. Most uses occur
222 >     * within operations that also interleave contention or emptiness
223 >     * tracking or inspection of elements before extracting them, so
224 >     * must interleave these with the above code. When performed by
225 >     * owner, getAndSet is used instead of CAS (see for example method
226 >     * nextLocalTask) which is usually more efficient, and possible
227 >     * because the top index cannot independently change during the
228 >     * operation.
229       *
230       * Memory ordering.  See "Correct and Efficient Work-Stealing for
231       * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
# Line 223 | Line 234 | public class ForkJoinPool extends Abstra
234       * algorithms similar to (but different than) the one used here.
235       * Extracting tasks in array slots via (fully fenced) CAS provides
236       * primary synchronization. The base and top indices imprecisely
237 <     * guide where to extract from. We do not always require strict
238 <     * orderings of array and index updates, so sometimes let them be
239 <     * subject to compiler and processor reorderings. However, the
240 <     * volatile "base" index also serves as a basis for memory
241 <     * ordering: Slot accesses are preceded by a read of base,
242 <     * ensuring happens-before ordering with respect to stealers (so
243 <     * the slots themselves can be read via plain array reads.)  The
244 <     * only other memory orderings relied on are maintained in the
245 <     * course of signalling and activation (see below).  A check that
246 <     * base == top indicates (momentary) emptiness, but otherwise may
247 <     * err on the side of possibly making the queue appear nonempty
248 <     * when a push, pop, or poll have not fully committed, or making
249 <     * it appear empty when an update of top has not yet been visibly
250 <     * written.  (Method isEmpty() checks the case of a partially
251 <     * completed removal of the last element.)  Because of this, the
252 <     * poll operation, considered individually, is not wait-free. One
253 <     * thief cannot successfully continue until another in-progress
254 <     * one (or, if previously empty, a push) visibly completes.
255 <     * However, in the aggregate, we ensure at least probabilistic
237 >     * guide where to extract from. We do not usually require strict
238 >     * orderings of array and index updates. Many index accesses use
239 >     * plain mode, with ordering constrained by surrounding context
240 >     * (usually with respect to element CASes or the two WorkQueue
241 >     * volatile fields source and phase). When not otherwise already
242 >     * constrained, reads of "base" by queue owners use acquire-mode,
243 >     * and some externally callable methods preface accesses with
244 >     * acquire fences.  Additionally, to ensure that index update
245 >     * writes are not coalesced or postponed in loops etc, "opaque"
246 >     * mode is used in a few cases where timely writes are not
247 >     * otherwise ensured. The "locked" versions of push- and pop-
248 >     * based methods for shared queues differ from owned versions
249 >     * because locking already forces some of the ordering.
250 >     *
251 >     * Because indices and slot contents cannot always be consistent,
252 >     * a check that base == top indicates (momentary) emptiness, but
253 >     * otherwise may err on the side of possibly making the queue
254 >     * appear nonempty when a push, pop, or poll have not fully
255 >     * committed, or making it appear empty when an update of top has
256 >     * not yet been visibly written.  (Method isEmpty() checks the
257 >     * case of a partially completed removal of the last element.)
258 >     * Because of this, the poll operation, considered individually,
259 >     * is not wait-free. One thief cannot successfully continue until
260 >     * another in-progress one (or, if previously empty, a push)
261 >     * visibly completes.  This can stall threads when required to
262 >     * consume from a given queue (see method poll()).  However, in
263 >     * the aggregate, we ensure at least probabilistic
264       * non-blockingness.  If an attempted steal fails, a scanning
265       * thief chooses a different random victim target to try next. So,
266       * in order for one thief to progress, it suffices for any
267 <     * in-progress poll or new push on any empty queue to
249 <     * complete.
267 >     * in-progress poll or new push on any empty queue to complete.
268       *
269       * This approach also enables support of a user mode in which
270       * local task processing is in FIFO, not LIFO order, simply by
# Line 267 | Line 285 | public class ForkJoinPool extends Abstra
285       * different position to use or create other queues -- they block
286       * only when creating and registering new queues. Because it is
287       * used only as a spinlock, unlocking requires only a "releasing"
288 <     * store (using setRelease).
288 >     * store (using setRelease) unless otherwise signalling.
289       *
290       * Management
291       * ==========
# Line 288 | Line 306 | public class ForkJoinPool extends Abstra
306       *
307       * Field "ctl" contains 64 bits holding information needed to
308       * atomically decide to add, enqueue (on an event queue), and
309 <     * dequeue (and release)-activate workers.  To enable this
310 <     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
311 <     * far in excess of normal operating range) to allow ids, counts,
312 <     * and their negations (used for thresholding) to fit into 16bit
309 >     * dequeue and release workers.  To enable this packing, we
310 >     * restrict maximum parallelism to (1<<15)-1 (which is far in
311 >     * excess of normal operating range) to allow ids, counts, and
312 >     * their negations (used for thresholding) to fit into 16bit
313       * subfields.
314       *
315       * Field "mode" holds configuration parameters as well as lifetime
# Line 303 | Line 321 | public class ForkJoinPool extends Abstra
321       * lock (using field workerNamePrefix as lock), but is otherwise
322       * concurrently readable, and accessed directly. We also ensure
323       * that uses of the array reference itself never become too stale
324 <     * in case of resizing.  To simplify index-based operations, the
325 <     * array size is always a power of two, and all readers must
326 <     * tolerate null slots. Worker queues are at odd indices. Shared
327 <     * (submission) queues are at even indices, up to a maximum of 64
328 <     * slots, to limit growth even if array needs to expand to add
329 <     * more workers. Grouping them together in this way simplifies and
330 <     * speeds up task scanning.
324 >     * in case of resizing, by arranging that (re-)reads are separated
325 >     * by at least one acquiring read access.  To simplify index-based
326 >     * operations, the array size is always a power of two, and all
327 >     * readers must tolerate null slots. Worker queues are at odd
328 >     * indices. Shared (submission) queues are at even indices, up to
329 >     * a maximum of 64 slots, to limit growth even if array needs to
330 >     * expand to add more workers. Grouping them together in this way
331 >     * simplifies and speeds up task scanning.
332       *
333       * All worker thread creation is on-demand, triggered by task
334       * submissions, replacement of terminated workers, and/or
# Line 387 | Line 406 | public class ForkJoinPool extends Abstra
406       * releases so usage requires care -- seeing a negative phase does
407       * not guarantee that the worker is available. When queued, the
408       * lower 16 bits of scanState must hold its pool index. So we
409 <     * place the index there upon initialization (see registerWorker)
410 <     * and otherwise keep it there or restore it when necessary.
409 >     * place the index there upon initialization and otherwise keep it
410 >     * there or restore it when necessary.
411       *
412       * The ctl field also serves as the basis for memory
413       * synchronization surrounding activation. This uses a more
# Line 396 | Line 415 | public class ForkJoinPool extends Abstra
415       * consumers sync with each other by both writing/CASing ctl (even
416       * if to its current value).  This would be extremely costly. So
417       * we relax it in several ways: (1) Producers only signal when
418 <     * their queue is empty. Other workers propagate this signal (in
419 <     * method scan) when they find tasks; to further reduce flailing,
420 <     * each worker signals only one other per activation. (2) Workers
421 <     * only enqueue after scanning (see below) and not finding any
422 <     * tasks.  (3) Rather than CASing ctl to its current value in the
423 <     * common case where no action is required, we reduce write
424 <     * contention by equivalently prefacing signalWork when called by
425 <     * an external task producer using a memory access with
407 <     * full-volatile semantics or a "fullFence".
418 >     * their queue is seen as empty at some point during a push
419 >     * operation. (2) Other workers propagate this signal when they
420 >     * find tasks. (3) Workers only enqueue after scanning (see below)
421 >     * and not finding any tasks.  (4) Rather than CASing ctl to its
422 >     * current value in the common case where no action is required,
423 >     * we reduce write contention by equivalently prefacing signalWork
424 >     * when called by an external task producer using a memory access
425 >     * with full-volatile semantics or a "fullFence".
426       *
427       * Almost always, too many signals are issued. A task producer
428       * cannot in general tell if some existing worker is in the midst
# Line 416 | Line 434 | public class ForkJoinPool extends Abstra
434       * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
435       * computations involving only a few workers.
436       *
437 <     * Scanning. Method runWorker performs top-level scanning for
438 <     * tasks.  Each scan traverses and tries to poll from each queue
439 <     * starting at a random index and circularly stepping. Scans are
440 <     * not performed in ideal random permutation order, to reduce
441 <     * cacheline contention.  The pseudorandom generator need not have
437 >     * Scanning. Method scan (from runWorker) performs top-level
438 >     * scanning for tasks. (Similar scans appear in helpQuiesce and
439 >     * pollScan.)  Each scan traverses and tries to poll from each
440 >     * queue starting at a random index. Scans are not performed in
441 >     * ideal random permutation order, to reduce cacheline
442 >     * contention. The pseudorandom generator need not have
443       * high-quality statistical properties in the long term, but just
444       * within computations; We use Marsaglia XorShifts (often via
445       * ThreadLocalRandom.nextSecondarySeed), which are cheap and
446 <     * suffice. Scanning also employs contention reduction: When
446 >     * suffice. Scanning also includes contention reduction: When
447       * scanning workers fail to extract an apparently existing task,
448 <     * they soon restart at a different pseudorandom index.  This
449 <     * improves throughput when many threads are trying to take tasks
450 <     * from few queues, which can be common in some usages.  Scans do
451 <     * not otherwise explicitly take into account core affinities,
452 <     * loads, cache localities, etc, However, they do exploit temporal
453 <     * locality (which usually approximates these) by preferring to
454 <     * re-poll (at most #workers times) from the same queue after a
455 <     * successful poll before trying others.
448 >     * they soon restart at a different pseudorandom index.  This form
449 >     * of backoff improves throughput when many threads are trying to
450 >     * take tasks from few queues, which can be common in some usages.
451 >     * Scans do not otherwise explicitly take into account core
452 >     * affinities, loads, cache localities, etc, However, they do
453 >     * exploit temporal locality (which usually approximates these) by
454 >     * preferring to re-poll from the same queue after a successful
455 >     * poll before trying others (see method topLevelExec).
456       *
457       * Trimming workers. To release resources after periods of lack of
458       * use, a worker starting to wait when the pool is quiescent will
459 <     * time out and terminate (see method scan) if the pool has
459 >     * time out and terminate (see method runWorker) if the pool has
460       * remained quiescent for period given by field keepAlive.
461       *
462       * Shutdown and Termination. A call to shutdownNow invokes
# Line 505 | Line 524 | public class ForkJoinPool extends Abstra
524       * time. Some previous versions of this class employed immediate
525       * compensations for any blocked join. However, in practice, the
526       * vast majority of blockages are transient byproducts of GC and
527 <     * other JVM or OS activities that are made worse by replacement.
528 <     * Rather than impose arbitrary policies, we allow users to
529 <     * override the default of only adding threads upon apparent
530 <     * starvation.  The compensation mechanism may also be bounded.
531 <     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
532 <     * JVMs to cope with programming errors and abuse before running
533 <     * out of resources to do so.
527 >     * other JVM or OS activities that are made worse by replacement
528 >     * when they cause longer-term oversubscription.  Rather than
529 >     * impose arbitrary policies, we allow users to override the
530 >     * default of only adding threads upon apparent starvation.  The
531 >     * compensation mechanism may also be bounded.  Bounds for the
532 >     * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
533 >     * with programming errors and abuse before running out of
534 >     * resources to do so.
535       *
536       * Common Pool
537       * ===========
# Line 544 | Line 564 | public class ForkJoinPool extends Abstra
564       * in ForkJoinWorkerThread) may be JVM-dependent and must access
565       * particular Thread class fields to achieve this effect.
566       *
567 +     * Memory placement
568 +     * ================
569 +     *
570 +     * Performance can be very sensitive to placement of instances of
571 +     * ForkJoinPool and WorkQueues and their queue arrays. To reduce
572 +     * false-sharing impact, the @Contended annotation isolates
573 +     * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
574 +     * field. WorkQueue arrays are allocated (by their threads) with
575 +     * larger initial sizes than most ever need, mostly to reduce
576 +     * false sharing current garbage collectors that use cardmark
577 +     * tables.
578 +     *
579       * Style notes
580       * ===========
581       *
# Line 551 | Line 583 | public class ForkJoinPool extends Abstra
583       * awkward and ugly, but also reflects the need to control
584       * outcomes across the unusual cases that arise in very racy code
585       * with very few invariants. All fields are read into locals
586 <     * before use, and null-checked if they are references.  This is
587 <     * usually done in a "C"-like style of listing declarations at the
588 <     * heads of methods or blocks, and using inline assignments on
589 <     * first encounter.  Nearly all explicit checks lead to
590 <     * bypass/return, not exception throws, because they may
591 <     * legitimately arise due to cancellation/revocation during
592 <     * shutdown.
586 >     * before use, and null-checked if they are references.  Array
587 >     * accesses using masked indices include checks (that are always
588 >     * true) that the array length is non-zero to avoid compilers
589 >     * inserting more expensive traps.  This is usually done in a
590 >     * "C"-like style of listing declarations at the heads of methods
591 >     * or blocks, and using inline assignments on first encounter.
592 >     * Nearly all explicit checks lead to bypass/return, not exception
593 >     * throws, because they may legitimately arise due to
594 >     * cancellation/revocation during shutdown.
595       *
596       * There is a lot of representation-level coupling among classes
597       * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
# Line 567 | Line 601 | public class ForkJoinPool extends Abstra
601       * representations will need to be accompanied by algorithmic
602       * changes anyway. Several methods intrinsically sprawl because
603       * they must accumulate sets of consistent reads of fields held in
604 <     * local variables.  There are also other coding oddities
605 <     * (including several unnecessary-looking hoisted null checks)
606 <     * that help some methods perform reasonably even when interpreted
607 <     * (not compiled).
604 >     * local variables. Some others are artificially broken up to
605 >     * reduce producer/consumer imbalances due to dynamic compilation.
606 >     * There are also other coding oddities (including several
607 >     * unnecessary-looking hoisted null checks) that help some methods
608 >     * perform reasonably even when interpreted (not compiled).
609       *
610       * The order of declarations in this file is (with a few exceptions):
611       * (1) Static utility functions
# Line 674 | Line 709 | public class ForkJoinPool extends Abstra
709      static final int DORMANT      = QUIET | UNSIGNALLED;
710  
711      /**
712 <     * The maximum number of local polls from the same queue before
712 >     * Initial capacity of work-stealing queue array.
713 >     * Must be a power of two, at least 2.
714 >     */
715 >    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
716 >
717 >    /**
718 >     * Maximum capacity for queue arrays. Must be a power of two less
719 >     * than or equal to 1 << (31 - width of array entry) to ensure
720 >     * lack of wraparound of index calculations, but defined to a
721 >     * value a bit less than this to help users trap runway programs
722 >     * before saturating systems.
723 >     */
724 >    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
725 >
726 >    /**
727 >     * The maximum number of top-level polls from local queue before
728       * checking others. This is a safeguard against infinitely unfair
729       * looping under unbounded user task recursion, and must be larger
730       * than plausible cases of intentional bounded task recursion.
731       */
732 <    static final int POLL_LIMIT = 1 << 10;
732 >    static final int SELF_CONSUME_LIMIT = 1 << 10;
733  
734      /**
735       * Queues supporting work-stealing as well as external task
736       * submission. See above for descriptions and algorithms.
687     * Performance on most platforms is very sensitive to placement of
688     * instances of both WorkQueues and their arrays -- we absolutely
689     * do not want multiple WorkQueue instances or multiple queue
690     * arrays sharing cache lines. The @Contended annotation alerts
691     * JVMs to try to keep instances apart.
737       */
738      @jdk.internal.vm.annotation.Contended
739      static final class WorkQueue {
740 <
741 <        /**
742 <         * Capacity of work-stealing queue array upon initialization.
743 <         * Must be a power of two; at least 4, but should be larger to
699 <         * reduce or eliminate cacheline sharing among queues.
700 <         * Currently, it is much larger, as a partial workaround for
701 <         * the fact that JVMs often place arrays in locations that
702 <         * share GC bookkeeping (especially cardmarks) such that
703 <         * per-write accesses encounter serious memory contention.
704 <         */
705 <        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
706 <
707 <        /**
708 <         * Maximum size for queue arrays. Must be a power of two less
709 <         * than or equal to 1 << (31 - width of array entry) to ensure
710 <         * lack of wraparound of index calculations, but defined to a
711 <         * value a bit less than this to help users trap runaway
712 <         * programs before saturating systems.
713 <         */
714 <        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
715 <
716 <        // Instance fields
740 >        volatile int source;       // source queue id, or sentinel
741 >        int id;                    // pool index, mode, tag
742 >        int base;                  // index of next slot for poll
743 >        int top;                   // index of next slot for push
744          volatile int phase;        // versioned, negative: queued, 1: locked
745          int stackPred;             // pool stack (ctl) predecessor link
746          int nsteals;               // number of steals
747 <        int id;                    // index, mode, tag
721 <        volatile int source;       // source queue id, or sentinel
722 <        volatile int base;         // index of next slot for poll
723 <        int top;                   // index of next slot for push
724 <        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
747 >        ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
748          final ForkJoinPool pool;   // the containing pool (may be null)
749          final ForkJoinWorkerThread owner; // owning thread or null if shared
750  
# Line 733 | Line 756 | public class ForkJoinPool extends Abstra
756          }
757  
758          /**
759 +         * Tries to lock shared queue by CASing phase field.
760 +         */
761 +        final boolean tryLockPhase() {
762 +            return PHASE.compareAndSet(this, 0, 1);
763 +        }
764 +
765 +        final void releasePhaseLock() {
766 +            PHASE.setRelease(this, 0);
767 +        }
768 +
769 +        /**
770           * Returns an exportable index (used by ForkJoinWorkerThread).
771           */
772          final int getPoolIndex() {
# Line 743 | Line 777 | public class ForkJoinPool extends Abstra
777           * Returns the approximate number of tasks in the queue.
778           */
779          final int queueSize() {
780 <            int n = base - top;       // read base first
780 >            int n = (int)BASE.getAcquire(this) - top;
781              return (n >= 0) ? 0 : -n; // ignore transient negative
782          }
783  
# Line 753 | Line 787 | public class ForkJoinPool extends Abstra
787           * near-empty queue has at least one unclaimed task.
788           */
789          final boolean isEmpty() {
790 <            ForkJoinTask<?>[] a; int n, al, b;
790 >            ForkJoinTask<?>[] a; int n, cap, b;
791 >            VarHandle.acquireFence(); // needed by external callers
792              return ((n = (b = base) - top) >= 0 || // possibly one task
793                      (n == -1 && ((a = array) == null ||
794 <                                 (al = a.length) == 0 ||
795 <                                 a[(al - 1) & b] == null)));
794 >                                 (cap = a.length) == 0 ||
795 >                                 a[(cap - 1) & b] == null)));
796          }
797  
763
798          /**
799           * Pushes a task. Call only by owner in unshared queues.
800           *
# Line 768 | Line 802 | public class ForkJoinPool extends Abstra
802           * @throws RejectedExecutionException if array cannot be resized
803           */
804          final void push(ForkJoinTask<?> task) {
805 <            int s = top; ForkJoinTask<?>[] a; int al, d;
806 <            if ((a = array) != null && (al = a.length) > 0) {
807 <                int index = (al - 1) & s;
808 <                ForkJoinPool p = pool;
805 >            ForkJoinTask<?>[] a;
806 >            int s = top, d, cap;
807 >            ForkJoinPool p = pool;
808 >            if ((a = array) != null && (cap = a.length) > 0) {
809 >                QA.setRelease(a, (cap - 1) & s, task);
810                  top = s + 1;
811 <                QA.setRelease(a, index, task);
777 <                if ((d = base - s) == 0 && p != null) {
811 >                if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) {
812                      VarHandle.fullFence();
813                      p.signalWork();
814                  }
815 <                else if (d + al == 1)
816 <                    growArray();
815 >                else if (d + cap - 1 == 0)
816 >                    growArray(false);
817              }
818          }
819  
820          /**
821 <         * Initializes or doubles the capacity of array. Call either
822 <         * by owner or with lock held -- it is OK for base, but not
789 <         * top, to move while resizings are in progress.
821 >         * Version of push for shared queues. Call only with phase lock held.
822 >         * @return true if should signal work
823           */
824 <        final ForkJoinTask<?>[] growArray() {
825 <            ForkJoinTask<?>[] oldA = array;
826 <            int oldSize = oldA != null ? oldA.length : 0;
827 <            int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
828 <            if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
829 <                throw new RejectedExecutionException("Queue capacity exceeded");
830 <            int oldMask, t, b;
831 <            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
832 <            if (oldA != null && (oldMask = oldSize - 1) > 0 &&
833 <                (t = top) - (b = base) > 0) {
834 <                int mask = size - 1;
835 <                do { // emulate poll from old array, push to new array
836 <                    int index = b & oldMask;
837 <                    ForkJoinTask<?> x = (ForkJoinTask<?>)
838 <                        QA.getAcquire(oldA, index);
839 <                    if (x != null &&
807 <                        QA.compareAndSet(oldA, index, x, null))
808 <                        a[b & mask] = x;
809 <                } while (++b != t);
810 <                VarHandle.releaseFence();
811 <            }
812 <            return a;
824 >        final boolean lockedPush(ForkJoinTask<?> task) {
825 >            ForkJoinTask<?>[] a;
826 >            boolean signal = false;
827 >            int s = top, b = base, cap;
828 >            if ((a = array) != null && (cap = a.length) > 0) {
829 >                a[(cap - 1) & s] = task;
830 >                top = s + 1;
831 >                if (b - s + cap - 1 == 0)
832 >                    growArray(true);
833 >                else {
834 >                    phase = 0; // full volatile unlock
835 >                    if (base == s)
836 >                        signal = true;
837 >                }
838 >            }
839 >            return signal;
840          }
841  
842          /**
843 <         * Takes next task, if one exists, in LIFO order.  Call only
844 <         * by owner in unshared queues.
845 <         */
846 <        final ForkJoinTask<?> pop() {
847 <            int b = base, s = top, al, i; ForkJoinTask<?>[] a;
848 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
849 <                int index = (al - 1) & --s;
850 <                ForkJoinTask<?> t = (ForkJoinTask<?>)
851 <                    QA.get(a, index);
852 <                if (t != null &&
853 <                    QA.compareAndSet(a, index, t, null)) {
854 <                    top = s;
855 <                    VarHandle.releaseFence();
856 <                    return t;
843 >         * Doubles the capacity of array. Call either by owner or with
844 >         * lock held -- it is OK for base, but not top, to move while
845 >         * resizings are in progress.
846 >         */
847 >        final void growArray(boolean locked) {
848 >            ForkJoinTask<?>[] newA = null;
849 >            try {
850 >                ForkJoinTask<?>[] oldA; int oldSize, newSize;
851 >                if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
852 >                    (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
853 >                    newSize > 0) {
854 >                    try {
855 >                        newA = new ForkJoinTask<?>[newSize];
856 >                    } catch (OutOfMemoryError ex) {
857 >                    }
858 >                    if (newA != null) { // poll from old array, push to new
859 >                        int oldMask = oldSize - 1, newMask = newSize - 1;
860 >                        for (int s = top - 1, k = oldMask; k >= 0; --k) {
861 >                            ForkJoinTask<?> x = (ForkJoinTask<?>)
862 >                                QA.getAndSet(oldA, s & oldMask, null);
863 >                            if (x != null)
864 >                                newA[s-- & newMask] = x;
865 >                            else
866 >                                break;
867 >                        }
868 >                        array = newA;
869 >                        VarHandle.releaseFence();
870 >                    }
871                  }
872 +            } finally {
873 +                if (locked)
874 +                    phase = 0;
875              }
876 <            return null;
876 >            if (newA == null)
877 >                throw new RejectedExecutionException("Queue capacity exceeded");
878          }
879  
880          /**
881           * Takes next task, if one exists, in FIFO order.
882           */
883          final ForkJoinTask<?> poll() {
884 <            for (;;) {
885 <                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
886 <                if ((a = array) != null && (d = b - s) < 0 &&
887 <                    (al = a.length) > 0) {
888 <                    int index = (al - 1) & b;
889 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
890 <                        QA.getAcquire(a, index);
891 <                    if (b++ == base) {
892 <                        if (t != null) {
893 <                            if (QA.compareAndSet(a, index, t, null)) {
894 <                                base = b;
850 <                                return t;
851 <                            }
852 <                        }
853 <                        else if (d == -1)
854 <                            break; // now empty
884 >            int b, k, cap; ForkJoinTask<?>[] a;
885 >            while ((a = array) != null && (cap = a.length) > 0 &&
886 >                   (b = base) != top) {
887 >                ForkJoinTask<?> t = (ForkJoinTask<?>)
888 >                    QA.getAcquire(a, k = (cap - 1) & b);
889 >                if (base == b++) {
890 >                    if (t == null)
891 >                        Thread.yield(); // await index advance
892 >                    else if (QA.compareAndSet(a, k, t, null)) {
893 >                        BASE.setOpaque(this, b);
894 >                        return t;
895                      }
896                  }
857                else
858                    break;
897              }
898              return null;
899          }
# Line 864 | Line 902 | public class ForkJoinPool extends Abstra
902           * Takes next task, if one exists, in order specified by mode.
903           */
904          final ForkJoinTask<?> nextLocalTask() {
905 <            return ((id & FIFO) != 0) ? poll() : pop();
905 >            ForkJoinTask<?> t = null;
906 >            int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
907 >            if ((a = array) != null && (cap = a.length) > 0 &&
908 >                (d = (s = top) - (b = base)) > 0) {
909 >                if ((md & FIFO) == 0 || d == 1) {
910 >                    if ((t = (ForkJoinTask<?>)
911 >                         QA.getAndSet(a, (cap - 1) & --s, null)) != null)
912 >                        TOP.setOpaque(this, s);
913 >                }
914 >                else if ((t = (ForkJoinTask<?>)
915 >                          QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
916 >                    BASE.setOpaque(this, b);
917 >                }
918 >                else // on contention in FIFO mode, use regular poll
919 >                    t = poll();
920 >            }
921 >            return t;
922          }
923  
924          /**
925           * Returns next task, if one exists, in order specified by mode.
926           */
927          final ForkJoinTask<?> peek() {
928 <            int al; ForkJoinTask<?>[] a;
929 <            return ((a = array) != null && (al = a.length) > 0) ?
930 <                a[(al - 1) &
877 <                  ((id & FIFO) != 0 ? base : top - 1)] : null;
928 >            int cap; ForkJoinTask<?>[] a;
929 >            return ((a = array) != null && (cap = a.length) > 0) ?
930 >                a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
931          }
932  
933          /**
934           * Pops the given task only if it is at the current top.
935           */
936          final boolean tryUnpush(ForkJoinTask<?> task) {
937 <            int b = base, s = top, al; ForkJoinTask<?>[] a;
938 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
939 <                int index = (al - 1) & --s;
940 <                if (QA.compareAndSet(a, index, task, null)) {
937 >            boolean popped = false;
938 >            int s, cap; ForkJoinTask<?>[] a;
939 >            if ((a = array) != null && (cap = a.length) > 0 &&
940 >                (s = top) != base &&
941 >                (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
942 >                TOP.setOpaque(this, s);
943 >            return popped;
944 >        }
945 >
946 >        /**
947 >         * Shared version of tryUnpush.
948 >         */
949 >        final boolean tryLockedUnpush(ForkJoinTask<?> task) {
950 >            boolean popped = false;
951 >            int s = top - 1, k, cap; ForkJoinTask<?>[] a;
952 >            if ((a = array) != null && (cap = a.length) > 0 &&
953 >                a[k = (cap - 1) & s] == task && tryLockPhase()) {
954 >                if (top == s + 1 && array == a &&
955 >                    (popped = QA.compareAndSet(a, k, task, null)))
956                      top = s;
957 <                    VarHandle.releaseFence();
890 <                    return true;
891 <                }
957 >                releasePhaseLock();
958              }
959 <            return false;
959 >            return popped;
960          }
961  
962          /**
# Line 904 | Line 970 | public class ForkJoinPool extends Abstra
970          // Specialized execution methods
971  
972          /**
973 <         * Pops and executes up to limit consecutive tasks or until empty.
974 <         *
975 <         * @param limit max runs, or zero for no limit
976 <         */
977 <        final void localPopAndExec(int limit) {
978 <            for (;;) {
979 <                int b = base, s = top, al; ForkJoinTask<?>[] a;
980 <                if ((a = array) != null && b != s && (al = a.length) > 0) {
981 <                    int index = (al - 1) & --s;
982 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
983 <                        QA.getAndSet(a, index, null);
984 <                    if (t != null) {
985 <                        top = s;
986 <                        VarHandle.releaseFence();
987 <                        t.doExec();
922 <                        if (limit != 0 && --limit == 0)
923 <                            break;
924 <                    }
925 <                    else
926 <                        break;
927 <                }
928 <                else
929 <                    break;
930 <            }
931 <        }
932 <
933 <        /**
934 <         * Polls and executes up to limit consecutive tasks or until empty.
935 <         *
936 <         * @param limit, or zero for no limit
937 <         */
938 <        final void localPollAndExec(int limit) {
939 <            for (int polls = 0;;) {
940 <                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
941 <                if ((a = array) != null && (d = b - s) < 0 &&
942 <                    (al = a.length) > 0) {
943 <                    int index = (al - 1) & b++;
944 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
945 <                        QA.getAndSet(a, index, null);
946 <                    if (t != null) {
947 <                        base = b;
948 <                        t.doExec();
949 <                        if (limit != 0 && ++polls == limit)
950 <                            break;
951 <                    }
952 <                    else if (d == -1)
953 <                        break;     // now empty
954 <                    else
955 <                        polls = 0; // stolen; reset
956 <                }
973 >         * Runs the given (stolen) task, as well as remaining local
974 >         * tasks (up to limit) and others available from the given
975 >         * queue.
976 >         */
977 >        final void topLevelExec(ForkJoinTask<?> t, WorkQueue q) {
978 >            int nstolen = 1;
979 >            for (int nlocal = 0;;) {
980 >                if (t != null)
981 >                    t.doExec();
982 >                if (nlocal >= SELF_CONSUME_LIMIT)
983 >                    break; // avoid infinite self-consume loops
984 >                else if ((t = nextLocalTask()) != null)
985 >                    ++nlocal;
986 >                else if (q != null && (t = q.poll()) != null)
987 >                    ++nstolen;
988                  else
989                      break;
990              }
991 +            ForkJoinWorkerThread thread = owner;
992 +            nsteals += nstolen;
993 +            source = 0;
994 +            if (thread != null)
995 +                thread.afterTopLevelExec();
996          }
997  
998          /**
999           * If present, removes task from queue and executes it.
1000           */
1001          final void tryRemoveAndExec(ForkJoinTask<?> task) {
1002 <            ForkJoinTask<?>[] wa; int s, wal;
1003 <            if (base - (s = top) < 0 && // traverse from top
1004 <                (wa = array) != null && (wal = wa.length) > 0) {
1005 <                for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
1002 >            ForkJoinTask<?>[] a; int s, cap;
1003 >            if ((a = array) != null && (cap = a.length) > 0 &&
1004 >                base - (s = top) < 0) { // traverse from top
1005 >                for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1006                      int index = i & m;
1007 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
972 <                        QA.get(wa, index);
1007 >                    ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
1008                      if (t == null)
1009                          break;
1010                      else if (t == task) {
1011 <                        if (QA.compareAndSet(wa, index, t, null)) {
1011 >                        if (QA.compareAndSet(a, index, t, null)) {
1012                              top = ns;   // safely shift down
1013                              for (int j = i; j != ns; ++j) {
1014                                  ForkJoinTask<?> f;
1015                                  int pindex = (j + 1) & m;
1016 <                                f = (ForkJoinTask<?>)QA.get(wa, pindex);
1017 <                                QA.setVolatile(wa, pindex, null);
1016 >                                f = (ForkJoinTask<?>)QA.get(a, pindex);
1017 >                                QA.setVolatile(a, pindex, null);
1018                                  int jindex = j & m;
1019 <                                QA.setRelease(wa, jindex, f);
1019 >                                QA.setRelease(a, jindex, f);
1020                              }
1021                              VarHandle.releaseFence();
1022                              t.doExec();
# Line 993 | Line 1028 | public class ForkJoinPool extends Abstra
1028          }
1029  
1030          /**
1031 <         * Tries to steal and run tasks within the target's
1032 <         * computation until done, not found, or limit exceeded.
1031 >         * Tries to pop and run tasks within the target's computation
1032 >         * until done, not found, or limit exceeded.
1033           *
1034           * @param task root of CountedCompleter computation
1035           * @param limit max runs, or zero for no limit
1036 +         * @param shared true if must lock to extract task
1037           * @return task status on exit
1038           */
1039 <        final int localHelpCC(CountedCompleter<?> task, int limit) {
1039 >        final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
1040              int status = 0;
1041              if (task != null && (status = task.status) >= 0) {
1042 <                for (;;) {
1043 <                    boolean help = false;
1044 <                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1045 <                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1046 <                        int index = (al - 1) & (s - 1);
1047 <                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1048 <                            QA.get(a, index);
1049 <                        if (o instanceof CountedCompleter) {
1050 <                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1051 <                            for (CountedCompleter<?> f = t;;) {
1052 <                                if (f != task) {
1053 <                                    if ((f = f.completer) == null) // try parent
1054 <                                        break;
1055 <                                }
1056 <                                else {
1057 <                                    if (QA.compareAndSet(a, index, t, null)) {
1042 >                int s, k, cap; ForkJoinTask<?>[] a;
1043 >                while ((a = array) != null && (cap = a.length) > 0 &&
1044 >                       (s = top) != base) {
1045 >                    CountedCompleter<?> v = null;
1046 >                    ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
1047 >                    if (o instanceof CountedCompleter) {
1048 >                        CountedCompleter<?> t = (CountedCompleter<?>)o;
1049 >                        for (CountedCompleter<?> f = t;;) {
1050 >                            if (f != task) {
1051 >                                if ((f = f.completer) == null)
1052 >                                    break;
1053 >                            }
1054 >                            else if (shared) {
1055 >                                if (tryLockPhase()) {
1056 >                                    if (top == s && array == a &&
1057 >                                        QA.compareAndSet(a, k, t, null)) {
1058                                          top = s - 1;
1059 <                                        VarHandle.releaseFence();
1024 <                                        t.doExec();
1025 <                                        help = true;
1059 >                                        v = t;
1060                                      }
1061 <                                    break;
1061 >                                    releasePhaseLock();
1062                                  }
1063 +                                break;
1064 +                            }
1065 +                            else {
1066 +                                if (QA.compareAndSet(a, k, t, null)) {
1067 +                                    top = s - 1;
1068 +                                    v = t;
1069 +                                }
1070 +                                break;
1071                              }
1072                          }
1073                      }
1074 <                    if ((status = task.status) < 0 || !help ||
1074 >                    if (v != null)
1075 >                        v.doExec();
1076 >                    if ((status = task.status) < 0 || v == null ||
1077                          (limit != 0 && --limit == 0))
1078                          break;
1079                  }
# Line 1037 | Line 1081 | public class ForkJoinPool extends Abstra
1081              return status;
1082          }
1083  
1040        // Operations on shared queues
1041
1042        /**
1043         * Tries to lock shared queue by CASing phase field.
1044         */
1045        final boolean tryLockSharedQueue() {
1046            return PHASE.compareAndSet(this, 0, QLOCK);
1047        }
1048
1049        /**
1050         * Shared version of tryUnpush.
1051         */
1052        final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053            boolean popped = false;
1054            int s = top - 1, al; ForkJoinTask<?>[] a;
1055            if ((a = array) != null && (al = a.length) > 0) {
1056                int index = (al - 1) & s;
1057                ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
1058                if (t == task &&
1059                    PHASE.compareAndSet(this, 0, QLOCK)) {
1060                    if (top == s + 1 && array == a &&
1061                        QA.compareAndSet(a, index, task, null)) {
1062                        popped = true;
1063                        top = s;
1064                    }
1065                    PHASE.setRelease(this, 0);
1066                }
1067            }
1068            return popped;
1069        }
1070
1084          /**
1085 <         * Shared version of localHelpCC.
1085 >         * Tries to poll and run AsynchronousCompletionTasks until
1086 >         * none found or blocker is released
1087 >         *
1088 >         * @param blocker the blocker
1089           */
1090 <        final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1091 <            int status = 0;
1092 <            if (task != null && (status = task.status) >= 0) {
1093 <                for (;;) {
1094 <                    boolean help = false;
1095 <                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1096 <                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1097 <                        int index = (al - 1) & (s - 1);
1098 <                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1099 <                            QA.get(a, index);
1100 <                        if (o instanceof CountedCompleter) {
1101 <                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1102 <                            for (CountedCompleter<?> f = t;;) {
1103 <                                if (f != task) {
1104 <                                    if ((f = f.completer) == null)
1089 <                                        break;
1090 <                                }
1091 <                                else {
1092 <                                    if (PHASE.compareAndSet(this, 0, QLOCK)) {
1093 <                                        if (top == s && array == a &&
1094 <                                            QA.compareAndSet(a, index, t, null)) {
1095 <                                            help = true;
1096 <                                            top = s - 1;
1097 <                                        }
1098 <                                        PHASE.setRelease(this, 0);
1099 <                                        if (help)
1100 <                                            t.doExec();
1101 <                                    }
1102 <                                    break;
1103 <                                }
1104 <                            }
1090 >        final void helpAsyncBlocker(ManagedBlocker blocker) {
1091 >            if (blocker != null) {
1092 >                int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1093 >                while ((a = array) != null && (cap = a.length) > 0 &&
1094 >                       (b = base) != top) {
1095 >                    t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1096 >                    if (blocker.isReleasable())
1097 >                        break;
1098 >                    else if (base == b++ && t != null) {
1099 >                        if (!(t instanceof CompletableFuture.
1100 >                              AsynchronousCompletionTask))
1101 >                            break;
1102 >                        else if (QA.compareAndSet(a, k, t, null)) {
1103 >                            BASE.setOpaque(this, b);
1104 >                            t.doExec();
1105                          }
1106                      }
1107                    if ((status = task.status) < 0 || !help ||
1108                        (limit != 0 && --limit == 0))
1109                        break;
1107                  }
1108              }
1112            return status;
1109          }
1110  
1111          /**
# Line 1124 | Line 1120 | public class ForkJoinPool extends Abstra
1120          }
1121  
1122          // VarHandle mechanics.
1123 <        private static final VarHandle PHASE;
1123 >        static final VarHandle PHASE;
1124 >        static final VarHandle BASE;
1125 >        static final VarHandle TOP;
1126          static {
1127              try {
1128                  MethodHandles.Lookup l = MethodHandles.lookup();
1129                  PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1130 +                BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1131 +                TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
1132              } catch (ReflectiveOperationException e) {
1133                  throw new Error(e);
1134              }
# Line 1327 | Line 1327 | public class ForkJoinPool extends Abstra
1327          wt.setDaemon(true);                             // configure thread
1328          if ((handler = ueh) != null)
1329              wt.setUncaughtExceptionHandler(handler);
1330        WorkQueue w = new WorkQueue(this, wt);
1330          int tid = 0;                                    // for thread name
1331 <        int fifo = mode & FIFO;
1331 >        int idbits = mode & FIFO;
1332          String prefix = workerNamePrefix;
1333 +        WorkQueue w = new WorkQueue(this, wt);
1334          if (prefix != null) {
1335              synchronized (prefix) {
1336                  WorkQueue[] ws = workQueues; int n;
1337                  int s = indexSeed += SEED_INCREMENT;
1338 +                idbits |= (s & ~(SMASK | FIFO | DORMANT));
1339                  if (ws != null && (n = ws.length) > 1) {
1340                      int m = n - 1;
1341 <                    tid = s & m;
1341 <                    int i = m & ((s << 1) | 1);         // odd-numbered indices
1341 >                    tid = m & ((s << 1) | 1);           // odd-numbered indices
1342                      for (int probes = n >>> 1;;) {      // find empty slot
1343                          WorkQueue q;
1344 <                        if ((q = ws[i]) == null || q.phase == QUIET)
1344 >                        if ((q = ws[tid]) == null || q.phase == QUIET)
1345                              break;
1346                          else if (--probes == 0) {
1347 <                            i = n | 1;                  // resize below
1347 >                            tid = n | 1;                // resize below
1348                              break;
1349                          }
1350                          else
1351 <                            i = (i + 2) & m;
1351 >                            tid = (tid + 2) & m;
1352                      }
1353 +                    w.phase = w.id = tid | idbits;      // now publishable
1354  
1355 <                    int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1356 <                    w.phase = w.id = id;                // now publishable
1356 <
1357 <                    if (i < n)
1358 <                        ws[i] = w;
1355 >                    if (tid < n)
1356 >                        ws[tid] = w;
1357                      else {                              // expand array
1358                          int an = n << 1;
1359                          WorkQueue[] as = new WorkQueue[an];
1360 <                        as[i] = w;
1360 >                        as[tid] = w;
1361                          int am = an - 1;
1362                          for (int j = 0; j < n; ++j) {
1363                              WorkQueue v;                // copy external queue
# Line 1392 | Line 1390 | public class ForkJoinPool extends Abstra
1390          int phase = 0;
1391          if (wt != null && (w = wt.workQueue) != null) {
1392              Object lock = workerNamePrefix;
1393 +            int wid = w.id;
1394              long ns = (long)w.nsteals & 0xffffffffL;
1396            int idx = w.id & SMASK;
1395              if (lock != null) {
1398                WorkQueue[] ws;                       // remove index from array
1396                  synchronized (lock) {
1397 <                    if ((ws = workQueues) != null && ws.length > idx &&
1398 <                        ws[idx] == w)
1399 <                        ws[idx] = null;
1397 >                    WorkQueue[] ws; int n, i;         // remove index from array
1398 >                    if ((ws = workQueues) != null && (n = ws.length) > 0 &&
1399 >                        ws[i = wid & (n - 1)] == w)
1400 >                        ws[i] = null;
1401                      stealCount += ns;
1402                  }
1403              }
# Line 1451 | Line 1449 | public class ForkJoinPool extends Abstra
1449                  Thread vt = v.owner;
1450                  if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1451                      v.phase = np;
1452 <                    if (v.source < 0)
1452 >                    if (vt != null && v.source < 0)
1453                          LockSupport.unpark(vt);
1454                      break;
1455                  }
# Line 1492 | Line 1490 | public class ForkJoinPool extends Abstra
1490                      long nc = ((long)v.stackPred & SP_MASK) | uc;
1491                      if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1492                          v.phase = np;
1493 <                        if (v.source < 0)
1493 >                        if (vt != null && v.source < 0)
1494                              LockSupport.unpark(vt);
1495                          return (wp < 0) ? -1 : 1;
1496                      }
# Line 1549 | Line 1547 | public class ForkJoinPool extends Abstra
1547       * See above for explanation.
1548       */
1549      final void runWorker(WorkQueue w) {
1550 <        WorkQueue[] ws;
1551 <        w.growArray();                                  // allocate queue
1552 <        int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1553 <        if (r == 0)                                     // initial nonzero seed
1554 <            r = 1;
1555 <        int lastSignalId = 0;                           // avoid unneeded signals
1556 <        while ((ws = workQueues) != null) {
1557 <            boolean nonempty = false;                   // scan
1558 <            for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1559 <                WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1560 <                if ((i = r & m) >= 0 && i < n &&        // always true
1561 <                    (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1562 <                    (a = q.array) != null && (al = a.length) > 0) {
1563 <                    int qid = q.id;                     // (never zero)
1564 <                    int index = (al - 1) & b;
1565 <                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1566 <                        QA.getAcquire(a, index);
1567 <                    if (t != null && b++ == q.base &&
1568 <                        QA.compareAndSet(a, index, t, null)) {
1569 <                        if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1570 <                            signalWork();               // propagate signal
1571 <                        w.source = lastSignalId = qid;
1572 <                        t.doExec();
1573 <                        if ((w.id & FIFO) != 0)         // run remaining locals
1574 <                            w.localPollAndExec(POLL_LIMIT);
1575 <                        else
1576 <                            w.localPopAndExec(POLL_LIMIT);
1577 <                        ForkJoinWorkerThread thread = w.owner;
1578 <                        ++w.nsteals;
1579 <                        w.source = 0;                   // now idle
1580 <                        if (thread != null)
1581 <                            thread.afterTopLevelExec();
1550 >        int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1551 >        w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
1552 >        for (;;) {
1553 >            int phase;
1554 >            if (scan(w, r)) {                     // scan until apparently empty
1555 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1556 >            }
1557 >            else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1558 >                long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1559 >                long c, nc;
1560 >                do {
1561 >                    w.stackPred = (int)(c = ctl);
1562 >                    nc = ((c - RC_UNIT) & UC_MASK) | np;
1563 >                } while (!CTL.weakCompareAndSet(this, c, nc));
1564 >            }
1565 >            else {                                // already queued
1566 >                int pred = w.stackPred;
1567 >                Thread.interrupted();             // clear before park
1568 >                w.source = DORMANT;               // enable signal
1569 >                long c = ctl;
1570 >                int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
1571 >                if (md < 0)                       // terminating
1572 >                    break;
1573 >                else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1574 >                         tryTerminate(false, false))
1575 >                    break;                        // quiescent shutdown
1576 >                else if (rc <= 0 && pred != 0 && phase == (int)c) {
1577 >                    long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1578 >                    long d = keepAlive + System.currentTimeMillis();
1579 >                    LockSupport.parkUntil(this, d);
1580 >                    if (ctl == c &&               // drop on timeout if all idle
1581 >                        d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1582 >                        CTL.compareAndSet(this, c, nc)) {
1583 >                        w.phase = QUIET;
1584 >                        break;
1585                      }
1585                    nonempty = true;
1586                  }
1587 <                else if (nonempty)
1588 <                    break;
1589 <                else
1590 <                    ++r;
1587 >                else if (w.phase < 0)
1588 >                    LockSupport.park(this);       // OK if spuriously woken
1589 >                w.source = 0;                     // disable signal
1590              }
1591 +        }
1592 +    }
1593  
1594 <            if (nonempty) {                             // move (xorshift)
1595 <                r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1596 <            }
1597 <            else {
1598 <                int phase;
1599 <                lastSignalId = 0;                       // clear for next scan
1600 <                if ((phase = w.phase) >= 0) {           // enqueue
1601 <                    int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1602 <                    long c, nc;
1603 <                    do {
1604 <                        w.stackPred = (int)(c = ctl);
1605 <                        nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1606 <                    } while (!CTL.weakCompareAndSet(this, c, nc));
1607 <                }
1608 <                else {                                  // already queued
1609 <                    int pred = w.stackPred;
1610 <                    w.source = DORMANT;                 // enable signal
1611 <                    for (int steps = 0;;) {
1612 <                        int md, rc; long c;
1613 <                        if (w.phase >= 0) {
1614 <                            w.source = 0;
1615 <                            break;
1615 <                        }
1616 <                        else if ((md = mode) < 0)       // shutting down
1617 <                            return;
1618 <                        else if ((rc = ((md & SMASK) +  // possibly quiescent
1619 <                                        (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1620 <                                 (md & SHUTDOWN) != 0 &&
1621 <                                 tryTerminate(false, false))
1622 <                            return;                     // help terminate
1623 <                        else if ((++steps & 1) == 0)
1624 <                            Thread.interrupted();       // clear between parks
1625 <                        else if (rc <= 0 && pred != 0 && phase == (int)c) {
1626 <                            long d = keepAlive + System.currentTimeMillis();
1627 <                            LockSupport.parkUntil(this, d);
1628 <                            if (ctl == c &&
1629 <                                d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1630 <                                long nc = ((UC_MASK & (c - TC_UNIT)) |
1631 <                                           (SP_MASK & pred));
1632 <                                if (CTL.compareAndSet(this, c, nc)) {
1633 <                                    w.phase = QUIET;
1634 <                                    return;             // drop on timeout
1635 <                                }
1636 <                            }
1594 >    /**
1595 >     * Scans for and executes top-level task
1596 >     * @return true if found an apparently non-empty queue (and
1597 >     * possibly ran task)
1598 >     */
1599 >    private boolean scan(WorkQueue w, int r) {
1600 >        WorkQueue[] ws; int n;
1601 >        if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1602 >            for (int m = n - 1, j = r & m;;) {
1603 >                WorkQueue q; int b, d;
1604 >                if ((q = ws[j]) != null && (d = (b = q.base) - q.top) < 0) {
1605 >                    int qid = q.id;
1606 >                    ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1607 >                    if ((a = q.array) != null && (cap = a.length) > 0) {
1608 >                        t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1609 >                        if (q.base == b++ && t != null &&
1610 >                            QA.compareAndSet(a, k, t, null)) {
1611 >                            q.base = b;
1612 >                            w.source = qid;
1613 >                            if (d != -1)
1614 >                                signalWork();
1615 >                            w.topLevelExec(t, q);
1616                          }
1638                        else
1639                            LockSupport.park(this);
1617                      }
1618 +                    return true;
1619                  }
1620 +                else if (--n > 0)
1621 +                    j = (j + 1) & m;
1622 +                else
1623 +                    break;
1624              }
1625          }
1626 +        return false;
1627      }
1628  
1629      /**
# Line 1656 | Line 1639 | public class ForkJoinPool extends Abstra
1639       */
1640      final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1641          int s = 0;
1642 +        int seed = ThreadLocalRandom.nextSecondarySeed();
1643          if (w != null && task != null &&
1644              (!(task instanceof CountedCompleter) ||
1645 <             (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1645 >             (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
1646              w.tryRemoveAndExec(task);
1647              int src = w.source, id = w.id;
1648 +            int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1649              s = task.status;
1650              while (s >= 0) {
1651                  WorkQueue[] ws;
1652 <                boolean nonempty = false;
1653 <                int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1654 <                if ((ws = workQueues) != null) {       // scan for matching id
1655 <                    for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1656 <                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1657 <                        if ((i = (r + j) & m) >= 0 && i < n &&
1658 <                            (q = ws[i]) != null && q.source == id &&
1659 <                            (b = q.base) - q.top < 0 &&
1675 <                            (a = q.array) != null && (al = a.length) > 0) {
1676 <                            int qid = q.id;
1677 <                            int index = (al - 1) & b;
1652 >                int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
1653 >                while (n > 0) {
1654 >                    WorkQueue q; int b;
1655 >                    if ((q = ws[r & m]) != null && q.source == id &&
1656 >                        (b = q.base) - q.top < 0) {
1657 >                        ForkJoinTask<?>[] a; int cap, k;
1658 >                        int qid = q.id;
1659 >                        if ((a = q.array) != null && (cap = a.length) > 0) {
1660                              ForkJoinTask<?> t = (ForkJoinTask<?>)
1661 <                                QA.getAcquire(a, index);
1662 <                            if (t != null && b++ == q.base && id == q.source &&
1663 <                                QA.compareAndSet(a, index, t, null)) {
1661 >                                QA.getAcquire(a, k = (cap - 1) & b);
1662 >                            if (q.source == id && q.base == b++ &&
1663 >                                t != null && QA.compareAndSet(a, k, t, null)) {
1664                                  q.base = b;
1665                                  w.source = qid;
1666                                  t.doExec();
1667                                  w.source = src;
1668                              }
1687                            nonempty = true;
1688                            break;
1669                          }
1670 +                        break;
1671 +                    }
1672 +                    else {
1673 +                        r += step;
1674 +                        --n;
1675                      }
1676                  }
1677                  if ((s = task.status) < 0)
1678                      break;
1679 <                else if (!nonempty) {
1679 >                else if (n == 0) { // empty scan
1680                      long ms, ns; int block;
1681                      if (deadline == 0L)
1682                          ms = 0L;                       // untimed
# Line 1716 | Line 1701 | public class ForkJoinPool extends Abstra
1701       * find tasks either.
1702       */
1703      final void helpQuiescePool(WorkQueue w) {
1704 <        int prevSrc = w.source, fifo = w.id & FIFO;
1704 >        int prevSrc = w.source;
1705 >        int seed = ThreadLocalRandom.nextSecondarySeed();
1706 >        int r = seed >>> 16, step = r | 1;
1707          for (int source = prevSrc, released = -1;;) { // -1 until known
1708 <            WorkQueue[] ws;
1709 <            if (fifo != 0)
1710 <                w.localPollAndExec(0);
1711 <            else
1725 <                w.localPopAndExec(0);
1726 <            if (released == -1 && w.phase >= 0)
1708 >            ForkJoinTask<?> localTask; WorkQueue[] ws;
1709 >            while ((localTask = w.nextLocalTask()) != null)
1710 >                localTask.doExec();
1711 >            if (w.phase >= 0 && released == -1)
1712                  released = 1;
1713              boolean quiet = true, empty = true;
1714 <            int r = ThreadLocalRandom.nextSecondarySeed();
1715 <            if ((ws = workQueues) != null) {
1716 <                for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1717 <                    WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1718 <                    if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1719 <                        if ((b = q.base) - q.top < 0 &&
1720 <                            (a = q.array) != null && (al = a.length) > 0) {
1721 <                            int qid = q.id;
1714 >            int n = (ws = workQueues) == null ? 0 : ws.length;
1715 >            for (int m = n - 1; n > 0; r += step, --n) {
1716 >                WorkQueue q; int b;
1717 >                if ((q = ws[r & m]) != null) {
1718 >                    int qs = q.source;
1719 >                    if ((b = q.base) - q.top < 0) {
1720 >                        quiet = empty = false;
1721 >                        ForkJoinTask<?>[] a; int cap, k;
1722 >                        int qid = q.id;
1723 >                        if ((a = q.array) != null && (cap = a.length) > 0) {
1724                              if (released == 0) {    // increment
1725                                  released = 1;
1726                                  CTL.getAndAdd(this, RC_UNIT);
1727                              }
1741                            int index = (al - 1) & b;
1728                              ForkJoinTask<?> t = (ForkJoinTask<?>)
1729 <                                QA.getAcquire(a, index);
1730 <                            if (t != null && b++ == q.base &&
1731 <                                QA.compareAndSet(a, index, t, null)) {
1729 >                                QA.getAcquire(a, k = (cap - 1) & b);
1730 >                            if (q.base == b++ && t != null &&
1731 >                                QA.compareAndSet(a, k, t, null)) {
1732                                  q.base = b;
1733 <                                w.source = source = q.id;
1733 >                                w.source = qid;
1734                                  t.doExec();
1735                                  w.source = source = prevSrc;
1736                              }
1751                            quiet = empty = false;
1752                            break;
1737                          }
1738 <                        else if ((q.source & QUIET) == 0)
1755 <                            quiet = false;
1738 >                        break;
1739                      }
1740 +                    else if ((qs & QUIET) == 0)
1741 +                        quiet = false;
1742                  }
1743              }
1744              if (quiet) {
# Line 1795 | Line 1780 | public class ForkJoinPool extends Abstra
1780                  origin = r & m;
1781                  step = h | 1;
1782              }
1783 <            for (int k = origin, oldSum = 0, checkSum = 0;;) {
1784 <                WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1785 <                if ((q = ws[k]) != null) {
1786 <                    checkSum += b = q.base;
1787 <                    if (b - q.top < 0 &&
1788 <                        (a = q.array) != null && (al = a.length) > 0) {
1789 <                        int index = (al - 1) & b;
1790 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1806 <                            QA.getAcquire(a, index);
1807 <                        if (t != null && b++ == q.base &&
1808 <                            QA.compareAndSet(a, index, t, null)) {
1809 <                            q.base = b;
1783 >            boolean nonempty = false;
1784 >            for (int i = origin, oldSum = 0, checkSum = 0;;) {
1785 >                WorkQueue q;
1786 >                if ((q = ws[i]) != null) {
1787 >                    int b; ForkJoinTask<?> t;
1788 >                    if ((b = q.base) - q.top < 0) {
1789 >                        nonempty = true;
1790 >                        if ((t = q.poll()) != null)
1791                              return t;
1811                        }
1812                        else
1813                            break; // restart
1792                      }
1793 +                    else
1794 +                        checkSum += b + q.id;
1795                  }
1796 <                if ((k = (k + step) & m) == origin) {
1797 <                    if (oldSum == (oldSum = checkSum))
1796 >                if ((i = (i + step) & m) == origin) {
1797 >                    if (!nonempty && oldSum == (oldSum = checkSum))
1798                          break rescan;
1799                      checkSum = 0;
1800 +                    nonempty = false;
1801                  }
1802              }
1803          }
# Line 1830 | Line 1811 | public class ForkJoinPool extends Abstra
1811       */
1812      final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1813          ForkJoinTask<?> t;
1814 <        if (w != null &&
1815 <            (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1816 <            return t;
1836 <        else
1837 <            return pollScan(false);
1814 >        if (w == null || (t = w.nextLocalTask()) == null)
1815 >            t = pollScan(false);
1816 >        return t;
1817      }
1818  
1819      // External operations
# Line 1852 | Line 1831 | public class ForkJoinPool extends Abstra
1831              r = ThreadLocalRandom.getProbe();
1832          }
1833          for (;;) {
1834 +            WorkQueue q;
1835              int md = mode, n;
1836              WorkQueue[] ws = workQueues;
1837              if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1838                  throw new RejectedExecutionException();
1839 <            else {
1840 <                WorkQueue q;
1841 <                boolean push = false, grow = false;
1842 <                if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1843 <                    Object lock = workerNamePrefix;
1844 <                    int qid = (r | QUIET) & ~(FIFO | OWNED);
1845 <                    q = new WorkQueue(this, null);
1846 <                    q.id = qid;
1847 <                    q.source = QUIET;
1848 <                    q.phase = QLOCK;          // lock queue
1849 <                    if (lock != null) {
1850 <                        synchronized (lock) { // lock pool to install
1851 <                            int i;
1852 <                            if ((ws = workQueues) != null &&
1853 <                                (n = ws.length) > 0 &&
1874 <                                ws[i = qid & (n - 1) & SQMASK] == null) {
1875 <                                ws[i] = q;
1876 <                                push = grow = true;
1877 <                            }
1878 <                        }
1839 >            else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
1840 >                int qid = (r | QUIET) & ~(FIFO | OWNED);
1841 >                Object lock = workerNamePrefix;
1842 >                ForkJoinTask<?>[] qa =
1843 >                    new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1844 >                q = new WorkQueue(this, null);
1845 >                q.array = qa;
1846 >                q.id = qid;
1847 >                q.source = QUIET;
1848 >                if (lock != null) {     // unless disabled, lock pool to install
1849 >                    synchronized (lock) {
1850 >                        WorkQueue[] vs; int i, vn;
1851 >                        if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
1852 >                            vs[i = qid & (vn - 1) & SQMASK] == null)
1853 >                            vs[i] = q;  // else another thread already installed
1854                      }
1855                  }
1856 <                else if (q.tryLockSharedQueue()) {
1857 <                    int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1858 <                    if ((a = q.array) != null && (al = a.length) > 0 &&
1859 <                        al - 1 + (d = b - s) > 0) {
1860 <                        a[(al - 1) & s] = task;
1886 <                        q.top = s + 1;        // relaxed writes OK here
1887 <                        q.phase = 0;
1888 <                        if (d < 0 && q.base - s < -1)
1889 <                            break;            // no signal needed
1890 <                    }
1891 <                    else
1892 <                        grow = true;
1893 <                    push = true;
1894 <                }
1895 <                if (push) {
1896 <                    if (grow) {
1897 <                        try {
1898 <                            q.growArray();
1899 <                            int s = q.top, al; ForkJoinTask<?>[] a;
1900 <                            if ((a = q.array) != null && (al = a.length) > 0) {
1901 <                                a[(al - 1) & s] = task;
1902 <                                q.top = s + 1;
1903 <                            }
1904 <                        } finally {
1905 <                            q.phase = 0;
1906 <                        }
1907 <                    }
1856 >            }
1857 >            else if (!q.tryLockPhase()) // move if busy
1858 >                r = ThreadLocalRandom.advanceProbe(r);
1859 >            else {
1860 >                if (q.lockedPush(task))
1861                      signalWork();
1862 <                    break;
1910 <                }
1911 <                else                          // move if busy
1912 <                    r = ThreadLocalRandom.advanceProbe(r);
1862 >                return;
1863              }
1864          }
1865      }
# Line 1951 | Line 1901 | public class ForkJoinPool extends Abstra
1901          return ((ws = workQueues) != null &&
1902                  (n = ws.length) > 0 &&
1903                  (w = ws[(n - 1) & r & SQMASK]) != null &&
1904 <                w.trySharedUnpush(task));
1904 >                w.tryLockedUnpush(task));
1905      }
1906  
1907      /**
# Line 1962 | Line 1912 | public class ForkJoinPool extends Abstra
1912          WorkQueue[] ws; WorkQueue w; int n;
1913          return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1914                  (w = ws[(n - 1) & r & SQMASK]) != null) ?
1915 <            w.sharedHelpCC(task, maxTasks) : 0;
1915 >            w.helpCC(task, maxTasks, true) : 0;
1916      }
1917  
1918      /**
# Line 1977 | Line 1927 | public class ForkJoinPool extends Abstra
1927       */
1928      final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1929                             int maxTasks) {
1930 <        return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
1930 >        return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
1931      }
1932  
1933      /**
# Line 2107 | Line 2057 | public class ForkJoinPool extends Abstra
2057                                  } catch (Throwable ignore) {
2058                                  }
2059                              }
2060 <                            checkSum += w.base + w.id;
2060 >                            checkSum += w.base + w.phase;
2061                          }
2062                      }
2063                  }
# Line 2600 | Line 2550 | public class ForkJoinPool extends Abstra
2550       * @return the number of worker threads
2551       */
2552      public int getRunningThreadCount() {
2603        int rc = 0;
2553          WorkQueue[] ws; WorkQueue w;
2554 +        VarHandle.acquireFence();
2555 +        int rc = 0;
2556          if ((ws = workQueues) != null) {
2557              for (int i = 1; i < ws.length; i += 2) {
2558                  if ((w = ws[i]) != null && w.isApparentlyUnblocked())
# Line 2649 | Line 2600 | public class ForkJoinPool extends Abstra
2600                  if ((ws = workQueues) != null) {
2601                      for (int i = 1; i < ws.length; i += 2) {
2602                          if ((v = ws[i]) != null) {
2603 <                            if ((v.source & QUIET) == 0)
2603 >                            if (v.source > 0)
2604                                  return false;
2605                              --tc;
2606                          }
# Line 2695 | Line 2646 | public class ForkJoinPool extends Abstra
2646       * @return the number of queued tasks
2647       */
2648      public long getQueuedTaskCount() {
2698        long count = 0;
2649          WorkQueue[] ws; WorkQueue w;
2650 +        VarHandle.acquireFence();
2651 +        int count = 0;
2652          if ((ws = workQueues) != null) {
2653              for (int i = 1; i < ws.length; i += 2) {
2654                  if ((w = ws[i]) != null)
# Line 2714 | Line 2666 | public class ForkJoinPool extends Abstra
2666       * @return the number of queued submissions
2667       */
2668      public int getQueuedSubmissionCount() {
2717        int count = 0;
2669          WorkQueue[] ws; WorkQueue w;
2670 +        VarHandle.acquireFence();
2671 +        int count = 0;
2672          if ((ws = workQueues) != null) {
2673              for (int i = 0; i < ws.length; i += 2) {
2674                  if ((w = ws[i]) != null)
# Line 2733 | Line 2686 | public class ForkJoinPool extends Abstra
2686       */
2687      public boolean hasQueuedSubmissions() {
2688          WorkQueue[] ws; WorkQueue w;
2689 +        VarHandle.acquireFence();
2690          if ((ws = workQueues) != null) {
2691              for (int i = 0; i < ws.length; i += 2) {
2692                  if ((w = ws[i]) != null && !w.isEmpty())
# Line 2771 | Line 2725 | public class ForkJoinPool extends Abstra
2725       * @return the number of elements transferred
2726       */
2727      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2774        int count = 0;
2728          WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2729 +        VarHandle.acquireFence();
2730 +        int count = 0;
2731          if ((ws = workQueues) != null) {
2732              for (int i = 0; i < ws.length; ++i) {
2733                  if ((w = ws[i]) != null) {
# Line 2795 | Line 2750 | public class ForkJoinPool extends Abstra
2750       */
2751      public String toString() {
2752          // Use a single pass through workQueues to collect counts
2753 <        long qt = 0L, qs = 0L; int rc = 0;
2753 >        int md = mode; // read volatile fields first
2754 >        long c = ctl;
2755          long st = stealCount;
2756 +        long qt = 0L, qs = 0L; int rc = 0;
2757          WorkQueue[] ws; WorkQueue w;
2758          if ((ws = workQueues) != null) {
2759              for (int i = 0; i < ws.length; ++i) {
# Line 2814 | Line 2771 | public class ForkJoinPool extends Abstra
2771              }
2772          }
2773  
2817        int md = mode;
2774          int pc = (md & SMASK);
2819        long c = ctl;
2775          int tc = pc + (short)(c >>> TC_SHIFT);
2776          int ac = pc + (int)(c >> RC_SHIFT);
2777          if (ac < 0) // ignore transient negative
# Line 3102 | Line 3057 | public class ForkJoinPool extends Abstra
3057       */
3058      public static void managedBlock(ManagedBlocker blocker)
3059          throws InterruptedException {
3060 +        if (blocker == null) throw new NullPointerException();
3061          ForkJoinPool p;
3062          ForkJoinWorkerThread wt;
3063          WorkQueue w;
# Line 3134 | Line 3090 | public class ForkJoinPool extends Abstra
3090       * available or blocker is released.
3091       */
3092      static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3093 <        if (blocker != null && (e instanceof ForkJoinPool)) {
3093 >        if (e instanceof ForkJoinPool) {
3094              WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3095              ForkJoinPool p = (ForkJoinPool)e;
3096              Thread thread = Thread.currentThread();
# Line 3146 | Line 3102 | public class ForkJoinPool extends Abstra
3102                  w = ws[(n - 1) & r & SQMASK];
3103              else
3104                  w = null;
3105 <            if (w != null) {
3106 <                for (;;) {
3151 <                    int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3152 <                    if ((a = w.array) != null && (d = b - s) < 0 &&
3153 <                        (al = a.length) > 0) {
3154 <                        int index = (al - 1) & b;
3155 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
3156 <                            QA.getAcquire(a, index);
3157 <                        if (blocker.isReleasable())
3158 <                            break;
3159 <                        else if (b++ == w.base) {
3160 <                            if (t == null) {
3161 <                                if (d == -1)
3162 <                                    break;
3163 <                            }
3164 <                            else if (!(t instanceof CompletableFuture.
3165 <                                  AsynchronousCompletionTask))
3166 <                                break;
3167 <                            else if (QA.compareAndSet(a, index, t, null)) {
3168 <                                w.base = b;
3169 <                                t.doExec();
3170 <                            }
3171 <                        }
3172 <                    }
3173 <                    else
3174 <                        break;
3175 <                }
3176 <            }
3105 >            if (w != null)
3106 >                w.helpAsyncBlocker(blocker);
3107          }
3108      }
3109  
# Line 3192 | Line 3122 | public class ForkJoinPool extends Abstra
3122      // VarHandle mechanics
3123      private static final VarHandle CTL;
3124      private static final VarHandle MODE;
3125 <    private static final VarHandle QA;
3125 >    static final VarHandle QA;
3126  
3127      static {
3128          try {
# Line 3255 | Line 3185 | public class ForkJoinPool extends Abstra
3185          }
3186      }
3187   }
3188 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines