ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.299 by jsr166, Wed Dec 16 02:29:06 2015 UTC vs.
Revision 1.300 by dl, Mon Mar 14 13:48:40 2016 UTC

# Line 3 | Line 3
3   * Expert Group and released to the public domain, as explained at
4   * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6
6   package java.util.concurrent;
7  
8   import java.lang.Thread.UncaughtExceptionHandler;
# Line 15 | Line 14 | import java.util.Arrays;
14   import java.util.Collection;
15   import java.util.Collections;
16   import java.util.List;
17 < import java.util.concurrent.locks.ReentrantLock;
17 > import java.util.concurrent.TimeUnit;
18 > import java.util.concurrent.CountedCompleter;
19 > import java.util.concurrent.ForkJoinTask;
20 > import java.util.concurrent.ForkJoinWorkerThread;
21   import java.util.concurrent.locks.LockSupport;
22  
23   /**
# Line 52 | Line 54 | import java.util.concurrent.locks.LockSu
54   * However, no such adjustments are guaranteed in the face of blocked
55   * I/O or other unmanaged synchronization. The nested {@link
56   * ManagedBlocker} interface enables extension of the kinds of
57 < * synchronization accommodated.
57 > * synchronization accommodated. The default policies may be
58 > * overridden using a constructor with parameters corresponding to
59 > * those documented in class {@link ThreadPoolExecutor}.
60   *
61   * <p>In addition to execution and lifecycle control methods, this
62   * class provides status check methods (for example
# Line 133 | Line 137 | import java.util.concurrent.locks.LockSu
137   * @since 1.7
138   * @author Doug Lea
139   */
136 @jdk.internal.vm.annotation.Contended
140   public class ForkJoinPool extends AbstractExecutorService {
141  
142      /*
# Line 200 | Line 203 | public class ForkJoinPool extends Abstra
203       *        (CAS slot to null))
204       *           increment base and return task;
205       *
206 <     * There are several variants of each of these; for example most
207 <     * versions of poll pre-screen the CAS by rechecking that the base
208 <     * has not changed since reading the slot, and most methods only
206 <     * attempt the CAS if base appears not to be equal to top.
206 >     * There are several variants of each of these. In particular,
207 >     * almost all uses of poll occur within scan operations that also
208 >     * interleave contention tracking (with associated code sprawl.)
209       *
210       * Memory ordering.  See "Correct and Efficient Work-Stealing for
211       * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
# Line 235 | Line 237 | public class ForkJoinPool extends Abstra
237       * thief chooses a different random victim target to try next. So,
238       * in order for one thief to progress, it suffices for any
239       * in-progress poll or new push on any empty queue to
240 <     * complete. (This is why we normally use method pollAt and its
239 <     * variants that try once at the apparent base index, else
240 <     * consider alternative actions, rather than method poll, which
241 <     * retries.)
240 >     * complete.
241       *
242       * This approach also enables support of a user mode in which
243       * local task processing is in FIFO, not LIFO order, simply by
# Line 253 | Line 252 | public class ForkJoinPool extends Abstra
252       * choosing existing queues, and may be randomly repositioned upon
253       * contention with other submitters.  In essence, submitters act
254       * like workers except that they are restricted to executing local
255 <     * tasks that they submitted (or in the case of CountedCompleters,
256 <     * others with the same root task).  Insertion of tasks in shared
257 <     * mode requires a lock but we use only a simple spinlock (using
258 <     * field qlock), because submitters encountering a busy queue move
259 <     * on to try or create other queues -- they block only when
260 <     * creating and registering new queues. Because it is used only as
261 <     * a spinlock, unlocking requires only a "releasing" store (using
263 <     * putOrderedInt).  The qlock is also used during termination
264 <     * detection, in which case it is forced to a negative
265 <     * non-lockable value.
255 >     * tasks that they submitted.  Insertion of tasks in shared mode
256 >     * requires a lock but we use only a simple spinlock (using field
257 >     * phase), because submitters encountering a busy queue move on to
258 >     * try or create other queues -- they block only when creating and
259 >     * registering new queues. Because it is used only as a spinlock,
260 >     * unlocking requires only a "releasing" store (using
261 >     * putOrderedInt).
262       *
263       * Management
264       * ==========
# Line 276 | Line 272 | public class ForkJoinPool extends Abstra
272       * There are only a few properties that we can globally track or
273       * maintain, so we pack them into a small number of variables,
274       * often maintaining atomicity without blocking or locking.
275 <     * Nearly all essentially atomic control state is held in two
275 >     * Nearly all essentially atomic control state is held in a few
276       * volatile variables that are by far most often read (not
277 <     * written) as status and consistency checks. (Also, field
278 <     * "config" holds unchanging configuration state.)
277 >     * written) as status and consistency checks. We pack as much
278 >     * information into them as we can.
279       *
280       * Field "ctl" contains 64 bits holding information needed to
281 <     * atomically decide to add, inactivate, enqueue (on an event
282 <     * queue), dequeue, and/or re-activate workers.  To enable this
281 >     * atomically decide to add, enqueue (on an event queue), and
282 >     * dequeue (and release)-activate workers.  To enable this
283       * packing, we restrict maximum parallelism to (1<<15)-1 (which is
284       * far in excess of normal operating range) to allow ids, counts,
285       * and their negations (used for thresholding) to fit into 16bit
286       * subfields.
287       *
288 <     * Field "runState" holds lifetime status, atomically and
289 <     * monotonically setting STARTED, SHUTDOWN, STOP, and finally
290 <     * TERMINATED bits.
295 <     *
296 <     * Field "auxState" is a ReentrantLock subclass that also
297 <     * opportunistically holds some other bookkeeping fields accessed
298 <     * only when locked.  It is mainly used to lock (infrequent)
299 <     * updates to workQueues.  The auxState instance is itself lazily
300 <     * constructed (see tryInitialize), requiring a double-check-style
301 <     * bootstrapping use of field runState, and locking a private
302 <     * static.
288 >     * Field "mode" holds configuration parameters as well as lifetime
289 >     * status, atomically and monotonically setting SHUTDOWN, STOP,
290 >     * and finally TERMINATED bits.
291       *
292       * Field "workQueues" holds references to WorkQueues.  It is
293 <     * updated (only during worker creation and termination) under the
294 <     * lock, but is otherwise concurrently readable, and accessed
295 <     * directly. We also ensure that reads of the array reference
296 <     * itself never become too stale (for example, re-reading before
297 <     * each scan). To simplify index-based operations, the array size
298 <     * is always a power of two, and all readers must tolerate null
299 <     * slots. Worker queues are at odd indices. Shared (submission)
300 <     * queues are at even indices, up to a maximum of 64 slots, to
301 <     * limit growth even if array needs to expand to add more
302 <     * workers. Grouping them together in this way simplifies and
293 >     * updated (only during worker creation and termination) under
294 >     * lock (using field workerNamePrefix as lock), but is otherwise
295 >     * concurrently readable, and accessed directly. We also ensure
296 >     * that uses of the array reference itself never become too stale
297 >     * in case of resizing.  To simplify index-based operations, the
298 >     * array size is always a power of two, and all readers must
299 >     * tolerate null slots. Worker queues are at odd indices. Shared
300 >     * (submission) queues are at even indices, up to a maximum of 64
301 >     * slots, to limit growth even if array needs to expand to add
302 >     * more workers. Grouping them together in this way simplifies and
303       * speeds up task scanning.
304       *
305       * All worker thread creation is on-demand, triggered by task
# Line 331 | Line 319 | public class ForkJoinPool extends Abstra
319       * workers unless there appear to be tasks available.  On the
320       * other hand, we must quickly prod them into action when new
321       * tasks are submitted or generated. In many usages, ramp-up time
322 <     * to activate workers is the main limiting factor in overall
323 <     * performance, which is compounded at program start-up by JIT
324 <     * compilation and allocation. So we streamline this as much as
325 <     * possible.
326 <     *
327 <     * The "ctl" field atomically maintains active and total worker
328 <     * counts as well as a queue to place waiting threads so they can
329 <     * be located for signalling. Active counts also play the role of
330 <     * quiescence indicators, so are decremented when workers believe
331 <     * that there are no more tasks to execute. The "queue" is
332 <     * actually a form of Treiber stack.  A stack is ideal for
333 <     * activating threads in most-recently used order. This improves
322 >     * is the main limiting factor in overall performance, which is
323 >     * compounded at program start-up by JIT compilation and
324 >     * allocation. So we streamline this as much as possible.
325 >     *
326 >     * The "ctl" field atomically maintains total worker and
327 >     * "released" worker counts, plus the head of the available worker
328 >     * queue (actually stack, represented by the lower 32bit subfield
329 >     * of ctl).  Released workers are those known to be scanning for
330 >     * and/or running tasks. Unreleased ("available") workers are
331 >     * recorded in the ctl stack. These workers are made available for
332 >     * signalling by enqueuing in ctl (see method runWorker).  The
333 >     * "queue" is a form of Treiber stack. This is ideal for
334 >     * activating threads in most-recently used order, and improves
335       * performance and locality, outweighing the disadvantages of
336       * being prone to contention and inability to release a worker
337 <     * unless it is topmost on stack.  We block/unblock workers after
338 <     * pushing on the idle worker stack (represented by the lower
339 <     * 32bit subfield of ctl) when they cannot find work.  The top
340 <     * stack state holds the value of the "scanState" field of the
341 <     * worker: its index and status, plus a version counter that, in
342 <     * addition to the count subfields (also serving as version
343 <     * stamps) provide protection against Treiber stack ABA effects.
337 >     * unless it is topmost on stack.  To avoid missed signal problems
338 >     * inherent in any wait/signal design, available workers rescan
339 >     * for (and if found run) tasks after enqueuing.  Normally their
340 >     * release status will be updated while doing so, but the released
341 >     * worker ctl count may underestimate the number of active
342 >     * threads. (However, it is still possible to determine quiescence
343 >     * via a validation traversal -- see isQuiescent).  After an
344 >     * unsuccessful rescan, available workers are blocked until
345 >     * signalled (see signalWork).  The top stack state holds the
346 >     * value of the "phase" field of the worker: its index and status,
347 >     * plus a version counter that, in addition to the count subfields
348 >     * (also serving as version stamps) provide protection against
349 >     * Treiber stack ABA effects.
350       *
351 <     * Creating workers. To create a worker, we pre-increment total
352 <     * count (serving as a reservation), and attempt to construct a
351 >     * Creating workers. To create a worker, we pre-increment counts
352 >     * (serving as a reservation), and attempt to construct a
353       * ForkJoinWorkerThread via its factory. Upon construction, the
354       * new thread invokes registerWorker, where it constructs a
355       * WorkQueue and is assigned an index in the workQueues array
# Line 376 | Line 371 | public class ForkJoinPool extends Abstra
371       * submission queues for existing external threads (see
372       * externalPush).
373       *
374 <     * WorkQueue field scanState is used by both workers and the pool
375 <     * to manage and track whether a worker is UNSIGNALLED (possibly
376 <     * blocked waiting for a signal).  When a worker is inactivated,
377 <     * its scanState field is set, and is prevented from executing
378 <     * tasks, even though it must scan once for them to avoid queuing
379 <     * races. Note that scanState updates lag queue CAS releases so
380 <     * usage requires care. When queued, the lower 16 bits of
381 <     * scanState must hold its pool index. So we place the index there
382 <     * upon initialization (see registerWorker) and otherwise keep it
388 <     * there or restore it when necessary.
374 >     * WorkQueue field "phase" is used by both workers and the pool to
375 >     * manage and track whether a worker is UNSIGNALLED (possibly
376 >     * blocked waiting for a signal).  When a worker is enqueued its
377 >     * phase field is set. Note that phase field updates lag queue CAS
378 >     * releases so usage requires care -- seeing a negative phase does
379 >     * not guarantee that the worker is available. When queued, the
380 >     * lower 16 bits of scanState must hold its pool index. So we
381 >     * place the index there upon initialization (see registerWorker)
382 >     * and otherwise keep it there or restore it when necessary.
383       *
384       * The ctl field also serves as the basis for memory
385       * synchronization surrounding activation. This uses a more
# Line 394 | Line 388 | public class ForkJoinPool extends Abstra
388       * if to its current value).  This would be extremely costly. So
389       * we relax it in several ways: (1) Producers only signal when
390       * their queue is empty. Other workers propagate this signal (in
391 <     * method scan) when they find tasks. (2) Workers only enqueue
392 <     * after scanning (see below) and not finding any tasks.  (3)
393 <     * Rather than CASing ctl to its current value in the common case
394 <     * where no action is required, we reduce write contention by
395 <     * equivalently prefacing signalWork when called by an external
396 <     * task producer using a memory access with full-volatile
397 <     * semantics or a "fullFence". (4) For internal task producers we
398 <     * rely on the fact that even if no other workers awaken, the
405 <     * producer itself will eventually see the task and execute it.
391 >     * method scan) when they find tasks; to further reduce flailing,
392 >     * each worker signals only one other per activation. (2) Workers
393 >     * only enqueue after scanning (see below) and not finding any
394 >     * tasks.  (3) Rather than CASing ctl to its current value in the
395 >     * common case where no action is required, we reduce write
396 >     * contention by equivalently prefacing signalWork when called by
397 >     * an external task producer using a memory access with
398 >     * full-volatile semantics or a "fullFence".
399       *
400       * Almost always, too many signals are issued. A task producer
401       * cannot in general tell if some existing worker is in the midst
# Line 414 | Line 407 | public class ForkJoinPool extends Abstra
407       * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
408       * computations involving only a few workers.
409       *
410 <     * Scanning. Method scan() performs top-level scanning for tasks.
411 <     * Each scan traverses (and tries to poll from) each queue in
412 <     * pseudorandom permutation order by randomly selecting an origin
413 <     * index and a step value.  (The pseudorandom generator need not
414 <     * have high-quality statistical properties in the long term, but
415 <     * just within computations; We use 64bit and 32bit Marsaglia
416 <     * XorShifts, which are cheap and suffice here.)  Scanning also
417 <     * employs contention reduction: When scanning workers fail a CAS
418 <     * polling for work, they soon restart with a different
419 <     * pseudorandom scan order (thus likely retrying at different
420 <     * intervals). This improves throughput when many threads are
421 <     * trying to take tasks from few queues.  Scans do not otherwise
422 <     * explicitly take into account core affinities, loads, cache
423 <     * localities, etc, However, they do exploit temporal locality
424 <     * (which usually approximates these) by preferring to re-poll (up
425 <     * to POLL_LIMIT times) from the same queue after a successful
426 <     * poll before trying others.  Restricted forms of scanning occur
427 <     * in methods helpComplete and findNonEmptyStealQueue, and take
435 <     * similar but simpler forms.
436 <     *
437 <     * Deactivation and waiting. Queuing encounters several intrinsic
438 <     * races; most notably that an inactivating scanning worker can
439 <     * miss seeing a task produced during a scan.  So when a worker
440 <     * cannot find a task to steal, it inactivates and enqueues, and
441 <     * then rescans to ensure that it didn't miss one, reactivating
442 <     * upon seeing one with probability approximately proportional to
443 <     * probability of a miss.  (In most cases, the worker will be
444 <     * signalled before self-signalling, avoiding cascades of multiple
445 <     * signals for the same task).
446 <     *
447 <     * Workers block (in method awaitWork) using park/unpark;
448 <     * advertising the need for signallers to unpark by setting their
449 <     * "parker" fields.
410 >     * Scanning. Method runWorker performs top-level scanning for
411 >     * tasks.  Each scan traverses and tries to poll from each queue
412 >     * starting at a random index and circularly stepping. Scans are
413 >     * not performed in ideal random permutation order, to reduce
414 >     * cacheline contention.  The pseudorandom generator need not have
415 >     * high-quality statistical properties in the long term, but just
416 >     * within computations; We use Marsaglia XorShifts (often via
417 >     * ThreadLocalRandom.nextSecondarySeed), which are cheap and
418 >     * suffice. Scanning also employs contention reduction: When
419 >     * scanning workers fail to extract an apparently existing task,
420 >     * they soon restart at a different pseudorandom index.  This
421 >     * improves throughput when many threads are trying to take tasks
422 >     * from few queues, which can be common in some usages.  Scans do
423 >     * not otherwise explicitly take into account core affinities,
424 >     * loads, cache localities, etc, However, they do exploit temporal
425 >     * locality (which usually approximates these) by preferring to
426 >     * re-poll (at most #workers times) from the same queue after a
427 >     * successful poll before trying others.
428       *
429       * Trimming workers. To release resources after periods of lack of
430       * use, a worker starting to wait when the pool is quiescent will
431 <     * time out and terminate (see awaitWork) if the pool has remained
432 <     * quiescent for period given by IDLE_TIMEOUT_MS, increasing the
455 <     * period as the number of threads decreases, eventually removing
456 <     * all workers.
431 >     * time out and terminate (see method scan) if the pool has
432 >     * remained quiescent for period given by field keepAlive.
433       *
434       * Shutdown and Termination. A call to shutdownNow invokes
435       * tryTerminate to atomically set a runState bit. The calling
436       * thread, as well as every other worker thereafter terminating,
437 <     * helps terminate others by setting their (qlock) status,
438 <     * cancelling their unprocessed tasks, and waking them up, doing
439 <     * so repeatedly until stable. Calls to non-abrupt shutdown()
440 <     * preface this by checking whether termination should commence.
441 <     * This relies primarily on the active count bits of "ctl"
442 <     * maintaining consensus -- tryTerminate is called from awaitWork
443 <     * whenever quiescent. However, external submitters do not take
468 <     * part in this consensus.  So, tryTerminate sweeps through queues
469 <     * (until stable) to ensure lack of in-flight submissions and
470 <     * workers about to process them before triggering the "STOP"
471 <     * phase of termination. (Note: there is an intrinsic conflict if
472 <     * helpQuiescePool is called when shutdown is enabled. Both wait
473 <     * for quiescence, but tryTerminate is biased to not trigger until
474 <     * helpQuiescePool completes.)
437 >     * helps terminate others by cancelling their unprocessed tasks,
438 >     * and waking them up, doing so repeatedly until stable. Calls to
439 >     * non-abrupt shutdown() preface this by checking whether
440 >     * termination should commence by sweeping through queues (until
441 >     * stable) to ensure lack of in-flight submissions and workers
442 >     * about to process them before triggering the "STOP" phase of
443 >     * termination.
444       *
445       * Joining Tasks
446       * =============
# Line 479 | Line 448 | public class ForkJoinPool extends Abstra
448       * Any of several actions may be taken when one worker is waiting
449       * to join a task stolen (or always held) by another.  Because we
450       * are multiplexing many tasks on to a pool of workers, we can't
451 <     * just let them block (as in Thread.join).  We also cannot just
452 <     * reassign the joiner's run-time stack with another and replace
453 <     * it later, which would be a form of "continuation", that even if
454 <     * possible is not necessarily a good idea since we may need both
455 <     * an unblocked task and its continuation to progress.  Instead we
456 <     * combine two tactics:
451 >     * always just let them block (as in Thread.join).  We also cannot
452 >     * just reassign the joiner's run-time stack with another and
453 >     * replace it later, which would be a form of "continuation", that
454 >     * even if possible is not necessarily a good idea since we may
455 >     * need both an unblocked task and its continuation to progress.
456 >     * Instead we combine two tactics:
457       *
458       *   Helping: Arranging for the joiner to execute some task that it
459       *      would be running if the steal had not occurred.
# Line 497 | Line 466 | public class ForkJoinPool extends Abstra
466       * helping a hypothetical compensator: If we can readily tell that
467       * a possible action of a compensator is to steal and execute the
468       * task being joined, the joining thread can do so directly,
469 <     * without the need for a compensation thread (although at the
501 <     * expense of larger run-time stacks, but the tradeoff is
502 <     * typically worthwhile).
469 >     * without the need for a compensation thread.
470       *
471       * The ManagedBlocker extension API can't use helping so relies
472       * only on compensation in method awaitBlocker.
473       *
474 <     * The algorithm in helpStealer entails a form of "linear
475 <     * helping".  Each worker records (in field currentSteal) the most
476 <     * recent task it stole from some other worker (or a submission).
477 <     * It also records (in field currentJoin) the task it is currently
478 <     * actively joining. Method helpStealer uses these markers to try
479 <     * to find a worker to help (i.e., steal back a task from and
480 <     * execute it) that could hasten completion of the actively joined
481 <     * task.  Thus, the joiner executes a task that would be on its
482 <     * own local deque had the to-be-joined task not been stolen. This
483 <     * is a conservative variant of the approach described in Wagner &
484 <     * Calder "Leapfrogging: a portable technique for implementing
485 <     * efficient futures" SIGPLAN Notices, 1993
486 <     * (http://portal.acm.org/citation.cfm?id=155354). It differs in
487 <     * that: (1) We only maintain dependency links across workers upon
488 <     * steals, rather than use per-task bookkeeping.  This sometimes
489 <     * requires a linear scan of workQueues array to locate stealers,
490 <     * but often doesn't because stealers leave hints (that may become
491 <     * stale/wrong) of where to locate them.  It is only a hint
492 <     * because a worker might have had multiple steals and the hint
526 <     * records only one of them (usually the most current).  Hinting
527 <     * isolates cost to when it is needed, rather than adding to
528 <     * per-task overhead.  (2) It is "shallow", ignoring nesting and
529 <     * potentially cyclic mutual steals.  (3) It is intentionally
530 <     * racy: field currentJoin is updated only while actively joining,
531 <     * which means that we miss links in the chain during long-lived
532 <     * tasks, GC stalls etc (which is OK since blocking in such cases
533 <     * is usually a good idea).  (4) We bound the number of attempts
534 <     * to find work using checksums and fall back to suspending the
535 <     * worker and if necessary replacing it with another.
536 <     *
537 <     * Helping actions for CountedCompleters do not require tracking
538 <     * currentJoins: Method helpComplete takes and executes any task
539 <     * with the same root as the task being waited on (preferring
540 <     * local pops to non-local polls). However, this still entails
541 <     * some traversal of completer chains, so is less efficient than
542 <     * using CountedCompleters without explicit joins.
474 >     * The algorithm in awaitJoin entails a form of "linear helping".
475 >     * Each worker records (in field source) the id of the queue from
476 >     * which it last stole a task.  The scan in method awaitJoin uses
477 >     * these markers to try to find a worker to help (i.e., steal back
478 >     * a task from and execute it) that could hasten completion of the
479 >     * actively joined task.  Thus, the joiner executes a task that
480 >     * would be on its own local deque if the to-be-joined task had
481 >     * not been stolen. This is a conservative variant of the approach
482 >     * described in Wagner & Calder "Leapfrogging: a portable
483 >     * technique for implementing efficient futures" SIGPLAN Notices,
484 >     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
485 >     * mainly in that we only record queue ids, not full dependency
486 >     * links.  This requires a linear scan of the workQueues array to
487 >     * locate stealers, but isolates cost to when it is needed, rather
488 >     * than adding to per-task overhead. Searches can fail to locate
489 >     * stealers GC stalls and the like delay recording sources.
490 >     * Further, even when accurately identified, stealers might not
491 >     * ever produce a task that the joiner can in turn help with. So,
492 >     * compensation is tried upon failure to find tasks to run.
493       *
494 <     * Compensation does not aim to keep exactly the target
494 >     * Compensation does not by default aim to keep exactly the target
495       * parallelism number of unblocked threads running at any given
496       * time. Some previous versions of this class employed immediate
497       * compensations for any blocked join. However, in practice, the
498       * vast majority of blockages are transient byproducts of GC and
499       * other JVM or OS activities that are made worse by replacement.
500 <     * Currently, compensation is attempted only after validating that
501 <     * all purportedly active threads are processing tasks by checking
502 <     * field WorkQueue.scanState, which eliminates most false
503 <     * positives.  Also, compensation is bypassed (tolerating fewer
504 <     * threads) in the most common case in which it is rarely
505 <     * beneficial: when a worker with an empty queue (thus no
506 <     * continuation tasks) blocks on a join and there still remain
557 <     * enough threads to ensure liveness.
558 <     *
559 <     * Spare threads are removed as soon as they notice that the
560 <     * target parallelism level has been exceeded, in method
561 <     * tryDropSpare. (Method scan arranges returns for rechecks upon
562 <     * each probe via the "bound" parameter.)
563 <     *
564 <     * The compensation mechanism may be bounded.  Bounds for the
565 <     * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
566 <     * with programming errors and abuse before running out of
567 <     * resources to do so. In other cases, users may supply factories
568 <     * that limit thread construction. The effects of bounding in this
569 <     * pool (like all others) is imprecise.  Total worker counts are
570 <     * decremented when threads deregister, not when they exit and
571 <     * resources are reclaimed by the JVM and OS. So the number of
572 <     * simultaneously live threads may transiently exceed bounds.
573 <     *
500 >     * Rather than impose arbitrary policies, we allow users to
501 >     * override the default of only adding threads upon apparent
502 >     * starvation.  The compensation mechanism may also be bounded.
503 >     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
504 >     * JVMs to cope with programming errors and abuse before running
505 >     * out of resources to do so.
506 >
507       * Common Pool
508       * ===========
509       *
510       * The static common pool always exists after static
511       * initialization.  Since it (or any other created pool) need
512       * never be used, we minimize initial construction overhead and
513 <     * footprint to the setup of about a dozen fields, with no nested
581 <     * allocation. Most bootstrapping occurs within method
582 <     * externalSubmit during the first submission to the pool.
513 >     * footprint to the setup of about a dozen fields.
514       *
515       * When external threads submit to the common pool, they can
516       * perform subtask processing (see externalHelpComplete and
# Line 599 | Line 530 | public class ForkJoinPool extends Abstra
530       * InnocuousForkJoinWorkerThread when there is a SecurityManager
531       * present. These workers have no permissions set, do not belong
532       * to any user-defined ThreadGroup, and erase all ThreadLocals
533 <     * after executing any top-level task (see WorkQueue.runTask).
534 <     * The associated mechanics (mainly in ForkJoinWorkerThread) may
535 <     * be JVM-dependent and must access particular Thread class fields
536 <     * to achieve this effect.
533 >     * after executing any top-level task (see
534 >     * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
535 >     * in ForkJoinWorkerThread) may be JVM-dependent and must access
536 >     * particular Thread class fields to achieve this effect.
537       *
538       * Style notes
539       * ===========
# Line 672 | Line 603 | public class ForkJoinPool extends Abstra
603      public static interface ForkJoinWorkerThreadFactory {
604          /**
605           * Returns a new worker thread operating in the given pool.
606 +         * Returning null or throwing an exception may result in tasks
607 +         * never being executed.  If this method throws an exception,
608 +         * it is relayed to the caller of the method (for example
609 +         * {@code execute}) causing attempted thread creation. If this
610 +         * method returns null or throws an exception, it is not
611 +         * retried until the next attempted creation (for example
612 +         * another call to {@code execute}).
613           *
614           * @param pool the pool this thread works in
615           * @return the new worker thread, or {@code null} if the request
616 <         *         to create a thread is rejected
616 >         *         to create a thread is rejected.
617           * @throws NullPointerException if the pool is null
618           */
619          public ForkJoinWorkerThread newThread(ForkJoinPool pool);
# Line 692 | Line 630 | public class ForkJoinPool extends Abstra
630          }
631      }
632  
695    /**
696     * Class for artificial tasks that are used to replace the target
697     * of local joins if they are removed from an interior queue slot
698     * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
699     * actually do anything beyond having a unique identity.
700     */
701    private static final class EmptyTask extends ForkJoinTask<Void> {
702        private static final long serialVersionUID = -7721805057305804111L;
703        EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
704        public final Void getRawResult() { return null; }
705        public final void setRawResult(Void x) {}
706        public final boolean exec() { return true; }
707    }
708
709    /**
710     * Additional fields and lock created upon initialization.
711     */
712    private static final class AuxState extends ReentrantLock {
713        private static final long serialVersionUID = -6001602636862214147L;
714        volatile long stealCount;     // cumulative steal count
715        long indexSeed;               // index bits for registerWorker
716        AuxState() {}
717    }
718
633      // Constants shared across ForkJoinPool and WorkQueue
634  
635      // Bounds
636 +    static final int SWIDTH       = 16;            // width of short
637      static final int SMASK        = 0xffff;        // short bits == max index
638      static final int MAX_CAP      = 0x7fff;        // max #workers - 1
724    static final int EVENMASK     = 0xfffe;        // even short bits
639      static final int SQMASK       = 0x007e;        // max 64 (even) slots
640  
641 <    // Masks and units for WorkQueue.scanState and ctl sp subfield
641 >    // Masks and units for WorkQueue.phase and ctl sp subfield
642      static final int UNSIGNALLED  = 1 << 31;       // must be negative
643      static final int SS_SEQ       = 1 << 16;       // version count
644 +    static final int QLOCK        = 1;             // must be 1
645  
646 <    // Mode bits for ForkJoinPool.config and WorkQueue.config
647 <    static final int MODE_MASK    = 0xffff << 16;  // top half of int
648 <    static final int SPARE_WORKER = 1 << 17;       // set if tc > 0 on creation
649 <    static final int UNREGISTERED = 1 << 18;       // to skip some of deregister
650 <    static final int FIFO_QUEUE   = 1 << 31;       // must be negative
651 <    static final int LIFO_QUEUE   = 0;             // for clarity
652 <    static final int IS_OWNED     = 1;             // low bit 0 if shared
653 <
654 <    /**
655 <     * The maximum number of task executions from the same queue
656 <     * before checking other queues, bounding unfairness and impact of
657 <     * infinite user task recursion.  Must be a power of two minus 1.
646 >    // Mode bits and sentinels, some also used in WorkQueue id and.source fields
647 >    static final int OWNED        = 1;             // queue has owner thread
648 >    static final int FIFO         = 1 << 16;       // fifo queue or access mode
649 >    static final int SATURATE     = 1 << 17;       // for tryCompensate
650 >    static final int SHUTDOWN     = 1 << 18;
651 >    static final int TERMINATED   = 1 << 19;
652 >    static final int STOP         = 1 << 31;       // must be negative
653 >    static final int QUIET        = 1 << 30;       // not scanning or working
654 >    static final int DORMANT      = QUIET | UNSIGNALLED;
655 >
656 >    /**
657 >     * The maximum number of local polls from the same queue before
658 >     * checking others. This is a safeguard against infinitely unfair
659 >     * looping under unbounded user task recursion, and must be larger
660 >     * than plausible cases of intentional bounded task recursion.
661       */
662 <    static final int POLL_LIMIT = (1 << 10) - 1;
662 >    static final int POLL_LIMIT = 1 << 10;
663  
664      /**
665       * Queues supporting work-stealing as well as external task
# Line 752 | Line 670 | public class ForkJoinPool extends Abstra
670       * arrays sharing cache lines. The @Contended annotation alerts
671       * JVMs to try to keep instances apart.
672       */
673 <    @jdk.internal.vm.annotation.Contended
673 >    // For now, using manual padding.
674 >    //    @jdk.internal.vm.annotation.Contended
675 >    //    @sun.misc.Contended
676      static final class WorkQueue {
677  
678          /**
# Line 776 | Line 696 | public class ForkJoinPool extends Abstra
696          static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
697  
698          // Instance fields
699 <
700 <        volatile int scanState;    // versioned, negative if inactive
701 <        int stackPred;             // pool stack (ctl) predecessor
699 >        volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
700 >        volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
701 >        volatile int phase;        // versioned, negative: queued, 1: locked
702 >        int stackPred;             // pool stack (ctl) predecessor link
703          int nsteals;               // number of steals
704 <        int hint;                  // randomization and stealer index hint
705 <        int config;                // pool index and mode
785 <        volatile int qlock;        // 1: locked, < 0: terminate; else 0
704 >        int id;                    // index, mode, tag
705 >        volatile int source;       // source queue id, or sentinel
706          volatile int base;         // index of next slot for poll
707          int top;                   // index of next slot for push
708          ForkJoinTask<?>[] array;   // the elements (initially unallocated)
709          final ForkJoinPool pool;   // the containing pool (may be null)
710          final ForkJoinWorkerThread owner; // owning thread or null if shared
711 <        volatile Thread parker;    // == owner during call to park; else null
712 <        volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
793 <
794 <        @jdk.internal.vm.annotation.Contended("group2") // segregate
795 <        volatile ForkJoinTask<?> currentSteal; // nonnull when running some task
711 >        volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
712 >        volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
713  
714          WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
715              this.pool = pool;
# Line 805 | Line 722 | public class ForkJoinPool extends Abstra
722           * Returns an exportable index (used by ForkJoinWorkerThread).
723           */
724          final int getPoolIndex() {
725 <            return (config & 0xffff) >>> 1; // ignore odd/even tag bit
725 >            return (id & 0xffff) >>> 1; // ignore odd/even tag bit
726          }
727  
728          /**
# Line 822 | Line 739 | public class ForkJoinPool extends Abstra
739           * near-empty queue has at least one unclaimed task.
740           */
741          final boolean isEmpty() {
742 <            ForkJoinTask<?>[] a; int n, al, s;
743 <            return ((n = base - (s = top)) >= 0 || // possibly one task
742 >            ForkJoinTask<?>[] a; int n, al, b;
743 >            return ((n = (b = base) - top) >= 0 || // possibly one task
744                      (n == -1 && ((a = array) == null ||
745                                   (al = a.length) == 0 ||
746 <                                 a[(al - 1) & (s - 1)] == null)));
746 >                                 a[(al - 1) & b] == null)));
747          }
748  
749 +
750          /**
751           * Pushes a task. Call only by owner in unshared queues.
752           *
# Line 836 | Line 754 | public class ForkJoinPool extends Abstra
754           * @throws RejectedExecutionException if array cannot be resized
755           */
756          final void push(ForkJoinTask<?> task) {
757 <            U.storeFence();              // ensure safe publication
840 <            int s = top, al, d; ForkJoinTask<?>[] a;
757 >            int s = top; ForkJoinTask<?>[] a; int al, d;
758              if ((a = array) != null && (al = a.length) > 0) {
759 <                a[(al - 1) & s] = task;  // relaxed writes OK
760 <                top = s + 1;
759 >                int index = (al - 1) & s;
760 >                long offset = ((long)index << ASHIFT) + ABASE;
761                  ForkJoinPool p = pool;
762 +                top = s + 1;
763 +                U.putOrderedObject(a, offset, task);
764                  if ((d = base - s) == 0 && p != null) {
765                      U.fullFence();
766                      p.signalWork();
767                  }
768 <                else if (al + d == 1)
768 >                else if (d + al == 1)
769                      growArray();
770              }
771          }
# Line 858 | Line 777 | public class ForkJoinPool extends Abstra
777           */
778          final ForkJoinTask<?>[] growArray() {
779              ForkJoinTask<?>[] oldA = array;
780 <            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
780 >            int oldSize = oldA != null ? oldA.length : 0;
781 >            int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
782              if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
783                  throw new RejectedExecutionException("Queue capacity exceeded");
784              int oldMask, t, b;
785              ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
786 <            if (oldA != null && (oldMask = oldA.length - 1) > 0 &&
786 >            if (oldA != null && (oldMask = oldSize - 1) > 0 &&
787                  (t = top) - (b = base) > 0) {
788                  int mask = size - 1;
789                  do { // emulate poll from old array, push to new array
# Line 894 | Line 814 | public class ForkJoinPool extends Abstra
814                  if (t != null &&
815                      U.compareAndSwapObject(a, offset, t, null)) {
816                      top = s;
817 <                    return t;
898 <                }
899 <            }
900 <            return null;
901 <        }
902 <
903 <        /**
904 <         * Takes a task in FIFO order if b is base of queue and a task
905 <         * can be claimed without contention. Specialized versions
906 <         * appear in ForkJoinPool methods scan and helpStealer.
907 <         */
908 <        final ForkJoinTask<?> pollAt(int b) {
909 <            ForkJoinTask<?>[] a; int al;
910 <            if ((a = array) != null && (al = a.length) > 0) {
911 <                int index = (al - 1) & b;
912 <                long offset = ((long)index << ASHIFT) + ABASE;
913 <                ForkJoinTask<?> t = (ForkJoinTask<?>)
914 <                    U.getObjectVolatile(a, offset);
915 <                if (t != null && b++ == base &&
916 <                    U.compareAndSwapObject(a, offset, t, null)) {
917 <                    base = b;
817 >                    U.storeFence();
818                      return t;
819                  }
820              }
# Line 954 | Line 854 | public class ForkJoinPool extends Abstra
854           * Takes next task, if one exists, in order specified by mode.
855           */
856          final ForkJoinTask<?> nextLocalTask() {
857 <            return (config < 0) ? poll() : pop();
857 >            return ((id & FIFO) != 0) ? poll() : pop();
858          }
859  
860          /**
# Line 963 | Line 863 | public class ForkJoinPool extends Abstra
863          final ForkJoinTask<?> peek() {
864              int al; ForkJoinTask<?>[] a;
865              return ((a = array) != null && (al = a.length) > 0) ?
866 <                a[(al - 1) & (config < 0 ? base : top - 1)] : null;
866 >                a[(al - 1) &
867 >                  ((id & FIFO) != 0 ? base : top - 1)] : null;
868          }
869  
870          /**
# Line 976 | Line 877 | public class ForkJoinPool extends Abstra
877                  long offset = ((long)index << ASHIFT) + ABASE;
878                  if (U.compareAndSwapObject(a, offset, task, null)) {
879                      top = s;
880 +                    U.storeFence();
881                      return true;
882                  }
883              }
# Line 983 | Line 885 | public class ForkJoinPool extends Abstra
885          }
886  
887          /**
986         * Shared version of push. Fails if already locked.
987         *
988         * @return status: > 0 locked, 0 possibly was empty, < 0 was nonempty
989         */
990        final int sharedPush(ForkJoinTask<?> task) {
991            int stat;
992            if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
993                int b = base, s = top, al, d; ForkJoinTask<?>[] a;
994                if ((a = array) != null && (al = a.length) > 0 &&
995                    al - 1 + (d = b - s) > 0) {
996                    a[(al - 1) & s] = task;
997                    top = s + 1;                 // relaxed writes OK here
998                    qlock = 0;
999                    stat = (d < 0 && b == base) ? d : 0;
1000                }
1001                else {
1002                    growAndSharedPush(task);
1003                    stat = 0;
1004                }
1005            }
1006            else
1007                stat = 1;
1008            return stat;
1009        }
1010
1011        /**
1012         * Helper for sharedPush; called only when locked and resize
1013         * needed.
1014         */
1015        private void growAndSharedPush(ForkJoinTask<?> task) {
1016            try {
1017                growArray();
1018                int s = top, al; ForkJoinTask<?>[] a;
1019                if ((a = array) != null && (al = a.length) > 0) {
1020                    a[(al - 1) & s] = task;
1021                    top = s + 1;
1022                }
1023            } finally {
1024                qlock = 0;
1025            }
1026        }
1027
1028        /**
1029         * Shared version of tryUnpush.
1030         */
1031        final boolean trySharedUnpush(ForkJoinTask<?> task) {
1032            boolean popped = false;
1033            int s = top - 1, al; ForkJoinTask<?>[] a;
1034            if ((a = array) != null && (al = a.length) > 0) {
1035                int index = (al - 1) & s;
1036                long offset = ((long)index << ASHIFT) + ABASE;
1037                ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1038                if (t == task &&
1039                    U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1040                    if (top == s + 1 && array == a &&
1041                        U.compareAndSwapObject(a, offset, task, null)) {
1042                        popped = true;
1043                        top = s;
1044                    }
1045                    U.putOrderedInt(this, QLOCK, 0);
1046                }
1047            }
1048            return popped;
1049        }
1050
1051        /**
888           * Removes and cancels all known tasks, ignoring any exceptions.
889           */
890          final void cancelAll() {
891 <            ForkJoinTask<?> t;
1056 <            if ((t = currentJoin) != null) {
1057 <                currentJoin = null;
1058 <                ForkJoinTask.cancelIgnoringExceptions(t);
1059 <            }
1060 <            if ((t = currentSteal) != null) {
1061 <                currentSteal = null;
1062 <                ForkJoinTask.cancelIgnoringExceptions(t);
1063 <            }
1064 <            while ((t = poll()) != null)
891 >            for (ForkJoinTask<?> t; (t = poll()) != null; )
892                  ForkJoinTask.cancelIgnoringExceptions(t);
893          }
894  
895          // Specialized execution methods
896  
897          /**
898 <         * Pops and executes up to POLL_LIMIT tasks or until empty.
898 >         * Pops and executes up to limit consecutive tasks or until empty.
899 >         *
900 >         * @param limit max runs, or zero for no limit
901           */
902 <        final void localPopAndExec() {
903 <            for (int nexec = 0;;) {
902 >        final void localPopAndExec(int limit) {
903 >            for (;;) {
904                  int b = base, s = top, al; ForkJoinTask<?>[] a;
905                  if ((a = array) != null && b != s && (al = a.length) > 0) {
906                      int index = (al - 1) & --s;
# Line 1080 | Line 909 | public class ForkJoinPool extends Abstra
909                          U.getAndSetObject(a, offset, null);
910                      if (t != null) {
911                          top = s;
912 <                        (currentSteal = t).doExec();
913 <                        if (++nexec > POLL_LIMIT)
912 >                        U.storeFence();
913 >                        t.doExec();
914 >                        if (limit != 0 && --limit == 0)
915                              break;
916                      }
917                      else
# Line 1093 | Line 923 | public class ForkJoinPool extends Abstra
923          }
924  
925          /**
926 <         * Polls and executes up to POLL_LIMIT tasks or until empty.
926 >         * Polls and executes up to limit consecutive tasks or until empty.
927 >         *
928 >         * @param limit, or zero for no limit
929           */
930 <        final void localPollAndExec() {
931 <            for (int nexec = 0;;) {
932 <                int b = base, s = top, al; ForkJoinTask<?>[] a;
933 <                if ((a = array) != null && b != s && (al = a.length) > 0) {
930 >        final void localPollAndExec(int limit) {
931 >            for (int polls = 0;;) {
932 >                int b = base, s = top, d, al; ForkJoinTask<?>[] a;
933 >                if ((a = array) != null && (d = b - s) < 0 &&
934 >                    (al = a.length) > 0) {
935                      int index = (al - 1) & b++;
936                      long offset = ((long)index << ASHIFT) + ABASE;
937                      ForkJoinTask<?> t = (ForkJoinTask<?>)
# Line 1106 | Line 939 | public class ForkJoinPool extends Abstra
939                      if (t != null) {
940                          base = b;
941                          t.doExec();
942 <                        if (++nexec > POLL_LIMIT)
942 >                        if (limit != 0 && ++polls == limit)
943                              break;
944                      }
945 +                    else if (d == -1)
946 +                        break;     // now empty
947 +                    else
948 +                        polls = 0; // stolen; reset
949                  }
950                  else
951                      break;
# Line 1116 | Line 953 | public class ForkJoinPool extends Abstra
953          }
954  
955          /**
956 <         * Executes the given task and (some) remaining local tasks.
956 >         * If present, removes task from queue and executes
957           */
958 <        final void runTask(ForkJoinTask<?> task) {
959 <            if (task != null) {
960 <                task.doExec();
961 <                if (config < 0)
962 <                    localPollAndExec();
963 <                else
964 <                    localPopAndExec();
965 <                int ns = ++nsteals;
966 <                ForkJoinWorkerThread thread = owner;
967 <                currentSteal = null;
968 <                if (ns < 0)           // collect on overflow
969 <                    transferStealCount(pool);
970 <                if (thread != null)
971 <                    thread.afterTopLevelExec();
972 <            }
973 <        }
974 <
975 <        /**
976 <         * Adds steal count to pool steal count if it exists, and resets.
977 <         */
978 <        final void transferStealCount(ForkJoinPool p) {
979 <            AuxState aux;
980 <            if (p != null && (aux = p.auxState) != null) {
981 <                long s = nsteals;
982 <                nsteals = 0;            // if negative, correct for overflow
983 <                if (s < 0) s = Integer.MAX_VALUE;
984 <                aux.lock();
985 <                try {
986 <                    aux.stealCount += s;
987 <                } finally {
1151 <                    aux.unlock();
958 >        final void tryRemoveAndExec(ForkJoinTask<?> task) {
959 >            ForkJoinTask<?>[] wa; int s, wal;
960 >            if (base - (s = top) < 0 && // traverse from top
961 >                (wa = array) != null && (wal = wa.length) > 0) {
962 >                for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
963 >                    int index = i & m;
964 >                    long offset = (index << ASHIFT) + ABASE;
965 >                    ForkJoinTask<?> t = (ForkJoinTask<?>)
966 >                        U.getObject(wa, offset);
967 >                    if (t == null)
968 >                        break;
969 >                    else if (t == task) {
970 >                        if (U.compareAndSwapObject(wa, offset, t, null)) {
971 >                            top = ns;   // safely shift down
972 >                            for (int j = i; j != ns; ++j) {
973 >                                ForkJoinTask<?> f;
974 >                                int pindex = (j + 1) & m;
975 >                                long pOffset = (pindex << ASHIFT) + ABASE;
976 >                                f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
977 >                                U.putObjectVolatile(wa, pOffset, null);
978 >
979 >                                int jindex = j & m;
980 >                                long jOffset = (jindex << ASHIFT) + ABASE;
981 >                                U.putOrderedObject(wa, jOffset, f);
982 >                            }
983 >                            U.storeFence();
984 >                            t.doExec();
985 >                        }
986 >                        break;
987 >                    }
988                  }
989              }
990          }
991  
992          /**
993 <         * If present, removes from queue and executes the given task,
994 <         * or any other cancelled task. Used only by awaitJoin.
993 >         * Tries to steal and run tasks within the target's
994 >         * computation until done, not found, or limit exceeded
995           *
996 <         * @return true if queue empty and task not known to be done
997 <         */
998 <        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
999 <            if (task != null && task.status >= 0) {
1000 <                int b, s, d, al; ForkJoinTask<?>[] a;
1001 <                while ((d = (b = base) - (s = top)) < 0 &&
1002 <                       (a = array) != null && (al = a.length) > 0) {
1003 <                    for (;;) {      // traverse from s to b
1004 <                        int index = --s & (al - 1);
1005 <                        long offset = (index << ASHIFT) + ABASE;
1006 <                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1007 <                            U.getObjectVolatile(a, offset);
1008 <                        if (t == null)
1009 <                            break;                   // restart
1010 <                        else if (t == task) {
1011 <                            boolean removed = false;
1012 <                            if (s + 1 == top) {      // pop
1013 <                                if (U.compareAndSwapObject(a, offset, t, null)) {
1014 <                                    top = s;
1015 <                                    removed = true;
996 >         * @param task root of CountedCompleter computation
997 >         * @param limit max runs, or zero for no limit
998 >         * @return task status on exit
999 >         */
1000 >        final int localHelpCC(CountedCompleter<?> task, int limit) {
1001 >            int status = 0;
1002 >            if (task != null && (status = task.status) >= 0) {
1003 >                for (;;) {
1004 >                    boolean help = false;
1005 >                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1006 >                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1007 >                        int index = (al - 1) & (s - 1);
1008 >                        long offset = ((long)index << ASHIFT) + ABASE;
1009 >                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1010 >                            U.getObject(a, offset);
1011 >                        if (o instanceof CountedCompleter) {
1012 >                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1013 >                            for (CountedCompleter<?> f = t;;) {
1014 >                                if (f != task) {
1015 >                                    if ((f = f.completer) == null) // try parent
1016 >                                        break;
1017 >                                }
1018 >                                else {
1019 >                                    if (U.compareAndSwapObject(a, offset,
1020 >                                                               t, null)) {
1021 >                                        top = s - 1;
1022 >                                        U.storeFence();
1023 >                                        t.doExec();
1024 >                                        help = true;
1025 >                                    }
1026 >                                    break;
1027                                  }
1028                              }
1182                            else if (base == b)      // replace with proxy
1183                                removed = U.compareAndSwapObject(a, offset, t,
1184                                                                 new EmptyTask());
1185                            if (removed) {
1186                                ForkJoinTask<?> ps = currentSteal;
1187                                (currentSteal = task).doExec();
1188                                currentSteal = ps;
1189                            }
1190                            break;
1191                        }
1192                        else if (t.status < 0 && s + 1 == top) {
1193                            if (U.compareAndSwapObject(a, offset, t, null)) {
1194                                top = s;
1195                            }
1196                            break;                  // was cancelled
1197                        }
1198                        else if (++d == 0) {
1199                            if (base != b)          // rescan
1200                                break;
1201                            return false;
1029                          }
1030                      }
1031 <                    if (task.status < 0)
1032 <                        return false;
1031 >                    if ((status = task.status) < 0 || !help ||
1032 >                        (limit != 0 && --limit == 0))
1033 >                        break;
1034                  }
1035              }
1036 <            return true;
1036 >            return status;
1037          }
1038  
1039 +        // Operations on shared queues
1040 +
1041          /**
1042 <         * Pops task if in the same CC computation as the given task,
1213 <         * in either shared or owned mode. Used only by helpComplete.
1042 >         * Tries to lock shared queue by CASing phase field
1043           */
1044 <        final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1045 <            int b = base, s = top, al; ForkJoinTask<?>[] a;
1046 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
1047 <                int index = (al - 1) & (s - 1);
1044 >        final boolean tryLockSharedQueue() {
1045 >            return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1046 >        }
1047 >
1048 >        /**
1049 >         * Shared version of tryUnpush.
1050 >         */
1051 >        final boolean trySharedUnpush(ForkJoinTask<?> task) {
1052 >            boolean popped = false;
1053 >            int s = top - 1, al; ForkJoinTask<?>[] a;
1054 >            if ((a = array) != null && (al = a.length) > 0) {
1055 >                int index = (al - 1) & s;
1056                  long offset = ((long)index << ASHIFT) + ABASE;
1057 <                ForkJoinTask<?> o = (ForkJoinTask<?>)
1058 <                    U.getObjectVolatile(a, offset);
1059 <                if (o instanceof CountedCompleter) {
1060 <                    CountedCompleter<?> t = (CountedCompleter<?>)o;
1061 <                    for (CountedCompleter<?> r = t;;) {
1062 <                        if (r == task) {
1063 <                            if ((mode & IS_OWNED) == 0) {
1227 <                                boolean popped = false;
1228 <                                if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1229 <                                    if (top == s && array == a &&
1230 <                                        U.compareAndSwapObject(a, offset,
1231 <                                                               t, null)) {
1232 <                                        popped = true;
1233 <                                        top = s - 1;
1234 <                                    }
1235 <                                    U.putOrderedInt(this, QLOCK, 0);
1236 <                                    if (popped)
1237 <                                        return t;
1238 <                                }
1239 <                            }
1240 <                            else if (U.compareAndSwapObject(a, offset,
1241 <                                                            t, null)) {
1242 <                                top = s - 1;
1243 <                                return t;
1244 <                            }
1245 <                            break;
1246 <                        }
1247 <                        else if ((r = r.completer) == null) // try parent
1248 <                            break;
1057 >                ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1058 >                if (t == task &&
1059 >                    U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1060 >                    if (top == s + 1 && array == a &&
1061 >                        U.compareAndSwapObject(a, offset, task, null)) {
1062 >                        popped = true;
1063 >                        top = s;
1064                      }
1065 +                    U.putOrderedInt(this, PHASE, 0);
1066                  }
1067              }
1068 <            return null;
1068 >            return popped;
1069          }
1070  
1071          /**
1072 <         * Steals and runs a task in the same CC computation as the
1073 <         * given task if one exists and can be taken without
1074 <         * contention. Otherwise returns a checksum/control value for
1075 <         * use by method helpComplete.
1076 <         *
1077 <         * @return 1 if successful, 2 if retryable (lost to another
1078 <         * stealer), -1 if non-empty but no matching task found, else
1079 <         * the base index, forced negative.
1080 <         */
1081 <        final int pollAndExecCC(CountedCompleter<?> task) {
1082 <            ForkJoinTask<?>[] a;
1083 <            int b = base, s = top, al, h;
1084 <            if ((a = array) != null && b != s && (al = a.length) > 0) {
1085 <                int index = (al - 1) & b;
1086 <                long offset = ((long)index << ASHIFT) + ABASE;
1087 <                ForkJoinTask<?> o = (ForkJoinTask<?>)
1088 <                    U.getObjectVolatile(a, offset);
1089 <                if (o == null)
1090 <                    h = 2;                      // retryable
1091 <                else if (!(o instanceof CountedCompleter))
1092 <                    h = -1;                     // unmatchable
1093 <                else {
1094 <                    CountedCompleter<?> t = (CountedCompleter<?>)o;
1095 <                    for (CountedCompleter<?> r = t;;) {
1096 <                        if (r == task) {
1097 <                            if (b++ == base &&
1098 <                                U.compareAndSwapObject(a, offset, t, null)) {
1099 <                                base = b;
1100 <                                t.doExec();
1101 <                                h = 1;          // success
1072 >         * Shared version of localHelpCC.
1073 >         */
1074 >        final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1075 >            int status = 0;
1076 >            if (task != null && (status = task.status) >= 0) {
1077 >                for (;;) {
1078 >                    boolean help = false;
1079 >                    int b = base, s = top, al; ForkJoinTask<?>[] a;
1080 >                    if ((a = array) != null && b != s && (al = a.length) > 0) {
1081 >                        int index = (al - 1) & (s - 1);
1082 >                        long offset = ((long)index << ASHIFT) + ABASE;
1083 >                        ForkJoinTask<?> o = (ForkJoinTask<?>)
1084 >                            U.getObject(a, offset);
1085 >                        if (o instanceof CountedCompleter) {
1086 >                            CountedCompleter<?> t = (CountedCompleter<?>)o;
1087 >                            for (CountedCompleter<?> f = t;;) {
1088 >                                if (f != task) {
1089 >                                    if ((f = f.completer) == null)
1090 >                                        break;
1091 >                                }
1092 >                                else {
1093 >                                    if (U.compareAndSwapInt(this, PHASE,
1094 >                                                            0, QLOCK)) {
1095 >                                        if (top == s && array == a &&
1096 >                                            U.compareAndSwapObject(a, offset,
1097 >                                                                   t, null)) {
1098 >                                            help = true;
1099 >                                            top = s - 1;
1100 >                                        }
1101 >                                        U.putOrderedInt(this, PHASE, 0);
1102 >                                        if (help)
1103 >                                            t.doExec();
1104 >                                    }
1105 >                                    break;
1106 >                                }
1107                              }
1287                            else
1288                                h = 2;          // lost CAS
1289                            break;
1290                        }
1291                        else if ((r = r.completer) == null) {
1292                            h = -1;             // unmatched
1293                            break;
1108                          }
1109                      }
1110 +                    if ((status = task.status) < 0 || !help ||
1111 +                        (limit != 0 && --limit == 0))
1112 +                        break;
1113                  }
1114              }
1115 <            else
1299 <                h = b | Integer.MIN_VALUE;      // to sense movement on re-poll
1300 <            return h;
1115 >            return status;
1116          }
1117  
1118          /**
# Line 1305 | Line 1120 | public class ForkJoinPool extends Abstra
1120           */
1121          final boolean isApparentlyUnblocked() {
1122              Thread wt; Thread.State s;
1123 <            return (scanState >= 0 &&
1309 <                    (wt = owner) != null &&
1123 >            return ((wt = owner) != null &&
1124                      (s = wt.getState()) != Thread.State.BLOCKED &&
1125                      s != Thread.State.WAITING &&
1126                      s != Thread.State.TIMED_WAITING);
# Line 1314 | Line 1128 | public class ForkJoinPool extends Abstra
1128  
1129          // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1130          private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1131 <        private static final long QLOCK;
1131 >        private static final long PHASE;
1132          private static final int ABASE;
1133          private static final int ASHIFT;
1134          static {
1135              try {
1136 <                QLOCK = U.objectFieldOffset
1137 <                    (WorkQueue.class.getDeclaredField("qlock"));
1136 >                PHASE = U.objectFieldOffset
1137 >                    (WorkQueue.class.getDeclaredField("phase"));
1138                  ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1139                  int scale = U.arrayIndexScale(ForkJoinTask[].class);
1140                  if ((scale & (scale - 1)) != 0)
# Line 1343 | Line 1157 | public class ForkJoinPool extends Abstra
1157  
1158      /**
1159       * Permission required for callers of methods that may start or
1160 <     * kill threads.  Also used as a static lock in tryInitialize.
1160 >     * kill threads.
1161       */
1162      static final RuntimePermission modifyThreadPermission;
1163  
# Line 1384 | Line 1198 | public class ForkJoinPool extends Abstra
1198      // static configuration constants
1199  
1200      /**
1201 <     * Initial timeout value (in milliseconds) for the thread
1202 <     * triggering quiescence to park waiting for new work. On timeout,
1389 <     * the thread will instead try to shrink the number of workers.
1390 <     * The value should be large enough to avoid overly aggressive
1391 <     * shrinkage during most transient stalls (long GCs etc).
1201 >     * Default idle timeout value (in milliseconds) for the thread
1202 >     * triggering quiescence to park waiting for new work
1203       */
1204 <    private static final long IDLE_TIMEOUT_MS = 2000L; // 2sec
1204 >    private static final long DEFAULT_KEEPALIVE = 60000L;
1205  
1206      /**
1207 <     * Tolerance for idle timeouts, to cope with timer undershoots.
1207 >     * Undershoot tolerance for idle timeouts
1208       */
1209 <    private static final long TIMEOUT_SLOP_MS =   20L; // 20ms
1209 >    private static final long TIMEOUT_SLOP = 20L;
1210  
1211      /**
1212       * The default value for COMMON_MAX_SPARES.  Overridable using the
# Line 1415 | Line 1226 | public class ForkJoinPool extends Abstra
1226  
1227      /*
1228       * Bits and masks for field ctl, packed with 4 16 bit subfields:
1229 <     * AC: Number of active running workers minus target parallelism
1229 >     * RC: Number of released (unqueued) workers minus target parallelism
1230       * TC: Number of total workers minus target parallelism
1231       * SS: version count and status of top waiting thread
1232       * ID: poolIndex of top of Treiber stack of waiters
# Line 1424 | Line 1235 | public class ForkJoinPool extends Abstra
1235       * (including version bits) as sp=(int)ctl.  The offsets of counts
1236       * by the target parallelism and the positionings of fields makes
1237       * it possible to perform the most common checks via sign tests of
1238 <     * fields: When ac is negative, there are not enough active
1238 >     * fields: When ac is negative, there are not enough unqueued
1239       * workers, when tc is negative, there are not enough total
1240       * workers.  When sp is non-zero, there are waiting workers.  To
1241       * deal with possibly negative fields, we use casts in and out of
1242       * "short" and/or signed shifts to maintain signedness.
1243       *
1244 <     * Because it occupies uppermost bits, we can add one active count
1245 <     * using getAndAddLong of AC_UNIT, rather than CAS, when returning
1244 >     * Because it occupies uppermost bits, we can add one release count
1245 >     * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1246       * from a blocked join.  Other updates entail multiple subfields
1247       * and masking, requiring CAS.
1248 +     *
1249 +     * The limits packed in field "bounds" are also offset by the
1250 +     * parallelism level to make them comparable to the ctl rc and tc
1251 +     * fields.
1252       */
1253  
1254      // Lower and upper word masks
1255      private static final long SP_MASK    = 0xffffffffL;
1256      private static final long UC_MASK    = ~SP_MASK;
1257  
1258 <    // Active counts
1259 <    private static final int  AC_SHIFT   = 48;
1260 <    private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
1261 <    private static final long AC_MASK    = 0xffffL << AC_SHIFT;
1258 >    // Release counts
1259 >    private static final int  RC_SHIFT   = 48;
1260 >    private static final long RC_UNIT    = 0x0001L << RC_SHIFT;
1261 >    private static final long RC_MASK    = 0xffffL << RC_SHIFT;
1262  
1263      // Total counts
1264      private static final int  TC_SHIFT   = 32;
# Line 1451 | Line 1266 | public class ForkJoinPool extends Abstra
1266      private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1267      private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1268  
1454    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
1455    private static final int  STARTED    = 1;
1456    private static final int  STOP       = 1 << 1;
1457    private static final int  TERMINATED = 1 << 2;
1458    private static final int  SHUTDOWN   = 1 << 31;
1459
1269      // Instance fields
1270 +
1271 +    // Segregate ctl field, For now using padding vs @Contended
1272 +    //    @jdk.internal.vm.annotation.Contended("fjpctl")
1273 +    //    @sun.misc.Contended("fjpctl")
1274 +    volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1275 +    volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1276      volatile long ctl;                   // main pool control
1277 <    volatile int runState;
1278 <    final int config;                    // parallelism, mode
1279 <    AuxState auxState;                   // lock, steal counts
1280 <    volatile WorkQueue[] workQueues;     // main registry
1281 <    final String workerNamePrefix;       // to create worker name string
1277 >    volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1278 >    volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1279 >
1280 >    volatile long stealCount;            // collects worker nsteals
1281 >    final long keepAlive;                // milliseconds before dropping if idle
1282 >    int indexSeed;                       // next worker index
1283 >    final int bounds;                    // min, max threads packed as shorts
1284 >    volatile int mode;                   // parallelism, runstate, queue mode
1285 >    WorkQueue[] workQueues;              // main registry
1286 >    final String workerNamePrefix;       // for worker thread string; sync lock
1287      final ForkJoinWorkerThreadFactory factory;
1288      final UncaughtExceptionHandler ueh;  // per-worker UEH
1289  
1470    /**
1471     * Instantiates fields upon first submission, or upon shutdown if
1472     * no submissions. If checkTermination true, also responds to
1473     * termination by external calls submitting tasks.
1474     */
1475    private void tryInitialize(boolean checkTermination) {
1476        if (runState == 0) { // bootstrap by locking static field
1477            int p = config & SMASK;
1478            int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
1479            n |= n >>> 1;    // create workQueues array with size a power of two
1480            n |= n >>> 2;
1481            n |= n >>> 4;
1482            n |= n >>> 8;
1483            n |= n >>> 16;
1484            n = ((n + 1) << 1) & SMASK;
1485            AuxState aux = new AuxState();
1486            WorkQueue[] ws = new WorkQueue[n];
1487            synchronized (modifyThreadPermission) { // double-check
1488                if (runState == 0) {
1489                    workQueues = ws;
1490                    auxState = aux;
1491                    runState = STARTED;
1492                }
1493            }
1494        }
1495        if (checkTermination && runState < 0) {
1496            tryTerminate(false, false); // help terminate
1497            throw new RejectedExecutionException();
1498        }
1499    }
1500
1290      // Creating, registering and deregistering workers
1291  
1292      /**
# Line 1505 | Line 1294 | public class ForkJoinPool extends Abstra
1294       * count has already been incremented as a reservation.  Invokes
1295       * deregisterWorker on any failure.
1296       *
1508     * @param isSpare true if this is a spare thread
1297       * @return true if successful
1298       */
1299 <    private boolean createWorker(boolean isSpare) {
1299 >    private boolean createWorker() {
1300          ForkJoinWorkerThreadFactory fac = factory;
1301          Throwable ex = null;
1302          ForkJoinWorkerThread wt = null;
1515        WorkQueue q;
1303          try {
1304              if (fac != null && (wt = fac.newThread(this)) != null) {
1518                if (isSpare && (q = wt.workQueue) != null)
1519                    q.config |= SPARE_WORKER;
1305                  wt.start();
1306                  return true;
1307              }
# Line 1537 | Line 1322 | public class ForkJoinPool extends Abstra
1322       */
1323      private void tryAddWorker(long c) {
1324          do {
1325 <            long nc = ((AC_MASK & (c + AC_UNIT)) |
1325 >            long nc = ((RC_MASK & (c + RC_UNIT)) |
1326                         (TC_MASK & (c + TC_UNIT)));
1327              if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1328 <                createWorker(false);
1328 >                createWorker();
1329                  break;
1330              }
1331          } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
# Line 1555 | Line 1340 | public class ForkJoinPool extends Abstra
1340       */
1341      final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1342          UncaughtExceptionHandler handler;
1343 <        AuxState aux;
1559 <        wt.setDaemon(true);                           // configure thread
1343 >        wt.setDaemon(true);                             // configure thread
1344          if ((handler = ueh) != null)
1345              wt.setUncaughtExceptionHandler(handler);
1346          WorkQueue w = new WorkQueue(this, wt);
1347 <        int i = 0;                                    // assign a pool index
1348 <        int mode = config & MODE_MASK;
1349 <        if ((aux = auxState) != null) {
1350 <            aux.lock();
1351 <            try {
1352 <                int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m;
1353 <                WorkQueue[] ws = workQueues;
1354 <                if (ws != null && (n = ws.length) > 0) {
1355 <                    i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices
1356 <                    if (ws[i] != null) {              // collision
1357 <                        int probes = 0;               // step by approx half n
1358 <                        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1359 <                        while (ws[i = (i + step) & m] != null) {
1360 <                            if (++probes >= n) {
1361 <                                workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1362 <                                m = n - 1;
1363 <                                probes = 0;
1364 <                            }
1347 >        int tid = 0;                                    // for thread name
1348 >        int fifo = mode & FIFO;
1349 >        String prefix = workerNamePrefix;
1350 >        if (prefix != null) {
1351 >            synchronized(prefix) {
1352 >                WorkQueue[] ws = workQueues; int n;
1353 >                int s = indexSeed += SEED_INCREMENT;
1354 >                if (ws != null && (n = ws.length) > 1) {
1355 >                    int m = n - 1;
1356 >                    tid = s & m;
1357 >                    int i = m & ((s << 1) | 1);         // odd-numbered indices
1358 >                    for (int probes = n >>> 1;;) {      // find empty slot
1359 >                        WorkQueue q;
1360 >                        if ((q = ws[i]) == null || q.phase == QUIET)
1361 >                            break;
1362 >                        else if (--probes == 0) {
1363 >                            i = n | 1;                  // resize below
1364 >                            break;
1365 >                        }
1366 >                        else
1367 >                            i = (i + 2) & m;
1368 >                    }
1369 >
1370 >                    int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1371 >                    w.phase = w.id = id;                // now publishable
1372 >
1373 >                    if (i < n)
1374 >                        ws[i] = w;
1375 >                    else {                              // expand array
1376 >                        int an = n << 1;
1377 >                        WorkQueue[] as = new WorkQueue[an];
1378 >                        as[i] = w;
1379 >                        int am = an - 1;
1380 >                        for (int j = 0; j < n; ++j) {
1381 >                            WorkQueue v;                // copy external queue
1382 >                            if ((v = ws[j]) != null)    // position may change
1383 >                                as[v.id & am & SQMASK] = v;
1384 >                            if (++j >= n)
1385 >                                break;
1386 >                            as[j] = ws[j];              // copy worker
1387                          }
1388 +                        workQueues = as;
1389                      }
1583                    w.hint = s;                       // use as random seed
1584                    w.config = i | mode;
1585                    w.scanState = i | (s & 0x7fff0000); // random seq bits
1586                    ws[i] = w;
1390                  }
1588            } finally {
1589                aux.unlock();
1391              }
1392 +            wt.setName(prefix.concat(Integer.toString(tid)));
1393          }
1592        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
1394          return w;
1395      }
1396  
# Line 1604 | Line 1405 | public class ForkJoinPool extends Abstra
1405       */
1406      final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1407          WorkQueue w = null;
1408 +        int phase = 0;
1409          if (wt != null && (w = wt.workQueue) != null) {
1410 <            AuxState aux; WorkQueue[] ws;          // remove index from array
1411 <            int idx = w.config & SMASK;
1412 <            int ns = w.nsteals;
1413 <            if ((aux = auxState) != null) {
1414 <                aux.lock();
1415 <                try {
1410 >            Object lock = workerNamePrefix;
1411 >            long ns = (long)w.nsteals & 0xffffffffL;
1412 >            int idx = w.id & SMASK;
1413 >            if (lock != null) {
1414 >                WorkQueue[] ws;                       // remove index from array
1415 >                synchronized(lock) {
1416                      if ((ws = workQueues) != null && ws.length > idx &&
1417                          ws[idx] == w)
1418                          ws[idx] = null;
1419 <                    aux.stealCount += ns;
1618 <                } finally {
1619 <                    aux.unlock();
1419 >                    stealCount += ns;
1420                  }
1421              }
1422 +            phase = w.phase;
1423          }
1424 <        if (w == null || (w.config & UNREGISTERED) == 0) { // else pre-adjusted
1424 >        if (phase != QUIET) {                         // else pre-adjusted
1425              long c;                                   // decrement counts
1426              do {} while (!U.compareAndSwapLong
1427 <                         (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
1427 >                         (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1428                                                 (TC_MASK & (c - TC_UNIT)) |
1429                                                 (SP_MASK & c))));
1430          }
1431 <        if (w != null) {
1631 <            w.currentSteal = null;
1632 <            w.qlock = -1;                             // ensure set
1431 >        if (w != null)
1432              w.cancelAll();                            // cancel remaining tasks
1433 <        }
1434 <        while (tryTerminate(false, false) >= 0) {     // possibly replace
1435 <            WorkQueue[] ws; int wl, sp; long c;
1436 <            if (w == null || w.array == null ||
1437 <                (ws = workQueues) == null || (wl = ws.length) <= 0)
1639 <                break;
1640 <            else if ((sp = (int)(c = ctl)) != 0) {    // wake up replacement
1641 <                if (tryRelease(c, ws[(wl - 1) & sp], AC_UNIT))
1642 <                    break;
1643 <            }
1644 <            else if (ex != null && (c & ADD_WORKER) != 0L) {
1645 <                tryAddWorker(c);                      // create replacement
1646 <                break;
1647 <            }
1648 <            else                                      // don't need replacement
1649 <                break;
1650 <        }
1433 >
1434 >        if (!tryTerminate(false, false) &&            // possibly replace worker
1435 >            w != null && w.array != null)             // avoid repeated failures
1436 >            signalWork();
1437 >
1438          if (ex == null)                               // help clean on way out
1439              ForkJoinTask.helpExpungeStaleExceptions();
1440          else                                          // rethrow
1441              ForkJoinTask.rethrow(ex);
1442      }
1443  
1657    // Signalling
1658
1444      /**
1445 <     * Tries to create or activate a worker if too few are active.
1445 >     * Tries to create or release a worker if too few are running.
1446       */
1447      final void signalWork() {
1448          for (;;) {
1449 <            long c; int sp, i; WorkQueue v; WorkQueue[] ws;
1449 >            long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1450              if ((c = ctl) >= 0L)                      // enough workers
1451                  break;
1452              else if ((sp = (int)c) == 0) {            // no idle workers
# Line 1676 | Line 1461 | public class ForkJoinPool extends Abstra
1461              else if ((v = ws[i]) == null)
1462                  break;                                // terminating
1463              else {
1464 <                int ns = sp & ~UNSIGNALLED;
1465 <                int vs = v.scanState;
1466 <                long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
1467 <                if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
1468 <                    v.scanState = ns;
1469 <                    LockSupport.unpark(v.parker);
1464 >                int np = sp & ~UNSIGNALLED;
1465 >                int vp = v.phase;
1466 >                long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1467 >                Thread vt = v.owner;
1468 >                if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1469 >                    v.phase = np;
1470 >                    if (v.source < 0)
1471 >                        LockSupport.unpark(vt);
1472                      break;
1473                  }
1474              }
# Line 1689 | Line 1476 | public class ForkJoinPool extends Abstra
1476      }
1477  
1478      /**
1479 <     * Signals and releases worker v if it is top of idle worker
1480 <     * stack.  This performs a one-shot version of signalWork only if
1481 <     * there is (apparently) at least one idle worker.
1482 <     *
1483 <     * @param c incoming ctl value
1484 <     * @param v if non-null, a worker
1485 <     * @param inc the increment to active count (zero when compensating)
1486 <     * @return true if successful
1487 <     */
1488 <    private boolean tryRelease(long c, WorkQueue v, long inc) {
1489 <        int sp = (int)c, ns = sp & ~UNSIGNALLED;
1703 <        if (v != null) {
1704 <            int vs = v.scanState;
1705 <            long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + inc));
1706 <            if (sp == vs && U.compareAndSwapLong(this, CTL, c, nc)) {
1707 <                v.scanState = ns;
1708 <                LockSupport.unpark(v.parker);
1709 <                return true;
1710 <            }
1711 <        }
1712 <        return false;
1713 <    }
1714 <
1715 <    /**
1716 <     * With approx probability of a missed signal, tries (once) to
1717 <     * reactivate worker w (or some other worker), failing if stale or
1718 <     * known to be already active.
1719 <     *
1720 <     * @param w the worker
1721 <     * @param ws the workQueue array to use
1722 <     * @param r random seed
1723 <     */
1724 <    private void tryReactivate(WorkQueue w, WorkQueue[] ws, int r) {
1725 <        long c; int sp, wl; WorkQueue v;
1726 <        if ((sp = (int)(c = ctl)) != 0 && w != null &&
1727 <            ws != null && (wl = ws.length) > 0 &&
1728 <            ((sp ^ r) & SS_SEQ) == 0 &&
1729 <            (v = ws[(wl - 1) & sp]) != null) {
1730 <            long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
1731 <            int ns = sp & ~UNSIGNALLED;
1732 <            if (w.scanState < 0 &&
1733 <                v.scanState == sp &&
1734 <                U.compareAndSwapLong(this, CTL, c, nc)) {
1735 <                v.scanState = ns;
1736 <                LockSupport.unpark(v.parker);
1737 <            }
1738 <        }
1739 <    }
1740 <
1741 <    /**
1742 <     * If worker w exists and is active, enqueues and sets status to inactive.
1743 <     *
1744 <     * @param w the worker
1745 <     * @param ss current (non-negative) scanState
1746 <     */
1747 <    private void inactivate(WorkQueue w, int ss) {
1748 <        int ns = (ss + SS_SEQ) | UNSIGNALLED;
1749 <        long lc = ns & SP_MASK, nc, c;
1750 <        if (w != null) {
1751 <            w.scanState = ns;
1752 <            do {
1753 <                nc = lc | (UC_MASK & ((c = ctl) - AC_UNIT));
1754 <                w.stackPred = (int)c;
1755 <            } while (!U.compareAndSwapLong(this, CTL, c, nc));
1756 <        }
1757 <    }
1758 <
1759 <    /**
1760 <     * Possibly blocks worker w waiting for signal, or returns
1761 <     * negative status if the worker should terminate. May return
1762 <     * without status change if multiple stale unparks and/or
1763 <     * interrupts occur.
1479 >     * Tries to decrement counts (sometimes implicitly) and possibly
1480 >     * arrange for a compensating worker in preparation for blocking:
1481 >     * If not all core workers yet exist, creates one, else if any are
1482 >     * unreleased (possibly including caller) releases one, else if
1483 >     * fewer than the minimum allowed number of workers running,
1484 >     * checks to see that they are all active, and if so creates an
1485 >     * extra worker unless over maximum limit and policy is to
1486 >     * saturate.  Most of these steps can fail due to interference, in
1487 >     * which case 0 is returned so caller will retry. A negative
1488 >     * return value indicates that the caller doesn't need to
1489 >     * re-adjust counts when later unblocked.
1490       *
1491 <     * @param w the calling worker
1766 <     * @return negative if w should terminate
1491 >     * @return 1: block then adjust, -1: block without adjust, 0 : retry
1492       */
1493 <    private int awaitWork(WorkQueue w) {
1494 <        int stat = 0;
1495 <        if (w != null && w.scanState < 0) {
1496 <            long c = ctl;
1497 <            if ((int)(c >> AC_SHIFT) + (config & SMASK) <= 0)
1498 <                stat = timedAwaitWork(w, c);     // possibly quiescent
1499 <            else if ((runState & STOP) != 0)
1500 <                stat = w.qlock = -1;             // pool terminating
1501 <            else if (w.scanState < 0) {
1502 <                w.parker = Thread.currentThread();
1503 <                if (w.scanState < 0)             // recheck after write
1504 <                    LockSupport.park(this);
1505 <                w.parker = null;
1506 <                if ((runState & STOP) != 0)
1507 <                    stat = w.qlock = -1;         // recheck
1508 <                else if (w.scanState < 0)
1509 <                    Thread.interrupted();        // clear status
1510 <            }
1511 <        }
1512 <        return stat;
1513 <    }
1514 <
1515 <    /**
1516 <     * Possibly triggers shutdown and tries (once) to block worker
1517 <     * when pool is (or may be) quiescent. Waits up to a duration
1518 <     * determined by number of workers.  On timeout, if ctl has not
1519 <     * changed, terminates the worker, which will in turn wake up
1520 <     * another worker to possibly repeat this process.
1521 <     *
1522 <     * @param w the calling worker
1523 <     * @return negative if w should terminate
1524 <     */
1525 <    private int timedAwaitWork(WorkQueue w, long c) {
1526 <        int stat = 0;
1527 <        int scale = 1 - (short)(c >>> TC_SHIFT);
1528 <        long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
1529 <                         System.currentTimeMillis());
1530 <        if ((runState >= 0 || (stat = tryTerminate(false, false)) > 0) &&
1531 <            w != null && w.scanState < 0) {
1532 <            int ss; AuxState aux;
1533 <            w.parker = Thread.currentThread();
1534 <            if (w.scanState < 0)
1535 <                LockSupport.parkUntil(this, deadline);
1536 <            w.parker = null;
1537 <            if ((runState & STOP) != 0)
1538 <                stat = w.qlock = -1;         // pool terminating
1539 <            else if ((ss = w.scanState) < 0 && !Thread.interrupted() &&
1815 <                     (int)c == ss && (aux = auxState) != null && ctl == c &&
1816 <                     deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
1817 <                aux.lock();
1818 <                try {                        // pre-deregister
1819 <                    WorkQueue[] ws;
1820 <                    int cfg = w.config, idx = cfg & SMASK;
1821 <                    long nc = ((UC_MASK & (c - TC_UNIT)) |
1822 <                               (SP_MASK & w.stackPred));
1823 <                    if ((runState & STOP) == 0 &&
1824 <                        (ws = workQueues) != null &&
1825 <                        idx < ws.length && idx >= 0 && ws[idx] == w &&
1826 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1827 <                        ws[idx] = null;
1828 <                        w.config = cfg | UNREGISTERED;
1829 <                        stat = w.qlock = -1;
1493 >    private int tryCompensate(WorkQueue w) {
1494 >        int t, n, sp;
1495 >        long c = ctl;
1496 >        WorkQueue[] ws = workQueues;
1497 >        if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1498 >            if (ws == null || (n = ws.length) <= 0 || w == null)
1499 >                return 0;                        // disabled
1500 >            else if ((sp = (int)c) != 0) {       // replace or release
1501 >                WorkQueue v = ws[sp & (n - 1)];
1502 >                int wp = w.phase;
1503 >                long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1504 >                int np = sp & ~UNSIGNALLED;
1505 >                if (v != null) {
1506 >                    int vp = v.phase;
1507 >                    Thread vt = v.owner;
1508 >                    long nc = ((long)v.stackPred & SP_MASK) | uc;
1509 >                    if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1510 >                        v.phase = np;
1511 >                        if (v.source < 0)
1512 >                            LockSupport.unpark(vt);
1513 >                        return (wp < 0) ? -1 : 1;
1514 >                    }
1515 >                }
1516 >                return 0;
1517 >            }
1518 >            else if ((int)(c >> RC_SHIFT) -      // reduce parallelism
1519 >                     (short)(bounds & SMASK) > 0) {
1520 >                long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1521 >                return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1522 >            }
1523 >            else {                               // validate
1524 >                int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1525 >                boolean unstable = false;
1526 >                for (int i = 1; i < n; i += 2) {
1527 >                    WorkQueue q; Thread wt; Thread.State ts;
1528 >                    if ((q = ws[i]) != null) {
1529 >                        if (q.source == 0) {
1530 >                            unstable = true;
1531 >                            break;
1532 >                        }
1533 >                        else {
1534 >                            --tc;
1535 >                            if ((wt = q.owner) != null &&
1536 >                                ((ts = wt.getState()) == Thread.State.BLOCKED ||
1537 >                                 ts == Thread.State.WAITING))
1538 >                                ++bc;            // worker is blocking
1539 >                        }
1540                      }
1831                } finally {
1832                    aux.unlock();
1541                  }
1542 <            }
1543 <        }
1544 <        return stat;
1545 <    }
1546 <
1547 <    /**
1548 <     * If the given worker is a spare with no queued tasks, and there
1549 <     * are enough existing workers, drops it from ctl counts and sets
1842 <     * its state to terminated.
1843 <     *
1844 <     * @param w the calling worker -- must be a spare
1845 <     * @return true if dropped (in which case it must not process more tasks)
1846 <     */
1847 <    private boolean tryDropSpare(WorkQueue w) {
1848 <        if (w != null && w.isEmpty()) {           // no local tasks
1849 <            long c; int sp, wl; WorkQueue[] ws; WorkQueue v;
1850 <            while ((short)((c = ctl) >> TC_SHIFT) > 0 &&
1851 <                   ((sp = (int)c) != 0 || (int)(c >> AC_SHIFT) > 0) &&
1852 <                   (ws = workQueues) != null && (wl = ws.length) > 0) {
1853 <                boolean dropped, canDrop;
1854 <                if (sp == 0) {                    // no queued workers
1855 <                    long nc = ((AC_MASK & (c - AC_UNIT)) |
1856 <                               (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c));
1857 <                    dropped = U.compareAndSwapLong(this, CTL, c, nc);
1858 <                }
1859 <                else if (
1860 <                    (v = ws[(wl - 1) & sp]) == null || v.scanState != sp)
1861 <                    dropped = false;              // stale; retry
1862 <                else {
1863 <                    long nc = v.stackPred & SP_MASK;
1864 <                    if (w == v || w.scanState >= 0) {
1865 <                        canDrop = true;           // w unqueued or topmost
1866 <                        nc |= ((AC_MASK & c) |    // ensure replacement
1867 <                               (TC_MASK & (c - TC_UNIT)));
1868 <                    }
1869 <                    else {                        // w may be queued
1870 <                        canDrop = false;          // help uncover
1871 <                        nc |= ((AC_MASK & (c + AC_UNIT)) |
1872 <                               (TC_MASK & c));
1873 <                    }
1874 <                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1875 <                        v.scanState = sp & ~UNSIGNALLED;
1876 <                        LockSupport.unpark(v.parker);
1877 <                        dropped = canDrop;
1542 >                if (unstable || tc != 0 || ctl != c)
1543 >                    return 0;                    // inconsistent
1544 >                else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1545 >                    if ((md & SATURATE) != 0)
1546 >                        return -1;
1547 >                    else if (bc < pc) {          // lagging
1548 >                        Thread.yield();          // for retry spins
1549 >                        return 0;
1550                      }
1551                      else
1552 <                        dropped = false;
1553 <                }
1882 <                if (dropped) {                    // pre-deregister
1883 <                    int cfg = w.config, idx = cfg & SMASK;
1884 <                    if (idx >= 0 && idx < ws.length && ws[idx] == w)
1885 <                        ws[idx] = null;
1886 <                    w.config = cfg | UNREGISTERED;
1887 <                    w.qlock = -1;
1888 <                    return true;
1552 >                        throw new RejectedExecutionException(
1553 >                            "Thread limit exceeded replacing blocked worker");
1554                  }
1555              }
1556          }
1557 <        return false;
1557 >
1558 >        long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1559 >        return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1560      }
1561  
1562      /**
1563       * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1564 +     * See above for explanation.
1565       */
1566      final void runWorker(WorkQueue w) {
1567 +        WorkQueue[] ws;
1568          w.growArray();                                  // allocate queue
1569 <        int bound = (w.config & SPARE_WORKER) != 0 ? 0 : POLL_LIMIT;
1570 <        long seed = w.hint * 0xdaba0b6eb09322e3L;       // initial random seed
1571 <        if ((runState & STOP) == 0) {
1572 <            for (long r = (seed == 0L) ? 1L : seed;;) { // ensure nonzero
1573 <                if (bound == 0 && tryDropSpare(w))
1574 <                    break;
1575 <                // high bits of prev seed for step; current low bits for idx
1576 <                int step = (int)(r >>> 48) | 1;
1577 <                r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift
1578 <                if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0)
1910 <                    break;
1911 <            }
1912 <        }
1913 <    }
1914 <
1915 <    // Scanning for tasks
1916 <
1917 <    /**
1918 <     * Repeatedly scans for and tries to steal and execute (via
1919 <     * workQueue.runTask) a queued task. Each scan traverses queues in
1920 <     * pseudorandom permutation. Upon finding a non-empty queue, makes
1921 <     * at most the given bound attempts to re-poll (fewer if
1922 <     * contended) on the same queue before returning (impossible
1923 <     * scanState value) 0 to restart scan. Else returns after at least
1924 <     * 1 and at most 32 full scans.
1925 <     *
1926 <     * @param w the worker (via its WorkQueue)
1927 <     * @param bound repoll bound as bitmask (0 if spare)
1928 <     * @param step (circular) index increment per iteration (must be odd)
1929 <     * @param r a random seed for origin index
1930 <     * @return negative if should await signal
1931 <     */
1932 <    private int scan(WorkQueue w, int bound, int step, int r) {
1933 <        int stat = 0, wl; WorkQueue[] ws;
1934 <        if ((ws = workQueues) != null && w != null && (wl = ws.length) > 0) {
1935 <            for (int m = wl - 1,
1936 <                     origin = m & r, idx = origin,
1937 <                     npolls = 0,
1938 <                     ss = w.scanState;;) {         // negative if inactive
1939 <                WorkQueue q; ForkJoinTask<?>[] a; int b, al;
1940 <                if ((q = ws[idx]) != null && (b = q.base) - q.top < 0 &&
1569 >        int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1570 >        if (r == 0)                                     // initial nonzero seed
1571 >            r = 1;
1572 >        int lastSignalId = 0;                           // avoid unneeded signals
1573 >        while ((ws = workQueues) != null) {
1574 >            boolean nonempty = false;                   // scan
1575 >            for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1576 >                WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1577 >                if ((i = r & m) >= 0 && i < n &&        // always true
1578 >                    (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1579                      (a = q.array) != null && (al = a.length) > 0) {
1580 +                    int qid = q.id;                     // (never zero)
1581                      int index = (al - 1) & b;
1582                      long offset = ((long)index << ASHIFT) + ABASE;
1583                      ForkJoinTask<?> t = (ForkJoinTask<?>)
1584                          U.getObjectVolatile(a, offset);
1585 <                    if (t == null)
1586 <                        break;                     // empty or busy
1587 <                    else if (b++ != q.base)
1588 <                        break;                     // busy
1589 <                    else if (ss < 0) {
1590 <                        tryReactivate(w, ws, r);
1591 <                        break;                     // retry upon rescan
1592 <                    }
1593 <                    else if (!U.compareAndSwapObject(a, offset, t, null))
1594 <                        break;                     // contended
1595 <                    else {
1596 <                        q.base = b;
1597 <                        w.currentSteal = t;
1598 <                        if (b != q.top)            // propagate signal
1599 <                            signalWork();
1961 <                        w.runTask(t);
1962 <                        if (++npolls > bound)
1963 <                            break;
1585 >                    if (t != null && b++ == q.base &&
1586 >                        U.compareAndSwapObject(a, offset, t, null)) {
1587 >                        if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1588 >                            signalWork();               // propagate signal
1589 >                        w.source = lastSignalId = qid;
1590 >                        t.doExec();
1591 >                        if ((w.id & FIFO) != 0)         // run remaining locals
1592 >                            w.localPollAndExec(POLL_LIMIT);
1593 >                        else
1594 >                            w.localPopAndExec(POLL_LIMIT);
1595 >                        ForkJoinWorkerThread thread = w.owner;
1596 >                        ++w.nsteals;
1597 >                        w.source = 0;                   // now idle
1598 >                        if (thread != null)
1599 >                            thread.afterTopLevelExec();
1600                      }
1601 +                    nonempty = true;
1602                  }
1603 <                else if (npolls != 0)              // rescan
1603 >                else if (nonempty)
1604                      break;
1605 <                else if ((idx = (idx + step) & m) == origin) {
1606 <                    if (ss < 0) {                  // await signal
1607 <                        stat = ss;
1608 <                        break;
1609 <                    }
1610 <                    else if (r >= 0) {
1611 <                        inactivate(w, ss);
1612 <                        break;
1605 >                else
1606 >                    ++r;
1607 >            }
1608 >
1609 >            if (nonempty) {                             // move (xorshift)
1610 >                r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1611 >            }
1612 >            else {
1613 >                int phase;
1614 >                lastSignalId = 0;                       // clear for next scan
1615 >                if ((phase = w.phase) >= 0) {           // enqueue
1616 >                    int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1617 >                    long c, nc;
1618 >                    do {
1619 >                        w.stackPred = (int)(c = ctl);
1620 >                        nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1621 >                    } while (!U.compareAndSwapLong(this, CTL, c, nc));
1622 >                }
1623 >                else {                                  // already queued
1624 >                    int pred = w.stackPred;
1625 >                    w.source = DORMANT;                 // enable signal
1626 >                    for (int steps = 0;;) {
1627 >                        int md, rc; long c;
1628 >                        if (w.phase >= 0) {
1629 >                            w.source = 0;
1630 >                            break;
1631 >                        }
1632 >                        else if ((md = mode) < 0)       // shutting down
1633 >                            return;
1634 >                        else if ((rc = ((md & SMASK) +  // possibly quiescent
1635 >                                        (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1636 >                                 (md & SHUTDOWN) != 0 &&
1637 >                                 tryTerminate(false, false))
1638 >                            return;                     // help terminate
1639 >                        else if ((++steps & 1) == 0)
1640 >                            Thread.interrupted();       // clear between parks
1641 >                        else if (rc <= 0 && pred != 0 && phase == (int)c) {
1642 >                            long d = keepAlive + System.currentTimeMillis();
1643 >                            LockSupport.parkUntil(this, d);
1644 >                            if (ctl == c &&
1645 >                                d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1646 >                                long nc = ((UC_MASK & (c - TC_UNIT)) |
1647 >                                           (SP_MASK & pred));
1648 >                                if (U.compareAndSwapLong(this, CTL, c, nc)) {
1649 >                                    w.phase = QUIET;
1650 >                                    return;             // drop on timeout
1651 >                                }
1652 >                            }
1653 >                        }
1654 >                        else
1655 >                            LockSupport.park(this);
1656                      }
1977                    else
1978                        r <<= 1;                   // at most 31 rescans
1657                  }
1658              }
1659          }
1982        return stat;
1660      }
1661  
1985    // Joining tasks
1986
1662      /**
1663 <     * Tries to steal and run tasks within the target's computation.
1664 <     * Uses a variant of the top-level algorithm, restricted to tasks
1665 <     * with the given task as ancestor: It prefers taking and running
1666 <     * eligible tasks popped from the worker's own queue (via
1992 <     * popCC). Otherwise it scans others, randomly moving on
1993 <     * contention or execution, deciding to give up based on a
1994 <     * checksum (via return codes from pollAndExecCC). The maxTasks
1995 <     * argument supports external usages; internal calls use zero,
1996 <     * allowing unbounded steps (external calls trap non-positive
1997 <     * values).
1663 >     * Helps and/or blocks until the given task is done or timeout.
1664 >     * First tries locally helping, then scans other queues for a task
1665 >     * produced by one of w's stealers; compensating and blocking if
1666 >     * none are found (rescanning if tryCompensate fails).
1667       *
1668       * @param w caller
1669 <     * @param maxTasks if non-zero, the maximum number of other tasks to run
1669 >     * @param task the task
1670 >     * @param deadline for timed waits, if nonzero
1671       * @return task status on exit
1672       */
1673 <    final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1674 <                           int maxTasks) {
1675 <        WorkQueue[] ws; int s = 0, wl;
1676 <        if ((ws = workQueues) != null && (wl = ws.length) > 1 &&
1677 <            task != null && w != null) {
1678 <            for (int m = wl - 1,
1679 <                     mode = w.config,
1680 <                     r = ~mode,                  // scanning seed
1681 <                     origin = r & m, k = origin, // first queue to scan
1682 <                     step = 3,                   // first scan step
1683 <                     h = 1,                      // 1:ran, >1:contended, <0:hash
1684 <                     oldSum = 0, checkSum = 0;;) {
1685 <                CountedCompleter<?> p; WorkQueue q; int i;
1673 >    final int xawaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1674 >        int s = 0;
1675 >        if (w != null && task != null &&
1676 >            (!(task instanceof CountedCompleter) ||
1677 >             (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1678 >            w.tryRemoveAndExec(task);
1679 >            int src = w.source, id = w.id, block = 0;
1680 >            s = task.status;
1681 >            while (s >= 0) {
1682 >                WorkQueue[] ws;
1683 >                boolean nonempty = false;
1684 >                int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1685 >                if ((ws = workQueues) != null) {       // scan for matching id
1686 >                    for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1687 >                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1688 >                        if ((i = r & m) >= 0 && i < n &&
1689 >                            (q = ws[i]) != null && q.source == id &&
1690 >                            (b = q.base) - q.top < 0 &&
1691 >                            (a = q.array) != null && (al = a.length) > 0) {
1692 >                            int qid = q.id;
1693 >                            if (block > 0)
1694 >                                U.getAndAddLong(this, CTL, RC_UNIT);
1695 >                            block = 0;
1696 >                            int index = (al - 1) & b;
1697 >                            long offset = ((long)index << ASHIFT) + ABASE;
1698 >                            ForkJoinTask<?> t = (ForkJoinTask<?>)
1699 >                                U.getObjectVolatile(a, offset);
1700 >                            if (t != null && b++ == q.base && id == q.source &&
1701 >                                U.compareAndSwapObject(a, offset, t, null)) {
1702 >                                q.base = b;
1703 >                                w.source = qid;
1704 >                                t.doExec();
1705 >                                w.source = src;
1706 >                            }
1707 >                            nonempty = true;
1708 >                            break;
1709 >                        }
1710 >                        else
1711 >                            r += 2;
1712 >                    }
1713 >                }
1714                  if ((s = task.status) < 0)
1715                      break;
1716 <                if (h == 1 && (p = w.popCC(task, mode)) != null) {
1717 <                    p.doExec();                  // run local task
1718 <                    if (maxTasks != 0 && --maxTasks == 0)
1719 <                        break;
1720 <                    origin = k;                  // reset
1721 <                    oldSum = checkSum = 0;
1716 >                else if (!nonempty) {
1717 >                    long ms, ns;
1718 >                    if (deadline == 0L)
1719 >                        ms = 0L;                   // untimed
1720 >                    else if ((ns = deadline - System.nanoTime()) <= 0L)
1721 >                        break;                     // timeout
1722 >                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1723 >                        ms = 1L;                   // avoid 0 for timed wait
1724 >                    if (block == 0)
1725 >                        block = tryCompensate(w);
1726 >                    else
1727 >                        task.internalWait(ms);
1728 >                    s = task.status;
1729                  }
1730 <                else {                           // poll other worker queues
1731 <                    if ((i = k | 1) < 0 || i > m || (q = ws[i]) == null)
1732 <                        h = 0;
1733 <                    else if ((h = q.pollAndExecCC(task)) < 0)
1734 <                        checkSum += h;
1735 <                    if (h > 0) {
1736 <                        if (h == 1 && maxTasks != 0 && --maxTasks == 0)
1730 >            }
1731 >            if (block > 0)
1732 >                U.getAndAddLong(this, CTL, RC_UNIT);
1733 >        }
1734 >        return s;
1735 >    }
1736 >
1737 >    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1738 >        int s = 0;
1739 >        if (w != null && task != null &&
1740 >            (!(task instanceof CountedCompleter) ||
1741 >             (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1742 >            w.tryRemoveAndExec(task);
1743 >            int src = w.source, id = w.id;
1744 >            s = task.status;
1745 >            while (s >= 0) {
1746 >                WorkQueue[] ws;
1747 >                boolean nonempty = false;
1748 >                int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1749 >                if ((ws = workQueues) != null) {       // scan for matching id
1750 >                    for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1751 >                        WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1752 >                        if ((i = (r + j) & m) >= 0 && i < n &&
1753 >                            (q = ws[i]) != null && q.source == id &&
1754 >                            (b = q.base) - q.top < 0 &&
1755 >                            (a = q.array) != null && (al = a.length) > 0) {
1756 >                            int qid = q.id;
1757 >                            int index = (al - 1) & b;
1758 >                            long offset = ((long)index << ASHIFT) + ABASE;
1759 >                            ForkJoinTask<?> t = (ForkJoinTask<?>)
1760 >                                U.getObjectVolatile(a, offset);
1761 >                            if (t != null && b++ == q.base && id == q.source &&
1762 >                                U.compareAndSwapObject(a, offset, t, null)) {
1763 >                                q.base = b;
1764 >                                w.source = qid;
1765 >                                t.doExec();
1766 >                                w.source = src;
1767 >                            }
1768 >                            nonempty = true;
1769                              break;
1770 <                        step = (r >>> 16) | 3;
2034 <                        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2035 <                        k = origin = r & m;      // move and restart
2036 <                        oldSum = checkSum = 0;
1770 >                        }
1771                      }
1772 <                    else if ((k = (k + step) & m) == origin) {
1773 <                        if (oldSum == (oldSum = checkSum))
1774 <                            break;
1775 <                        checkSum = 0;
1772 >                }
1773 >                if ((s = task.status) < 0)
1774 >                    break;
1775 >                else if (!nonempty) {
1776 >                    long ms, ns; int block;
1777 >                    if (deadline == 0L)
1778 >                        ms = 0L;                       // untimed
1779 >                    else if ((ns = deadline - System.nanoTime()) <= 0L)
1780 >                        break;                         // timeout
1781 >                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1782 >                        ms = 1L;                       // avoid 0 for timed wait
1783 >                    if ((block = tryCompensate(w)) != 0) {
1784 >                        task.internalWait(ms);
1785 >                        U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1786                      }
1787 +                    s = task.status;
1788                  }
1789              }
1790          }
# Line 2047 | Line 1792 | public class ForkJoinPool extends Abstra
1792      }
1793  
1794      /**
1795 <     * Tries to locate and execute tasks for a stealer of the given
1796 <     * task, or in turn one of its stealers. Traces currentSteal ->
1797 <     * currentJoin links looking for a thread working on a descendant
2053 <     * of the given task and with a non-empty queue to steal back and
2054 <     * execute tasks from. The first call to this method upon a
2055 <     * waiting join will often entail scanning/search, (which is OK
2056 <     * because the joiner has nothing better to do), but this method
2057 <     * leaves hints in workers to speed up subsequent calls.
2058 <     *
2059 <     * @param w caller
2060 <     * @param task the task to join
1795 >     * Runs tasks until {@code isQuiescent()}. Rather than blocking
1796 >     * when tasks cannot be found, rescans until all others cannot
1797 >     * find tasks either.
1798       */
1799 <    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
1800 <        if (task != null && w != null) {
1801 <            ForkJoinTask<?> ps = w.currentSteal;
1802 <            WorkQueue[] ws; int wl, oldSum = 0;
1803 <            outer: while (w.tryRemoveAndExec(task) && task.status >= 0 &&
1804 <                          (ws = workQueues) != null && (wl = ws.length) > 0) {
1805 <                ForkJoinTask<?> subtask;
1806 <                int m = wl - 1, checkSum = 0;          // for stability check
1807 <                WorkQueue j = w, v;                    // v is subtask stealer
1808 <                descent: for (subtask = task; subtask.status >= 0; ) {
1809 <                    for (int h = j.hint | 1, k = 0, i;;) {
1810 <                        if ((v = ws[i = (h + (k << 1)) & m]) != null) {
1811 <                            if (v.currentSteal == subtask) {
1812 <                                j.hint = i;
1813 <                                break;
1799 >    final void helpQuiescePool(WorkQueue w) {
1800 >        int prevSrc = w.source, fifo = w.id & FIFO;
1801 >        for (int source = prevSrc, released = -1;;) { // -1 until known
1802 >            WorkQueue[] ws;
1803 >            if (fifo != 0)
1804 >                w.localPollAndExec(0);
1805 >            else
1806 >                w.localPopAndExec(0);
1807 >            if (released == -1 && w.phase >= 0)
1808 >                released = 1;
1809 >            boolean quiet = true, empty = true;
1810 >            int r = ThreadLocalRandom.nextSecondarySeed();
1811 >            if ((ws = workQueues) != null) {
1812 >                for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1813 >                    WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1814 >                    if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1815 >                        if ((b = q.base) - q.top < 0 &&
1816 >                            (a = q.array) != null && (al = a.length) > 0) {
1817 >                            int qid = q.id;
1818 >                            if (released == 0) {    // increment
1819 >                                released = 1;
1820 >                                U.getAndAddLong(this, CTL, RC_UNIT);
1821                              }
2078                            checkSum += v.base;
2079                        }
2080                        if (++k > m)                   // can't find stealer
2081                            break outer;
2082                    }
2083
2084                    for (;;) {                         // help v or descend
2085                        ForkJoinTask<?>[] a; int b, al;
2086                        if (subtask.status < 0)        // too late to help
2087                            break descent;
2088                        checkSum += (b = v.base);
2089                        ForkJoinTask<?> next = v.currentJoin;
2090                        ForkJoinTask<?> t = null;
2091                        if ((a = v.array) != null && (al = a.length) > 0) {
1822                              int index = (al - 1) & b;
1823                              long offset = ((long)index << ASHIFT) + ABASE;
1824 <                            t = (ForkJoinTask<?>)
1824 >                            ForkJoinTask<?> t = (ForkJoinTask<?>)
1825                                  U.getObjectVolatile(a, offset);
1826 <                            if (t != null && b++ == v.base) {
1827 <                                if (j.currentJoin != subtask ||
1828 <                                    v.currentSteal != subtask ||
1829 <                                    subtask.status < 0)
1830 <                                    break descent;     // stale
1831 <                                if (U.compareAndSwapObject(a, offset, t, null)) {
2102 <                                    v.base = b;
2103 <                                    w.currentSteal = t;
2104 <                                    for (int top = w.top;;) {
2105 <                                        t.doExec();    // help
2106 <                                        w.currentSteal = ps;
2107 <                                        if (task.status < 0)
2108 <                                            break outer;
2109 <                                        if (w.top == top)
2110 <                                            break;     // run local tasks
2111 <                                        if ((t = w.pop()) == null)
2112 <                                            break descent;
2113 <                                        w.currentSteal = t;
2114 <                                    }
2115 <                                }
2116 <                            }
2117 <                        }
2118 <                        if (t == null && b == v.base && b - v.top >= 0) {
2119 <                            if ((subtask = next) == null) {  // try to descend
2120 <                                if (next == v.currentJoin &&
2121 <                                    oldSum == (oldSum = checkSum))
2122 <                                    break outer;
2123 <                                break descent;
1826 >                            if (t != null && b++ == q.base &&
1827 >                                U.compareAndSwapObject(a, offset, t, null)) {
1828 >                                q.base = b;
1829 >                                w.source = source = q.id;
1830 >                                t.doExec();
1831 >                                w.source = source = prevSrc;
1832                              }
1833 <                            j = v;
1833 >                            quiet = empty = false;
1834                              break;
1835                          }
1836 +                        else if ((q.source & QUIET) == 0)
1837 +                            quiet = false;
1838                      }
1839                  }
1840              }
1841 +            if (quiet) {
1842 +                if (released == 0)
1843 +                    U.getAndAddLong(this, CTL, RC_UNIT);
1844 +                w.source = prevSrc;
1845 +                break;
1846 +            }
1847 +            else if (empty) {
1848 +                if (source != QUIET)
1849 +                    w.source = source = QUIET;
1850 +                if (released == 1) {                 // decrement
1851 +                    released = 0;
1852 +                    U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1853 +                }
1854 +            }
1855          }
1856      }
1857  
1858      /**
1859 <     * Tries to decrement active count (sometimes implicitly) and
1860 <     * possibly release or create a compensating worker in preparation
2137 <     * for blocking. Returns false (retryable by caller), on
2138 <     * contention, detected staleness, instability, or termination.
1859 >     * Scans for and returns a polled task, if available.
1860 >     * Used only for untracked polls.
1861       *
1862 <     * @param w caller
1862 >     * @param submissionsOnly if true, only scan submission queues
1863       */
1864 <    private boolean tryCompensate(WorkQueue w) {
1865 <        boolean canBlock; int wl;
1866 <        long c = ctl;
1867 <        WorkQueue[] ws = workQueues;
1868 <        int pc = config & SMASK;
1869 <        int ac = pc + (int)(c >> AC_SHIFT);
1870 <        int tc = pc + (short)(c >> TC_SHIFT);
1871 <        if (w == null || w.qlock < 0 || pc == 0 ||  // terminating or disabled
1872 <            ws == null || (wl = ws.length) <= 0)
1873 <            canBlock = false;
1874 <        else {
2153 <            int m = wl - 1, sp;
2154 <            boolean busy = true;                    // validate ac
2155 <            for (int i = 0; i <= m; ++i) {
2156 <                int k; WorkQueue v;
2157 <                if ((k = (i << 1) | 1) <= m && k >= 0 && (v = ws[k]) != null &&
2158 <                    v.scanState >= 0 && v.currentSteal == null) {
2159 <                    busy = false;
2160 <                    break;
2161 <                }
1864 >    private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1865 >        WorkQueue[] ws; int n;
1866 >        rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1867 >                      (n = ws.length) > 0) {
1868 >            int m = n - 1;
1869 >            int r = ThreadLocalRandom.nextSecondarySeed();
1870 >            int h = r >>> 16;
1871 >            int origin, step;
1872 >            if (submissionsOnly) {
1873 >                origin = (r & ~1) & m;         // even indices and steps
1874 >                step = (h & ~1) | 2;
1875              }
1876 <            if (!busy || ctl != c)
1877 <                canBlock = false;                   // unstable or stale
1878 <            else if ((sp = (int)c) != 0)            // release idle worker
1879 <                canBlock = tryRelease(c, ws[m & sp], 0L);
1880 <            else if (tc >= pc && ac > 1 && w.isEmpty()) {
1881 <                long nc = ((AC_MASK & (c - AC_UNIT)) |
1882 <                           (~AC_MASK & c));         // uncompensated
1883 <                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
1884 <            }
1885 <            else if (tc >= MAX_CAP ||
1886 <                     (this == common && tc >= pc + COMMON_MAX_SPARES))
1887 <                throw new RejectedExecutionException(
1888 <                    "Thread limit exceeded replacing blocked worker");
1889 <            else {                                  // similar to tryAddWorker
1890 <                boolean isSpare = (tc >= pc);
1891 <                long nc = (AC_MASK & c) | (TC_MASK & (c + TC_UNIT));
1892 <                canBlock = (U.compareAndSwapLong(this, CTL, c, nc) &&
1893 <                            createWorker(isSpare)); // throws on exception
1876 >            else {
1877 >                origin = r & m;
1878 >                step = h | 1;
1879 >            }
1880 >            for (int k = origin, oldSum = 0, checkSum = 0;;) {
1881 >                WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1882 >                if ((q = ws[k]) != null) {
1883 >                    checkSum += b = q.base;
1884 >                    if (b - q.top < 0 &&
1885 >                        (a = q.array) != null && (al = a.length) > 0) {
1886 >                        int index = (al - 1) & b;
1887 >                        long offset = ((long)index << ASHIFT) + ABASE;
1888 >                        ForkJoinTask<?> t = (ForkJoinTask<?>)
1889 >                            U.getObjectVolatile(a, offset);
1890 >                        if (t != null && b++ == q.base &&
1891 >                            U.compareAndSwapObject(a, offset, t, null)) {
1892 >                            q.base = b;
1893 >                            return t;
1894 >                        }
1895 >                        else
1896 >                            break; // restart
1897 >                    }
1898 >                }
1899 >                if ((k = (k + step) & m) == origin) {
1900 >                    if (oldSum == (oldSum = checkSum))
1901 >                        break rescan;
1902 >                    checkSum = 0;
1903 >                }
1904              }
1905          }
1906 <        return canBlock;
1906 >        return null;
1907      }
1908  
1909      /**
1910 <     * Helps and/or blocks until the given task is done or timeout.
1910 >     * Gets and removes a local or stolen task for the given worker.
1911       *
1912 <     * @param w caller
2190 <     * @param task the task
2191 <     * @param deadline for timed waits, if nonzero
2192 <     * @return task status on exit
1912 >     * @return a task, if available
1913       */
1914 <    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1915 <        int s = 0;
1916 <        if (w != null) {
1917 <            ForkJoinTask<?> prevJoin = w.currentJoin;
1918 <            if (task != null && (s = task.status) >= 0) {
1919 <                w.currentJoin = task;
1920 <                CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
1921 <                    (CountedCompleter<?>)task : null;
1922 <                for (;;) {
1923 <                    if (cc != null)
1924 <                        helpComplete(w, cc, 0);
1914 >    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1915 >        ForkJoinTask<?> t;
1916 >        if (w != null &&
1917 >            (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1918 >            return t;
1919 >        else
1920 >            return pollScan(false);
1921 >    }
1922 >
1923 >    // External operations
1924 >
1925 >    /**
1926 >     * Adds the given task to a submission queue at submitter's
1927 >     * current queue, creating one if null or contended.
1928 >     *
1929 >     * @param task the task. Caller must ensure non-null.
1930 >     */
1931 >    final void externalPush(ForkJoinTask<?> task) {
1932 >        int r;                                // initialize caller's probe
1933 >        if ((r = ThreadLocalRandom.getProbe()) == 0) {
1934 >            ThreadLocalRandom.localInit();
1935 >            r = ThreadLocalRandom.getProbe();
1936 >        }
1937 >        for (;;) {
1938 >            int md = mode, n;
1939 >            WorkQueue[] ws = workQueues;
1940 >            if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1941 >                throw new RejectedExecutionException();
1942 >            else {
1943 >                WorkQueue q;
1944 >                boolean push = false, grow = false;
1945 >                if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1946 >                    Object lock = workerNamePrefix;
1947 >                    int qid = (r | QUIET) & ~(FIFO | OWNED);
1948 >                    q = new WorkQueue(this, null);
1949 >                    q.id = qid;
1950 >                    q.source = QUIET;
1951 >                    q.phase = QLOCK;          // lock queue
1952 >                    if (lock != null) {
1953 >                        synchronized(lock) {  // lock pool to install
1954 >                            int i;
1955 >                            if ((ws = workQueues) != null &&
1956 >                                (n = ws.length) > 0 &&
1957 >                                ws[i = qid & (n - 1) & SQMASK] == null) {
1958 >                                ws[i] = q;
1959 >                                push = grow = true;
1960 >                            }
1961 >                        }
1962 >                    }
1963 >                }
1964 >                else if (q.tryLockSharedQueue()) {
1965 >                    int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1966 >                    if ((a = q.array) != null && (al = a.length) > 0 &&
1967 >                        al - 1 + (d = b - s) > 0) {
1968 >                        a[(al - 1) & s] = task;
1969 >                        q.top = s + 1;        // relaxed writes OK here
1970 >                        q.phase = 0;
1971 >                        if (d < 0 && q.base - s < 0)
1972 >                            break;            // no signal needed
1973 >                    }
1974                      else
1975 <                        helpStealer(w, task);
1976 <                    if ((s = task.status) < 0)
1977 <                        break;
1978 <                    long ms, ns;
1979 <                    if (deadline == 0L)
1980 <                        ms = 0L;
1981 <                    else if ((ns = deadline - System.nanoTime()) <= 0L)
1982 <                        break;
1983 <                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1984 <                        ms = 1L;
1985 <                    if (tryCompensate(w)) {
1986 <                        task.internalWait(ms);
1987 <                        U.getAndAddLong(this, CTL, AC_UNIT);
1975 >                        grow = true;
1976 >                    push = true;
1977 >                }
1978 >                if (push) {
1979 >                    if (grow) {
1980 >                        try {
1981 >                            q.growArray();
1982 >                            int s = q.top, al; ForkJoinTask<?>[] a;
1983 >                            if ((a = q.array) != null && (al = a.length) > 0) {
1984 >                                a[(al - 1) & s] = task;
1985 >                                q.top = s + 1;
1986 >                            }
1987 >                        } finally {
1988 >                            q.phase = 0;
1989 >                        }
1990                      }
1991 <                    if ((s = task.status) < 0)
1992 <                        break;
1991 >                    signalWork();
1992 >                    break;
1993                  }
1994 <                w.currentJoin = prevJoin;
1994 >                else                          // move if busy
1995 >                    r = ThreadLocalRandom.advanceProbe(r);
1996              }
1997          }
2226        return s;
1998      }
1999  
2000 <    // Specialized scanning
2000 >    /**
2001 >     * Pushes a possibly-external submission.
2002 >     */
2003 >    private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2004 >        Thread t; ForkJoinWorkerThread w; WorkQueue q;
2005 >        if (task == null)
2006 >            throw new NullPointerException();
2007 >        if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2008 >            (w = (ForkJoinWorkerThread)t).pool == this &&
2009 >            (q = w.workQueue) != null)
2010 >            q.push(task);
2011 >        else
2012 >            externalPush(task);
2013 >        return task;
2014 >    }
2015  
2016      /**
2017 <     * Returns a (probably) non-empty steal queue, if one is found
2018 <     * during a scan, else null.  This method must be retried by
2019 <     * caller if, by the time it tries to use the queue, it is empty.
2020 <     */
2021 <    private WorkQueue findNonEmptyStealQueue() {
2022 <        WorkQueue[] ws; int wl;  // one-shot version of scan loop
2023 <        int r = ThreadLocalRandom.nextSecondarySeed();
2024 <        if ((ws = workQueues) != null && (wl = ws.length) > 0) {
2025 <            int m = wl - 1, origin = r & m;
2241 <            for (int k = origin, oldSum = 0, checkSum = 0;;) {
2242 <                WorkQueue q; int b;
2243 <                if ((q = ws[k]) != null) {
2244 <                    if ((b = q.base) - q.top < 0)
2245 <                        return q;
2246 <                    checkSum += b;
2247 <                }
2248 <                if ((k = (k + 1) & m) == origin) {
2249 <                    if (oldSum == (oldSum = checkSum))
2250 <                        break;
2251 <                    checkSum = 0;
2252 <                }
2253 <            }
2254 <        }
2255 <        return null;
2017 >     * Returns common pool queue for an external thread.
2018 >     */
2019 >    static WorkQueue commonSubmitterQueue() {
2020 >        ForkJoinPool p = common;
2021 >        int r = ThreadLocalRandom.getProbe();
2022 >        WorkQueue[] ws; int n;
2023 >        return (p != null && (ws = p.workQueues) != null &&
2024 >                (n = ws.length) > 0) ?
2025 >            ws[(n - 1) & r & SQMASK] : null;
2026      }
2027  
2028      /**
2029 <     * Runs tasks until {@code isQuiescent()}. We piggyback on
2260 <     * active count ctl maintenance, but rather than blocking
2261 <     * when tasks cannot be found, we rescan until all others cannot
2262 <     * find tasks either.
2029 >     * Performs tryUnpush for an external submitter.
2030       */
2031 <    final void helpQuiescePool(WorkQueue w) {
2032 <        ForkJoinTask<?> ps = w.currentSteal; // save context
2033 <        int wc = w.config;
2034 <        for (boolean active = true;;) {
2035 <            long c; WorkQueue q; ForkJoinTask<?> t;
2036 <            if (wc >= 0 && (t = w.pop()) != null) { // run locals if LIFO
2037 <                (w.currentSteal = t).doExec();
2271 <                w.currentSteal = ps;
2272 <            }
2273 <            else if ((q = findNonEmptyStealQueue()) != null) {
2274 <                if (!active) {      // re-establish active count
2275 <                    active = true;
2276 <                    U.getAndAddLong(this, CTL, AC_UNIT);
2277 <                }
2278 <                if ((t = q.pollAt(q.base)) != null) {
2279 <                    (w.currentSteal = t).doExec();
2280 <                    w.currentSteal = ps;
2281 <                    if (++w.nsteals < 0)
2282 <                        w.transferStealCount(this);
2283 <                }
2284 <            }
2285 <            else if (active) {      // decrement active count without queuing
2286 <                long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
2287 <                if (U.compareAndSwapLong(this, CTL, c, nc))
2288 <                    active = false;
2289 <            }
2290 <            else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
2291 <                     U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2292 <                break;
2293 <        }
2031 >    final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2032 >        int r = ThreadLocalRandom.getProbe();
2033 >        WorkQueue[] ws; WorkQueue w; int n;
2034 >        return ((ws = workQueues) != null &&
2035 >                (n = ws.length) > 0 &&
2036 >                (w = ws[(n - 1) & r & SQMASK]) != null &&
2037 >                w.trySharedUnpush(task));
2038      }
2039  
2040      /**
2041 <     * Gets and removes a local or stolen task for the given worker.
2041 >     * Performs helpComplete for an external submitter.
2042 >     */
2043 >    final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2044 >        int r = ThreadLocalRandom.getProbe();
2045 >        WorkQueue[] ws; WorkQueue w; int n;
2046 >        return ((ws = workQueues) != null && (n = ws.length) > 0 &&
2047 >                (w = ws[(n - 1) & r & SQMASK]) != null) ?
2048 >            w.sharedHelpCC(task, maxTasks) : 0;
2049 >    }
2050 >
2051 >    /**
2052 >     * Tries to steal and run tasks within the target's computation.
2053 >     * The maxTasks argument supports external usages; internal calls
2054 >     * use zero, allowing unbounded steps (external calls trap
2055 >     * non-positive values).
2056       *
2057 <     * @return a task, if available
2057 >     * @param w caller
2058 >     * @param maxTasks if non-zero, the maximum number of other tasks to run
2059 >     * @return task status on exit
2060       */
2061 <    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2062 <        for (ForkJoinTask<?> t;;) {
2063 <            WorkQueue q;
2304 <            if ((t = w.nextLocalTask()) != null)
2305 <                return t;
2306 <            if ((q = findNonEmptyStealQueue()) == null)
2307 <                return null;
2308 <            if ((t = q.pollAt(q.base)) != null)
2309 <                return t;
2310 <        }
2061 >    final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2062 >                           int maxTasks) {
2063 >        return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2064      }
2065  
2066      /**
# Line 2354 | Line 2107 | public class ForkJoinPool extends Abstra
2107       */
2108      static int getSurplusQueuedTaskCount() {
2109          Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2110 <        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2111 <            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2112 <            int n = (q = wt.workQueue).top - q.base;
2113 <            int a = (int)(pool.ctl >> AC_SHIFT) + p;
2110 >        if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2111 >            (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2112 >            (q = wt.workQueue) != null) {
2113 >            int p = pool.mode & SMASK;
2114 >            int a = p + (int)(pool.ctl >> RC_SHIFT);
2115 >            int n = q.top - q.base;
2116              return n - (a > (p >>>= 1) ? 0 :
2117                          a > (p >>>= 1) ? 1 :
2118                          a > (p >>>= 1) ? 2 :
# Line 2367 | Line 2122 | public class ForkJoinPool extends Abstra
2122          return 0;
2123      }
2124  
2125 <    //  Termination
2125 >    // Termination
2126  
2127      /**
2128       * Possibly initiates and/or completes termination.
# Line 2375 | Line 2130 | public class ForkJoinPool extends Abstra
2130       * @param now if true, unconditionally terminate, else only
2131       * if no work and no active workers
2132       * @param enable if true, terminate when next possible
2133 <     * @return -1: terminating/terminated, 0: retry if internal caller, else 1
2133 >     * @return true if terminating or terminated
2134       */
2135 <    private int tryTerminate(boolean now, boolean enable) {
2136 <        int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2135 >    private boolean tryTerminate(boolean now, boolean enable) {
2136 >        int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2137  
2138 <        while ((rs = runState) >= 0) {
2138 >        while (((md = mode) & SHUTDOWN) == 0) {
2139              if (!enable || this == common)        // cannot shutdown
2140 <                return 1;
2386 <            else if (rs == 0)
2387 <                tryInitialize(false);             // ensure initialized
2140 >                return false;
2141              else
2142 <                U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN);
2142 >                U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2143          }
2144  
2145 <        if ((rs & STOP) == 0) {                   // try to initiate termination
2146 <            if (!now) {                           // check quiescence
2145 >        while (((md = mode) & STOP) == 0) {       // try to initiate termination
2146 >            if (!now) {                           // check if quiescent & empty
2147                  for (long oldSum = 0L;;) {        // repeat until stable
2148 <                    WorkQueue[] ws; WorkQueue w; int b;
2148 >                    boolean running = false;
2149                      long checkSum = ctl;
2150 <                    if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
2151 <                        return 0;                 // still active workers
2152 <                    if ((ws = workQueues) != null) {
2150 >                    WorkQueue[] ws = workQueues;
2151 >                    if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2152 >                        running = true;
2153 >                    else if (ws != null) {
2154 >                        WorkQueue w; int b;
2155                          for (int i = 0; i < ws.length; ++i) {
2156                              if ((w = ws[i]) != null) {
2157 <                                checkSum += (b = w.base);
2158 <                                if (w.currentSteal != null || b != w.top)
2159 <                                    return 0;     // retry if internal caller
2157 >                                checkSum += (b = w.base) + w.id;
2158 >                                if (b != w.top ||
2159 >                                    ((i & 1) == 1 && w.source >= 0)) {
2160 >                                    running = true;
2161 >                                    break;
2162 >                                }
2163                              }
2164                          }
2165                      }
2166 <                    if (oldSum == (oldSum = checkSum))
2166 >                    if (((md = mode) & STOP) != 0)
2167 >                        break;                 // already triggered
2168 >                    else if (running)
2169 >                        return false;
2170 >                    else if (workQueues == ws && oldSum == (oldSum = checkSum))
2171                          break;
2172                  }
2173              }
2174 <            do {} while (!U.compareAndSwapInt(this, RUNSTATE,
2175 <                                              rs = runState, rs | STOP));
2174 >            if ((md & STOP) == 0)
2175 >                U.compareAndSwapInt(this, MODE, md, md | STOP);
2176          }
2177  
2178 <        for (long oldSum = 0L;;) {                // repeat until stable
2179 <            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt;
2180 <            long checkSum = ctl;
2181 <            if ((ws = workQueues) != null) {      // help terminate others
2182 <                for (int i = 0; i < ws.length; ++i) {
2183 <                    if ((w = ws[i]) != null) {
2184 <                        w.cancelAll();            // clear queues
2185 <                        checkSum += w.base;
2186 <                        if (w.qlock >= 0) {
2187 <                            w.qlock = -1;         // racy set OK
2426 <                            if ((wt = w.owner) != null) {
2178 >        while (((md = mode) & TERMINATED) == 0) { // help terminate others
2179 >            for (long oldSum = 0L;;) {            // repeat until stable
2180 >                WorkQueue[] ws; WorkQueue w;
2181 >                long checkSum = ctl;
2182 >                if ((ws = workQueues) != null) {
2183 >                    for (int i = 0; i < ws.length; ++i) {
2184 >                        if ((w = ws[i]) != null) {
2185 >                            ForkJoinWorkerThread wt = w.owner;
2186 >                            w.cancelAll();        // clear queues
2187 >                            if (wt != null) {
2188                                  try {             // unblock join or park
2189                                      wt.interrupt();
2190                                  } catch (Throwable ignore) {
2191                                  }
2192                              }
2193 +                            checkSum += w.base + w.id;
2194                          }
2195                      }
2196                  }
2197 +                if (((md = mode) & TERMINATED) != 0 ||
2198 +                    (workQueues == ws && oldSum == (oldSum = checkSum)))
2199 +                    break;
2200              }
2201 <            if (oldSum == (oldSum = checkSum))
2201 >            if ((md & TERMINATED) != 0)
2202                  break;
2203 <        }
2439 <
2440 <        if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2441 <            runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write
2442 <            synchronized (this) {
2443 <                notifyAll();                      // for awaitTermination
2444 <            }
2445 <        }
2446 <
2447 <        return -1;
2448 <    }
2449 <
2450 <    // External operations
2451 <
2452 <    /**
2453 <     * Constructs and tries to install a new external queue,
2454 <     * failing if the workQueues array already has a queue at
2455 <     * the given index.
2456 <     *
2457 <     * @param index the index of the new queue
2458 <     */
2459 <    private void tryCreateExternalQueue(int index) {
2460 <        AuxState aux;
2461 <        if ((aux = auxState) != null && index >= 0) {
2462 <            WorkQueue q = new WorkQueue(this, null);
2463 <            q.config = index;
2464 <            q.scanState = ~UNSIGNALLED;
2465 <            q.qlock = 1;                   // lock queue
2466 <            boolean installed = false;
2467 <            aux.lock();
2468 <            try {                          // lock pool to install
2469 <                WorkQueue[] ws;
2470 <                if ((ws = workQueues) != null && index < ws.length &&
2471 <                    ws[index] == null) {
2472 <                    ws[index] = q;         // else throw away
2473 <                    installed = true;
2474 <                }
2475 <            } finally {
2476 <                aux.unlock();
2477 <            }
2478 <            if (installed) {
2479 <                try {
2480 <                    q.growArray();
2481 <                } finally {
2482 <                    q.qlock = 0;
2483 <                }
2484 <            }
2485 <        }
2486 <    }
2487 <
2488 <    /**
2489 <     * Adds the given task to a submission queue at submitter's
2490 <     * current queue. Also performs secondary initialization upon the
2491 <     * first submission of the first task to the pool, and detects
2492 <     * first submission by an external thread and creates a new shared
2493 <     * queue if the one at index if empty or contended.
2494 <     *
2495 <     * @param task the task. Caller must ensure non-null.
2496 <     */
2497 <    final void externalPush(ForkJoinTask<?> task) {
2498 <        int r;                            // initialize caller's probe
2499 <        if ((r = ThreadLocalRandom.getProbe()) == 0) {
2500 <            ThreadLocalRandom.localInit();
2501 <            r = ThreadLocalRandom.getProbe();
2502 <        }
2503 <        for (;;) {
2504 <            WorkQueue q; int wl, k, stat;
2505 <            int rs = runState;
2506 <            WorkQueue[] ws = workQueues;
2507 <            if (rs <= 0 || ws == null || (wl = ws.length) <= 0)
2508 <                tryInitialize(true);
2509 <            else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2510 <                tryCreateExternalQueue(k);
2511 <            else if ((stat = q.sharedPush(task)) < 0)
2203 >            else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2204                  break;
2205 <            else if (stat == 0) {
2206 <                signalWork();
2205 >            else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2206 >                synchronized (this) {
2207 >                    notifyAll();                  // for awaitTermination
2208 >                }
2209                  break;
2210              }
2517            else                          // move if busy
2518                r = ThreadLocalRandom.advanceProbe(r);
2211          }
2212 <    }
2521 <
2522 <    /**
2523 <     * Pushes a possibly-external submission.
2524 <     */
2525 <    private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2526 <        Thread t; ForkJoinWorkerThread w; WorkQueue q;
2527 <        if (task == null)
2528 <            throw new NullPointerException();
2529 <        if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2530 <            (w = (ForkJoinWorkerThread)t).pool == this &&
2531 <            (q = w.workQueue) != null)
2532 <            q.push(task);
2533 <        else
2534 <            externalPush(task);
2535 <        return task;
2536 <    }
2537 <
2538 <    /**
2539 <     * Returns common pool queue for an external thread.
2540 <     */
2541 <    static WorkQueue commonSubmitterQueue() {
2542 <        ForkJoinPool p = common;
2543 <        int r = ThreadLocalRandom.getProbe();
2544 <        WorkQueue[] ws; int wl;
2545 <        return (p != null && (ws = p.workQueues) != null &&
2546 <                (wl = ws.length) > 0) ?
2547 <            ws[(wl - 1) & r & SQMASK] : null;
2548 <    }
2549 <
2550 <    /**
2551 <     * Performs tryUnpush for an external submitter.
2552 <     */
2553 <    final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2554 <        int r = ThreadLocalRandom.getProbe();
2555 <        WorkQueue[] ws; WorkQueue w; int wl;
2556 <        return ((ws = workQueues) != null &&
2557 <                (wl = ws.length) > 0 &&
2558 <                (w = ws[(wl - 1) & r & SQMASK]) != null &&
2559 <                w.trySharedUnpush(task));
2560 <    }
2561 <
2562 <    /**
2563 <     * Performs helpComplete for an external submitter.
2564 <     */
2565 <    final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2566 <        WorkQueue[] ws; int wl;
2567 <        int r = ThreadLocalRandom.getProbe();
2568 <        return ((ws = workQueues) != null && (wl = ws.length) > 0) ?
2569 <            helpComplete(ws[(wl - 1) & r & SQMASK], task, maxTasks) : 0;
2212 >        return true;
2213      }
2214  
2215      // Exported methods
# Line 2575 | Line 2218 | public class ForkJoinPool extends Abstra
2218  
2219      /**
2220       * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2221 <     * java.lang.Runtime#availableProcessors}, using the {@linkplain
2222 <     * #defaultForkJoinWorkerThreadFactory default thread factory},
2580 <     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2221 >     * java.lang.Runtime#availableProcessors}, using defaults for all
2222 >     * other parameters.
2223       *
2224       * @throws SecurityException if a security manager exists and
2225       *         the caller is not permitted to modify threads
# Line 2586 | Line 2228 | public class ForkJoinPool extends Abstra
2228       */
2229      public ForkJoinPool() {
2230          this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2231 <             defaultForkJoinWorkerThreadFactory, null, false);
2231 >             defaultForkJoinWorkerThreadFactory, null, false,
2232 >             0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2233      }
2234  
2235      /**
2236       * Creates a {@code ForkJoinPool} with the indicated parallelism
2237 <     * level, the {@linkplain
2595 <     * #defaultForkJoinWorkerThreadFactory default thread factory},
2596 <     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2237 >     * level, using defaults for all other parameters.
2238       *
2239       * @param parallelism the parallelism level
2240       * @throws IllegalArgumentException if parallelism less than or
# Line 2604 | Line 2245 | public class ForkJoinPool extends Abstra
2245       *         java.lang.RuntimePermission}{@code ("modifyThread")}
2246       */
2247      public ForkJoinPool(int parallelism) {
2248 <        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2248 >        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2249 >             0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2250      }
2251  
2252      /**
2253 <     * Creates a {@code ForkJoinPool} with the given parameters.
2253 >     * Creates a {@code ForkJoinPool} with the given parameters (using
2254 >     * defaults for others).
2255       *
2256       * @param parallelism the parallelism level. For default value,
2257       * use {@link java.lang.Runtime#availableProcessors}.
# Line 2635 | Line 2278 | public class ForkJoinPool extends Abstra
2278                          ForkJoinWorkerThreadFactory factory,
2279                          UncaughtExceptionHandler handler,
2280                          boolean asyncMode) {
2281 <        this(checkParallelism(parallelism),
2282 <             checkFactory(factory),
2640 <             handler,
2641 <             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
2642 <             "ForkJoinPool-" + nextPoolId() + "-worker-");
2643 <        checkPermission();
2281 >        this(parallelism, factory, handler, asyncMode,
2282 >             0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2283      }
2284  
2285 <    private static int checkParallelism(int parallelism) {
2286 <        if (parallelism <= 0 || parallelism > MAX_CAP)
2285 >    /**
2286 >     * Creates a {@code ForkJoinPool} with the given parameters.
2287 >     *
2288 >     * @param parallelism the parallelism level. For default value,
2289 >     * use {@link java.lang.Runtime#availableProcessors}.
2290 >     *
2291 >     * @param factory the factory for creating new threads. For
2292 >     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2293 >     *
2294 >     * @param handler the handler for internal worker threads that
2295 >     * terminate due to unrecoverable errors encountered while
2296 >     * executing tasks. For default value, use {@code null}.
2297 >     *
2298 >     * @param asyncMode if true, establishes local first-in-first-out
2299 >     * scheduling mode for forked tasks that are never joined. This
2300 >     * mode may be more appropriate than default locally stack-based
2301 >     * mode in applications in which worker threads only process
2302 >     * event-style asynchronous tasks.  For default value, use {@code
2303 >     * false}.
2304 >     *
2305 >     * @param corePoolSize the number of threads to keep in the pool
2306 >     * (unless timed out after an elapsed keep-alive). Normally (and
2307 >     * by default) this is the same value as the parallelism level,
2308 >     * but may be set to a larger value to reduce dynamic overhead if
2309 >     * tasks regularly block. Using a smaller value (for example
2310 >     * {@code 0}) has the same effect as the default.
2311 >     *
2312 >     * @param maximumPoolSize the maximum number of threads allowed.
2313 >     * When the maximum is reached, attempts to replace blocked
2314 >     * threads fail.  (However, because creation and termination of
2315 >     * different threads may overlap, and may be managed by the given
2316 >     * thread factory, this value may be transiently exceeded.)  The
2317 >     * default for the common pool is {@code 256} plus the parallelism
2318 >     * level. Using a value (for example {@code Integer.MAX_VALUE})
2319 >     * larger than the implementation's total thread limit has the
2320 >     * same effect as using this limit.
2321 >     *
2322 >     * @param minimumRunnable the minimum allowed number of core
2323 >     * threads not blocked by a join or {@link ManagedBlocker}.  To
2324 >     * ensure progress, when too few unblocked threads exist and
2325 >     * unexecuted tasks may exist, new threads are constructed, up to
2326 >     * the given maximumPoolSize.  For the default value, use {@code
2327 >     * 1}, that ensures liveness.  A larger value might improve
2328 >     * throughput in the presence of blocked activities, but might
2329 >     * not, due to increased overhead.  A value of zero may be
2330 >     * acceptable when submitted tasks cannot have dependencies
2331 >     * requiring additional threads.
2332 >     *
2333 >     * @param rejectOnSaturation if true, attempts to create more than
2334 >     * the maximum total allowed threads throw {@link
2335 >     * RejectedExecutionException}. Otherwise, the pool continues to
2336 >     * operate, but with fewer than the target number of runnable
2337 >     * threads, so might not ensure progress.  For default value, use
2338 >     * {@code true}.
2339 >     *
2340 >     * @param keepAliveTime the elapsed time since last use before
2341 >     * a thread is terminated (and then later replaced if needed).
2342 >     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2343 >     *
2344 >     * @param unit the time unit for the {@code keepAliveTime} argument
2345 >     *
2346 >     * @throws IllegalArgumentException if parallelism is less than or
2347 >     *         equal to zero, or is greater than implementation limit,
2348 >     *         or if maximumPoolSize is less than parallelism,
2349 >     *         of if the keepAliveTime is less than or equal to zero.
2350 >     * @throws NullPointerException if the factory is null
2351 >     * @throws SecurityException if a security manager exists and
2352 >     *         the caller is not permitted to modify threads
2353 >     *         because it does not hold {@link
2354 >     *         java.lang.RuntimePermission}{@code ("modifyThread")}
2355 >     */
2356 >    public ForkJoinPool(int parallelism,
2357 >                        ForkJoinWorkerThreadFactory factory,
2358 >                        UncaughtExceptionHandler handler,
2359 >                        boolean asyncMode,
2360 >                        int corePoolSize,
2361 >                        int maximumPoolSize,
2362 >                        int minimumRunnable,
2363 >                        boolean rejectOnSaturation,
2364 >                        long keepAliveTime,
2365 >                        TimeUnit unit) {
2366 >        // check, encode, pack parameters
2367 >        if (parallelism <= 0 || parallelism > MAX_CAP ||
2368 >            maximumPoolSize < parallelism || keepAliveTime <= 0L)
2369              throw new IllegalArgumentException();
2649        return parallelism;
2650    }
2651
2652    private static ForkJoinWorkerThreadFactory checkFactory
2653        (ForkJoinWorkerThreadFactory factory) {
2370          if (factory == null)
2371              throw new NullPointerException();
2372 <        return factory;
2372 >        long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2373 >
2374 >        String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2375 >        int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2376 >        long c = ((((long)(-corep)       << TC_SHIFT) & TC_MASK) |
2377 >                  (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2378 >        int m = (parallelism |
2379 >                 (asyncMode ? FIFO : 0) |
2380 >                 (rejectOnSaturation ? 0 : SATURATE));
2381 >        int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2382 >        int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2383 >        int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2384 >        int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2385 >        n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2386 >        n = (n + 1) << 1; // power of two, including space for submission queues
2387 >
2388 >        this.workQueues = new WorkQueue[n];
2389 >        this.workerNamePrefix = prefix;
2390 >        this.factory = factory;
2391 >        this.ueh = handler;
2392 >        this.keepAlive = ms;
2393 >        this.bounds = b;
2394 >        this.mode = m;
2395 >        this.ctl = c;
2396 >        checkPermission();
2397      }
2398  
2399      /**
2400 <     * Creates a {@code ForkJoinPool} with the given parameters, without
2401 <     * any security checks or parameter validation.  Invoked directly by
2402 <     * makeCommonPool.
2403 <     */
2404 <    private ForkJoinPool(int parallelism,
2405 <                         ForkJoinWorkerThreadFactory factory,
2406 <                         UncaughtExceptionHandler handler,
2407 <                         int mode,
2408 <                         String workerNamePrefix) {
2409 <        this.workerNamePrefix = workerNamePrefix;
2410 <        this.factory = factory;
2400 >     * Constructor for common pool using parameters possibly
2401 >     * overridden by system properties
2402 >     */
2403 >    private ForkJoinPool(byte forCommonPoolOnly) {
2404 >        int parallelism = -1;
2405 >        ForkJoinWorkerThreadFactory fac = null;
2406 >        UncaughtExceptionHandler handler = null;
2407 >        try {  // ignore exceptions in accessing/parsing properties
2408 >            String pp = System.getProperty
2409 >                ("java.util.concurrent.ForkJoinPool.common.parallelism");
2410 >            String fp = System.getProperty
2411 >                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2412 >            String hp = System.getProperty
2413 >                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2414 >            if (pp != null)
2415 >                parallelism = Integer.parseInt(pp);
2416 >            if (fp != null)
2417 >                fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2418 >                           getSystemClassLoader().loadClass(fp).newInstance());
2419 >            if (hp != null)
2420 >                handler = ((UncaughtExceptionHandler)ClassLoader.
2421 >                           getSystemClassLoader().loadClass(hp).newInstance());
2422 >        } catch (Exception ignore) {
2423 >        }
2424 >
2425 >        if (fac == null) {
2426 >            if (System.getSecurityManager() == null)
2427 >                fac = defaultForkJoinWorkerThreadFactory;
2428 >            else // use security-managed default
2429 >                fac = new InnocuousForkJoinWorkerThreadFactory();
2430 >        }
2431 >        if (parallelism < 0 && // default 1 less than #cores
2432 >            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2433 >            parallelism = 1;
2434 >        if (parallelism > MAX_CAP)
2435 >            parallelism = MAX_CAP;
2436 >
2437 >        long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2438 >                  (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2439 >        int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2440 >        int m = (parallelism < 1) ? 1 : parallelism;
2441 >        int n = (parallelism > 1) ? parallelism - 1 : 1;
2442 >        n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2443 >        n = (n + 1) << 1;
2444 >
2445 >        this.workQueues = new WorkQueue[n];
2446 >        this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2447 >        this.factory = fac;
2448          this.ueh = handler;
2449 <        this.config = (parallelism & SMASK) | mode;
2450 <        long np = (long)(-parallelism); // offset ctl counts
2451 <        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2449 >        this.keepAlive = DEFAULT_KEEPALIVE;
2450 >        this.bounds = b;
2451 >        this.mode = m;
2452 >        this.ctl = c;
2453      }
2454  
2455      /**
# Line 2847 | Line 2625 | public class ForkJoinPool extends Abstra
2625       * @return the targeted parallelism level of this pool
2626       */
2627      public int getParallelism() {
2628 <        int par;
2851 <        return ((par = config & SMASK) > 0) ? par : 1;
2628 >        return mode & SMASK;
2629      }
2630  
2631      /**
# Line 2870 | Line 2647 | public class ForkJoinPool extends Abstra
2647       * @return the number of worker threads
2648       */
2649      public int getPoolSize() {
2650 <        return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2650 >        return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2651      }
2652  
2653      /**
# Line 2880 | Line 2657 | public class ForkJoinPool extends Abstra
2657       * @return {@code true} if this pool uses async mode
2658       */
2659      public boolean getAsyncMode() {
2660 <        return (config & FIFO_QUEUE) != 0;
2660 >        return (mode & FIFO) != 0;
2661      }
2662  
2663      /**
# Line 2911 | Line 2688 | public class ForkJoinPool extends Abstra
2688       * @return the number of active threads
2689       */
2690      public int getActiveThreadCount() {
2691 <        int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2691 >        int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2692          return (r <= 0) ? 0 : r; // suppress momentarily negative values
2693      }
2694  
# Line 2927 | Line 2704 | public class ForkJoinPool extends Abstra
2704       * @return {@code true} if all threads are currently idle
2705       */
2706      public boolean isQuiescent() {
2707 <        return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
2707 >        for (;;) {
2708 >            long c = ctl;
2709 >            int md = mode, pc = md & SMASK;
2710 >            int tc = pc + (short)(c >> TC_SHIFT);
2711 >            int rc = pc + (int)(c >> RC_SHIFT);
2712 >            if ((md & (STOP | TERMINATED)) != 0)
2713 >                return true;
2714 >            else if (rc > 0)
2715 >                return false;
2716 >            else {
2717 >                WorkQueue[] ws; WorkQueue v;
2718 >                if ((ws = workQueues) != null) {
2719 >                    for (int i = 1; i < ws.length; i += 2) {
2720 >                        if ((v = ws[i]) != null) {
2721 >                            if ((v.source & QUIET) == 0)
2722 >                                return false;
2723 >                            --tc;
2724 >                        }
2725 >                    }
2726 >                }
2727 >                if (tc == 0 && ctl == c)
2728 >                    return true;
2729 >            }
2730 >        }
2731      }
2732  
2733      /**
# Line 2942 | Line 2742 | public class ForkJoinPool extends Abstra
2742       * @return the number of steals
2743       */
2744      public long getStealCount() {
2745 <        AuxState sc = auxState;
2946 <        long count = (sc == null) ? 0L : sc.stealCount;
2745 >        long count = stealCount;
2746          WorkQueue[] ws; WorkQueue w;
2747          if ((ws = workQueues) != null) {
2748              for (int i = 1; i < ws.length; i += 2) {
2749                  if ((w = ws[i]) != null)
2750 <                    count += w.nsteals;
2750 >                    count += (long)w.nsteals & 0xffffffffL;
2751              }
2752          }
2753          return count;
# Line 3020 | Line 2819 | public class ForkJoinPool extends Abstra
2819       * @return the next submission, or {@code null} if none
2820       */
2821      protected ForkJoinTask<?> pollSubmission() {
2822 <        WorkQueue[] ws; int wl; WorkQueue w; ForkJoinTask<?> t;
3024 <        int r = ThreadLocalRandom.nextSecondarySeed();
3025 <        if ((ws = workQueues) != null && (wl = ws.length) > 0) {
3026 <            for (int m = wl - 1, i = 0; i < wl; ++i) {
3027 <                if ((w = ws[(i << 1) & m]) != null && (t = w.poll()) != null)
3028 <                    return t;
3029 <            }
3030 <        }
3031 <        return null;
2822 >        return pollScan(true);
2823      }
2824  
2825      /**
# Line 3074 | Line 2865 | public class ForkJoinPool extends Abstra
2865      public String toString() {
2866          // Use a single pass through workQueues to collect counts
2867          long qt = 0L, qs = 0L; int rc = 0;
2868 <        AuxState sc = auxState;
3078 <        long st = (sc == null) ? 0L : sc.stealCount;
3079 <        long c = ctl;
2868 >        long st = stealCount;
2869          WorkQueue[] ws; WorkQueue w;
2870          if ((ws = workQueues) != null) {
2871              for (int i = 0; i < ws.length; ++i) {
# Line 3086 | Line 2875 | public class ForkJoinPool extends Abstra
2875                          qs += size;
2876                      else {
2877                          qt += size;
2878 <                        st += w.nsteals;
2878 >                        st += (long)w.nsteals & 0xffffffffL;
2879                          if (w.isApparentlyUnblocked())
2880                              ++rc;
2881                      }
2882                  }
2883              }
2884          }
2885 <        int pc = (config & SMASK);
2885 >
2886 >        int md = mode;
2887 >        int pc = (md & SMASK);
2888 >        long c = ctl;
2889          int tc = pc + (short)(c >>> TC_SHIFT);
2890 <        int ac = pc + (int)(c >> AC_SHIFT);
2890 >        int ac = pc + (int)(c >> RC_SHIFT);
2891          if (ac < 0) // ignore transient negative
2892              ac = 0;
2893 <        int rs = runState;
2894 <        String level = ((rs & TERMINATED) != 0 ? "Terminated" :
2895 <                        (rs & STOP)       != 0 ? "Terminating" :
3104 <                        (rs & SHUTDOWN)   != 0 ? "Shutting down" :
2893 >        String level = ((md & TERMINATED) != 0 ? "Terminated" :
2894 >                        (md & STOP)       != 0 ? "Terminating" :
2895 >                        (md & SHUTDOWN)   != 0 ? "Shutting down" :
2896                          "Running");
2897          return super.toString() +
2898              "[" + level +
# Line 3164 | Line 2955 | public class ForkJoinPool extends Abstra
2955       * @return {@code true} if all tasks have completed following shut down
2956       */
2957      public boolean isTerminated() {
2958 <        return (runState & TERMINATED) != 0;
2958 >        return (mode & TERMINATED) != 0;
2959      }
2960  
2961      /**
# Line 3181 | Line 2972 | public class ForkJoinPool extends Abstra
2972       * @return {@code true} if terminating but not yet terminated
2973       */
2974      public boolean isTerminating() {
2975 <        int rs = runState;
2976 <        return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
2975 >        int md = mode;
2976 >        return (md & STOP) != 0 && (md & TERMINATED) == 0;
2977      }
2978  
2979      /**
# Line 3191 | Line 2982 | public class ForkJoinPool extends Abstra
2982       * @return {@code true} if this pool has been shut down
2983       */
2984      public boolean isShutdown() {
2985 <        return (runState & SHUTDOWN) != 0;
2985 >        return (mode & SHUTDOWN) != 0;
2986      }
2987  
2988      /**
# Line 3255 | Line 3046 | public class ForkJoinPool extends Abstra
3046              helpQuiescePool(wt.workQueue);
3047              return true;
3048          }
3049 <        long startTime = System.nanoTime();
3050 <        WorkQueue[] ws;
3051 <        int r = 0, wl;
3052 <        boolean found = true;
3053 <        while (!isQuiescent() && (ws = workQueues) != null &&
3054 <               (wl = ws.length) > 0) {
3055 <            if (!found) {
3056 <                if ((System.nanoTime() - startTime) > nanos)
3049 >        else {
3050 >            for (long startTime = System.nanoTime();;) {
3051 >                ForkJoinTask<?> t;
3052 >                if ((t = pollScan(false)) != null)
3053 >                    t.doExec();
3054 >                else if (isQuiescent())
3055 >                    return true;
3056 >                else if ((System.nanoTime() - startTime) > nanos)
3057                      return false;
3058 <                Thread.yield(); // cannot block
3059 <            }
3269 <            found = false;
3270 <            for (int m = wl - 1, j = (m + 1) << 2; j >= 0; --j) {
3271 <                ForkJoinTask<?> t; WorkQueue q; int b, k;
3272 <                if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
3273 <                    (b = q.base) - q.top < 0) {
3274 <                    found = true;
3275 <                    if ((t = q.pollAt(b)) != null)
3276 <                        t.doExec();
3277 <                    break;
3278 <                }
3058 >                else
3059 >                    Thread.yield(); // cannot block
3060              }
3061          }
3281        return true;
3062      }
3063  
3064      /**
# Line 3393 | Line 3173 | public class ForkJoinPool extends Abstra
3173          throws InterruptedException {
3174          ForkJoinPool p;
3175          ForkJoinWorkerThread wt;
3176 +        WorkQueue w;
3177          Thread t = Thread.currentThread();
3178          if ((t instanceof ForkJoinWorkerThread) &&
3179 <            (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
3180 <            WorkQueue w = wt.workQueue;
3179 >            (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3180 >            (w = wt.workQueue) != null) {
3181 >            int block;
3182              while (!blocker.isReleasable()) {
3183 <                if (p.tryCompensate(w)) {
3183 >                if ((block = p.tryCompensate(w)) != 0) {
3184                      try {
3185                          do {} while (!blocker.isReleasable() &&
3186                                       !blocker.block());
3187                      } finally {
3188 <                        U.getAndAddLong(p, CTL, AC_UNIT);
3188 >                        U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3189                      }
3190                      break;
3191                  }
# Line 3430 | Line 3212 | public class ForkJoinPool extends Abstra
3212      // Unsafe mechanics
3213      private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3214      private static final long CTL;
3215 <    private static final long RUNSTATE;
3215 >    private static final long MODE;
3216      private static final int ABASE;
3217      private static final int ASHIFT;
3218  
# Line 3438 | Line 3220 | public class ForkJoinPool extends Abstra
3220          try {
3221              CTL = U.objectFieldOffset
3222                  (ForkJoinPool.class.getDeclaredField("ctl"));
3223 <            RUNSTATE = U.objectFieldOffset
3224 <                (ForkJoinPool.class.getDeclaredField("runState"));
3223 >            MODE = U.objectFieldOffset
3224 >                (ForkJoinPool.class.getDeclaredField("mode"));
3225              ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3226              int scale = U.arrayIndexScale(ForkJoinTask[].class);
3227              if ((scale & (scale - 1)) != 0)
# Line 3468 | Line 3250 | public class ForkJoinPool extends Abstra
3250  
3251          common = java.security.AccessController.doPrivileged
3252              (new java.security.PrivilegedAction<ForkJoinPool>() {
3253 <                public ForkJoinPool run() { return makeCommonPool(); }});
3253 >                    public ForkJoinPool run() {
3254 >                        return new ForkJoinPool((byte)0); }});
3255  
3256 <        // report 1 even if threads disabled
3474 <        COMMON_PARALLELISM = Math.max(common.config & SMASK, 1);
3475 <    }
3476 <
3477 <    /**
3478 <     * Creates and returns the common pool, respecting user settings
3479 <     * specified via system properties.
3480 <     */
3481 <    static ForkJoinPool makeCommonPool() {
3482 <        int parallelism = -1;
3483 <        ForkJoinWorkerThreadFactory factory = null;
3484 <        UncaughtExceptionHandler handler = null;
3485 <        try {  // ignore exceptions in accessing/parsing properties
3486 <            String pp = System.getProperty
3487 <                ("java.util.concurrent.ForkJoinPool.common.parallelism");
3488 <            String fp = System.getProperty
3489 <                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3490 <            String hp = System.getProperty
3491 <                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3492 <            if (pp != null)
3493 <                parallelism = Integer.parseInt(pp);
3494 <            if (fp != null)
3495 <                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3496 <                           getSystemClassLoader().loadClass(fp).newInstance());
3497 <            if (hp != null)
3498 <                handler = ((UncaughtExceptionHandler)ClassLoader.
3499 <                           getSystemClassLoader().loadClass(hp).newInstance());
3500 <        } catch (Exception ignore) {
3501 <        }
3502 <        if (factory == null) {
3503 <            if (System.getSecurityManager() == null)
3504 <                factory = defaultForkJoinWorkerThreadFactory;
3505 <            else // use security-managed default
3506 <                factory = new InnocuousForkJoinWorkerThreadFactory();
3507 <        }
3508 <        if (parallelism < 0 && // default 1 less than #cores
3509 <            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3510 <            parallelism = 1;
3511 <        if (parallelism > MAX_CAP)
3512 <            parallelism = MAX_CAP;
3513 <        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3514 <                                "ForkJoinPool.commonPool-worker-");
3256 >        COMMON_PARALLELISM = common.mode & SMASK;
3257      }
3258  
3259      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines