ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.407
Committed: Mon Apr 4 12:02:41 2022 UTC (2 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.406: +91 -79 lines
Log Message:
Cancellation compatibility with previous versions; other misc improvements

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6 jsr166 1.301
7 jsr166 1.1 package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.329 import java.security.AccessController;
11 jsr166 1.228 import java.security.AccessControlContext;
12 jsr166 1.331 import java.security.Permission;
13 jsr166 1.228 import java.security.Permissions;
14 jsr166 1.329 import java.security.PrivilegedAction;
15 jsr166 1.228 import java.security.ProtectionDomain;
16 jsr166 1.1 import java.util.ArrayList;
17     import java.util.Collection;
18     import java.util.Collections;
19     import java.util.List;
20 dl 1.307 import java.util.function.Predicate;
21 dl 1.367 import java.util.concurrent.atomic.AtomicInteger;
22 dl 1.243 import java.util.concurrent.locks.LockSupport;
23 dl 1.355 import java.util.concurrent.locks.ReentrantLock;
24     import java.util.concurrent.locks.Condition;
25 dl 1.404 import jdk.internal.misc.Unsafe;
26 dl 1.405 //import jdk.internal.vm.SharedThreadContainer; // for loom
27 jsr166 1.1
28     /**
29 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
30 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
31 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
32 jsr166 1.11 * monitoring operations.
33 jsr166 1.1 *
34 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
35     * ExecutorService} mainly by virtue of employing
36     * <em>work-stealing</em>: all threads in the pool attempt to find and
37 dl 1.78 * execute tasks submitted to the pool and/or created by other active
38     * tasks (eventually blocking waiting for work if none exist). This
39     * enables efficient processing when most tasks spawn other subtasks
40     * (as do most {@code ForkJoinTask}s), as well as when many small
41     * tasks are submitted to the pool from external clients. Especially
42     * when setting <em>asyncMode</em> to true in constructors, {@code
43     * ForkJoinPool}s may also be appropriate for use with event-style
44 dl 1.330 * tasks that are never joined. All worker threads are initialized
45     * with {@link Thread#isDaemon} set {@code true}.
46 jsr166 1.1 *
47 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
48 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
49     * is not explicitly submitted to a specified pool. Using the common
50     * pool normally reduces resource usage (its threads are slowly
51     * reclaimed during periods of non-use, and reinstated upon subsequent
52 dl 1.105 * use).
53 dl 1.100 *
54     * <p>For applications that require separate or custom pools, a {@code
55     * ForkJoinPool} may be constructed with a given target parallelism
56 jsr166 1.214 * level; by default, equal to the number of available processors.
57     * The pool attempts to maintain enough active (or available) threads
58     * by dynamically adding, suspending, or resuming internal worker
59 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
60     * However, no such adjustments are guaranteed in the face of blocked
61     * I/O or other unmanaged synchronization. The nested {@link
62 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
63 dl 1.300 * synchronization accommodated. The default policies may be
64     * overridden using a constructor with parameters corresponding to
65     * those documented in class {@link ThreadPoolExecutor}.
66 jsr166 1.1 *
67     * <p>In addition to execution and lifecycle control methods, this
68     * class provides status check methods (for example
69 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
70 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
71 jsr166 1.4 * {@link #toString} returns indications of pool state in a
72 jsr166 1.1 * convenient form for informal monitoring.
73     *
74 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
75 jsr166 1.84 * main task execution methods summarized in the following table.
76     * These are designed to be used primarily by clients not already
77     * engaged in fork/join computations in the current pool. The main
78     * forms of these methods accept instances of {@code ForkJoinTask},
79     * but overloaded forms also allow mixed execution of plain {@code
80     * Runnable}- or {@code Callable}- based activities as well. However,
81     * tasks that are already executing in a pool should normally instead
82     * use the within-computation forms listed in the table unless using
83     * async event-style tasks that are not usually joined, in which case
84     * there is little difference among choice of methods.
85 dl 1.18 *
86 jsr166 1.337 * <table class="plain">
87 jsr166 1.159 * <caption>Summary of task execution methods</caption>
88 dl 1.18 * <tr>
89     * <td></td>
90 jsr166 1.338 * <th scope="col"> Call from non-fork/join clients</th>
91     * <th scope="col"> Call from within fork/join computations</th>
92 dl 1.18 * </tr>
93     * <tr>
94 jsr166 1.338 * <th scope="row" style="text-align:left"> Arrange async execution</th>
95 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#fork}</td>
97     * </tr>
98     * <tr>
99 jsr166 1.338 * <th scope="row" style="text-align:left"> Await and obtain result</th>
100 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#invoke}</td>
102     * </tr>
103     * <tr>
104 jsr166 1.338 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
105 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
106     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
107     * </tr>
108     * </table>
109 dl 1.19 *
110 jsr166 1.333 * <p>The parameters used to construct the common pool may be controlled by
111     * setting the following {@linkplain System#getProperty system properties}:
112 jsr166 1.162 * <ul>
113 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
114 jsr166 1.162 * - the parallelism level, a non-negative integer
115 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
116 jsr166 1.331 * - the class name of a {@link ForkJoinWorkerThreadFactory}.
117     * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
118     * is used to load this class.
119 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
120 jsr166 1.331 * - the class name of a {@link UncaughtExceptionHandler}.
121     * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
122     * is used to load this class.
123 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
124 dl 1.223 * - the maximum number of allowed extra threads to maintain target
125 dl 1.208 * parallelism (default 256).
126 jsr166 1.162 * </ul>
127 jsr166 1.333 * If no thread factory is supplied via a system property, then the
128     * common pool uses a factory that uses the system class loader as the
129 jsr166 1.331 * {@linkplain Thread#getContextClassLoader() thread context class loader}.
130 jsr166 1.333 * In addition, if a {@link SecurityManager} is present, then
131     * the common pool uses a factory supplying threads that have no
132     * {@link Permissions} enabled.
133 jsr166 1.331 *
134 jsr166 1.156 * Upon any error in establishing these settings, default parameters
135 dl 1.160 * are used. It is possible to disable or limit the use of threads in
136     * the common pool by setting the parallelism property to zero, and/or
137 dl 1.193 * using a factory that may return {@code null}. However doing so may
138     * cause unjoined tasks to never be executed.
139 dl 1.105 *
140 dl 1.387 * <p><b>Implementation notes:</b> This implementation restricts the
141 jsr166 1.1 * maximum number of running threads to 32767. Attempts to create
142 jsr166 1.11 * pools with greater than the maximum number result in
143 jsr166 1.8 * {@code IllegalArgumentException}.
144 jsr166 1.1 *
145 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
146 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
147 dl 1.20 * or internal resources have been exhausted.
148 jsr166 1.11 *
149 jsr166 1.1 * @since 1.7
150     * @author Doug Lea
151     */
152     public class ForkJoinPool extends AbstractExecutorService {
153    
154     /*
155 dl 1.14 * Implementation Overview
156     *
157 dl 1.78 * This class and its nested classes provide the main
158     * functionality and control for a set of worker threads:
159 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
160     * Workers take these tasks and typically split them into subtasks
161 dl 1.345 * that may be stolen by other workers. Work-stealing based on
162     * randomized scans generally leads to better throughput than
163     * "work dealing" in which producers assign tasks to idle threads,
164     * in part because threads that have finished other tasks before
165     * the signalled thread wakes up (which can be a long time) can
166     * take the task instead. Preference rules give first priority to
167     * processing tasks from their own queues (LIFO or FIFO, depending
168     * on mode), then to randomized FIFO steals of tasks in other
169     * queues. This framework began as vehicle for supporting
170     * tree-structured parallelism using work-stealing. Over time,
171     * its scalability advantages led to extensions and changes to
172     * better support more diverse usage contexts. Because most
173     * internal methods and nested classes are interrelated, their
174     * main rationale and descriptions are presented here; individual
175     * methods and nested classes contain only brief comments about
176 dl 1.405 * details. There are a fair number of odd code constructions and
177     * design decisions for components that reside at the edge of Java
178     * vs JVM functionality.
179 dl 1.78 *
180 jsr166 1.84 * WorkQueues
181 dl 1.78 * ==========
182     *
183     * Most operations occur within work-stealing queues (in nested
184     * class WorkQueue). These are special forms of Deques that
185     * support only three of the four possible end-operations -- push,
186     * pop, and poll (aka steal), under the further constraints that
187     * push and pop are called only from the owning thread (or, as
188     * extended here, under a lock), while poll may be called from
189     * other threads. (If you are unfamiliar with them, you probably
190     * want to read Herlihy and Shavit's book "The Art of
191     * Multiprocessor programming", chapter 16 describing these in
192     * more detail before proceeding.) The main work-stealing queue
193     * design is roughly similar to those in the papers "Dynamic
194     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
195     * (http://research.sun.com/scalable/pubs/index.html) and
196     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
197     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
198 dl 1.200 * The main differences ultimately stem from GC requirements that
199     * we null out taken slots as soon as we can, to maintain as small
200     * a footprint as possible even in programs generating huge
201     * numbers of tasks. To accomplish this, we shift the CAS
202     * arbitrating pop vs poll (steal) from being on the indices
203 dl 1.405 * ("base" and "top") to the slots themselves. These provide the
204     * primary required memory ordering -- see "Correct and Efficient
205     * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
206     * Nardelli, PPoPP 2013
207     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
208     * analysis of memory ordering requirements in work-stealing
209     * algorithms similar to the one used here. We also use ordered,
210     * moded accesses and/or fences for other control, with modes
211     * reflecting the presence or absence of other contextual sync
212     * provided by atomic and/or volatile accesses. Some methods (or
213 dl 1.407 * their primary loops) begin with an acquire fence or
214     * otherwise-unnecessary valatile read that amounts to an
215     * acquiring read of "this" to cover all fields (which is
216 dl 1.405 * sometimes stronger than necessary, but less brittle). Some
217     * constructions are intentionally racy because they use read
218     * values as hints, not for correctness.
219     *
220     * We also support a user mode in which local task processing is
221     * in FIFO, not LIFO order, simply by using a local version of
222     * poll rather than pop. This can be useful in message-passing
223     * frameworks in which tasks are never joined, although with
224     * increased contention among task producers and consumers. Also,
225     * the same data structure (and class) is used for "submission
226     * queues" (described below) holding externally submitted tasks,
227     * that differ only in that a lock (field "access"; see below) is
228     * required by external callers to push and pop tasks.
229 dl 1.200 *
230 dl 1.243 * Adding tasks then takes the form of a classic array push(task)
231     * in a circular buffer:
232     * q.array[q.top++ % length] = task;
233 dl 1.200 *
234 dl 1.355 * The actual code needs to null-check and size-check the array,
235 jsr166 1.247 * uses masking, not mod, for indexing a power-of-two-sized array,
236 dl 1.355 * enforces memory ordering, supports resizing, and possibly
237 dl 1.405 * signals waiting workers to start scanning (described below),
238     * which requires that even internal usages to strictly order
239     * accesses (using a form of lock release).
240 dl 1.355 *
241     * The pop operation (always performed by owner) is of the form:
242     * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
243     * decrement top and return task;
244 dl 1.405 * If this fails, the queue is empty. This operation is one part
245     * of the nextLocalTask method, that instead does a local-poll
246     * in FIFO mode.
247 dl 1.355 *
248 dl 1.405 * The poll operation is, basically:
249     * if (CAS nonnull task t = q.array[k = q.base % length] to null)
250 dl 1.355 * increment base and return task;
251     *
252 dl 1.405 * However, there are several more cases that must be dealt with.
253     * Some of them are just due to asynchrony; others reflect
254     * contention and stealing policies. Stepping through them
255     * illustrates some of the implementation decisions in this class.
256     *
257     * * Slot k must be read with an acquiring read, which it must
258     * anyway to dereference and run the task if the (acquiring)
259     * CAS succeeds, but uses an explicit acquire fence to support
260     * the following rechecks even if the CAS is not attempted.
261     *
262     * * q.base may change between reading and using its value to
263     * index the slot. To avoid trying to use the wrong t, the
264     * index and slot must be reread (not necessarily immediately)
265     * until consistent, unless this is a local poll by owner, in
266     * which case this form of inconsistency can only appear as t
267     * being null, below.
268     *
269     * * Similarly, q.array may change (due to a resize), unless this
270     * is a local poll by owner. Otherwise, when t is present, this
271     * only needs consideration on CAS failure (since a CAS
272     * confirms the non-resized case.)
273     *
274     * * t may appear null because a previous poll operation has not
275     * yet incremented q.base, so the read is from an already-taken
276     * index. This form of stall reflects the non-lock-freedom of
277     * the poll operation. Stalls can be detected by observing that
278     * q.base doesn't change on repeated reads of null t and when
279     * no other alternatives apply, spin-wait for it to settle. To
280     * reduce producing these kinds of stalls by other stealers, we
281     * encourage timely writes to indices using store fences when
282     * memory ordering is not already constrained by context.
283     *
284     * * The CAS may fail, in which case we may want to retry unless
285     * there is too much contention. One goal is to balance and
286     * spread out the many forms of contention that may be
287     * encountered across polling and other operations to avoid
288     * sustained performance degradations. Across all cases where
289     * alternatives exist, a bounded number of CAS misses or stalls
290     * are tolerated (for slots, ctl, and elsewhere described
291     * below) before taking alternative action. These may move
292     * contention or retries elsewhere, which is still preferable
293     * to single-point bottlenecks.
294     *
295     * * Even though the check "top == base" is quiescently accurate
296     * to determine whether a queue is empty, it is not of much use
297     * when deciding whether to try to poll or repoll after a
298     * failure. Both top and base may move independently, and both
299     * lag updates to the underlying array. To reduce memory
300     * contention, when possible, non-owners avoid reading the
301     * "top" index at all, and instead use array reads, including
302     * one-ahead reads to check whether to repoll, relying on the
303     * fact that an non-empty queue does not have two null slots in
304     * a row, except in cases (resizes and shifts) that can be
305     * detected with a secondary recheck.
306     *
307     * The poll operations in q.poll(), scan(), helpJoin(), and
308     * elsewhere differ with respect to whether other queues are
309     * available to try, and the presence or nature of screening steps
310     * when only some kinds of tasks can be taken. When alternatives
311     * (or failing) is an option, they uniformly give up after
312     * boundeed numbers of stalls and/or CAS failures, which reduces
313     * contention when too many workers are polling too few tasks.
314     * Overall, in the aggregate, we ensure probabilistic
315     * non-blockingness of work-stealing at least until checking
316     * quiescence (which is intrinsically blocking): If an attempted
317     * steal fails in these ways, a scanning thief chooses a different
318     * target to try next. In contexts where alternatives aren't
319     * available, and when progress conditions can be isolated to
320     * values of a single variable, simple spinloops (using
321     * Thread.onSpinWait) are used to reduce memory traffic.
322 dl 1.78 *
323     * WorkQueues are also used in a similar way for tasks submitted
324     * to the pool. We cannot mix these tasks in the same queues used
325 dl 1.200 * by workers. Instead, we randomly associate submission queues
326 dl 1.83 * with submitting threads, using a form of hashing. The
327 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
328     * choosing existing queues, and may be randomly repositioned upon
329     * contention with other submitters. In essence, submitters act
330     * like workers except that they are restricted to executing local
331 dl 1.355 * tasks that they submitted (or when known, subtasks thereof).
332     * Insertion of tasks in shared mode requires a lock. We use only
333 dl 1.405 * a simple spinlock because submitters encountering a busy queue
334     * move to a different position to use or create other queues.
335     * They (spin) block only when registering new queues, and less
336     * often in tryRemove and helpComplete. The lock needed for
337     * external queues is generalized (as field "access") for
338     * operations on owned queues that require a fully-fenced write
339     * (including push, parking status, and termination) in order to
340     * deal with Dekker-like signalling constructs described below.
341 dl 1.78 *
342 jsr166 1.84 * Management
343 dl 1.78 * ==========
344 dl 1.52 *
345     * The main throughput advantages of work-stealing stem from
346     * decentralized control -- workers mostly take tasks from
347 dl 1.200 * themselves or each other, at rates that can exceed a billion
348 dl 1.355 * per second. Most non-atomic control is performed by some form
349     * of scanning across or within queues. The pool itself creates,
350     * activates (enables scanning for and running tasks),
351     * deactivates, blocks, and terminates threads, all with minimal
352     * central information. There are only a few properties that we
353     * can globally track or maintain, so we pack them into a small
354     * number of variables, often maintaining atomicity without
355     * blocking or locking. Nearly all essentially atomic control
356 dl 1.405 * state is held in a few variables that are by far most often
357     * read (not written) as status and consistency checks. We pack as
358     * much information into them as we can.
359 dl 1.78 *
360 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
361 dl 1.300 * atomically decide to add, enqueue (on an event queue), and
362 dl 1.345 * dequeue and release workers. To enable this packing, we
363     * restrict maximum parallelism to (1<<15)-1 (which is far in
364     * excess of normal operating range) to allow ids, counts, and
365     * their negations (used for thresholding) to fit into 16bit
366 dl 1.405 * subfields. Field "parallelism" holds the target parallelism
367     * (normally corresponding to pool size). It is needed (nearly)
368     * only in methods updating ctl, so is packed nearby. As of the
369     * current release, users can dynamically reset target
370     * parallelism, which is read once per update, so only slowly has
371     * an effect in creating threads or letting them time out and
372     * terminate when idle.
373     *
374 dl 1.407 * Field "runState" holds lifetime status, atomically and
375 dl 1.405 * monotonically setting SHUTDOWN, STOP, and finally TERMINATED
376     * bits. It is updated only via bitwise atomics (getAndBitwiseOr).
377 dl 1.258 *
378 dl 1.355 * Array "queues" holds references to WorkQueues. It is updated
379     * (only during worker creation and termination) under the
380 dl 1.405 * registrationLock, but is otherwise concurrently readable (often
381     * prefaced by a volatile read of mode to check termination, that
382     * is required anyway, and serves as an acquire fence). To
383     * simplify index-based operations, the array size is always a
384     * power of two, and all readers must tolerate null slots. Worker
385     * queues are at odd indices. Worker ids masked with SMASK match
386     * their index. Shared (submission) queues are at even
387     * indices. Grouping them together in this way simplifies and
388     * speeds up task scanning.
389 dl 1.86 *
390     * All worker thread creation is on-demand, triggered by task
391     * submissions, replacement of terminated workers, and/or
392 dl 1.78 * compensation for blocked workers. However, all other support
393     * code is set up to work with other policies. To ensure that we
394 dl 1.355 * do not hold on to worker or task references that would prevent
395 dl 1.405 * GC, all accesses to workQueues in waiting, signalling, and
396     * control methods are via indices into the queues array (which is
397     * one source of some of the messy code constructions here). In
398     * essence, the queues array serves as a weak reference
399     * mechanism. In particular, the stack top subfield of ctl stores
400     * indices, not references.
401 dl 1.200 *
402     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
403     * cannot let workers spin indefinitely scanning for tasks when
404     * none can be found immediately, and we cannot start/resume
405     * workers unless there appear to be tasks available. On the
406     * other hand, we must quickly prod them into action when new
407 dl 1.355 * tasks are submitted or generated. These latencies are mainly a
408     * function of JVM park/unpark (and underlying OS) performance,
409     * which can be slow and variable. In many usages, ramp-up time
410 dl 1.300 * is the main limiting factor in overall performance, which is
411     * compounded at program start-up by JIT compilation and
412 dl 1.355 * allocation. On the other hand, throughput degrades when too
413     * many threads poll for too few tasks.
414 dl 1.300 *
415 dl 1.355 * The "ctl" field atomically maintains total and "released"
416     * worker counts, plus the head of the available worker queue
417     * (actually stack, represented by the lower 32bit subfield of
418     * ctl). Released workers are those known to be scanning for
419 dl 1.300 * and/or running tasks. Unreleased ("available") workers are
420     * recorded in the ctl stack. These workers are made available for
421 dl 1.355 * signalling by enqueuing in ctl (see method awaitWork). The
422 dl 1.300 * "queue" is a form of Treiber stack. This is ideal for
423     * activating threads in most-recently used order, and improves
424 dl 1.200 * performance and locality, outweighing the disadvantages of
425     * being prone to contention and inability to release a worker
426 dl 1.355 * unless it is topmost on stack. The top stack state holds the
427 dl 1.300 * value of the "phase" field of the worker: its index and status,
428     * plus a version counter that, in addition to the count subfields
429     * (also serving as version stamps) provide protection against
430     * Treiber stack ABA effects.
431 dl 1.200 *
432 dl 1.300 * Creating workers. To create a worker, we pre-increment counts
433     * (serving as a reservation), and attempt to construct a
434 dl 1.355 * ForkJoinWorkerThread via its factory. On starting, the new
435     * thread first invokes registerWorker, where it constructs a
436     * WorkQueue and is assigned an index in the queues array
437     * (expanding the array if necessary). Upon any exception across
438     * these steps, or null return from factory, deregisterWorker
439     * adjusts counts and records accordingly. If a null return, the
440     * pool continues running with fewer than the target number
441     * workers. If exceptional, the exception is propagated, generally
442     * to some external caller.
443 dl 1.243 *
444 dl 1.300 * WorkQueue field "phase" is used by both workers and the pool to
445 dl 1.405 * manage and track whether a worker is unsignalled (possibly
446     * blocked waiting for a signal), convienently using the sign bit
447     * to check. When a worker is enqueued its phase field is set
448     * negative. Note that phase field updates lag queue CAS releases;
449     * seeing a negative phase does not guarantee that the worker is
450     * available (and so is never checked in this way). When queued,
451     * the lower 16 bits of its phase must hold its pool index. So we
452     * place the index there upon initialization and never modify
453     * these bits.
454 dl 1.243 *
455     * The ctl field also serves as the basis for memory
456     * synchronization surrounding activation. This uses a more
457     * efficient version of a Dekker-like rule that task producers and
458     * consumers sync with each other by both writing/CASing ctl (even
459 dl 1.355 * if to its current value). However, rather than CASing ctl to
460     * its current value in the common case where no action is
461     * required, we reduce write contention by ensuring that
462 dl 1.405 * signalWork invocations are prefaced with a fully fenced memory
463 dl 1.355 * access (which is usually needed anyway).
464     *
465     * Signalling. Signals (in signalWork) cause new or reactivated
466     * workers to scan for tasks. Method signalWork and its callers
467     * try to approximate the unattainable goal of having the right
468     * number of workers activated for the tasks at hand, but must err
469     * on the side of too many workers vs too few to avoid stalls. If
470     * computations are purely tree structured, it suffices for every
471     * worker to activate another when it pushes a task into an empty
472     * queue, resulting in O(log(#threads)) steps to full activation.
473 dl 1.405 * (To reduce resource usages in some cases, at the expense of
474     * slower startup in others, activation of an idle thread is
475     * preferred over creating a new one, here and elsewhere.) If
476     * instead, tasks come in serially from only a single producer,
477     * each worker taking its first (since the last activation) task
478 dl 1.355 * from a queue should signal another if there are more tasks in
479     * that queue. This is equivalent to, but generally faster than,
480     * arranging the stealer take two tasks, re-pushing one on its own
481     * queue, and signalling (because its queue is empty), also
482     * resulting in logarithmic full activation time. Because we don't
483     * know about usage patterns (or most commonly, mixtures), we use
484 dl 1.405 * both approaches. Together these are minimally necessary for
485     * maintaining liveness. However, they do not account for the fact
486     * that when tasks are short-lived, signals are unnecessary
487     * because workers will already be scanning for new tasks without
488     * the need of new signals. We track these cases (variable
489     * "prevSrc" in scan() and related methods) to avoid some
490     * unnecessary signals and scans. However, signal contention and
491     * overhead effects may still occur during ramp-up, ramp-down, and
492     * small computations involving only a few workers.
493 dl 1.243 *
494 dl 1.355 * Scanning. Method scan performs top-level scanning for (and
495 dl 1.405 * execution of) tasks by polling a pseodo-random permutation of
496     * the array (by starting at a random index, and using a constant
497     * cyclically exhaustive stride.) It uses the same basic polling
498     * method as WorkQueue.poll(), but restarts with a different
499     * permutation on each invocation. (Non-top-level scans; for
500     * example in helpJoin, use simpler and faster linear probes
501     * because they do not systematically contend with top-level
502     * scans.) The pseudorandom generator need not have high-quality
503     * statistical properties in the long term. We use Marsaglia
504     * XorShifts, seeded with the Weyl sequence from ThreadLocalRandom
505     * probes, which are cheap and suffice. Scans do not otherwise
506     * explicitly take into account core affinities, loads, cache
507     * localities, etc, However, they do exploit temporal locality
508     * (which usually approximates these) by preferring to re-poll
509     * from the same queue (using method tryPoll()) after a successful
510     * poll before trying others (see method topLevelExec), which also
511     * reduces bookkeeping and scanning overhead. This also reduces
512     * fairness, which is partially counteracted by giving up on
513     * contention.
514     *
515     * Deactivation. When method scan indicates that no tasks are
516     * found by a worker, it deactivates (see awaitWork). Note that
517     * not finding tasks doesn't mean that there won't soon be
518     * some. Further, a scan may give up under contention, returning
519     * even without knowing whether any tasks are still present, which
520     * is OK, given the above signalling rules that will eventually
521     * maintain progress. Blocking and unblocking via park/unpark can
522     * cause serious slowdowns when tasks are rapidly but irregularly
523     * generated (which is often due to garbage collectors and other
524     * activities). One way to ameliorate is for workers to rescan
525     * multiple times, even when there are unlikely to be tasks. But
526     * this causes enough memory and CAS contention to prefer using
527     * quieter spinwaits in awaitWork; currently set to small values
528     * that only cover near-miss scenarios for deactivate vs activate
529     * races. Because idle workers are often not yet blocked (via
530     * LockSupport.park), we use the WorkQueue access field to
531     * advertise that a waiter actually needs unparking upon signal.
532     *
533     * When idle workers are not continually woken up, the count
534     * fields in ctl allow efficient and accurate discovery of
535     * quiescent states (i.e., when all workers are idle) after
536     * deactivation. However, this voting mechanism alone does not
537     * guarantee that a pool can become dormant (quiesced or
538     * terminated), because external racing producers do not vote, and
539     * can asynchronously submit new tasks. To deal with this, the
540     * final unparked thread (in awaitWork) scans external queues to
541     * check for tasks that could have been added during a race window
542     * that would not be accompanied by a signal, in which case
543     * re-activating itself (or any other worker) to recheck. The same
544     * sets of checks are used in tryTerminate, to correctly trigger
545     * delayed termination (shutDown, followed by quiescence) in the
546     * presence of racing submissions. In all cases, the notion of the
547     * "final" unparked thread is an approximation, because new
548     * workers could be in the process of being constructed, which
549     * occasionally adds some extra unnecessary processing.
550     *
551     * Shutdown and Termination. A call to shutdownNow invokes
552     * tryTerminate to atomically set a mode bit. The calling thread,
553     * as well as every other worker thereafter terminating, helps
554     * terminate others by cancelling their unprocessed tasks, and
555     * interrupting other workers. Calls to non-abrupt shutdown()
556     * preface this by checking isQuiescent before triggering the
557     * "STOP" phase of termination. During termination, workers are
558     * stopped using all three of (often in parallel): releasing via
559     * ctl (method reactivate), interrupts, and cancelling tasks that
560     * will cause workers to not find work and exit. To support this,
561     * worker references not removed from the queues array during
562     * termination. It is possible for late thread creations to still
563     * be in progress after a quiescent termination reports terminated
564     * status, but they will also immediately terminate. To conform to
565     * ExecutorService invoke, invokeAll, and invokeAny specs, we must
566     * track pool status while waiting in ForkJoinTask.awaitDone, and
567 dl 1.407 * interrupt interruptible callers on termination, while also
568     * avoiding cancelling other tasks that are normally completing
569     * during quiescent termination. This is tracked by recording
570     * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
571     * argument to joining methods.
572 dl 1.52 *
573     * Trimming workers. To release resources after periods of lack of
574     * use, a worker starting to wait when the pool is quiescent will
575 dl 1.355 * time out and terminate if the pool has remained quiescent for
576     * period given by field keepAlive.
577 dl 1.52 *
578 jsr166 1.84 * Joining Tasks
579     * =============
580 dl 1.78 *
581 dl 1.355 * Normally, the first option when joining a task that is not done
582 dl 1.405 * is to try to take it from local queue and run it. Otherwise,
583 dl 1.355 * any of several actions may be taken when one worker is waiting
584 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
585 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
586 dl 1.300 * always just let them block (as in Thread.join). We also cannot
587     * just reassign the joiner's run-time stack with another and
588     * replace it later, which would be a form of "continuation", that
589     * even if possible is not necessarily a good idea since we may
590     * need both an unblocked task and its continuation to progress.
591     * Instead we combine two tactics:
592 dl 1.19 *
593     * Helping: Arranging for the joiner to execute some task that it
594 dl 1.355 * could be running if the steal had not occurred.
595 dl 1.19 *
596     * Compensating: Unless there are already enough live threads,
597 dl 1.78 * method tryCompensate() may create or re-activate a spare
598     * thread to compensate for blocked joiners until they unblock.
599     *
600 dl 1.355 * A third form (implemented via tryRemove) amounts to helping a
601     * hypothetical compensator: If we can readily tell that a
602     * possible action of a compensator is to steal and execute the
603 dl 1.105 * task being joined, the joining thread can do so directly,
604 dl 1.355 * without the need for a compensation thread; although with a
605 dl 1.405 * possibility of reduced parallelism because of a transient gap
606     * in the queue array that stalls stealers.
607 dl 1.355 *
608     * Other intermediate forms available for specific task types (for
609     * example helpAsyncBlocker) often avoid or postpone the need for
610     * blocking or compensation.
611 dl 1.52 *
612     * The ManagedBlocker extension API can't use helping so relies
613     * only on compensation in method awaitBlocker.
614 dl 1.19 *
615 dl 1.355 * The algorithm in helpJoin entails a form of "linear helping".
616 dl 1.405 * Each worker records (in field "source") a reference to the
617     * queue from which it last stole a task. The scan in method
618     * helpJoin uses these markers to try to find a worker to help
619     * (i.e., steal back a task from and execute it) that could hasten
620     * completion of the actively joined task. Thus, the joiner
621     * executes a task that would be on its own local deque if the
622     * to-be-joined task had not been stolen. This is a conservative
623     * variant of the approach described in Wagner & Calder
624     * "Leapfrogging: a portable technique for implementing efficient
625     * futures" SIGPLAN Notices, 1993
626     * (http://portal.acm.org/citation.cfm?id=155354). It differs
627     * mainly in that we only record queues, not full dependency
628 dl 1.355 * links. This requires a linear scan of the queues array to
629 dl 1.300 * locate stealers, but isolates cost to when it is needed, rather
630 dl 1.405 * than adding to per-task overhead. For CountedCompleters, the
631     * analogous method helpComplete doesn't need stealer-tracking,
632     * but requires a similar check of completion chains.
633     *
634     * In either case, searches can fail to locate stealers when
635     * stalls delay recording sources. We avoid some of these cases by
636     * using snapshotted values of ctl as a check that the numbers of
637     * workers are not changing. But even when accurately identified,
638 dl 1.355 * stealers might not ever produce a task that the joiner can in
639     * turn help with. So, compensation is tried upon failure to find
640     * tasks to run.
641     *
642 dl 1.300 * Compensation does not by default aim to keep exactly the target
643 dl 1.200 * parallelism number of unblocked threads running at any given
644     * time. Some previous versions of this class employed immediate
645     * compensations for any blocked join. However, in practice, the
646     * vast majority of blockages are transient byproducts of GC and
647 dl 1.345 * other JVM or OS activities that are made worse by replacement
648     * when they cause longer-term oversubscription. Rather than
649     * impose arbitrary policies, we allow users to override the
650     * default of only adding threads upon apparent starvation. The
651     * compensation mechanism may also be bounded. Bounds for the
652 dl 1.404 * commonPool better enable JVMs to cope with programming errors
653     * and abuse before running out of resources to do so.
654 jsr166 1.301 *
655 dl 1.105 * Common Pool
656     * ===========
657     *
658 jsr166 1.175 * The static common pool always exists after static
659 dl 1.105 * initialization. Since it (or any other created pool) need
660     * never be used, we minimize initial construction overhead and
661 dl 1.405 * footprint to the setup of about a dozen fields, although with
662     * some System property parsing and with security processing that
663     * takes far longer than the actual construction when
664     * SecurityManagers are used or properties are set. The common
665     * pool is distinguished internally by having both a null
666     * workerNamePrefix and ISCOMMON config bit set, along with
667     * PRESET_SIZE set if parallelism was configured by system
668     * property.
669     *
670     * When external threads use ForkJoinTask.fork for the common
671     * pool, they can perform subtask processing (see helpComplete and
672     * related methods) upon joins. This caller-helps policy makes it
673 dl 1.200 * sensible to set common pool parallelism level to one (or more)
674     * less than the total number of available cores, or even zero for
675 dl 1.405 * pure caller-runs. For the sake of ExecutorService specs, we can
676     * only do this for tasks entered via fork, not submit. We track
677     * this using a task status bit (markPoolSubmission). In all
678     * other cases, external threads waiting for joins first check the
679     * common pool for their task, which fails quickly if the caller
680     * did not fork to common pool.
681 dl 1.105 *
682 dl 1.399 * Guarantees for common pool parallelism zero are limited to
683     * tasks that are joined by their callers in a tree-structured
684     * fashion or use CountedCompleters (as is true for jdk
685     * parallelStreams). Support infiltrates several methods,
686 dl 1.405 * including those that retry helping steps or spin until we are
687     * sure that none apply if there are no workers.
688 dl 1.399 *
689 dl 1.197 * As a more appropriate default in managed environments, unless
690     * overridden by system properties, we use workers of subclass
691     * InnocuousForkJoinWorkerThread when there is a SecurityManager
692     * present. These workers have no permissions set, do not belong
693     * to any user-defined ThreadGroup, and erase all ThreadLocals
694 dl 1.355 * after executing any top-level task. The associated mechanics
695 dl 1.364 * may be JVM-dependent and must access particular Thread class
696     * fields to achieve this effect.
697 jsr166 1.198 *
698 dl 1.372 * Interrupt handling
699     * ==================
700     *
701     * The framework is designed to manage task cancellation
702     * (ForkJoinTask.cancel) independently from the interrupt status
703     * of threads running tasks. (See the public ForkJoinTask
704     * documentation for rationale.) Interrupts are issued only in
705     * tryTerminate, when workers should be terminating and tasks
706     * should be cancelled anyway. Interrupts are cleared only when
707     * necessary to ensure that calls to LockSupport.park do not loop
708     * indefinitely (park returns immediately if the current thread is
709 dl 1.405 * interrupted). For cases in which task bodies are specified or
710     * desired to interrupt upon cancellation, ForkJoinTask.cancel can
711     * be overridden to do so (as is done for invoke{Any,All}).
712 dl 1.372 *
713 dl 1.345 * Memory placement
714     * ================
715     *
716 dl 1.405 * Performance is very sensitive to placement of instances of
717     * ForkJoinPool and WorkQueues and their queue arrays, as well the
718     * placement of their fields. Caches misses and contention due to
719     * false-sharing have been observed to slow down some programs by
720     * more than a factor of four. There is no perfect solution, in
721     * part because isolating more fields also generates more cache
722     * misses in more common cases (because some fields snd slots are
723     * usually read at the same time), and the main means of placing
724     * memory, the @Contended annotation provides only rough control
725     * (for good reason). We isolate the ForkJoinPool.ctl field as
726     * well the set of WorkQueue fields that otherwise cause the most
727     * false-sharing misses with respect to other fields. Also,
728     * ForkJoinPool fields are ordered such that fields less prone to
729     * contention effects are first, offsetting those that otherwise
730     * would be, while also reducing total footprint vs using
731     * multiple @Contended regions, which tends to slow down
732     * less-contended applications. These arrangements mainly reduce
733     * cache traffic by scanners, which speeds up finding tasks to
734     * run. Initial sizing and resizing of WorkQueue arrays is an
735     * even more delicate tradeoff because the best strategy may vary
736     * across garbage collectors. Small arrays are better for locality
737     * and reduce GC scan time, but large arrays reduce both direct
738     * false-sharing and indirect cases due to GC bookkeeping
739     * (cardmarks etc), and reduce the number of resizes, which are
740     * not especially fast because they require atomic transfers, and
741     * may cause other scanning workers to stall or give up.
742     * Currently, arrays are initialized to be fairly small but early
743     * resizes rapidly increase size by more than a factor of two
744     * until very large. (Maintenance note: any changes in fields,
745     * queues, or their uses must be accompanied by re-evaluation of
746     * these placement and sizing decisions.)
747 dl 1.345 *
748 dl 1.105 * Style notes
749     * ===========
750     *
751 dl 1.355 * Memory ordering relies mainly on atomic operations (CAS,
752 dl 1.405 * getAndSet, getAndAdd) along with moded accesses. These use
753 dl 1.404 * jdk-internal Unsafe for atomics and special memory modes,
754     * rather than VarHandles, to avoid initialization dependencies in
755     * other jdk components that require early parallelism. This can
756     * be awkward and ugly, but also reflects the need to control
757 jsr166 1.315 * outcomes across the unusual cases that arise in very racy code
758 dl 1.319 * with very few invariants. All fields are read into locals
759 dl 1.355 * before use, and null-checked if they are references, even if
760 dl 1.405 * they can never be null under current usages. Usually,
761     * computations (held in local variables) are defined as soon as
762     * logically enabled, sometimes to convince compilers that they
763     * may be performed despite memory ordering constraints. Array
764     * accesses using masked indices include checks (that are always
765     * true) that the array length is non-zero to avoid compilers
766     * inserting more expensive traps. This is usually done in a
767     * "C"-like style of listing declarations at the heads of methods
768     * or blocks, and using inline assignments on first encounter.
769     * Nearly all explicit checks lead to bypass/return, not exception
770     * throws, because they may legitimately arise during shutdown. A
771     * few unusual loop constructions encourage (with varying
772     * effectiveness) JVMs about where (not) to place safepoints.
773 dl 1.200 *
774 dl 1.105 * There is a lot of representation-level coupling among classes
775     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
776     * fields of WorkQueue maintain data structures managed by
777     * ForkJoinPool, so are directly accessed. There is little point
778     * trying to reduce this, since any associated future changes in
779     * representations will need to be accompanied by algorithmic
780     * changes anyway. Several methods intrinsically sprawl because
781 dl 1.200 * they must accumulate sets of consistent reads of fields held in
782 dl 1.345 * local variables. Some others are artificially broken up to
783     * reduce producer/consumer imbalances due to dynamic compilation.
784     * There are also other coding oddities (including several
785     * unnecessary-looking hoisted null checks) that help some methods
786     * perform reasonably even when interpreted (not compiled).
787 dl 1.52 *
788 dl 1.208 * The order of declarations in this file is (with a few exceptions):
789 dl 1.405 * (1) Static constants
790     * (2) Static utility functions
791     * (3) Nested (static) classes
792 dl 1.86 * (4) Fields, along with constants used when unpacking some of them
793     * (5) Internal control methods
794     * (6) Callbacks and other support for ForkJoinTask methods
795     * (7) Exported methods
796     * (8) Static block initializing statics in minimally dependent order
797 dl 1.355 *
798     * Revision notes
799     * ==============
800     *
801 dl 1.405 * The main sources of differences from previous version are:
802     *
803     * * Use of Unsafe vs VarHandle, including re-instatement of some
804     * constructions from pre-VarHandle versions.
805     * * Reduced memory and signal contention, mainly by distinguishing
806     * failure cases.
807     * * Improved initialization, in part by preparing for possible
808     * removal of SecurityManager
809     * * Enable resizing (includes refactoring quiescence/termination)
810     * * Unification of most internal vs external operations; some made
811     * possible via use of WorkQueue.access, and POOLSUBMIT status in tasks.
812     */
813    
814     // static configuration constants
815    
816     /**
817     * Default idle timeout value (in milliseconds) for idle threads
818     * to park waiting for new work before terminating.
819     */
820     static final long DEFAULT_KEEPALIVE = 60_000L;
821    
822     /**
823     * Undershoot tolerance for idle timeouts
824     */
825     static final long TIMEOUT_SLOP = 20L;
826    
827     /**
828     * The default value for common pool maxSpares. Overridable using
829     * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
830     * system property. The default value is far in excess of normal
831     * requirements, but also far short of MAX_CAP and typical OS
832     * thread limits, so allows JVMs to catch misuse/abuse before
833     * running out of resources needed to do so.
834     */
835     static final int DEFAULT_COMMON_MAX_SPARES = 256;
836    
837     /**
838     * Initial capacity of work-stealing queue array. Must be a power
839     * of two, at least 2. See above.
840     */
841     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
842    
843     // Bounds
844     static final int SWIDTH = 16; // width of short
845     static final int SMASK = 0xffff; // short bits == max index
846     static final int MAX_CAP = 0x7fff; // max #workers - 1
847    
848     // pool.runState and workQueue.access bits and sentinels
849     static final int STOP = 1 << 31; // must be negative
850     static final int SHUTDOWN = 1;
851     static final int TERMINATED = 2;
852     static final int PARKED = -1; // access value when parked
853    
854     // {pool, workQueue}.config bits
855     static final int FIFO = 1 << 16; // fifo queue or access mode
856     static final int SRC = 1 << 17; // set when stealable
857     static final int INNOCUOUS = 1 << 18; // set for Innocuous workers
858     static final int TRIMMED = 1 << 19; // timed out while idle
859     static final int ISCOMMON = 1 << 20; // set for common pool
860     static final int PRESET_SIZE = 1 << 21; // size was set by property
861    
862     static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
863    
864     /*
865     * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
866     * RC: Number of released (unqueued) workers
867     * TC: Number of total workers
868     * SS: version count and status of top waiting thread
869     * ID: poolIndex of top of Treiber stack of waiters
870 dl 1.355 *
871 dl 1.405 * When convenient, we can extract the lower 32 stack top bits
872     * (including version bits) as sp=(int)ctl. When sp is non-zero,
873     * there are waiting workers. Count fields may be transiently
874     * negative during termination because of out-of-order updates.
875     * To deal with this, we use casts in and out of "short" and/or
876     * signed shifts to maintain signedness. Because it occupies
877     * uppermost bits, we can add one release count using getAndAdd of
878     * RC_UNIT, rather than CAS, when returning from a blocked join.
879     * Other updates of multiple subfields require CAS.
880 dl 1.86 */
881    
882 dl 1.405 // Lower and upper word masks
883     static final long SP_MASK = 0xffffffffL;
884     static final long UC_MASK = ~SP_MASK;
885     // Release counts
886     static final int RC_SHIFT = 48;
887     static final long RC_UNIT = 0x0001L << RC_SHIFT;
888     static final long RC_MASK = 0xffffL << RC_SHIFT;
889     // Total counts
890     static final int TC_SHIFT = 32;
891     static final long TC_UNIT = 0x0001L << TC_SHIFT;
892     static final long TC_MASK = 0xffffL << TC_SHIFT;
893     // sp bits
894     static final int SS_SEQ = 1 << 16; // version count
895     static final int INACTIVE = 1 << 31; // phase bit when idle
896    
897 dl 1.86 // Static utilities
898    
899     /**
900     * If there is a security manager, makes sure caller has
901     * permission to modify threads.
902 jsr166 1.1 */
903 dl 1.405 @SuppressWarnings("removal")
904 dl 1.86 private static void checkPermission() {
905 dl 1.405 SecurityManager security; RuntimePermission perm;
906     if ((security = System.getSecurityManager()) != null) {
907     if ((perm = modifyThreadPermission) == null)
908     modifyThreadPermission = perm = // races OK
909     new RuntimePermission("modifyThread");
910     security.checkPermission(perm);
911     }
912 dl 1.355 }
913    
914 dl 1.86 // Nested classes
915 jsr166 1.1
916     /**
917 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
918     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
919     * for {@code ForkJoinWorkerThread} subclasses that extend base
920     * functionality or initialize threads with different contexts.
921 jsr166 1.1 */
922     public static interface ForkJoinWorkerThreadFactory {
923     /**
924     * Returns a new worker thread operating in the given pool.
925 dl 1.300 * Returning null or throwing an exception may result in tasks
926     * never being executed. If this method throws an exception,
927     * it is relayed to the caller of the method (for example
928     * {@code execute}) causing attempted thread creation. If this
929     * method returns null or throws an exception, it is not
930     * retried until the next attempted creation (for example
931     * another call to {@code execute}).
932 jsr166 1.1 *
933     * @param pool the pool this thread works in
934 jsr166 1.296 * @return the new worker thread, or {@code null} if the request
935 jsr166 1.331 * to create a thread is rejected
936 jsr166 1.11 * @throws NullPointerException if the pool is null
937 jsr166 1.1 */
938     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
939     }
940    
941     /**
942     * Default ForkJoinWorkerThreadFactory implementation; creates a
943 jsr166 1.331 * new ForkJoinWorkerThread using the system class loader as the
944     * thread context class loader.
945 jsr166 1.1 */
946 dl 1.355 static final class DefaultForkJoinWorkerThreadFactory
947     implements ForkJoinWorkerThreadFactory {
948     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
949 dl 1.405 boolean isCommon = (pool.workerNamePrefix == null);
950     @SuppressWarnings("removal")
951     SecurityManager sm = System.getSecurityManager();
952     if (sm == null)
953     return new ForkJoinWorkerThread(null, pool, true);
954     else if (isCommon)
955     return newCommonWithACC(pool);
956     else
957     return newRegularWithACC(pool);
958     }
959    
960     /*
961     * Create and use static AccessControlContexts only if there
962     * is a SecurityManager. (These can be removed if/when
963     * SecurityManagers are removed from platform.) The ACCs are
964     * immutable and equivalent even when racily initialized, so
965     * they don't require locking, although with the chance of
966     * needlessly duplicate construction.
967     */
968     @SuppressWarnings("removal")
969     static volatile AccessControlContext regularACC, commonACC;
970    
971     @SuppressWarnings("removal")
972     static ForkJoinWorkerThread newRegularWithACC(ForkJoinPool pool) {
973     AccessControlContext acc = regularACC;
974     if (acc == null) {
975     Permissions ps = new Permissions();
976     ps.add(new RuntimePermission("getClassLoader"));
977     ps.add(new RuntimePermission("setContextClassLoader"));
978     regularACC = acc =
979     new AccessControlContext(new ProtectionDomain[] {
980     new ProtectionDomain(null, ps) });
981     }
982 dl 1.355 return AccessController.doPrivileged(
983     new PrivilegedAction<>() {
984     public ForkJoinWorkerThread run() {
985 dl 1.405 return new ForkJoinWorkerThread(null, pool, true);
986     }}, acc);
987 dl 1.355 }
988    
989 dl 1.404 @SuppressWarnings("removal")
990 dl 1.405 static ForkJoinWorkerThread newCommonWithACC(ForkJoinPool pool) {
991     AccessControlContext acc = commonACC;
992     if (acc == null) {
993     Permissions ps = new Permissions();
994     ps.add(new RuntimePermission("getClassLoader"));
995     ps.add(new RuntimePermission("setContextClassLoader"));
996     ps.add(new RuntimePermission("modifyThread"));
997     ps.add(new RuntimePermission("enableContextClassLoaderOverride"));
998     ps.add(new RuntimePermission("modifyThreadGroup"));
999     commonACC = acc =
1000     new AccessControlContext(new ProtectionDomain[] {
1001     new ProtectionDomain(null, ps) });
1002     }
1003     return AccessController.doPrivileged(
1004     new PrivilegedAction<>() {
1005     public ForkJoinWorkerThread run() {
1006     return new ForkJoinWorkerThread.
1007     InnocuousForkJoinWorkerThread(pool);
1008     }}, acc);
1009 jsr166 1.1 }
1010     }
1011    
1012 dl 1.253 /**
1013 dl 1.78 * Queues supporting work-stealing as well as external task
1014 jsr166 1.202 * submission. See above for descriptions and algorithms.
1015 dl 1.78 */
1016     static final class WorkQueue {
1017 dl 1.355 int stackPred; // pool stack (ctl) predecessor link
1018     int config; // index, mode, ORed with SRC after init
1019 dl 1.345 int base; // index of next slot for poll
1020     ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1021 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
1022 dl 1.112
1023 dl 1.405 // fields otherwise causing more unnecessary false-sharing cache misses
1024 dl 1.355 @jdk.internal.vm.annotation.Contended("w")
1025     int top; // index of next slot for push
1026     @jdk.internal.vm.annotation.Contended("w")
1027 dl 1.405 volatile int access; // values 0, 1 (locked), PARKED, STOP
1028     @jdk.internal.vm.annotation.Contended("w")
1029     volatile int phase; // versioned, negative if inactive
1030     @jdk.internal.vm.annotation.Contended("w")
1031     volatile int source; // source queue id in topLevelExec
1032 dl 1.355 @jdk.internal.vm.annotation.Contended("w")
1033     int nsteals; // number of steals from other queues
1034    
1035     // Support for atomic operations
1036 dl 1.404 private static final Unsafe U;
1037 dl 1.405 private static final long ACCESS;
1038     private static final long PHASE;
1039 dl 1.404 private static final long ABASE;
1040 dl 1.405 private static final int ASHIFT;
1041    
1042     static ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a, int i) {
1043 dl 1.404 return (ForkJoinTask<?>)
1044 dl 1.405 U.getAndSetReference(a, ((long)i << ASHIFT) + ABASE, null);
1045 dl 1.355 }
1046 dl 1.405 static boolean casSlotToNull(ForkJoinTask<?>[] a, int i,
1047     ForkJoinTask<?> c) {
1048     return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE,
1049     c, null);
1050 dl 1.355 }
1051 dl 1.405 final void forcePhaseActive() { // clear sign bit
1052     U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
1053 dl 1.355 }
1054 dl 1.405 final int getAndSetAccess(int v) {
1055     return U.getAndSetInt(this, ACCESS, v);
1056 dl 1.78 }
1057 dl 1.407 final void releaseAccess() {
1058     U.putIntRelease(this, ACCESS, 0);
1059     }
1060 dl 1.78
1061     /**
1062 dl 1.405 * Constructor. For owned queues, most fields are initialized
1063     * upon thread start in pool.registerWorker.
1064 dl 1.345 */
1065 dl 1.405 WorkQueue(ForkJoinWorkerThread owner, int config) {
1066 dl 1.355 this.owner = owner;
1067     this.config = config;
1068 dl 1.407 base = top = 1;
1069 dl 1.345 }
1070    
1071     /**
1072 jsr166 1.220 * Returns an exportable index (used by ForkJoinWorkerThread).
1073 dl 1.200 */
1074     final int getPoolIndex() {
1075 dl 1.355 return (config & 0xffff) >>> 1; // ignore odd/even tag bit
1076 dl 1.200 }
1077    
1078     /**
1079 dl 1.115 * Returns the approximate number of tasks in the queue.
1080     */
1081     final int queueSize() {
1082 dl 1.407 int unused = access; // for ordering effect
1083     return Math.max(top - base, 0); // ignore transient negative
1084 dl 1.115 }
1085    
1086 jsr166 1.180 /**
1087 dl 1.405 * Pushes a task. Called only by owner or if already locked
1088 dl 1.78 *
1089     * @param task the task. Caller must ensure non-null.
1090 dl 1.405 * @param pool the pool. Must be non-null unless terminating.
1091     * @param signalIfEmpty true if signal when pushing to empty queue
1092 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
1093 dl 1.78 */
1094 dl 1.405 final void push(ForkJoinTask<?> task, ForkJoinPool pool,
1095     boolean signalIfEmpty) {
1096     boolean resize = false;
1097     int s = top++, b = base, cap, m; ForkJoinTask<?>[] a;
1098     if ((a = array) != null && (cap = a.length) > 0) {
1099     if ((m = (cap - 1)) == s - b) {
1100     resize = true; // rapidly grow until large
1101     int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1;
1102     ForkJoinTask<?>[] newArray;
1103     try {
1104     newArray = new ForkJoinTask<?>[newCap];
1105     } catch (Throwable ex) {
1106     top = s;
1107     access = 0;
1108     throw new RejectedExecutionException(
1109     "Queue capacity exceeded");
1110     }
1111     if (newCap > 0) { // always true
1112     int newMask = newCap - 1, k = s;
1113     do { // poll old, push to new
1114     newArray[k-- & newMask] = task;
1115     } while ((task = getAndClearSlot(a, k & m)) != null);
1116     }
1117     array = newArray;
1118     }
1119     else
1120     a[m & s] = task;
1121     getAndSetAccess(0); // for memory effects if owned
1122     if ((resize || (a[m & (s - 1)] == null && signalIfEmpty)) &&
1123     pool != null)
1124     pool.signalWork();
1125 dl 1.78 }
1126     }
1127    
1128 dl 1.178 /**
1129 dl 1.405 * Takes next task, if one exists, in order specified by mode,
1130     * so acts as either local-pop or local-poll. Called only by owner.
1131     * @param fifo nonzero if FIFO mode
1132 dl 1.112 */
1133 dl 1.405 final ForkJoinTask<?> nextLocalTask(int fifo) {
1134     ForkJoinTask<?> t = null;
1135 dl 1.355 ForkJoinTask<?>[] a = array;
1136 dl 1.405 int p = top, s = p - 1, b = base, nb, cap;
1137     if (p - b > 0 && a != null && (cap = a.length) > 0) {
1138     do {
1139     if (fifo == 0 || (nb = b + 1) == p) {
1140 dl 1.407 if ((t = getAndClearSlot(a, (cap - 1) & s)) != null)
1141 dl 1.405 top = s;
1142     break; // lost race for only task
1143     }
1144     else if ((t = getAndClearSlot(a, (cap - 1) & b)) != null) {
1145     base = nb;
1146     break;
1147     }
1148     else {
1149     while (b == (b = base)) {
1150     U.loadFence();
1151     Thread.onSpinWait(); // spin to reduce memory traffic
1152     }
1153     }
1154     } while (p - b > 0);
1155 dl 1.407 U.storeStoreFence(); // for timely index updates
1156 dl 1.345 }
1157     return t;
1158 dl 1.78 }
1159    
1160     /**
1161 dl 1.405 * Takes next task, if one exists, using configured mode.
1162     * (Always owned, never called for Common pool.)
1163 dl 1.78 */
1164 dl 1.405 final ForkJoinTask<?> nextLocalTask() {
1165     return nextLocalTask(config & FIFO);
1166 dl 1.373 }
1167    
1168     /**
1169 dl 1.405 * Pops the given task only if it is at the current top.
1170 dl 1.373 */
1171 dl 1.405 final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
1172     ForkJoinTask<?>[] a = array;
1173     int p = top, s, cap, k;
1174     if (task != null && base != p && a != null && (cap = a.length) > 0 &&
1175     a[k = (cap - 1) & (s = p - 1)] == task) {
1176     if (owned || getAndSetAccess(1) == 0) {
1177 dl 1.407 if (top != p || a[k] != task ||
1178     getAndClearSlot(a, k) == null)
1179     access = 0;
1180     else {
1181 dl 1.405 top = s;
1182 dl 1.407 releaseAccess();
1183     return true;
1184 dl 1.394 }
1185 dl 1.392 }
1186 dl 1.355 }
1187 dl 1.407 return false;
1188 dl 1.345 }
1189    
1190     /**
1191 dl 1.405 * Returns next task, if one exists, in order specified by mode.
1192 dl 1.355 */
1193 dl 1.405 final ForkJoinTask<?> peek() {
1194     ForkJoinTask<?>[] a = array;
1195     int cfg = config, p = top, b = base, cap;
1196     if (p != b && a != null && (cap = a.length) > 0) {
1197     if ((cfg & FIFO) == 0)
1198     return a[(cap - 1) & (p - 1)];
1199     else { // skip over in-progress removals
1200     ForkJoinTask<?> t;
1201     for ( ; p - b > 0; ++b) {
1202     if ((t = a[(cap - 1) & b]) != null)
1203     return t;
1204 dl 1.355 }
1205     }
1206 dl 1.78 }
1207 dl 1.405 return null;
1208 dl 1.78 }
1209    
1210     /**
1211 dl 1.405 * Polls for a task. Used only by non-owners in usually
1212     * uncontended contexts.
1213     *
1214     * @param pool if nonnull, pool to signal if more tasks exist
1215 dl 1.78 */
1216 dl 1.405 final ForkJoinTask<?> poll(ForkJoinPool pool) {
1217     for (int b = base;;) {
1218     int cap; ForkJoinTask<?>[] a;
1219     if ((a = array) == null || (cap = a.length) <= 0)
1220     break; // currently impossible
1221     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1222     ForkJoinTask<?> t = a[k];
1223     U.loadFence(); // for re-reads
1224     if (b != (b = base)) // inconsistent
1225     ;
1226     else if (t != null && casSlotToNull(a, k, t)) {
1227     base = nb;
1228     U.storeFence();
1229     if (pool != null && a[nk] != null)
1230     pool.signalWork(); // propagate
1231 dl 1.355 return t;
1232     }
1233 dl 1.405 else if (array != a || a[k] != null)
1234     ; // stale
1235     else if (a[nk] == null && top - b <= 0)
1236     break; // empty
1237 dl 1.355 }
1238     return null;
1239 dl 1.78 }
1240    
1241     /**
1242 dl 1.405 * Tries to poll next task in FIFO order, failing on
1243     * contention or stalls. Used only by topLevelExec to repoll
1244     * from the queue obtained from pool.scan.
1245 dl 1.345 */
1246 dl 1.405 final ForkJoinTask<?> tryPoll() {
1247     int b = base, cap; ForkJoinTask<?>[] a;
1248 dl 1.355 if ((a = array) != null && (cap = a.length) > 0) {
1249 dl 1.405 for (;;) {
1250     int k = (cap - 1) & b, nb = b + 1;
1251     ForkJoinTask<?> t = a[k];
1252     U.loadFence(); // for re-reads
1253     if (b != (b = base))
1254     ; // inconsistent
1255     else if (t != null) {
1256     if (casSlotToNull(a, k, t)) {
1257     base = nb;
1258 dl 1.407 U.storeStoreFence();
1259 dl 1.405 return t;
1260     }
1261     break; // contended
1262 dl 1.355 }
1263 dl 1.405 else if (a[k] == null)
1264     break; // empty or stalled
1265 jsr166 1.344 }
1266 dl 1.253 }
1267 dl 1.405 return null;
1268 dl 1.253 }
1269    
1270 dl 1.355 // specialized execution methods
1271    
1272 dl 1.253 /**
1273 dl 1.355 * Runs the given (stolen) task if nonnull, as well as
1274 dl 1.405 * remaining local tasks and/or others available from its
1275     * source queue, if any.
1276 dl 1.94 */
1277 dl 1.405 final void topLevelExec(ForkJoinTask<?> task, WorkQueue src) {
1278     int cfg = config, fifo = cfg & FIFO, nstolen = 1;
1279 dl 1.355 while (task != null) {
1280     task.doExec();
1281 dl 1.405 if ((task = nextLocalTask(fifo)) == null &&
1282     src != null && (task = src.tryPoll()) != null)
1283 dl 1.355 ++nstolen;
1284 dl 1.215 }
1285 dl 1.355 nsteals += nstolen;
1286     source = 0;
1287     if ((cfg & INNOCUOUS) != 0)
1288     ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
1289 dl 1.215 }
1290    
1291     /**
1292 dl 1.405 * Deep form of tryUnpush: Traverses from top and removes and
1293     * runs task if present, shifting others to fill gap.
1294     * @return task status if removed, else 0
1295     */
1296     final int tryRemoveAndExec(ForkJoinTask<?> task, boolean owned) {
1297     ForkJoinTask<?>[] a = array;
1298     int p = top, s = p - 1, d = p - base, cap;
1299     if (task != null && d > 0 && a != null && (cap = a.length) > 0) {
1300     for (int m = cap - 1, i = s; ; --i) {
1301     ForkJoinTask<?> t; int k;
1302     if ((t = a[k = i & m]) == task) {
1303     if (!owned && getAndSetAccess(1) != 0)
1304     break; // fail if locked
1305 dl 1.407 else if (top != p || a[k] != task ||
1306     getAndClearSlot(a, k) == null) {
1307     access = 0;
1308     break; // missed
1309     }
1310     else {
1311 dl 1.405 if (i != s && i == base)
1312     base = i + 1; // avoid shift
1313     else {
1314     for (int j = i; j != s;) // shift down
1315     a[j & m] = getAndClearSlot(a, ++j & m);
1316     top = s;
1317     }
1318 dl 1.407 releaseAccess();
1319     return task.doExec();
1320 dl 1.405 }
1321     }
1322     else if (t == null || --d == 0)
1323     break;
1324     }
1325     }
1326 dl 1.407 return 0;
1327 dl 1.405 }
1328    
1329     /**
1330 dl 1.345 * Tries to pop and run tasks within the target's computation
1331     * until done, not found, or limit exceeded.
1332 dl 1.94 *
1333 dl 1.405 * @param task root of computation
1334 dl 1.300 * @param limit max runs, or zero for no limit
1335 jsr166 1.363 * @return task status on exit
1336 dl 1.300 */
1337 dl 1.365 final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1338 dl 1.405 int status = 0;
1339     if (task != null) {
1340     outer: for (;;) {
1341     ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1342     int p, s, cap, k;
1343     if ((status = task.status) < 0)
1344     return status;
1345     if ((a = array) == null || (cap = a.length) <= 0 ||
1346     (t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null ||
1347     !(t instanceof CountedCompleter))
1348 dl 1.355 break;
1349 dl 1.405 for (CountedCompleter<?> f = (CountedCompleter<?>)t;;) {
1350     if (f == task)
1351     break;
1352     else if ((f = f.completer) == null)
1353     break outer; // ineligible
1354     }
1355     if (!owned && getAndSetAccess(1) != 0)
1356     break; // fail if locked
1357 dl 1.407 if (top != p || a[k] != t || getAndClearSlot(a, k) == null) {
1358 dl 1.405 access = 0;
1359 dl 1.407 break; // missed
1360 dl 1.104 }
1361 dl 1.407 top = s;
1362     releaseAccess();
1363     t.doExec();
1364     if (limit != 0 && --limit == 0)
1365     break;
1366 dl 1.104 }
1367 dl 1.405 status = task.status;
1368 dl 1.104 }
1369 dl 1.300 return status;
1370     }
1371    
1372 jsr166 1.344 /**
1373 dl 1.345 * Tries to poll and run AsynchronousCompletionTasks until
1374 dl 1.405 * none found or blocker is released
1375 dl 1.345 *
1376     * @param blocker the blocker
1377 jsr166 1.344 */
1378 dl 1.345 final void helpAsyncBlocker(ManagedBlocker blocker) {
1379 dl 1.405 if (blocker != null) {
1380     for (;;) {
1381     int b = base, cap; ForkJoinTask<?>[] a;
1382     if ((a = array) == null || (cap = a.length) <= 0 || b == top)
1383     break;
1384     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1385     ForkJoinTask<?> t = a[k];
1386     U.loadFence(); // for re-reads
1387     if (base != b)
1388     ;
1389     else if (blocker.isReleasable())
1390     break;
1391     else if (a[k] != t)
1392     ;
1393     else if (t != null) {
1394     if (!(t instanceof CompletableFuture
1395     .AsynchronousCompletionTask))
1396     break;
1397     else if (casSlotToNull(a, k, t)) {
1398     base = nb;
1399 dl 1.407 U.storeStoreFence();
1400 dl 1.405 t.doExec();
1401     }
1402     }
1403     else if (a[nk] == null)
1404     break;
1405 dl 1.178 }
1406 dl 1.78 }
1407     }
1408    
1409 dl 1.355 // misc
1410    
1411 dl 1.78 /**
1412 dl 1.373 * Returns true if owned by a worker thread and not known to be blocked.
1413 dl 1.86 */
1414     final boolean isApparentlyUnblocked() {
1415     Thread wt; Thread.State s;
1416 dl 1.405 return (access != STOP && (wt = owner) != null &&
1417 dl 1.86 (s = wt.getState()) != Thread.State.BLOCKED &&
1418     s != Thread.State.WAITING &&
1419     s != Thread.State.TIMED_WAITING);
1420     }
1421    
1422 dl 1.405 /**
1423     * Callback from InnocuousForkJoinWorkerThread.onStart
1424     */
1425     final void setInnocuous() {
1426     config |= INNOCUOUS;
1427     }
1428    
1429 dl 1.78 static {
1430 dl 1.404 U = Unsafe.getUnsafe();
1431 dl 1.405 Class<WorkQueue> klass = WorkQueue.class;
1432     ACCESS = U.objectFieldOffset(klass, "access");
1433     PHASE = U.objectFieldOffset(klass, "phase");
1434     Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
1435     ABASE = U.arrayBaseOffset(aklass);
1436     int scale = U.arrayIndexScale(aklass);
1437     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1438 dl 1.404 if ((scale & (scale - 1)) != 0)
1439     throw new Error("array index scale not a power of two");
1440 dl 1.78 }
1441     }
1442 dl 1.14
1443 dl 1.112 // static fields (initialized in static initializer below)
1444    
1445     /**
1446     * Creates a new ForkJoinWorkerThread. This factory is used unless
1447     * overridden in ForkJoinPool constructors.
1448     */
1449     public static final ForkJoinWorkerThreadFactory
1450     defaultForkJoinWorkerThreadFactory;
1451    
1452 jsr166 1.1 /**
1453 dl 1.405 * Common (static) pool. Non-null for public use unless a static
1454     * construction exception, but internal usages null-check on use
1455     * to paranoically avoid potential initialization circularities
1456     * as well as to simplify generated code.
1457 dl 1.115 */
1458 dl 1.405 static final ForkJoinPool common;
1459 dl 1.115
1460     /**
1461 dl 1.355 * Sequence number for creating worker names
1462 dl 1.83 */
1463 dl 1.355 private static volatile int poolIds;
1464 dl 1.86
1465     /**
1466 dl 1.405 * Permission required for callers of methods that may start or
1467     * kill threads. Lazily constructed.
1468 dl 1.86 */
1469 dl 1.405 static volatile RuntimePermission modifyThreadPermission;
1470 dl 1.86
1471 dl 1.200
1472 dl 1.300 // Instance fields
1473 dl 1.405 volatile long stealCount; // collects worker nsteals
1474     volatile long threadIds; // for worker thread names
1475 dl 1.355 final long keepAlive; // milliseconds before dropping if idle
1476 dl 1.405 final long bounds; // min, max threads packed as shorts
1477     final int config; // static configuration bits
1478     volatile int runState; // SHUTDOWN, STOP, TERMINATED bits
1479 dl 1.355 WorkQueue[] queues; // main registry
1480     final ReentrantLock registrationLock;
1481     Condition termination; // lazily constructed
1482     final String workerNamePrefix; // null for common pool
1483 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1484 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1485 dl 1.307 final Predicate<? super ForkJoinPool> saturate;
1486 dl 1.405 // final SharedThreadContainer container; // for loom
1487 dl 1.101
1488 dl 1.308 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1489     volatile long ctl; // main pool control
1490 dl 1.405 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
1491     int parallelism; // target number of workers
1492 jsr166 1.309
1493 dl 1.355 // Support for atomic operations
1494 dl 1.404 private static final Unsafe U;
1495     private static final long CTL;
1496 dl 1.405 private static final long RUNSTATE;
1497     private static final long PARALLELISM;
1498 dl 1.404 private static final long THREADIDS;
1499     private static final long POOLIDS;
1500 dl 1.405
1501 dl 1.355 private boolean compareAndSetCtl(long c, long v) {
1502 dl 1.404 return U.compareAndSetLong(this, CTL, c, v);
1503 dl 1.355 }
1504     private long compareAndExchangeCtl(long c, long v) {
1505 dl 1.404 return U.compareAndExchangeLong(this, CTL, c, v);
1506 dl 1.355 }
1507     private long getAndAddCtl(long v) {
1508 dl 1.404 return U.getAndAddLong(this, CTL, v);
1509 dl 1.355 }
1510 dl 1.405 private int getAndBitwiseOrRunState(int v) {
1511     return U.getAndBitwiseOrInt(this, RUNSTATE, v);
1512 dl 1.355 }
1513 dl 1.405 private long incrementThreadIds() {
1514     return U.getAndAddLong(this, THREADIDS, 1L);
1515 dl 1.355 }
1516     private static int getAndAddPoolIds(int x) {
1517 dl 1.404 return U.getAndAddInt(ForkJoinPool.class, POOLIDS, x);
1518 dl 1.355 }
1519 dl 1.405 private int getAndSetParallelism(int v) {
1520     return U.getAndSetInt(this, PARALLELISM, v);
1521     }
1522     private int getParallelismOpaque() {
1523     return U.getIntOpaque(this, PARALLELISM);
1524     }
1525 dl 1.355
1526 dl 1.405 // Creating, registering, and deregistering workers
1527 dl 1.200
1528 dl 1.112 /**
1529 dl 1.200 * Tries to construct and start one worker. Assumes that total
1530     * count has already been incremented as a reservation. Invokes
1531     * deregisterWorker on any failure.
1532     *
1533     * @return true if successful
1534 dl 1.115 */
1535 dl 1.300 private boolean createWorker() {
1536 dl 1.200 ForkJoinWorkerThreadFactory fac = factory;
1537     Throwable ex = null;
1538     ForkJoinWorkerThread wt = null;
1539     try {
1540 dl 1.405 if (runState >= 0 && // avoid construction if terminating
1541     fac != null && (wt = fac.newThread(this)) != null) {
1542 dl 1.200 wt.start();
1543 dl 1.405 // container.start(wt); // for loom
1544 dl 1.200 return true;
1545 dl 1.115 }
1546 dl 1.200 } catch (Throwable rex) {
1547     ex = rex;
1548 dl 1.112 }
1549 dl 1.200 deregisterWorker(wt, ex);
1550     return false;
1551 dl 1.112 }
1552    
1553 dl 1.200 /**
1554 jsr166 1.360 * Provides a name for ForkJoinWorkerThread constructor.
1555 dl 1.200 */
1556 dl 1.355 final String nextWorkerThreadName() {
1557     String prefix = workerNamePrefix;
1558 dl 1.405 long tid = incrementThreadIds() + 1L;
1559 dl 1.355 if (prefix == null) // commonPool has no prefix
1560     prefix = "ForkJoinPool.commonPool-worker-";
1561 dl 1.405 return prefix.concat(Long.toString(tid));
1562 dl 1.200 }
1563 dl 1.112
1564     /**
1565 dl 1.355 * Finishes initializing and records owned queue.
1566     *
1567     * @param w caller's WorkQueue
1568     */
1569     final void registerWorker(WorkQueue w) {
1570     ThreadLocalRandom.localInit();
1571     int seed = ThreadLocalRandom.getProbe();
1572 dl 1.405 ReentrantLock lock = registrationLock;
1573     int cfg = config & FIFO;
1574 dl 1.355 if (w != null && lock != null) {
1575     w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1576 dl 1.405 cfg |= w.config | SRC;
1577     w.stackPred = seed;
1578 dl 1.355 int id = (seed << 1) | 1; // initial index guess
1579     lock.lock();
1580     try {
1581     WorkQueue[] qs; int n; // find queue index
1582     if ((qs = queues) != null && (n = qs.length) > 0) {
1583     int k = n, m = n - 1;
1584     for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2);
1585     if (k == 0)
1586     id = n | 1; // resize below
1587 dl 1.405 w.phase = w.config = id | cfg; // now publishable
1588 dl 1.300
1589 dl 1.355 if (id < n)
1590     qs[id] = w;
1591 dl 1.300 else { // expand array
1592 dl 1.355 int an = n << 1, am = an - 1;
1593 dl 1.300 WorkQueue[] as = new WorkQueue[an];
1594 dl 1.355 as[id & am] = w;
1595     for (int j = 1; j < n; j += 2)
1596     as[j] = qs[j];
1597     for (int j = 0; j < n; j += 2) {
1598     WorkQueue q;
1599     if ((q = qs[j]) != null) // shared queues may move
1600     as[q.config & am] = q;
1601 dl 1.94 }
1602 dl 1.405 U.storeFence(); // fill before publish
1603 dl 1.355 queues = as;
1604 dl 1.94 }
1605     }
1606 dl 1.355 } finally {
1607     lock.unlock();
1608 dl 1.78 }
1609     }
1610     }
1611 dl 1.19
1612 jsr166 1.1 /**
1613 dl 1.86 * Final callback from terminating worker, as well as upon failure
1614 dl 1.105 * to construct or start a worker. Removes record of worker from
1615     * array, and adjusts counts. If pool is shutting down, tries to
1616     * complete termination.
1617 dl 1.78 *
1618 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1619 dl 1.78 * @param ex the exception causing failure, or null if none
1620 dl 1.45 */
1621 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1622 dl 1.405 WorkQueue w = (wt == null) ? null : wt.workQueue;
1623     int cfg = (w == null) ? 0 : w.config;
1624     long c = ctl;
1625     if ((cfg & TRIMMED) == 0) // decrement counts
1626     do {} while (c != (c = compareAndExchangeCtl(
1627     c, ((RC_MASK & (c - RC_UNIT)) |
1628     (TC_MASK & (c - TC_UNIT)) |
1629     (SP_MASK & c)))));
1630     else if ((int)c == 0) // was dropped on timeout
1631     cfg &= ~SRC; // suppress signal if last
1632     if (!tryTerminate(false, false) && w != null) {
1633     ReentrantLock lock; WorkQueue[] qs; int n, i;
1634 dl 1.355 long ns = w.nsteals & 0xffffffffL;
1635 dl 1.405 if ((lock = registrationLock) != null) {
1636     lock.lock(); // remove index unless terminating
1637     if ((qs = queues) != null && (n = qs.length) > 0 &&
1638     qs[i = cfg & (n - 1)] == w)
1639     qs[i] = null;
1640     stealCount += ns; // accumulate steals
1641     lock.unlock();
1642     }
1643     if ((cfg & SRC) != 0)
1644     signalWork(); // possibly replace worker
1645 dl 1.243 }
1646 dl 1.405 if (ex != null) {
1647     if (w != null) {
1648     w.access = STOP; // cancel tasks
1649     for (ForkJoinTask<?> t; (t = w.nextLocalTask(0)) != null; )
1650     ForkJoinTask.cancelIgnoringExceptions(t);
1651     }
1652 dl 1.104 ForkJoinTask.rethrow(ex);
1653 dl 1.405 }
1654 dl 1.78 }
1655 dl 1.52
1656 dl 1.355 /*
1657 dl 1.405 * Releases an idle worker, or creates one if not enough exist.
1658 dl 1.105 */
1659 dl 1.355 final void signalWork() {
1660 dl 1.405 int pc = parallelism, n;
1661     long c = ctl;
1662     WorkQueue[] qs = queues;
1663     if ((short)(c >>> RC_SHIFT) < pc && qs != null && (n = qs.length) > 0) {
1664     for (;;) {
1665     boolean create = false;
1666     int sp = (int)c & ~INACTIVE;
1667     WorkQueue v = qs[sp & (n - 1)];
1668     int deficit = pc - (short)(c >>> TC_SHIFT);
1669     long ac = (c + RC_UNIT) & RC_MASK, nc;
1670     if (sp != 0 && v != null)
1671     nc = (v.stackPred & SP_MASK) | (c & TC_MASK);
1672     else if (deficit <= 0)
1673     break;
1674     else {
1675     create = true;
1676     nc = ((c + TC_UNIT) & TC_MASK);
1677     }
1678     if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
1679     if (create)
1680     createWorker();
1681     else {
1682     Thread owner = v.owner;
1683     v.phase = sp;
1684     if (v.access == PARKED)
1685     LockSupport.unpark(owner);
1686     }
1687     break;
1688     }
1689     }
1690     }
1691     }
1692    
1693     /**
1694     * Reactivates any idle worker, if one exists.
1695     *
1696     * @return the signalled worker, or null if none
1697     */
1698     private WorkQueue reactivate() {
1699     WorkQueue[] qs; int n;
1700     long c = ctl;
1701     if ((qs = queues) != null && (n = qs.length) > 0) {
1702     for (;;) {
1703     int sp = (int)c & ~INACTIVE;
1704     WorkQueue v = qs[sp & (n - 1)];
1705     long ac = UC_MASK & (c + RC_UNIT);
1706     if (sp == 0 || v == null)
1707 dl 1.355 break;
1708     if (c == (c = compareAndExchangeCtl(
1709 dl 1.405 c, (v.stackPred & SP_MASK) | ac))) {
1710     Thread owner = v.owner;
1711     v.phase = sp;
1712     if (v.access == PARKED)
1713     LockSupport.unpark(owner);
1714     return v;
1715 dl 1.355 }
1716 dl 1.200 }
1717 dl 1.405 }
1718     return null;
1719     }
1720    
1721     /**
1722     * Tries to deactivate worker w; called only on idle timeout.
1723     */
1724     private boolean tryTrim(WorkQueue w) {
1725     if (w != null) {
1726     int pred = w.stackPred, cfg = w.config | TRIMMED;
1727     long c = ctl;
1728     int sp = (int)c & ~INACTIVE;
1729     if ((sp & SMASK) == (cfg & SMASK) &&
1730     compareAndSetCtl(c, ((pred & SP_MASK) |
1731     (UC_MASK & (c - TC_UNIT))))) {
1732     w.config = cfg; // add sentinel for deregisterWorker
1733     w.phase = sp;
1734     return true;
1735 dl 1.174 }
1736 dl 1.52 }
1737 dl 1.405 return false;
1738     }
1739    
1740     /**
1741 dl 1.407 * Returns true if any queue is detectably nonempty. Accurate
1742     * only when workers are quiescent; else conservatively
1743 dl 1.405 * approximate.
1744 dl 1.407 * @param submissionsOnly if true, only check submission queues
1745 dl 1.405 */
1746 dl 1.407 private boolean hasTasks(boolean submissionsOnly) {
1747     int step = submissionsOnly ? 2 : 1;
1748     for (int checkSum = 0;;) { // repeat until stable (normally twice)
1749     U.loadFence();
1750     WorkQueue[] qs = queues;
1751     int n = (qs == null) ? 0 : qs.length, sum = 0;
1752     for (int i = 0; i < n; i += step) {
1753     WorkQueue q; int s;
1754     if ((q = qs[i]) != null) {
1755     if (q.access > 0 || (s = q.top) != q.base)
1756     return true;
1757     sum += (s << 16) + i + 1;
1758     }
1759     }
1760     if (checkSum == (checkSum = sum))
1761     return false;
1762 dl 1.405 }
1763 dl 1.14 }
1764    
1765 dl 1.200 /**
1766 dl 1.355 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1767     * See above for explanation.
1768 dl 1.243 *
1769 dl 1.355 * @param w caller's WorkQueue (may be null on failed initialization)
1770 dl 1.243 */
1771 dl 1.355 final void runWorker(WorkQueue w) {
1772 dl 1.405 if (w != null) { // skip on failed init
1773 dl 1.355 int r = w.stackPred, src = 0; // use seed from registerWorker
1774     do {
1775     r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1776     } while ((src = scan(w, src, r)) >= 0 ||
1777     (src = awaitWork(w)) == 0);
1778 dl 1.405 w.access = STOP; // record normal termination
1779 dl 1.355 }
1780     }
1781    
1782     /**
1783     * Scans for and if found executes top-level tasks: Tries to poll
1784     * each queue starting at a random index with random stride,
1785 dl 1.405 * returning source id or retry indicator.
1786 dl 1.355 *
1787     * @param w caller's WorkQueue
1788     * @param prevSrc the previous queue stolen from in current phase, or 0
1789     * @param r random seed
1790     * @return id of queue if taken, negative if none found, prevSrc for retry
1791     */
1792     private int scan(WorkQueue w, int prevSrc, int r) {
1793     WorkQueue[] qs = queues;
1794     int n = (w == null || qs == null) ? 0 : qs.length;
1795     for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
1796 dl 1.405 int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1797     if ((q = qs[j = r & (n - 1)]) != null &&
1798 dl 1.355 (a = q.array) != null && (cap = a.length) > 0) {
1799 dl 1.405 int src = j | SRC, b = q.base;
1800     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1801     ForkJoinTask<?> t = a[k];
1802     U.loadFence(); // for re-reads
1803 dl 1.355 if (q.base != b) // inconsistent
1804     return prevSrc;
1805     else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
1806 dl 1.405 q.base = nb;
1807     w.source = src;
1808     if (prevSrc == 0 && q.base == nb && a[nk] != null)
1809 dl 1.355 signalWork(); // propagate
1810     w.topLevelExec(t, q);
1811     return src;
1812     }
1813 dl 1.405 else if (q.array != a || a[k] != null || a[nk] != null)
1814     return prevSrc; // revisit
1815 dl 1.355 }
1816     }
1817 dl 1.405 return -1;
1818 dl 1.355 }
1819    
1820     /**
1821 dl 1.405 * Advances phase, enqueues, and awaits signal or termination.
1822 dl 1.355 *
1823     * @return negative if terminated, else 0
1824     */
1825     private int awaitWork(WorkQueue w) {
1826 dl 1.405 if (w == null)
1827     return -1; // currently impossible
1828     int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase
1829     boolean idle = false; // true if possibly quiescent
1830     if (runState < 0)
1831     return -1; // terminating
1832     long sp = p & SP_MASK, pc = ctl, qc;
1833     w.phase = p | INACTIVE;
1834     do { // enqueue
1835     w.stackPred = (int)pc; // set ctl stack link
1836     } while (pc != (pc = compareAndExchangeCtl(
1837     pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
1838 dl 1.407 if ((qc & RC_MASK) <= 0L) {
1839     if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
1840     return 0; // check for stragglers
1841     if (runState != 0 && tryTerminate(false, false))
1842     return -1; // quiescent termination
1843 dl 1.405 idle = true;
1844 dl 1.407 }
1845     WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
1846     int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
1847 dl 1.405 while ((p = w.phase) < 0 && --spins > 0)
1848 dl 1.407 Thread.onSpinWait();
1849     if (p < 0) {
1850     long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
1851 dl 1.405 LockSupport.setCurrentBlocker(this);
1852 dl 1.407 for (;;) { // await signal or termination
1853     if (runState < 0)
1854     return -1;
1855 dl 1.405 w.access = PARKED; // enable unpark
1856     if (w.phase < 0) {
1857     if (idle)
1858     LockSupport.parkUntil(deadline);
1859     else
1860     LockSupport.park();
1861     }
1862     w.access = 0; // disable unpark
1863     if (w.phase >= 0) {
1864     LockSupport.setCurrentBlocker(null);
1865 dl 1.404 break;
1866     }
1867 dl 1.405 Thread.interrupted(); // clear status for next park
1868     if (idle) { // check for idle timeout
1869     if (deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
1870     if (tryTrim(w))
1871     return -1;
1872     else // not at head; restart timer
1873     deadline += keepAlive;
1874     }
1875     }
1876 dl 1.386 }
1877 dl 1.243 }
1878 dl 1.407 return 0;
1879 dl 1.355 }
1880 dl 1.300
1881 dl 1.366 /**
1882 dl 1.405 * Non-overridable version of isQuiescent. Returns true if
1883     * quiescent or already terminating.
1884 dl 1.366 */
1885 dl 1.404 private boolean canStop() {
1886 dl 1.405 long c = ctl;
1887     do {
1888     if (runState < 0)
1889     break;
1890 dl 1.407 if ((c & RC_MASK) > 0L || hasTasks(false))
1891 dl 1.405 return false;
1892     } while (c != (c = ctl)); // validate
1893     return true;
1894     }
1895    
1896     /**
1897     * Scans for and returns a polled task, if available. Used only
1898     * for untracked polls. Begins scan at a random index to avoid
1899     * systematic unfairness.
1900     *
1901     * @param submissionsOnly if true, only scan submission queues
1902     */
1903     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1904     int r = ThreadLocalRandom.nextSecondarySeed();
1905     if (submissionsOnly) // even indices only
1906     r &= ~1;
1907     int step = (submissionsOnly) ? 2 : 1;
1908     WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
1909     if (runState >= 0 && (qs = queues) != null && (n = qs.length) > 0) {
1910     for (int i = n; i > 0; i -= step, r += step) {
1911     if ((q = qs[r & (n - 1)]) != null &&
1912     (t = q.poll(this)) != null)
1913     return t;
1914 dl 1.366 }
1915     }
1916 dl 1.405 return null;
1917 dl 1.366 }
1918    
1919 dl 1.355 /**
1920     * Tries to decrement counts (sometimes implicitly) and possibly
1921     * arrange for a compensating worker in preparation for
1922     * blocking. May fail due to interference, in which case -1 is
1923     * returned so caller may retry. A zero return value indicates
1924     * that the caller doesn't need to re-adjust counts when later
1925     * unblocked.
1926     *
1927     * @param c incoming ctl value
1928 dl 1.405 * @param canSaturate to override saturate predicate
1929 dl 1.373 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1930 dl 1.355 */
1931 dl 1.405 private int tryCompensate(long c, boolean canSaturate) {
1932 dl 1.355 Predicate<? super ForkJoinPool> sat;
1933 dl 1.405 long b = bounds; // unpack fields
1934     int pc = parallelism;
1935 dl 1.355 int minActive = (short)(b & SMASK),
1936 dl 1.405 maxTotal = (short)(b >>> SWIDTH) + pc,
1937     active = (short)(c >>> RC_SHIFT),
1938 dl 1.366 total = (short)(c >>> TC_SHIFT),
1939 dl 1.405 sp = (int)c & ~INACTIVE;
1940     if (runState < 0) // terminating
1941     return -1;
1942     else if (sp != 0 && active <= pc) { // activate idle worker
1943     WorkQueue[] qs; WorkQueue v; int i;
1944     if (ctl == c && (qs = queues) != null &&
1945     qs.length > (i = sp & SMASK) && (v = qs[i]) != null) {
1946     long nc = (v.stackPred & SP_MASK) | (UC_MASK & c);
1947     if (compareAndSetCtl(c, nc)) {
1948     v.phase = sp;
1949     LockSupport.unpark(v.owner);
1950     return UNCOMPENSATE;
1951 dl 1.355 }
1952     }
1953 dl 1.405 return -1; // retry
1954     }
1955     else if (active > minActive && total >= pc) { // reduce active workers
1956     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1957     return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1958 dl 1.355 }
1959 dl 1.405 else if (total < maxTotal && total < MAX_CAP) { // expand pool
1960 dl 1.355 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1961 dl 1.373 return (!compareAndSetCtl(c, nc) ? -1 :
1962     !createWorker() ? 0 : UNCOMPENSATE);
1963 dl 1.355 }
1964 dl 1.405 else if (!compareAndSetCtl(c, c)) // validate
1965 dl 1.355 return -1;
1966 dl 1.405 else if (canSaturate || ((sat = saturate) != null && sat.test(this)))
1967 dl 1.355 return 0;
1968     else
1969     throw new RejectedExecutionException(
1970     "Thread limit exceeded replacing blocked worker");
1971     }
1972    
1973     /**
1974     * Readjusts RC count; called from ForkJoinTask after blocking.
1975     */
1976     final void uncompensate() {
1977     getAndAddCtl(RC_UNIT);
1978 dl 1.243 }
1979    
1980     /**
1981 dl 1.405 * Helps if possible until the given task is done. Processes
1982     * compatible local tasks and scans other queues for task produced
1983     * by w's stealers; returning compensated blocking sentinel if
1984     * none are found.
1985 dl 1.345 *
1986 dl 1.355 * @param task the task
1987     * @param w caller's WorkQueue
1988 dl 1.405 * @param timed true if this is a timed join
1989 dl 1.373 * @return task status on exit, or UNCOMPENSATE for compensated blocking
1990 dl 1.355 */
1991 dl 1.405 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean timed) {
1992     if (w == null || task == null)
1993     return 0;
1994     int wsrc = w.source, wid = (w.config & SMASK) | SRC, r = wid + 2;
1995     long sctl = 0L; // track stability
1996     for (boolean rescan = true;;) {
1997     int s; WorkQueue[] qs;
1998     if ((s = task.status) < 0)
1999     return s;
2000 dl 1.407 if (!rescan && sctl == (sctl = ctl)) {
2001     if ((s = tryCompensate(sctl, timed)) >= 0)
2002     return s; // block
2003     if (runState < 0)
2004     return 0;
2005     }
2006 dl 1.405 rescan = false;
2007     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2008     scan: for (int i = n >>> 1; i > 0; --i, r += 2) {
2009     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
2010     if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
2011     (cap = a.length) > 0) {
2012     for (int src = j | SRC;;) {
2013     int sq = q.source, b = q.base;
2014     int k = (cap - 1) & b, nb = b + 1;
2015     ForkJoinTask<?> t = a[k];
2016     U.loadFence(); // for re-reads
2017     boolean eligible = true; // check steal chain
2018     for (int d = n, v = sq;;) { // may be cyclic; bound
2019     WorkQueue p;
2020     if (v == wid)
2021     break;
2022     if (v == 0 || --d == 0 || (p = qs[v & m]) == null) {
2023     eligible = false;
2024     break;
2025     }
2026     v = p.source;
2027     }
2028     if (q.source != sq || q.base != b)
2029     ; // stale
2030     else if ((s = task.status) < 0)
2031     return s; // recheck before taking
2032     else if (t == null) {
2033     if (a[k] == null) {
2034     if (!rescan && eligible &&
2035     (q.array != a || q.top != b))
2036     rescan = true; // resized or stalled
2037     break;
2038 dl 1.355 }
2039 dl 1.300 }
2040 dl 1.405 else if (t != task && !eligible)
2041     break;
2042     else if (WorkQueue.casSlotToNull(a, k, t)) {
2043     q.base = nb;
2044     w.source = src;
2045     t.doExec();
2046     w.source = wsrc;
2047     rescan = true;
2048     break scan;
2049     }
2050 dl 1.300 }
2051     }
2052     }
2053     }
2054 dl 1.407 }
2055 dl 1.200
2056 dl 1.305 /**
2057 dl 1.405 * Version of helpJoin for CountedCompleters.
2058 jsr166 1.356 *
2059 dl 1.405 * @param task the task
2060 dl 1.355 * @param w caller's WorkQueue
2061 dl 1.405 * @param owned true if w is owned by a ForkJoinWorkerThread
2062     * @param timed true if this is a timed join
2063     * @return task status on exit, or UNCOMPENSATE for compensated blocking
2064 dl 1.305 */
2065 dl 1.405 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned,
2066     boolean timed) {
2067     if (w == null || task == null)
2068     return 0;
2069     int wsrc = w.source, r = w.config;
2070     long sctl = 0L; // track stability
2071     for (boolean rescan = true;;) {
2072     int s; WorkQueue[] qs;
2073     if ((s = w.helpComplete(task, owned, 0)) < 0)
2074     return s;
2075     if (!rescan && sctl == (sctl = ctl)) {
2076     if (!owned)
2077     return 0;
2078     if ((s = tryCompensate(sctl, timed)) >= 0)
2079     return s;
2080 dl 1.407 if (runState < 0)
2081     return 0;
2082 dl 1.405 }
2083     rescan = false;
2084     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2085     scan: for (int i = n; i > 0; --i, ++r) {
2086     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
2087     if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
2088     (cap = a.length) > 0) {
2089     poll: for (int src = j | SRC, b = q.base;;) {
2090     int k = (cap - 1) & b, nb = b + 1;
2091     ForkJoinTask<?> t = a[k];
2092     U.loadFence(); // for re-reads
2093     if (b != (b = q.base))
2094     ; // stale
2095     else if ((s = task.status) < 0)
2096     return s; // recheck before taking
2097     else if (t == null) {
2098     if (a[k] == null) {
2099     if (!rescan && // resized or stalled
2100     (q.array != a || q.top != b))
2101     rescan = true;
2102     break;
2103     }
2104     }
2105     else if (t instanceof CountedCompleter) {
2106     CountedCompleter<?> f;
2107     for (f = (CountedCompleter<?>)t;;) {
2108     if (f == task)
2109     break;
2110     else if ((f = f.completer) == null)
2111     break poll; // ineligible
2112 dl 1.355 }
2113 dl 1.405 if (WorkQueue.casSlotToNull(a, k, t)) {
2114     q.base = nb;
2115     w.source = src;
2116     t.doExec();
2117     w.source = wsrc;
2118     rescan = true;
2119     break scan;
2120 dl 1.355 }
2121     }
2122 dl 1.405 else
2123     break;
2124 dl 1.200 }
2125     }
2126 dl 1.178 }
2127     }
2128 dl 1.405 }
2129 dl 1.300
2130     /**
2131 dl 1.366 * Runs tasks until {@code isQuiescent()}. Rather than blocking
2132     * when tasks cannot be found, rescans until all others cannot
2133     * find tasks either.
2134     *
2135     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2136     * @param interruptible true if return on interrupt
2137     * @return positive if quiescent, negative if interrupted, else 0
2138     */
2139 dl 1.405 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
2140     long startTime = System.nanoTime(), parkTime = 0L;
2141     int phase; // w.phase set negative when temporarily quiescent
2142     if (w == null || (phase = w.phase) < 0)
2143 dl 1.366 return 0;
2144 dl 1.405 int activePhase = phase, inactivePhase = phase | INACTIVE;
2145     int wsrc = w.source, r = 0;
2146     for (boolean locals = true;;) {
2147     WorkQueue[] qs; WorkQueue q;
2148     if (runState < 0) { // terminating
2149     w.phase = activePhase;
2150     return 1;
2151     }
2152     if (locals) { // run local tasks before (re)polling
2153     for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
2154 dl 1.366 u.doExec();
2155     }
2156 dl 1.405 boolean rescan = false, busy = locals = false, interrupted;
2157     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2158     scan: for (int i = n, j; i > 0; --i, ++r) {
2159     if ((q = qs[j = m & r]) != null && q != w) {
2160     for (int src = j | SRC;;) {
2161     ForkJoinTask<?>[] a = q.array;
2162     int b = q.base, cap;
2163     if (a == null || (cap = a.length) <= 0)
2164     break;
2165     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
2166     ForkJoinTask<?> t = a[k];
2167     U.loadFence(); // for re-reads
2168     if (q.base != b || q.array != a || a[k] != t)
2169     ;
2170     else if (t == null) {
2171     if (!rescan) {
2172     if (a[nk] != null || q.top - b > 0)
2173     rescan = true;
2174     else if (!busy &&
2175     q.owner != null && q.phase >= 0)
2176     busy = true;
2177     }
2178     break;
2179 dl 1.366 }
2180 dl 1.405 else if (phase < 0) // reactivate before taking
2181     w.phase = phase = activePhase;
2182     else if (WorkQueue.casSlotToNull(a, k, t)) {
2183     q.base = nb;
2184 dl 1.366 w.source = src;
2185     t.doExec();
2186 dl 1.405 w.source = wsrc;
2187     rescan = locals = true;
2188     break scan;
2189 dl 1.366 }
2190     }
2191     }
2192     }
2193 dl 1.405 if (rescan)
2194     ; // retry
2195     else if (phase >= 0) {
2196     parkTime = 0L;
2197     w.phase = phase = inactivePhase;
2198     }
2199     else if (!busy) {
2200     w.phase = activePhase;
2201     return 1;
2202     }
2203     else if (parkTime == 0L) {
2204     parkTime = 1L << 10; // initially about 1 usec
2205     Thread.yield();
2206     }
2207     else if ((interrupted = interruptible && Thread.interrupted()) ||
2208     System.nanoTime() - startTime > nanos) {
2209     w.phase = activePhase;
2210     return interrupted ? -1 : 0;
2211     }
2212     else {
2213     LockSupport.parkNanos(this, parkTime);
2214     if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2215     parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
2216 dl 1.366 }
2217     }
2218     }
2219    
2220     /**
2221     * Helps quiesce from external caller until done, interrupted, or timeout
2222     *
2223     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2224     * @param interruptible true if return on interrupt
2225     * @return positive if quiescent, negative if interrupted, else 0
2226     */
2227 dl 1.405 private int externalHelpQuiesce(long nanos, boolean interruptible) {
2228 dl 1.366 for (long startTime = System.nanoTime(), parkTime = 0L;;) {
2229     ForkJoinTask<?> t;
2230     if ((t = pollScan(false)) != null) {
2231     t.doExec();
2232     parkTime = 0L;
2233     }
2234     else if (canStop())
2235     return 1;
2236     else if (parkTime == 0L) {
2237     parkTime = 1L << 10;
2238     Thread.yield();
2239     }
2240     else if ((System.nanoTime() - startTime) > nanos)
2241     return 0;
2242     else if (interruptible && Thread.interrupted())
2243     return -1;
2244     else {
2245     LockSupport.parkNanos(this, parkTime);
2246     if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2247     parkTime <<= 1;
2248     }
2249     }
2250     }
2251    
2252     /**
2253 dl 1.405 * Helps quiesce from either internal or external caller
2254     *
2255     * @param pool the pool to use, or null if any
2256     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2257     * @param interruptible true if return on interrupt
2258     * @return positive if quiescent, negative if interrupted, else 0
2259     */
2260     final static int helpQuiescePool(ForkJoinPool pool, long nanos,
2261     boolean interruptible) {
2262     Thread t; ForkJoinPool p; ForkJoinWorkerThread wt;
2263     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
2264     (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2265     (p == pool || pool == null))
2266     return p.helpQuiesce(wt.workQueue, nanos, interruptible);
2267     else if ((p = pool) != null || (p = common) != null)
2268     return p.externalHelpQuiesce(nanos, interruptible);
2269     else
2270     return 0;
2271     }
2272    
2273     /**
2274 dl 1.300 * Gets and removes a local or stolen task for the given worker.
2275     *
2276     * @return a task, if available
2277     */
2278     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2279     ForkJoinTask<?> t;
2280 dl 1.405 if (w == null || (t = w.nextLocalTask()) == null)
2281 dl 1.345 t = pollScan(false);
2282     return t;
2283 dl 1.90 }
2284    
2285 dl 1.300 // External operations
2286    
2287 dl 1.90 /**
2288 dl 1.355 * Finds and locks a WorkQueue for an external submitter, or
2289 dl 1.405 * throws RejectedExecutionException if shutdown or terminating.
2290     * @param isSubmit false if this is for a common pool fork
2291 dl 1.90 */
2292 dl 1.405 final WorkQueue submissionQueue(boolean isSubmit) {
2293 dl 1.355 int r;
2294 dl 1.405 ReentrantLock lock = registrationLock;
2295 dl 1.300 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2296 dl 1.355 ThreadLocalRandom.localInit(); // initialize caller's probe
2297 dl 1.300 r = ThreadLocalRandom.getProbe();
2298     }
2299 dl 1.405 if (lock != null) { // else init error
2300     for (int id = r << 1;;) { // even indices only
2301     int n, i; WorkQueue[] qs; WorkQueue q;
2302     if ((qs = queues) == null || (n = qs.length) <= 0)
2303     break;
2304     else if ((q = qs[i = (n - 1) & id]) == null) {
2305     WorkQueue w = new WorkQueue(null, id | SRC);
2306     w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
2307     lock.lock(); // install under lock
2308     if (queues == qs && qs[i] == null)
2309     qs[i] = w; // else lost race; discard
2310 dl 1.355 lock.unlock();
2311 dl 1.300 }
2312 dl 1.405 else if (q.getAndSetAccess(1) != 0) // move and restart
2313     id = (r = ThreadLocalRandom.advanceProbe(r)) << 1;
2314     else if (isSubmit && runState != 0) {
2315     q.access = 0; // check while lock held
2316     break;
2317     }
2318     else
2319     return q;
2320 dl 1.345 }
2321 dl 1.90 }
2322 dl 1.405 throw new RejectedExecutionException();
2323 dl 1.90 }
2324    
2325 dl 1.300 /**
2326 dl 1.405 * Pushes a submission to the pool, using internal queue if called
2327     * from ForkJoinWorkerThread, else external queue.
2328 dl 1.300 */
2329 dl 1.405 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty,
2330     ForkJoinTask<T> task) {
2331     WorkQueue q; Thread t; ForkJoinWorkerThread wt;
2332 dl 1.407 U.storeStoreFence(); // ensure safely publishable
2333 dl 1.405 if (task == null) throw new NullPointerException();
2334 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2335 dl 1.405 (wt = (ForkJoinWorkerThread)t).pool == this)
2336     q = wt.workQueue;
2337     else {
2338     task.markPoolSubmission();
2339     q = submissionQueue(true);
2340     }
2341     q.push(task, this, signalIfEmpty);
2342 dl 1.404 return task;
2343     }
2344    
2345     /**
2346 dl 1.405 * Returns queue for an external thread, if one exists that has
2347     * possibly ever submitted to the given pool (nonzero probe), or
2348     * null if none.
2349 dl 1.404 */
2350 dl 1.405 private static WorkQueue externalQueue(ForkJoinPool p) {
2351     WorkQueue[] qs;
2352     int r = ThreadLocalRandom.getProbe(), n;
2353     return (p != null && (qs = p.queues) != null &&
2354     (n = qs.length) > 0 && r != 0) ?
2355     qs[(n - 1) & (r << 1)] : null;
2356 dl 1.300 }
2357    
2358     /**
2359 dl 1.405 * Returns external queue for common pool.
2360 dl 1.355 */
2361     static WorkQueue commonQueue() {
2362 dl 1.405 return externalQueue(common);
2363 dl 1.300 }
2364 dl 1.90
2365     /**
2366 dl 1.396 * Returns queue for an external thread, if one exists
2367     */
2368     final WorkQueue externalQueue() {
2369 dl 1.405 return externalQueue(this);
2370 dl 1.396 }
2371    
2372     /**
2373 dl 1.355 * If the given executor is a ForkJoinPool, poll and execute
2374     * AsynchronousCompletionTasks from worker's queue until none are
2375     * available or blocker is released.
2376 dl 1.300 */
2377 dl 1.355 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2378     WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2379     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2380     if ((wt = (ForkJoinWorkerThread)t).pool == e)
2381     w = wt.workQueue;
2382     }
2383 dl 1.396 else if (e instanceof ForkJoinPool)
2384     w = ((ForkJoinPool)e).externalQueue();
2385 dl 1.355 if (w != null)
2386     w.helpAsyncBlocker(blocker);
2387 dl 1.14 }
2388    
2389     /**
2390 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
2391     * programmers, frameworks, tools, or languages have little or no
2392 jsr166 1.222 * idea about task granularity. In essence, by offering this
2393 dl 1.105 * method, we ask users only about tradeoffs in overhead vs
2394     * expected throughput and its variance, rather than how finely to
2395     * partition tasks.
2396     *
2397     * In a steady state strict (tree-structured) computation, each
2398     * thread makes available for stealing enough tasks for other
2399     * threads to remain active. Inductively, if all threads play by
2400     * the same rules, each thread should make available only a
2401     * constant number of tasks.
2402     *
2403     * The minimum useful constant is just 1. But using a value of 1
2404     * would require immediate replenishment upon each steal to
2405     * maintain enough tasks, which is infeasible. Further,
2406     * partitionings/granularities of offered tasks should minimize
2407     * steal rates, which in general means that threads nearer the top
2408     * of computation tree should generate more than those nearer the
2409     * bottom. In perfect steady state, each thread is at
2410     * approximately the same level of computation tree. However,
2411     * producing extra tasks amortizes the uncertainty of progress and
2412     * diffusion assumptions.
2413     *
2414 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2415 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2416     * against uneven progress; as traded off against the cost of
2417     * extra task overhead. We leave the user to pick a threshold
2418     * value to compare with the results of this call to guide
2419     * decisions, but recommend values such as 3.
2420     *
2421     * When all threads are active, it is on average OK to estimate
2422     * surplus strictly locally. In steady-state, if one thread is
2423     * maintaining say 2 surplus tasks, then so are others. So we can
2424     * just use estimated queue length. However, this strategy alone
2425     * leads to serious mis-estimates in some non-steady-state
2426     * conditions (ramp-up, ramp-down, other stalls). We can detect
2427     * many of these by further considering the number of "idle"
2428     * threads, that are known to have zero queued tasks, so
2429     * compensate by a factor of (#idle/#active) threads.
2430     */
2431     static int getSurplusQueuedTaskCount() {
2432     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2433 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2434     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2435     (q = wt.workQueue) != null) {
2436     int n = q.top - q.base;
2437 dl 1.405 int p = pool.parallelism;
2438     int a = (short)(pool.ctl >>> RC_SHIFT);
2439 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2440     a > (p >>>= 1) ? 1 :
2441     a > (p >>>= 1) ? 2 :
2442     a > (p >>>= 1) ? 4 :
2443     8);
2444 dl 1.105 }
2445     return 0;
2446 dl 1.100 }
2447    
2448 dl 1.300 // Termination
2449 dl 1.14
2450     /**
2451 dl 1.405 * Possibly initiates and/or completes pool termination.
2452 dl 1.14 *
2453     * @param now if true, unconditionally terminate, else only
2454 dl 1.78 * if no work and no active workers
2455 dl 1.243 * @param enable if true, terminate when next possible
2456 dl 1.300 * @return true if terminating or terminated
2457 jsr166 1.1 */
2458 dl 1.300 private boolean tryTerminate(boolean now, boolean enable) {
2459 dl 1.405 int rs; ReentrantLock lock; Condition cond;
2460     if ((rs = runState) >= 0) { // set SHUTDOWN and/or STOP
2461     if ((config & ISCOMMON) != 0)
2462     return false; // cannot shutdown
2463     if (!now) {
2464     if ((rs & SHUTDOWN) == 0) {
2465     if (!enable)
2466     return false;
2467     getAndBitwiseOrRunState(SHUTDOWN);
2468     }
2469     if (!canStop())
2470     return false;
2471     }
2472     getAndBitwiseOrRunState(SHUTDOWN | STOP);
2473     }
2474     WorkQueue released = reactivate(); // try signalling waiter
2475     int tc = (short)(ctl >>> TC_SHIFT);
2476     if (released == null && tc > 0) { // help unblock and cancel
2477     Thread current = Thread.currentThread();
2478     WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
2479     ((ForkJoinWorkerThread)current).workQueue : null);
2480     int r = (w == null) ? 0 : w.config + 1; // stagger traversals
2481     WorkQueue[] qs = queues;
2482     int n = (qs == null) ? 0 : qs.length;
2483     for (int i = 0; i < n; ++i) {
2484     WorkQueue q; Thread thread;
2485     if ((q = qs[(r + i) & (n - 1)]) != null &&
2486     (thread = q.owner) != current && q.access != STOP) {
2487 dl 1.407 for (ForkJoinTask<?> t; (t = q.poll(null)) != null; )
2488     ForkJoinTask.cancelIgnoringExceptions(t);
2489 dl 1.405 if (thread != null && !thread.isInterrupted()) {
2490     q.forcePhaseActive(); // for awaitWork
2491 dl 1.366 try {
2492     thread.interrupt();
2493     } catch (Throwable ignore) {
2494     }
2495 dl 1.203 }
2496     }
2497     }
2498 dl 1.405 }
2499     if ((tc <= 0 || (short)(ctl >>> TC_SHIFT) <= 0) &&
2500     (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
2501     (lock = registrationLock) != null) {
2502     lock.lock(); // signal when no workers
2503     if ((cond = termination) != null)
2504     cond.signalAll();
2505     lock.unlock();
2506     // container.close(); // for loom
2507 dl 1.52 }
2508 dl 1.300 return true;
2509 dl 1.105 }
2510    
2511 dl 1.52 // Exported methods
2512 jsr166 1.1
2513     // Constructors
2514    
2515     /**
2516 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2517 dl 1.300 * java.lang.Runtime#availableProcessors}, using defaults for all
2518 dl 1.319 * other parameters (see {@link #ForkJoinPool(int,
2519     * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2520     * int, int, int, Predicate, long, TimeUnit)}).
2521 jsr166 1.1 *
2522     * @throws SecurityException if a security manager exists and
2523     * the caller is not permitted to modify threads
2524     * because it does not hold {@link
2525     * java.lang.RuntimePermission}{@code ("modifyThread")}
2526     */
2527     public ForkJoinPool() {
2528 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2529 dl 1.300 defaultForkJoinWorkerThreadFactory, null, false,
2530 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2531 jsr166 1.1 }
2532    
2533     /**
2534 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2535 dl 1.319 * level, using defaults for all other parameters (see {@link
2536     * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2537     * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2538     * long, TimeUnit)}).
2539 jsr166 1.1 *
2540 jsr166 1.9 * @param parallelism the parallelism level
2541 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2542 jsr166 1.11 * equal to zero, or greater than implementation limit
2543 jsr166 1.1 * @throws SecurityException if a security manager exists and
2544     * the caller is not permitted to modify threads
2545     * because it does not hold {@link
2546     * java.lang.RuntimePermission}{@code ("modifyThread")}
2547     */
2548     public ForkJoinPool(int parallelism) {
2549 dl 1.300 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2550 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2551 jsr166 1.1 }
2552    
2553     /**
2554 dl 1.300 * Creates a {@code ForkJoinPool} with the given parameters (using
2555 dl 1.319 * defaults for others -- see {@link #ForkJoinPool(int,
2556     * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2557     * int, int, int, Predicate, long, TimeUnit)}).
2558 jsr166 1.1 *
2559 dl 1.18 * @param parallelism the parallelism level. For default value,
2560     * use {@link java.lang.Runtime#availableProcessors}.
2561     * @param factory the factory for creating new threads. For default value,
2562     * use {@link #defaultForkJoinWorkerThreadFactory}.
2563 dl 1.19 * @param handler the handler for internal worker threads that
2564     * terminate due to unrecoverable errors encountered while executing
2565 jsr166 1.31 * tasks. For default value, use {@code null}.
2566 dl 1.19 * @param asyncMode if true,
2567 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2568     * tasks that are never joined. This mode may be more appropriate
2569     * than default locally stack-based mode in applications in which
2570     * worker threads only process event-style asynchronous tasks.
2571 jsr166 1.31 * For default value, use {@code false}.
2572 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2573 jsr166 1.11 * equal to zero, or greater than implementation limit
2574     * @throws NullPointerException if the factory is null
2575 jsr166 1.1 * @throws SecurityException if a security manager exists and
2576     * the caller is not permitted to modify threads
2577     * because it does not hold {@link
2578     * java.lang.RuntimePermission}{@code ("modifyThread")}
2579     */
2580 dl 1.19 public ForkJoinPool(int parallelism,
2581 dl 1.18 ForkJoinWorkerThreadFactory factory,
2582 jsr166 1.156 UncaughtExceptionHandler handler,
2583 dl 1.18 boolean asyncMode) {
2584 dl 1.300 this(parallelism, factory, handler, asyncMode,
2585 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2586 dl 1.152 }
2587    
2588 dl 1.300 /**
2589     * Creates a {@code ForkJoinPool} with the given parameters.
2590     *
2591     * @param parallelism the parallelism level. For default value,
2592     * use {@link java.lang.Runtime#availableProcessors}.
2593     *
2594     * @param factory the factory for creating new threads. For
2595     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2596     *
2597     * @param handler the handler for internal worker threads that
2598     * terminate due to unrecoverable errors encountered while
2599     * executing tasks. For default value, use {@code null}.
2600     *
2601     * @param asyncMode if true, establishes local first-in-first-out
2602     * scheduling mode for forked tasks that are never joined. This
2603     * mode may be more appropriate than default locally stack-based
2604     * mode in applications in which worker threads only process
2605     * event-style asynchronous tasks. For default value, use {@code
2606     * false}.
2607     *
2608     * @param corePoolSize the number of threads to keep in the pool
2609     * (unless timed out after an elapsed keep-alive). Normally (and
2610     * by default) this is the same value as the parallelism level,
2611     * but may be set to a larger value to reduce dynamic overhead if
2612     * tasks regularly block. Using a smaller value (for example
2613     * {@code 0}) has the same effect as the default.
2614     *
2615     * @param maximumPoolSize the maximum number of threads allowed.
2616     * When the maximum is reached, attempts to replace blocked
2617     * threads fail. (However, because creation and termination of
2618     * different threads may overlap, and may be managed by the given
2619 dl 1.307 * thread factory, this value may be transiently exceeded.) To
2620     * arrange the same value as is used by default for the common
2621 dl 1.319 * pool, use {@code 256} plus the {@code parallelism} level. (By
2622     * default, the common pool allows a maximum of 256 spare
2623     * threads.) Using a value (for example {@code
2624     * Integer.MAX_VALUE}) larger than the implementation's total
2625     * thread limit has the same effect as using this limit (which is
2626     * the default).
2627 dl 1.300 *
2628     * @param minimumRunnable the minimum allowed number of core
2629     * threads not blocked by a join or {@link ManagedBlocker}. To
2630     * ensure progress, when too few unblocked threads exist and
2631     * unexecuted tasks may exist, new threads are constructed, up to
2632     * the given maximumPoolSize. For the default value, use {@code
2633     * 1}, that ensures liveness. A larger value might improve
2634     * throughput in the presence of blocked activities, but might
2635     * not, due to increased overhead. A value of zero may be
2636     * acceptable when submitted tasks cannot have dependencies
2637     * requiring additional threads.
2638     *
2639 jsr166 1.318 * @param saturate if non-null, a predicate invoked upon attempts
2640 dl 1.307 * to create more than the maximum total allowed threads. By
2641     * default, when a thread is about to block on a join or {@link
2642     * ManagedBlocker}, but cannot be replaced because the
2643     * maximumPoolSize would be exceeded, a {@link
2644     * RejectedExecutionException} is thrown. But if this predicate
2645     * returns {@code true}, then no exception is thrown, so the pool
2646     * continues to operate with fewer than the target number of
2647     * runnable threads, which might not ensure progress.
2648 dl 1.300 *
2649     * @param keepAliveTime the elapsed time since last use before
2650     * a thread is terminated (and then later replaced if needed).
2651     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2652     *
2653     * @param unit the time unit for the {@code keepAliveTime} argument
2654     *
2655     * @throws IllegalArgumentException if parallelism is less than or
2656     * equal to zero, or is greater than implementation limit,
2657     * or if maximumPoolSize is less than parallelism,
2658     * of if the keepAliveTime is less than or equal to zero.
2659     * @throws NullPointerException if the factory is null
2660     * @throws SecurityException if a security manager exists and
2661     * the caller is not permitted to modify threads
2662     * because it does not hold {@link
2663     * java.lang.RuntimePermission}{@code ("modifyThread")}
2664 jsr166 1.306 * @since 9
2665 dl 1.300 */
2666     public ForkJoinPool(int parallelism,
2667     ForkJoinWorkerThreadFactory factory,
2668     UncaughtExceptionHandler handler,
2669     boolean asyncMode,
2670     int corePoolSize,
2671     int maximumPoolSize,
2672     int minimumRunnable,
2673 dl 1.307 Predicate<? super ForkJoinPool> saturate,
2674 dl 1.300 long keepAliveTime,
2675     TimeUnit unit) {
2676 dl 1.355 checkPermission();
2677     int p = parallelism;
2678     if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2679 dl 1.152 throw new IllegalArgumentException();
2680 dl 1.355 if (factory == null || unit == null)
2681 dl 1.14 throw new NullPointerException();
2682 dl 1.405 this.parallelism = p;
2683 dl 1.300 this.factory = factory;
2684     this.ueh = handler;
2685 dl 1.307 this.saturate = saturate;
2686 dl 1.405 this.config = asyncMode ? FIFO : 0;
2687 dl 1.355 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2688 dl 1.405 int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2689     int maxSpares = Math.max(0, Math.min(maximumPoolSize - p, MAX_CAP));
2690     int minAvail = Math.max(0, Math.min(minimumRunnable, MAX_CAP));
2691     this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |
2692     ((long)corep << 32);
2693 dl 1.355 int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2694     this.registrationLock = new ReentrantLock();
2695     this.queues = new WorkQueue[size];
2696     String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2697 dl 1.405 String name = "ForkJoinPool-" + pid;
2698     this.workerNamePrefix = name + "-worker-";
2699     // this.container = SharedThreadContainer.create(name); // for loom
2700 jsr166 1.327 }
2701    
2702 dl 1.152 /**
2703 dl 1.300 * Constructor for common pool using parameters possibly
2704     * overridden by system properties
2705     */
2706     private ForkJoinPool(byte forCommonPoolOnly) {
2707 dl 1.405 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
2708     UncaughtExceptionHandler handler = null;
2709 dl 1.404 int maxSpares = DEFAULT_COMMON_MAX_SPARES;
2710 dl 1.405 int pc = 0, preset = 0; // nonzero if size set as property
2711 dl 1.300 try { // ignore exceptions in accessing/parsing properties
2712     String pp = System.getProperty
2713     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2714 dl 1.405 if (pp != null) {
2715     pc = Math.max(0, Integer.parseInt(pp));
2716     preset = PRESET_SIZE;
2717     }
2718     String ms = System.getProperty
2719 dl 1.404 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
2720 dl 1.405 if (ms != null)
2721     maxSpares = Math.max(0, Math.min(MAX_CAP, Integer.parseInt(ms)));
2722     String sf = System.getProperty
2723     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2724     String sh = System.getProperty
2725     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2726     if (sf != null || sh != null) {
2727     ClassLoader ldr = ClassLoader.getSystemClassLoader();
2728     if (sf != null)
2729     fac = (ForkJoinWorkerThreadFactory)
2730     ldr.loadClass(sf).getConstructor().newInstance();
2731     if (sh != null)
2732     handler = (UncaughtExceptionHandler)
2733     ldr.loadClass(sh).getConstructor().newInstance();
2734     }
2735 dl 1.300 } catch (Exception ignore) {
2736     }
2737 dl 1.405 if (preset == 0)
2738     pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2739     int p = Math.min(pc, MAX_CAP);
2740     int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
2741     this.parallelism = p;
2742     this.config = ISCOMMON | preset;
2743     this.bounds = (long)(1 | (maxSpares << SWIDTH));
2744     this.factory = fac;
2745 dl 1.18 this.ueh = handler;
2746 dl 1.355 this.keepAlive = DEFAULT_KEEPALIVE;
2747 dl 1.307 this.saturate = null;
2748 dl 1.355 this.workerNamePrefix = null;
2749 dl 1.405 this.registrationLock = new ReentrantLock();
2750 dl 1.355 this.queues = new WorkQueue[size];
2751 dl 1.405 // this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); // for loom
2752 dl 1.404 }
2753    
2754     /**
2755 dl 1.128 * Returns the common pool instance. This pool is statically
2756 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2757     * #shutdown} or {@link #shutdownNow}. However this pool and any
2758     * ongoing processing are automatically terminated upon program
2759     * {@link System#exit}. Any program that relies on asynchronous
2760     * task processing to complete before program termination should
2761 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2762     * before exit.
2763 dl 1.100 *
2764     * @return the common pool instance
2765 jsr166 1.138 * @since 1.8
2766 dl 1.100 */
2767     public static ForkJoinPool commonPool() {
2768 dl 1.405 // assert common != null : "static init error";
2769     return common;
2770 dl 1.100 }
2771    
2772 jsr166 1.1 // Execution methods
2773    
2774     /**
2775     * Performs the given task, returning its result upon completion.
2776 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2777     * it is rethrown as the outcome of this invocation. Rethrown
2778     * exceptions behave in the same way as regular exceptions, but,
2779     * when possible, contain stack traces (as displayed for example
2780     * using {@code ex.printStackTrace()}) of both the current thread
2781     * as well as the thread actually encountering the exception;
2782     * minimally only the latter.
2783 jsr166 1.1 *
2784     * @param task the task
2785 jsr166 1.191 * @param <T> the type of the task's result
2786 jsr166 1.1 * @return the task's result
2787 jsr166 1.11 * @throws NullPointerException if the task is null
2788     * @throws RejectedExecutionException if the task cannot be
2789     * scheduled for execution
2790 jsr166 1.1 */
2791     public <T> T invoke(ForkJoinTask<T> task) {
2792 dl 1.405 poolSubmit(true, task);
2793     return task.join();
2794 jsr166 1.1 }
2795    
2796     /**
2797     * Arranges for (asynchronous) execution of the given task.
2798     *
2799     * @param task the task
2800 jsr166 1.11 * @throws NullPointerException if the task is null
2801     * @throws RejectedExecutionException if the task cannot be
2802     * scheduled for execution
2803 jsr166 1.1 */
2804 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2805 dl 1.405 poolSubmit(true, task);
2806 jsr166 1.1 }
2807    
2808     // AbstractExecutorService methods
2809    
2810 jsr166 1.11 /**
2811     * @throws NullPointerException if the task is null
2812     * @throws RejectedExecutionException if the task cannot be
2813     * scheduled for execution
2814     */
2815 dl 1.355 @Override
2816     @SuppressWarnings("unchecked")
2817 jsr166 1.1 public void execute(Runnable task) {
2818 dl 1.405 poolSubmit(true, (task instanceof ForkJoinTask<?>)
2819     ? (ForkJoinTask<Void>) task // avoid re-wrap
2820     : new ForkJoinTask.RunnableExecuteAction(task));
2821 jsr166 1.1 }
2822    
2823 jsr166 1.11 /**
2824 dl 1.18 * Submits a ForkJoinTask for execution.
2825     *
2826     * @param task the task to submit
2827 jsr166 1.191 * @param <T> the type of the task's result
2828 dl 1.18 * @return the task
2829     * @throws NullPointerException if the task is null
2830     * @throws RejectedExecutionException if the task cannot be
2831     * scheduled for execution
2832     */
2833     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2834 dl 1.405 return poolSubmit(true, task);
2835 dl 1.18 }
2836    
2837     /**
2838 jsr166 1.11 * @throws NullPointerException if the task is null
2839     * @throws RejectedExecutionException if the task cannot be
2840     * scheduled for execution
2841     */
2842 dl 1.355 @Override
2843 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2844 dl 1.405 return poolSubmit(true, new ForkJoinTask.AdaptedCallable<T>(task));
2845 jsr166 1.1 }
2846    
2847 jsr166 1.11 /**
2848     * @throws NullPointerException if the task is null
2849     * @throws RejectedExecutionException if the task cannot be
2850     * scheduled for execution
2851     */
2852 dl 1.355 @Override
2853 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2854 dl 1.405 return poolSubmit(true, new ForkJoinTask.AdaptedRunnable<T>(task, result));
2855 jsr166 1.1 }
2856    
2857 jsr166 1.11 /**
2858     * @throws NullPointerException if the task is null
2859     * @throws RejectedExecutionException if the task cannot be
2860     * scheduled for execution
2861     */
2862 dl 1.355 @Override
2863 jsr166 1.335 @SuppressWarnings("unchecked")
2864 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2865 dl 1.405 return poolSubmit(true, (task instanceof ForkJoinTask<?>)
2866     ? (ForkJoinTask<Void>) task // avoid re-wrap
2867     : new ForkJoinTask.AdaptedRunnableAction(task));
2868 jsr166 1.1 }
2869    
2870 dl 1.405 // Added mainly for possible use in Loom
2871    
2872 dl 1.404 /**
2873     * Submits the given task without guaranteeing that it will
2874 dl 1.405 * eventually execute in the absence of available active threads.
2875     * In some contexts, this method may reduce contention and
2876     * overhead by relying on context-specific knowledge that existing
2877     * threads (possibly including the calling thread if operating in
2878     * this pool) will eventually be available to execute the task.
2879 dl 1.404 *
2880     * @param task the task
2881     * @param <T> the type of the task's result
2882     * @return the task
2883 dl 1.406 * @since 19
2884 dl 1.404 */
2885 dl 1.405 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
2886     return poolSubmit(false, task);
2887 dl 1.404 }
2888 dl 1.405
2889 dl 1.404 /**
2890 dl 1.405 * Changes the target parallelism of this pool, controlling the
2891     * future creation, use, and termination of worker threads.
2892     * Applications include contexts in which the number of available
2893     * processors changes over time.
2894     *
2895     * @param size the target parallelism level
2896     * @return the previous parallelism level.
2897     * @throws IllegalArgumentException if size is less than 1 or
2898     * greater than the maximum supported by this
2899     * pool (currently 32767).
2900     * @throws IllegalStateException if this is the{@link #commonPool()} and
2901     * parallelism level was set by System property
2902     * {@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}.
2903     * @throws SecurityException if a security manager exists and
2904     * the caller is not permitted to modify threads
2905     * because it does not hold {@link
2906     * java.lang.RuntimePermission}{@code ("modifyThread")}
2907 dl 1.406 * @since 19
2908 dl 1.404 */
2909 dl 1.405 public int setParallelism(int size) {
2910     if (size < 1 || size > MAX_CAP)
2911     throw new IllegalArgumentException();
2912     if ((config & PRESET_SIZE) != 0)
2913     throw new IllegalStateException("Cannot override System property");
2914     checkPermission();
2915     return getAndSetParallelism(size);
2916 dl 1.404 }
2917    
2918 jsr166 1.1 /**
2919 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2920     * @throws RejectedExecutionException {@inheritDoc}
2921     */
2922 dl 1.355 @Override
2923 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2924 dl 1.366 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2925     try {
2926     for (Callable<T> t : tasks) {
2927 dl 1.367 ForkJoinTask<T> f =
2928     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2929 dl 1.366 futures.add(f);
2930 dl 1.405 poolSubmit(true, f);
2931 dl 1.366 }
2932     for (int i = futures.size() - 1; i >= 0; --i)
2933 dl 1.405 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2934 dl 1.366 return futures;
2935     } catch (Throwable t) {
2936     for (Future<T> e : futures)
2937     ForkJoinTask.cancelIgnoringExceptions(e);
2938     throw t;
2939     }
2940 dl 1.355 }
2941    
2942     @Override
2943     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2944     long timeout, TimeUnit unit)
2945     throws InterruptedException {
2946 dl 1.366 long nanos = unit.toNanos(timeout);
2947     ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2948     try {
2949     for (Callable<T> t : tasks) {
2950 dl 1.367 ForkJoinTask<T> f =
2951     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2952 dl 1.366 futures.add(f);
2953 dl 1.405 poolSubmit(true, f);
2954 dl 1.366 }
2955     long startTime = System.nanoTime(), ns = nanos;
2956     boolean timedOut = (ns < 0L);
2957     for (int i = futures.size() - 1; i >= 0; --i) {
2958 dl 1.405 ForkJoinTask<T> f = (ForkJoinTask<T>)futures.get(i);
2959 dl 1.366 if (!f.isDone()) {
2960 dl 1.405 if (!timedOut)
2961     timedOut = !f.quietlyJoin(ns, TimeUnit.NANOSECONDS);
2962 dl 1.366 if (timedOut)
2963     ForkJoinTask.cancelIgnoringExceptions(f);
2964 dl 1.405 else
2965     ns = nanos - (System.nanoTime() - startTime);
2966 dl 1.366 }
2967 dl 1.355 }
2968 dl 1.366 return futures;
2969     } catch (Throwable t) {
2970     for (Future<T> e : futures)
2971     ForkJoinTask.cancelIgnoringExceptions(e);
2972     throw t;
2973 dl 1.355 }
2974 jsr166 1.1 }
2975    
2976 dl 1.367 // Task to hold results from InvokeAnyTasks
2977     static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2978     private static final long serialVersionUID = 2838392045355241008L;
2979     @SuppressWarnings("serial") // Conditionally serializable
2980     volatile E result;
2981 dl 1.391 final AtomicInteger count; // in case all throw
2982 jsr166 1.402 @SuppressWarnings("serial")
2983 dl 1.391 final ForkJoinPool pool; // to check shutdown while collecting
2984     InvokeAnyRoot(int n, ForkJoinPool p) {
2985     pool = p;
2986     count = new AtomicInteger(n);
2987     }
2988 dl 1.367 final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2989 jsr166 1.384 Throwable ex = null;
2990 dl 1.394 boolean failed;
2991     if (c == null || Thread.interrupted() ||
2992 dl 1.405 (pool != null && pool.runState < 0))
2993 dl 1.394 failed = true;
2994     else if (isDone())
2995     failed = false;
2996     else {
2997 dl 1.390 try {
2998     complete(c.call());
2999 dl 1.394 failed = false;
3000 dl 1.390 } catch (Throwable tx) {
3001     ex = tx;
3002 jsr166 1.384 failed = true;
3003     }
3004     }
3005 dl 1.405 if ((pool != null && pool.runState < 0) ||
3006 dl 1.391 (failed && count.getAndDecrement() <= 1))
3007 jsr166 1.384 trySetThrown(ex != null ? ex : new CancellationException());
3008 dl 1.367 }
3009     public final boolean exec() { return false; } // never forked
3010     public final E getRawResult() { return result; }
3011     public final void setRawResult(E v) { result = v; }
3012     }
3013    
3014     // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
3015     static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
3016     private static final long serialVersionUID = 2838392045355241008L;
3017     final InvokeAnyRoot<E> root;
3018     @SuppressWarnings("serial") // Conditionally serializable
3019     final Callable<E> callable;
3020     transient volatile Thread runner;
3021     InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) {
3022     this.root = root;
3023     this.callable = callable;
3024     }
3025     public final boolean exec() {
3026     Thread.interrupted();
3027     runner = Thread.currentThread();
3028     root.tryComplete(callable);
3029     runner = null;
3030     Thread.interrupted();
3031     return true;
3032     }
3033     public final boolean cancel(boolean mayInterruptIfRunning) {
3034     Thread t;
3035     boolean stat = super.cancel(false);
3036     if (mayInterruptIfRunning && (t = runner) != null) {
3037     try {
3038     t.interrupt();
3039     } catch (Throwable ignore) {
3040     }
3041     }
3042     return stat;
3043     }
3044     public final void setRawResult(E v) {} // unused
3045     public final E getRawResult() { return null; }
3046     }
3047    
3048     @Override
3049     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
3050     throws InterruptedException, ExecutionException {
3051     int n = tasks.size();
3052     if (n <= 0)
3053     throw new IllegalArgumentException();
3054 dl 1.390 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
3055 dl 1.367 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
3056     try {
3057 dl 1.390 for (Callable<T> c : tasks) {
3058     if (c == null)
3059     throw new NullPointerException();
3060     InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
3061     fs.add(f);
3062 dl 1.405 poolSubmit(true, f);
3063 dl 1.390 if (root.isDone())
3064     break;
3065     }
3066 dl 1.405 return root.get();
3067 dl 1.367 } finally {
3068     for (InvokeAnyTask<T> f : fs)
3069 dl 1.369 ForkJoinTask.cancelIgnoringExceptions(f);
3070 dl 1.367 }
3071     }
3072    
3073     @Override
3074     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
3075     long timeout, TimeUnit unit)
3076     throws InterruptedException, ExecutionException, TimeoutException {
3077     long nanos = unit.toNanos(timeout);
3078     int n = tasks.size();
3079     if (n <= 0)
3080     throw new IllegalArgumentException();
3081 dl 1.390 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
3082 dl 1.367 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
3083     try {
3084 dl 1.390 for (Callable<T> c : tasks) {
3085     if (c == null)
3086     throw new NullPointerException();
3087     InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
3088     fs.add(f);
3089 dl 1.405 poolSubmit(true, f);
3090 dl 1.390 if (root.isDone())
3091     break;
3092     }
3093 dl 1.405 return root.get(nanos, TimeUnit.NANOSECONDS);
3094 dl 1.367 } finally {
3095     for (InvokeAnyTask<T> f : fs)
3096 dl 1.369 ForkJoinTask.cancelIgnoringExceptions(f);
3097 dl 1.367 }
3098     }
3099    
3100 jsr166 1.1 /**
3101     * Returns the factory used for constructing new workers.
3102     *
3103     * @return the factory used for constructing new workers
3104     */
3105     public ForkJoinWorkerThreadFactory getFactory() {
3106     return factory;
3107     }
3108    
3109     /**
3110     * Returns the handler for internal worker threads that terminate
3111     * due to unrecoverable errors encountered while executing tasks.
3112     *
3113 jsr166 1.4 * @return the handler, or {@code null} if none
3114 jsr166 1.1 */
3115 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
3116 dl 1.14 return ueh;
3117 jsr166 1.1 }
3118    
3119     /**
3120 jsr166 1.9 * Returns the targeted parallelism level of this pool.
3121 jsr166 1.1 *
3122 jsr166 1.9 * @return the targeted parallelism level of this pool
3123 jsr166 1.1 */
3124     public int getParallelism() {
3125 dl 1.405 return Math.max(getParallelismOpaque(), 1);
3126 jsr166 1.1 }
3127    
3128     /**
3129 dl 1.100 * Returns the targeted parallelism level of the common pool.
3130     *
3131     * @return the targeted parallelism level of the common pool
3132 jsr166 1.138 * @since 1.8
3133 dl 1.100 */
3134     public static int getCommonPoolParallelism() {
3135 dl 1.405 return common.getParallelism();
3136 dl 1.100 }
3137    
3138     /**
3139 jsr166 1.1 * Returns the number of worker threads that have started but not
3140 jsr166 1.34 * yet terminated. The result returned by this method may differ
3141 jsr166 1.4 * from {@link #getParallelism} when threads are created to
3142 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
3143     *
3144     * @return the number of worker threads
3145     */
3146     public int getPoolSize() {
3147 dl 1.405 return (short)(ctl >>> TC_SHIFT);
3148 jsr166 1.1 }
3149    
3150     /**
3151 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
3152 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
3153     *
3154 jsr166 1.4 * @return {@code true} if this pool uses async mode
3155 jsr166 1.1 */
3156     public boolean getAsyncMode() {
3157 dl 1.405 return (config & FIFO) != 0;
3158 jsr166 1.1 }
3159    
3160     /**
3161     * Returns an estimate of the number of worker threads that are
3162     * not blocked waiting to join tasks or for other managed
3163 dl 1.14 * synchronization. This method may overestimate the
3164     * number of running threads.
3165 jsr166 1.1 *
3166     * @return the number of worker threads
3167     */
3168     public int getRunningThreadCount() {
3169 dl 1.355 WorkQueue[] qs; WorkQueue q;
3170 jsr166 1.344 int rc = 0;
3171 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3172 dl 1.355 for (int i = 1; i < qs.length; i += 2) {
3173     if ((q = qs[i]) != null && q.isApparentlyUnblocked())
3174 dl 1.78 ++rc;
3175     }
3176     }
3177     return rc;
3178 jsr166 1.1 }
3179    
3180     /**
3181     * Returns an estimate of the number of threads that are currently
3182     * stealing or executing tasks. This method may overestimate the
3183     * number of active threads.
3184     *
3185     * @return the number of active threads
3186     */
3187     public int getActiveThreadCount() {
3188 dl 1.405 return Math.max((short)(ctl >>> RC_SHIFT), 0);
3189 jsr166 1.1 }
3190    
3191     /**
3192 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
3193     * An idle worker is one that cannot obtain a task to execute
3194     * because none are available to steal from other threads, and
3195     * there are no pending submissions to the pool. This method is
3196     * conservative; it might not return {@code true} immediately upon
3197     * idleness of all threads, but will eventually become true if
3198     * threads remain inactive.
3199 jsr166 1.1 *
3200 jsr166 1.4 * @return {@code true} if all threads are currently idle
3201 jsr166 1.1 */
3202     public boolean isQuiescent() {
3203 dl 1.366 return canStop();
3204 jsr166 1.1 }
3205    
3206     /**
3207 dl 1.354 * Returns an estimate of the total number of completed tasks that
3208     * were executed by a thread other than their submitter. The
3209     * reported value underestimates the actual total number of steals
3210     * when the pool is not quiescent. This value may be useful for
3211     * monitoring and tuning fork/join programs: in general, steal
3212     * counts should be high enough to keep threads busy, but low
3213     * enough to avoid overhead and contention across threads.
3214 jsr166 1.1 *
3215     * @return the number of steals
3216     */
3217     public long getStealCount() {
3218 dl 1.300 long count = stealCount;
3219 dl 1.355 WorkQueue[] qs; WorkQueue q;
3220     if ((qs = queues) != null) {
3221     for (int i = 1; i < qs.length; i += 2) {
3222     if ((q = qs[i]) != null)
3223 dl 1.405 count += (long)q.nsteals & 0xffffffffL;
3224 dl 1.78 }
3225     }
3226     return count;
3227 jsr166 1.1 }
3228    
3229     /**
3230     * Returns an estimate of the total number of tasks currently held
3231     * in queues by worker threads (but not including tasks submitted
3232     * to the pool that have not begun executing). This value is only
3233     * an approximation, obtained by iterating across all threads in
3234     * the pool. This method may be useful for tuning task
3235     * granularities.
3236     *
3237     * @return the number of queued tasks
3238     */
3239     public long getQueuedTaskCount() {
3240 dl 1.355 WorkQueue[] qs; WorkQueue q;
3241 dl 1.345 int count = 0;
3242 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3243 dl 1.355 for (int i = 1; i < qs.length; i += 2) {
3244     if ((q = qs[i]) != null)
3245     count += q.queueSize();
3246 dl 1.78 }
3247 dl 1.52 }
3248 jsr166 1.1 return count;
3249     }
3250    
3251     /**
3252 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
3253 dl 1.55 * pool that have not yet begun executing. This method may take
3254 dl 1.52 * time proportional to the number of submissions.
3255 jsr166 1.1 *
3256     * @return the number of queued submissions
3257     */
3258     public int getQueuedSubmissionCount() {
3259 dl 1.355 WorkQueue[] qs; WorkQueue q;
3260 jsr166 1.344 int count = 0;
3261 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3262 dl 1.355 for (int i = 0; i < qs.length; i += 2) {
3263     if ((q = qs[i]) != null)
3264     count += q.queueSize();
3265 dl 1.78 }
3266     }
3267     return count;
3268 jsr166 1.1 }
3269    
3270     /**
3271 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
3272     * pool that have not yet begun executing.
3273 jsr166 1.1 *
3274     * @return {@code true} if there are any queued submissions
3275     */
3276     public boolean hasQueuedSubmissions() {
3277 dl 1.407 return hasTasks(true);
3278 jsr166 1.1 }
3279    
3280     /**
3281     * Removes and returns the next unexecuted submission if one is
3282     * available. This method may be useful in extensions to this
3283     * class that re-assign work in systems with multiple pools.
3284     *
3285 jsr166 1.4 * @return the next submission, or {@code null} if none
3286 jsr166 1.1 */
3287     protected ForkJoinTask<?> pollSubmission() {
3288 dl 1.300 return pollScan(true);
3289 jsr166 1.1 }
3290    
3291     /**
3292     * Removes all available unexecuted submitted and forked tasks
3293     * from scheduling queues and adds them to the given collection,
3294     * without altering their execution status. These may include
3295 jsr166 1.8 * artificially generated or wrapped tasks. This method is
3296     * designed to be invoked only when the pool is known to be
3297 jsr166 1.1 * quiescent. Invocations at other times may not remove all
3298     * tasks. A failure encountered while attempting to add elements
3299     * to collection {@code c} may result in elements being in
3300     * neither, either or both collections when the associated
3301     * exception is thrown. The behavior of this operation is
3302     * undefined if the specified collection is modified while the
3303     * operation is in progress.
3304     *
3305     * @param c the collection to transfer elements into
3306     * @return the number of elements transferred
3307     */
3308 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
3309 jsr166 1.344 int count = 0;
3310 dl 1.355 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
3311     c.add(t);
3312     ++count;
3313 dl 1.52 }
3314 dl 1.18 return count;
3315     }
3316    
3317     /**
3318 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
3319     * including indications of run state, parallelism level, and
3320     * worker and task counts.
3321     *
3322     * @return a string identifying this pool, as well as its state
3323     */
3324     public String toString() {
3325 dl 1.355 // Use a single pass through queues to collect counts
3326 dl 1.345 long st = stealCount;
3327 dl 1.355 long qt = 0L, ss = 0L; int rc = 0;
3328     WorkQueue[] qs; WorkQueue q;
3329     if ((qs = queues) != null) {
3330     for (int i = 0; i < qs.length; ++i) {
3331     if ((q = qs[i]) != null) {
3332     int size = q.queueSize();
3333 dl 1.86 if ((i & 1) == 0)
3334 dl 1.355 ss += size;
3335 dl 1.86 else {
3336     qt += size;
3337 dl 1.355 st += (long)q.nsteals & 0xffffffffL;
3338     if (q.isApparentlyUnblocked())
3339 dl 1.86 ++rc;
3340     }
3341     }
3342     }
3343     }
3344 dl 1.300
3345 dl 1.405 int pc = parallelism;
3346     long c = ctl;
3347     int tc = (short)(c >>> TC_SHIFT);
3348     int ac = (short)(c >>> RC_SHIFT);
3349 dl 1.78 if (ac < 0) // ignore transient negative
3350     ac = 0;
3351 dl 1.405 int rs = runState;
3352     String level = ((rs & TERMINATED) != 0 ? "Terminated" :
3353     (rs & STOP) != 0 ? "Terminating" :
3354     (rs & SHUTDOWN) != 0 ? "Shutting down" :
3355 dl 1.200 "Running");
3356 jsr166 1.1 return super.toString() +
3357 dl 1.52 "[" + level +
3358 dl 1.14 ", parallelism = " + pc +
3359     ", size = " + tc +
3360     ", active = " + ac +
3361     ", running = " + rc +
3362 jsr166 1.1 ", steals = " + st +
3363     ", tasks = " + qt +
3364 dl 1.355 ", submissions = " + ss +
3365 jsr166 1.1 "]";
3366     }
3367    
3368     /**
3369 dl 1.100 * Possibly initiates an orderly shutdown in which previously
3370     * submitted tasks are executed, but no new tasks will be
3371     * accepted. Invocation has no effect on execution state if this
3372 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
3373 dl 1.100 * already shut down. Tasks that are in the process of being
3374     * submitted concurrently during the course of this method may or
3375     * may not be rejected.
3376 jsr166 1.1 *
3377     * @throws SecurityException if a security manager exists and
3378     * the caller is not permitted to modify threads
3379     * because it does not hold {@link
3380     * java.lang.RuntimePermission}{@code ("modifyThread")}
3381     */
3382     public void shutdown() {
3383     checkPermission();
3384 dl 1.404 tryTerminate(false, true);
3385 jsr166 1.1 }
3386    
3387     /**
3388 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
3389     * all subsequently submitted tasks. Invocation has no effect on
3390 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
3391 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
3392     * are in the process of being submitted or executed concurrently
3393     * during the course of this method may or may not be
3394     * rejected. This method cancels both existing and unexecuted
3395     * tasks, in order to permit termination in the presence of task
3396     * dependencies. So the method always returns an empty list
3397     * (unlike the case for some other Executors).
3398 jsr166 1.1 *
3399     * @return an empty list
3400     * @throws SecurityException if a security manager exists and
3401     * the caller is not permitted to modify threads
3402     * because it does not hold {@link
3403     * java.lang.RuntimePermission}{@code ("modifyThread")}
3404     */
3405     public List<Runnable> shutdownNow() {
3406     checkPermission();
3407 dl 1.404 tryTerminate(true, true);
3408 jsr166 1.1 return Collections.emptyList();
3409     }
3410    
3411     /**
3412     * Returns {@code true} if all tasks have completed following shut down.
3413     *
3414     * @return {@code true} if all tasks have completed following shut down
3415     */
3416     public boolean isTerminated() {
3417 dl 1.405 return (runState & TERMINATED) != 0;
3418 jsr166 1.1 }
3419    
3420     /**
3421     * Returns {@code true} if the process of termination has
3422 jsr166 1.9 * commenced but not yet completed. This method may be useful for
3423     * debugging. A return of {@code true} reported a sufficient
3424     * period after shutdown may indicate that submitted tasks have
3425 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
3426 dl 1.49 * causing this executor not to properly terminate. (See the
3427     * advisory notes for class {@link ForkJoinTask} stating that
3428     * tasks should not normally entail blocking operations. But if
3429     * they do, they must abort them on interrupt.)
3430 jsr166 1.1 *
3431 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
3432 jsr166 1.1 */
3433     public boolean isTerminating() {
3434 dl 1.405 return (runState & (STOP | TERMINATED)) == STOP;
3435 jsr166 1.1 }
3436    
3437     /**
3438     * Returns {@code true} if this pool has been shut down.
3439     *
3440     * @return {@code true} if this pool has been shut down
3441     */
3442     public boolean isShutdown() {
3443 dl 1.405 return runState != 0;
3444 jsr166 1.9 }
3445    
3446     /**
3447 dl 1.105 * Blocks until all tasks have completed execution after a
3448     * shutdown request, or the timeout occurs, or the current thread
3449 dl 1.134 * is interrupted, whichever happens first. Because the {@link
3450     * #commonPool()} never terminates until program shutdown, when
3451     * applied to the common pool, this method is equivalent to {@link
3452 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3453 jsr166 1.1 *
3454     * @param timeout the maximum time to wait
3455     * @param unit the time unit of the timeout argument
3456     * @return {@code true} if this executor terminated and
3457     * {@code false} if the timeout elapsed before termination
3458     * @throws InterruptedException if interrupted while waiting
3459     */
3460     public boolean awaitTermination(long timeout, TimeUnit unit)
3461     throws InterruptedException {
3462 dl 1.405 ReentrantLock lock; Condition cond; boolean terminated;
3463 dl 1.355 long nanos = unit.toNanos(timeout);
3464 dl 1.405 if ((config & ISCOMMON) != 0) {
3465     if (helpQuiescePool(this, nanos, true) < 0)
3466 dl 1.366 throw new InterruptedException();
3467 dl 1.405 terminated = false;
3468 dl 1.134 }
3469 dl 1.405 else if (!(terminated = ((runState & TERMINATED) != 0))) {
3470 dl 1.407 tryTerminate(false, false); // reduce transient blocking
3471 dl 1.405 if ((lock = registrationLock) != null &&
3472     !(terminated = (((runState & TERMINATED) != 0)))) {
3473     lock.lock();
3474     try {
3475     if ((cond = termination) == null)
3476     termination = cond = lock.newCondition();
3477     while (!(terminated = ((runState & TERMINATED) != 0)) &&
3478     nanos > 0L)
3479     nanos = cond.awaitNanos(nanos);
3480     } finally {
3481     lock.unlock();
3482     }
3483 dl 1.366 }
3484 dl 1.18 }
3485 dl 1.366 return terminated;
3486 jsr166 1.1 }
3487    
3488     /**
3489 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
3490     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3491     * waits and/or attempts to assist performing tasks until this
3492     * pool {@link #isQuiescent} or the indicated timeout elapses.
3493     *
3494     * @param timeout the maximum time to wait
3495     * @param unit the time unit of the timeout argument
3496     * @return {@code true} if quiescent; {@code false} if the
3497     * timeout elapsed.
3498     */
3499     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3500 dl 1.405 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
3501 dl 1.134 }
3502    
3503     /**
3504 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3505 jsr166 1.8 * in {@link ForkJoinPool}s.
3506     *
3507 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3508 jsr166 1.218 * {@link #isReleasable} must return {@code true} if blocking is
3509     * not necessary. Method {@link #block} blocks the current thread
3510 dl 1.19 * if necessary (perhaps internally invoking {@code isReleasable}
3511 dl 1.54 * before actually blocking). These actions are performed by any
3512 dl 1.355 * thread invoking {@link
3513     * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
3514     * methods in this API accommodate synchronizers that may, but
3515     * don't usually, block for long periods. Similarly, they allow
3516     * more efficient internal handling of cases in which additional
3517     * workers may be, but usually are not, needed to ensure
3518     * sufficient parallelism. Toward this end, implementations of
3519     * method {@code isReleasable} must be amenable to repeated
3520     * invocation. Neither method is invoked after a prior invocation
3521     * of {@code isReleasable} or {@code block} returns {@code true}.
3522 jsr166 1.1 *
3523     * <p>For example, here is a ManagedBlocker based on a
3524     * ReentrantLock:
3525 jsr166 1.239 * <pre> {@code
3526 jsr166 1.1 * class ManagedLocker implements ManagedBlocker {
3527     * final ReentrantLock lock;
3528     * boolean hasLock = false;
3529     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3530     * public boolean block() {
3531     * if (!hasLock)
3532     * lock.lock();
3533     * return true;
3534     * }
3535     * public boolean isReleasable() {
3536     * return hasLock || (hasLock = lock.tryLock());
3537     * }
3538     * }}</pre>
3539 dl 1.19 *
3540     * <p>Here is a class that possibly blocks waiting for an
3541     * item on a given queue:
3542 jsr166 1.239 * <pre> {@code
3543 dl 1.19 * class QueueTaker<E> implements ManagedBlocker {
3544     * final BlockingQueue<E> queue;
3545     * volatile E item = null;
3546     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3547     * public boolean block() throws InterruptedException {
3548     * if (item == null)
3549 dl 1.23 * item = queue.take();
3550 dl 1.19 * return true;
3551     * }
3552     * public boolean isReleasable() {
3553 dl 1.23 * return item != null || (item = queue.poll()) != null;
3554 dl 1.19 * }
3555     * public E getItem() { // call after pool.managedBlock completes
3556     * return item;
3557     * }
3558     * }}</pre>
3559 jsr166 1.1 */
3560     public static interface ManagedBlocker {
3561     /**
3562     * Possibly blocks the current thread, for example waiting for
3563     * a lock or condition.
3564     *
3565 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3566     * (i.e., if isReleasable would return true)
3567 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3568     * (the method is not required to do so, but is allowed to)
3569     */
3570     boolean block() throws InterruptedException;
3571    
3572     /**
3573 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3574 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3575 jsr166 1.1 */
3576     boolean isReleasable();
3577     }
3578    
3579     /**
3580 jsr166 1.217 * Runs the given possibly blocking task. When {@linkplain
3581     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3582     * method possibly arranges for a spare thread to be activated if
3583     * necessary to ensure sufficient parallelism while the current
3584     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3585 jsr166 1.1 *
3586 jsr166 1.217 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3587     * {@code blocker.block()} until either method returns {@code true}.
3588     * Every call to {@code blocker.block()} is preceded by a call to
3589     * {@code blocker.isReleasable()} that returned {@code false}.
3590     *
3591     * <p>If not running in a ForkJoinPool, this method is
3592 jsr166 1.8 * behaviorally equivalent to
3593 jsr166 1.239 * <pre> {@code
3594 jsr166 1.1 * while (!blocker.isReleasable())
3595     * if (blocker.block())
3596 jsr166 1.217 * break;}</pre>
3597 jsr166 1.8 *
3598 jsr166 1.217 * If running in a ForkJoinPool, the pool may first be expanded to
3599     * ensure sufficient parallelism available during the call to
3600     * {@code blocker.block()}.
3601 jsr166 1.1 *
3602 jsr166 1.217 * @param blocker the blocker task
3603     * @throws InterruptedException if {@code blocker.block()} did so
3604 jsr166 1.1 */
3605 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3606 jsr166 1.1 throws InterruptedException {
3607 dl 1.355 Thread t; ForkJoinPool p;
3608     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3609     (p = ((ForkJoinWorkerThread)t).pool) != null)
3610     p.compensatedBlock(blocker);
3611     else
3612     unmanagedBlock(blocker);
3613     }
3614    
3615     /** ManagedBlock for ForkJoinWorkerThreads */
3616     private void compensatedBlock(ManagedBlocker blocker)
3617     throws InterruptedException {
3618 dl 1.345 if (blocker == null) throw new NullPointerException();
3619 dl 1.355 for (;;) {
3620     int comp; boolean done;
3621     long c = ctl;
3622     if (blocker.isReleasable())
3623     break;
3624 dl 1.405 if ((comp = tryCompensate(c, false)) >= 0) {
3625 dl 1.355 long post = (comp == 0) ? 0L : RC_UNIT;
3626     try {
3627     done = blocker.block();
3628     } finally {
3629     getAndAddCtl(post);
3630     }
3631     if (done)
3632 dl 1.105 break;
3633 dl 1.78 }
3634 dl 1.18 }
3635 jsr166 1.1 }
3636    
3637 dl 1.355 /** ManagedBlock for external threads */
3638     private static void unmanagedBlock(ManagedBlocker blocker)
3639     throws InterruptedException {
3640     if (blocker == null) throw new NullPointerException();
3641     do {} while (!blocker.isReleasable() && !blocker.block());
3642 dl 1.310 }
3643    
3644 dl 1.355 // AbstractExecutorService.newTaskFor overrides rely on
3645     // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks
3646     // that also implement RunnableFuture.
3647 jsr166 1.1
3648 dl 1.355 @Override
3649 jsr166 1.1 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3650 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3651 jsr166 1.1 }
3652    
3653 dl 1.355 @Override
3654 jsr166 1.1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3655 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3656 jsr166 1.1 }
3657    
3658 dl 1.52 static {
3659 dl 1.405 U = Unsafe.getUnsafe();
3660     Class<ForkJoinPool> klass = ForkJoinPool.class;
3661 jsr166 1.3 try {
3662 dl 1.405 POOLIDS = U.staticFieldOffset(klass.getDeclaredField("poolIds"));
3663 dl 1.404 } catch (NoSuchFieldException e) {
3664 jsr166 1.347 throw new ExceptionInInitializerError(e);
3665 dl 1.52 }
3666 dl 1.405 CTL = U.objectFieldOffset(klass, "ctl");
3667     RUNSTATE = U.objectFieldOffset(klass, "runState");
3668     PARALLELISM = U.objectFieldOffset(klass, "parallelism");
3669     THREADIDS = U.objectFieldOffset(klass, "threadIds");
3670 dl 1.105
3671 dl 1.404 defaultForkJoinWorkerThreadFactory =
3672     new DefaultForkJoinWorkerThreadFactory();
3673 dl 1.405 @SuppressWarnings("removal")
3674     ForkJoinPool p = common = (System.getSecurityManager() == null) ?
3675     new ForkJoinPool((byte)0) :
3676     AccessController.doPrivileged(new PrivilegedAction<>() {
3677     public ForkJoinPool run() {
3678     return new ForkJoinPool((byte)0); }});
3679     Class<?> dep = LockSupport.class; // ensure loaded
3680 jsr166 1.3 }
3681 jsr166 1.1 }