ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.406
Committed: Fri Mar 25 12:29:55 2022 UTC (2 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.405: +3 -0 lines
Log Message:
Compatibility; @since tags

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6 jsr166 1.301
7 jsr166 1.1 package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.329 import java.security.AccessController;
11 jsr166 1.228 import java.security.AccessControlContext;
12 jsr166 1.331 import java.security.Permission;
13 jsr166 1.228 import java.security.Permissions;
14 jsr166 1.329 import java.security.PrivilegedAction;
15 jsr166 1.228 import java.security.ProtectionDomain;
16 jsr166 1.1 import java.util.ArrayList;
17     import java.util.Collection;
18     import java.util.Collections;
19     import java.util.List;
20 dl 1.307 import java.util.function.Predicate;
21 dl 1.367 import java.util.concurrent.atomic.AtomicInteger;
22 dl 1.243 import java.util.concurrent.locks.LockSupport;
23 dl 1.355 import java.util.concurrent.locks.ReentrantLock;
24     import java.util.concurrent.locks.Condition;
25 dl 1.404 import jdk.internal.misc.Unsafe;
26 dl 1.405 //import jdk.internal.vm.SharedThreadContainer; // for loom
27 jsr166 1.1
28     /**
29 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
30 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
31 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
32 jsr166 1.11 * monitoring operations.
33 jsr166 1.1 *
34 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
35     * ExecutorService} mainly by virtue of employing
36     * <em>work-stealing</em>: all threads in the pool attempt to find and
37 dl 1.78 * execute tasks submitted to the pool and/or created by other active
38     * tasks (eventually blocking waiting for work if none exist). This
39     * enables efficient processing when most tasks spawn other subtasks
40     * (as do most {@code ForkJoinTask}s), as well as when many small
41     * tasks are submitted to the pool from external clients. Especially
42     * when setting <em>asyncMode</em> to true in constructors, {@code
43     * ForkJoinPool}s may also be appropriate for use with event-style
44 dl 1.330 * tasks that are never joined. All worker threads are initialized
45     * with {@link Thread#isDaemon} set {@code true}.
46 jsr166 1.1 *
47 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
48 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
49     * is not explicitly submitted to a specified pool. Using the common
50     * pool normally reduces resource usage (its threads are slowly
51     * reclaimed during periods of non-use, and reinstated upon subsequent
52 dl 1.105 * use).
53 dl 1.100 *
54     * <p>For applications that require separate or custom pools, a {@code
55     * ForkJoinPool} may be constructed with a given target parallelism
56 jsr166 1.214 * level; by default, equal to the number of available processors.
57     * The pool attempts to maintain enough active (or available) threads
58     * by dynamically adding, suspending, or resuming internal worker
59 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
60     * However, no such adjustments are guaranteed in the face of blocked
61     * I/O or other unmanaged synchronization. The nested {@link
62 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
63 dl 1.300 * synchronization accommodated. The default policies may be
64     * overridden using a constructor with parameters corresponding to
65     * those documented in class {@link ThreadPoolExecutor}.
66 jsr166 1.1 *
67     * <p>In addition to execution and lifecycle control methods, this
68     * class provides status check methods (for example
69 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
70 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
71 jsr166 1.4 * {@link #toString} returns indications of pool state in a
72 jsr166 1.1 * convenient form for informal monitoring.
73     *
74 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
75 jsr166 1.84 * main task execution methods summarized in the following table.
76     * These are designed to be used primarily by clients not already
77     * engaged in fork/join computations in the current pool. The main
78     * forms of these methods accept instances of {@code ForkJoinTask},
79     * but overloaded forms also allow mixed execution of plain {@code
80     * Runnable}- or {@code Callable}- based activities as well. However,
81     * tasks that are already executing in a pool should normally instead
82     * use the within-computation forms listed in the table unless using
83     * async event-style tasks that are not usually joined, in which case
84     * there is little difference among choice of methods.
85 dl 1.18 *
86 jsr166 1.337 * <table class="plain">
87 jsr166 1.159 * <caption>Summary of task execution methods</caption>
88 dl 1.18 * <tr>
89     * <td></td>
90 jsr166 1.338 * <th scope="col"> Call from non-fork/join clients</th>
91     * <th scope="col"> Call from within fork/join computations</th>
92 dl 1.18 * </tr>
93     * <tr>
94 jsr166 1.338 * <th scope="row" style="text-align:left"> Arrange async execution</th>
95 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#fork}</td>
97     * </tr>
98     * <tr>
99 jsr166 1.338 * <th scope="row" style="text-align:left"> Await and obtain result</th>
100 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#invoke}</td>
102     * </tr>
103     * <tr>
104 jsr166 1.338 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
105 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
106     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
107     * </tr>
108     * </table>
109 dl 1.19 *
110 jsr166 1.333 * <p>The parameters used to construct the common pool may be controlled by
111     * setting the following {@linkplain System#getProperty system properties}:
112 jsr166 1.162 * <ul>
113 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
114 jsr166 1.162 * - the parallelism level, a non-negative integer
115 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
116 jsr166 1.331 * - the class name of a {@link ForkJoinWorkerThreadFactory}.
117     * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
118     * is used to load this class.
119 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
120 jsr166 1.331 * - the class name of a {@link UncaughtExceptionHandler}.
121     * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
122     * is used to load this class.
123 jsr166 1.350 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
124 dl 1.223 * - the maximum number of allowed extra threads to maintain target
125 dl 1.208 * parallelism (default 256).
126 jsr166 1.162 * </ul>
127 jsr166 1.333 * If no thread factory is supplied via a system property, then the
128     * common pool uses a factory that uses the system class loader as the
129 jsr166 1.331 * {@linkplain Thread#getContextClassLoader() thread context class loader}.
130 jsr166 1.333 * In addition, if a {@link SecurityManager} is present, then
131     * the common pool uses a factory supplying threads that have no
132     * {@link Permissions} enabled.
133 jsr166 1.331 *
134 jsr166 1.156 * Upon any error in establishing these settings, default parameters
135 dl 1.160 * are used. It is possible to disable or limit the use of threads in
136     * the common pool by setting the parallelism property to zero, and/or
137 dl 1.193 * using a factory that may return {@code null}. However doing so may
138     * cause unjoined tasks to never be executed.
139 dl 1.105 *
140 dl 1.387 * <p><b>Implementation notes:</b> This implementation restricts the
141 jsr166 1.1 * maximum number of running threads to 32767. Attempts to create
142 jsr166 1.11 * pools with greater than the maximum number result in
143 jsr166 1.8 * {@code IllegalArgumentException}.
144 jsr166 1.1 *
145 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
146 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
147 dl 1.20 * or internal resources have been exhausted.
148 jsr166 1.11 *
149 jsr166 1.1 * @since 1.7
150     * @author Doug Lea
151     */
152     public class ForkJoinPool extends AbstractExecutorService {
153    
154     /*
155 dl 1.14 * Implementation Overview
156     *
157 dl 1.78 * This class and its nested classes provide the main
158     * functionality and control for a set of worker threads:
159 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
160     * Workers take these tasks and typically split them into subtasks
161 dl 1.345 * that may be stolen by other workers. Work-stealing based on
162     * randomized scans generally leads to better throughput than
163     * "work dealing" in which producers assign tasks to idle threads,
164     * in part because threads that have finished other tasks before
165     * the signalled thread wakes up (which can be a long time) can
166     * take the task instead. Preference rules give first priority to
167     * processing tasks from their own queues (LIFO or FIFO, depending
168     * on mode), then to randomized FIFO steals of tasks in other
169     * queues. This framework began as vehicle for supporting
170     * tree-structured parallelism using work-stealing. Over time,
171     * its scalability advantages led to extensions and changes to
172     * better support more diverse usage contexts. Because most
173     * internal methods and nested classes are interrelated, their
174     * main rationale and descriptions are presented here; individual
175     * methods and nested classes contain only brief comments about
176 dl 1.405 * details. There are a fair number of odd code constructions and
177     * design decisions for components that reside at the edge of Java
178     * vs JVM functionality.
179 dl 1.78 *
180 jsr166 1.84 * WorkQueues
181 dl 1.78 * ==========
182     *
183     * Most operations occur within work-stealing queues (in nested
184     * class WorkQueue). These are special forms of Deques that
185     * support only three of the four possible end-operations -- push,
186     * pop, and poll (aka steal), under the further constraints that
187     * push and pop are called only from the owning thread (or, as
188     * extended here, under a lock), while poll may be called from
189     * other threads. (If you are unfamiliar with them, you probably
190     * want to read Herlihy and Shavit's book "The Art of
191     * Multiprocessor programming", chapter 16 describing these in
192     * more detail before proceeding.) The main work-stealing queue
193     * design is roughly similar to those in the papers "Dynamic
194     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
195     * (http://research.sun.com/scalable/pubs/index.html) and
196     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
197     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
198 dl 1.200 * The main differences ultimately stem from GC requirements that
199     * we null out taken slots as soon as we can, to maintain as small
200     * a footprint as possible even in programs generating huge
201     * numbers of tasks. To accomplish this, we shift the CAS
202     * arbitrating pop vs poll (steal) from being on the indices
203 dl 1.405 * ("base" and "top") to the slots themselves. These provide the
204     * primary required memory ordering -- see "Correct and Efficient
205     * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
206     * Nardelli, PPoPP 2013
207     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
208     * analysis of memory ordering requirements in work-stealing
209     * algorithms similar to the one used here. We also use ordered,
210     * moded accesses and/or fences for other control, with modes
211     * reflecting the presence or absence of other contextual sync
212     * provided by atomic and/or volatile accesses. Some methods (or
213     * their primary loops) begin with an acquire fence that amounts
214     * to an acquiring read of "this" to cover all fields (which is
215     * sometimes stronger than necessary, but less brittle). Some
216     * constructions are intentionally racy because they use read
217     * values as hints, not for correctness.
218     *
219     * We also support a user mode in which local task processing is
220     * in FIFO, not LIFO order, simply by using a local version of
221     * poll rather than pop. This can be useful in message-passing
222     * frameworks in which tasks are never joined, although with
223     * increased contention among task producers and consumers. Also,
224     * the same data structure (and class) is used for "submission
225     * queues" (described below) holding externally submitted tasks,
226     * that differ only in that a lock (field "access"; see below) is
227     * required by external callers to push and pop tasks.
228 dl 1.200 *
229 dl 1.243 * Adding tasks then takes the form of a classic array push(task)
230     * in a circular buffer:
231     * q.array[q.top++ % length] = task;
232 dl 1.200 *
233 dl 1.355 * The actual code needs to null-check and size-check the array,
234 jsr166 1.247 * uses masking, not mod, for indexing a power-of-two-sized array,
235 dl 1.355 * enforces memory ordering, supports resizing, and possibly
236 dl 1.405 * signals waiting workers to start scanning (described below),
237     * which requires that even internal usages to strictly order
238     * accesses (using a form of lock release).
239 dl 1.355 *
240     * The pop operation (always performed by owner) is of the form:
241     * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
242     * decrement top and return task;
243 dl 1.405 * If this fails, the queue is empty. This operation is one part
244     * of the nextLocalTask method, that instead does a local-poll
245     * in FIFO mode.
246 dl 1.355 *
247 dl 1.405 * The poll operation is, basically:
248     * if (CAS nonnull task t = q.array[k = q.base % length] to null)
249 dl 1.355 * increment base and return task;
250     *
251 dl 1.405 * However, there are several more cases that must be dealt with.
252     * Some of them are just due to asynchrony; others reflect
253     * contention and stealing policies. Stepping through them
254     * illustrates some of the implementation decisions in this class.
255     *
256     * * Slot k must be read with an acquiring read, which it must
257     * anyway to dereference and run the task if the (acquiring)
258     * CAS succeeds, but uses an explicit acquire fence to support
259     * the following rechecks even if the CAS is not attempted.
260     *
261     * * q.base may change between reading and using its value to
262     * index the slot. To avoid trying to use the wrong t, the
263     * index and slot must be reread (not necessarily immediately)
264     * until consistent, unless this is a local poll by owner, in
265     * which case this form of inconsistency can only appear as t
266     * being null, below.
267     *
268     * * Similarly, q.array may change (due to a resize), unless this
269     * is a local poll by owner. Otherwise, when t is present, this
270     * only needs consideration on CAS failure (since a CAS
271     * confirms the non-resized case.)
272     *
273     * * t may appear null because a previous poll operation has not
274     * yet incremented q.base, so the read is from an already-taken
275     * index. This form of stall reflects the non-lock-freedom of
276     * the poll operation. Stalls can be detected by observing that
277     * q.base doesn't change on repeated reads of null t and when
278     * no other alternatives apply, spin-wait for it to settle. To
279     * reduce producing these kinds of stalls by other stealers, we
280     * encourage timely writes to indices using store fences when
281     * memory ordering is not already constrained by context.
282     *
283     * * The CAS may fail, in which case we may want to retry unless
284     * there is too much contention. One goal is to balance and
285     * spread out the many forms of contention that may be
286     * encountered across polling and other operations to avoid
287     * sustained performance degradations. Across all cases where
288     * alternatives exist, a bounded number of CAS misses or stalls
289     * are tolerated (for slots, ctl, and elsewhere described
290     * below) before taking alternative action. These may move
291     * contention or retries elsewhere, which is still preferable
292     * to single-point bottlenecks.
293     *
294     * * Even though the check "top == base" is quiescently accurate
295     * to determine whether a queue is empty, it is not of much use
296     * when deciding whether to try to poll or repoll after a
297     * failure. Both top and base may move independently, and both
298     * lag updates to the underlying array. To reduce memory
299     * contention, when possible, non-owners avoid reading the
300     * "top" index at all, and instead use array reads, including
301     * one-ahead reads to check whether to repoll, relying on the
302     * fact that an non-empty queue does not have two null slots in
303     * a row, except in cases (resizes and shifts) that can be
304     * detected with a secondary recheck.
305     *
306     * The poll operations in q.poll(), scan(), helpJoin(), and
307     * elsewhere differ with respect to whether other queues are
308     * available to try, and the presence or nature of screening steps
309     * when only some kinds of tasks can be taken. When alternatives
310     * (or failing) is an option, they uniformly give up after
311     * boundeed numbers of stalls and/or CAS failures, which reduces
312     * contention when too many workers are polling too few tasks.
313     * Overall, in the aggregate, we ensure probabilistic
314     * non-blockingness of work-stealing at least until checking
315     * quiescence (which is intrinsically blocking): If an attempted
316     * steal fails in these ways, a scanning thief chooses a different
317     * target to try next. In contexts where alternatives aren't
318     * available, and when progress conditions can be isolated to
319     * values of a single variable, simple spinloops (using
320     * Thread.onSpinWait) are used to reduce memory traffic.
321 dl 1.78 *
322     * WorkQueues are also used in a similar way for tasks submitted
323     * to the pool. We cannot mix these tasks in the same queues used
324 dl 1.200 * by workers. Instead, we randomly associate submission queues
325 dl 1.83 * with submitting threads, using a form of hashing. The
326 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
327     * choosing existing queues, and may be randomly repositioned upon
328     * contention with other submitters. In essence, submitters act
329     * like workers except that they are restricted to executing local
330 dl 1.355 * tasks that they submitted (or when known, subtasks thereof).
331     * Insertion of tasks in shared mode requires a lock. We use only
332 dl 1.405 * a simple spinlock because submitters encountering a busy queue
333     * move to a different position to use or create other queues.
334     * They (spin) block only when registering new queues, and less
335     * often in tryRemove and helpComplete. The lock needed for
336     * external queues is generalized (as field "access") for
337     * operations on owned queues that require a fully-fenced write
338     * (including push, parking status, and termination) in order to
339     * deal with Dekker-like signalling constructs described below.
340 dl 1.78 *
341 jsr166 1.84 * Management
342 dl 1.78 * ==========
343 dl 1.52 *
344     * The main throughput advantages of work-stealing stem from
345     * decentralized control -- workers mostly take tasks from
346 dl 1.200 * themselves or each other, at rates that can exceed a billion
347 dl 1.355 * per second. Most non-atomic control is performed by some form
348     * of scanning across or within queues. The pool itself creates,
349     * activates (enables scanning for and running tasks),
350     * deactivates, blocks, and terminates threads, all with minimal
351     * central information. There are only a few properties that we
352     * can globally track or maintain, so we pack them into a small
353     * number of variables, often maintaining atomicity without
354     * blocking or locking. Nearly all essentially atomic control
355 dl 1.405 * state is held in a few variables that are by far most often
356     * read (not written) as status and consistency checks. We pack as
357     * much information into them as we can.
358 dl 1.78 *
359 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
360 dl 1.300 * atomically decide to add, enqueue (on an event queue), and
361 dl 1.345 * dequeue and release workers. To enable this packing, we
362     * restrict maximum parallelism to (1<<15)-1 (which is far in
363     * excess of normal operating range) to allow ids, counts, and
364     * their negations (used for thresholding) to fit into 16bit
365 dl 1.405 * subfields. Field "parallelism" holds the target parallelism
366     * (normally corresponding to pool size). It is needed (nearly)
367     * only in methods updating ctl, so is packed nearby. As of the
368     * current release, users can dynamically reset target
369     * parallelism, which is read once per update, so only slowly has
370     * an effect in creating threads or letting them time out and
371     * terminate when idle.
372     *
373     * Field "mode" mainly holds lifetime status, atomically and
374     * monotonically setting SHUTDOWN, STOP, and finally TERMINATED
375     * bits. It is updated only via bitwise atomics (getAndBitwiseOr).
376 dl 1.258 *
377 dl 1.355 * Array "queues" holds references to WorkQueues. It is updated
378     * (only during worker creation and termination) under the
379 dl 1.405 * registrationLock, but is otherwise concurrently readable (often
380     * prefaced by a volatile read of mode to check termination, that
381     * is required anyway, and serves as an acquire fence). To
382     * simplify index-based operations, the array size is always a
383     * power of two, and all readers must tolerate null slots. Worker
384     * queues are at odd indices. Worker ids masked with SMASK match
385     * their index. Shared (submission) queues are at even
386     * indices. Grouping them together in this way simplifies and
387     * speeds up task scanning.
388 dl 1.86 *
389     * All worker thread creation is on-demand, triggered by task
390     * submissions, replacement of terminated workers, and/or
391 dl 1.78 * compensation for blocked workers. However, all other support
392     * code is set up to work with other policies. To ensure that we
393 dl 1.355 * do not hold on to worker or task references that would prevent
394 dl 1.405 * GC, all accesses to workQueues in waiting, signalling, and
395     * control methods are via indices into the queues array (which is
396     * one source of some of the messy code constructions here). In
397     * essence, the queues array serves as a weak reference
398     * mechanism. In particular, the stack top subfield of ctl stores
399     * indices, not references.
400 dl 1.200 *
401     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
402     * cannot let workers spin indefinitely scanning for tasks when
403     * none can be found immediately, and we cannot start/resume
404     * workers unless there appear to be tasks available. On the
405     * other hand, we must quickly prod them into action when new
406 dl 1.355 * tasks are submitted or generated. These latencies are mainly a
407     * function of JVM park/unpark (and underlying OS) performance,
408     * which can be slow and variable. In many usages, ramp-up time
409 dl 1.300 * is the main limiting factor in overall performance, which is
410     * compounded at program start-up by JIT compilation and
411 dl 1.355 * allocation. On the other hand, throughput degrades when too
412     * many threads poll for too few tasks.
413 dl 1.300 *
414 dl 1.355 * The "ctl" field atomically maintains total and "released"
415     * worker counts, plus the head of the available worker queue
416     * (actually stack, represented by the lower 32bit subfield of
417     * ctl). Released workers are those known to be scanning for
418 dl 1.300 * and/or running tasks. Unreleased ("available") workers are
419     * recorded in the ctl stack. These workers are made available for
420 dl 1.355 * signalling by enqueuing in ctl (see method awaitWork). The
421 dl 1.300 * "queue" is a form of Treiber stack. This is ideal for
422     * activating threads in most-recently used order, and improves
423 dl 1.200 * performance and locality, outweighing the disadvantages of
424     * being prone to contention and inability to release a worker
425 dl 1.355 * unless it is topmost on stack. The top stack state holds the
426 dl 1.300 * value of the "phase" field of the worker: its index and status,
427     * plus a version counter that, in addition to the count subfields
428     * (also serving as version stamps) provide protection against
429     * Treiber stack ABA effects.
430 dl 1.200 *
431 dl 1.300 * Creating workers. To create a worker, we pre-increment counts
432     * (serving as a reservation), and attempt to construct a
433 dl 1.355 * ForkJoinWorkerThread via its factory. On starting, the new
434     * thread first invokes registerWorker, where it constructs a
435     * WorkQueue and is assigned an index in the queues array
436     * (expanding the array if necessary). Upon any exception across
437     * these steps, or null return from factory, deregisterWorker
438     * adjusts counts and records accordingly. If a null return, the
439     * pool continues running with fewer than the target number
440     * workers. If exceptional, the exception is propagated, generally
441     * to some external caller.
442 dl 1.243 *
443 dl 1.300 * WorkQueue field "phase" is used by both workers and the pool to
444 dl 1.405 * manage and track whether a worker is unsignalled (possibly
445     * blocked waiting for a signal), convienently using the sign bit
446     * to check. When a worker is enqueued its phase field is set
447     * negative. Note that phase field updates lag queue CAS releases;
448     * seeing a negative phase does not guarantee that the worker is
449     * available (and so is never checked in this way). When queued,
450     * the lower 16 bits of its phase must hold its pool index. So we
451     * place the index there upon initialization and never modify
452     * these bits.
453 dl 1.243 *
454     * The ctl field also serves as the basis for memory
455     * synchronization surrounding activation. This uses a more
456     * efficient version of a Dekker-like rule that task producers and
457     * consumers sync with each other by both writing/CASing ctl (even
458 dl 1.355 * if to its current value). However, rather than CASing ctl to
459     * its current value in the common case where no action is
460     * required, we reduce write contention by ensuring that
461 dl 1.405 * signalWork invocations are prefaced with a fully fenced memory
462 dl 1.355 * access (which is usually needed anyway).
463     *
464     * Signalling. Signals (in signalWork) cause new or reactivated
465     * workers to scan for tasks. Method signalWork and its callers
466     * try to approximate the unattainable goal of having the right
467     * number of workers activated for the tasks at hand, but must err
468     * on the side of too many workers vs too few to avoid stalls. If
469     * computations are purely tree structured, it suffices for every
470     * worker to activate another when it pushes a task into an empty
471     * queue, resulting in O(log(#threads)) steps to full activation.
472 dl 1.405 * (To reduce resource usages in some cases, at the expense of
473     * slower startup in others, activation of an idle thread is
474     * preferred over creating a new one, here and elsewhere.) If
475     * instead, tasks come in serially from only a single producer,
476     * each worker taking its first (since the last activation) task
477 dl 1.355 * from a queue should signal another if there are more tasks in
478     * that queue. This is equivalent to, but generally faster than,
479     * arranging the stealer take two tasks, re-pushing one on its own
480     * queue, and signalling (because its queue is empty), also
481     * resulting in logarithmic full activation time. Because we don't
482     * know about usage patterns (or most commonly, mixtures), we use
483 dl 1.405 * both approaches. Together these are minimally necessary for
484     * maintaining liveness. However, they do not account for the fact
485     * that when tasks are short-lived, signals are unnecessary
486     * because workers will already be scanning for new tasks without
487     * the need of new signals. We track these cases (variable
488     * "prevSrc" in scan() and related methods) to avoid some
489     * unnecessary signals and scans. However, signal contention and
490     * overhead effects may still occur during ramp-up, ramp-down, and
491     * small computations involving only a few workers.
492 dl 1.243 *
493 dl 1.355 * Scanning. Method scan performs top-level scanning for (and
494 dl 1.405 * execution of) tasks by polling a pseodo-random permutation of
495     * the array (by starting at a random index, and using a constant
496     * cyclically exhaustive stride.) It uses the same basic polling
497     * method as WorkQueue.poll(), but restarts with a different
498     * permutation on each invocation. (Non-top-level scans; for
499     * example in helpJoin, use simpler and faster linear probes
500     * because they do not systematically contend with top-level
501     * scans.) The pseudorandom generator need not have high-quality
502     * statistical properties in the long term. We use Marsaglia
503     * XorShifts, seeded with the Weyl sequence from ThreadLocalRandom
504     * probes, which are cheap and suffice. Scans do not otherwise
505     * explicitly take into account core affinities, loads, cache
506     * localities, etc, However, they do exploit temporal locality
507     * (which usually approximates these) by preferring to re-poll
508     * from the same queue (using method tryPoll()) after a successful
509     * poll before trying others (see method topLevelExec), which also
510     * reduces bookkeeping and scanning overhead. This also reduces
511     * fairness, which is partially counteracted by giving up on
512     * contention.
513     *
514     * Deactivation. When method scan indicates that no tasks are
515     * found by a worker, it deactivates (see awaitWork). Note that
516     * not finding tasks doesn't mean that there won't soon be
517     * some. Further, a scan may give up under contention, returning
518     * even without knowing whether any tasks are still present, which
519     * is OK, given the above signalling rules that will eventually
520     * maintain progress. Blocking and unblocking via park/unpark can
521     * cause serious slowdowns when tasks are rapidly but irregularly
522     * generated (which is often due to garbage collectors and other
523     * activities). One way to ameliorate is for workers to rescan
524     * multiple times, even when there are unlikely to be tasks. But
525     * this causes enough memory and CAS contention to prefer using
526     * quieter spinwaits in awaitWork; currently set to small values
527     * that only cover near-miss scenarios for deactivate vs activate
528     * races. Because idle workers are often not yet blocked (via
529     * LockSupport.park), we use the WorkQueue access field to
530     * advertise that a waiter actually needs unparking upon signal.
531     *
532     * When idle workers are not continually woken up, the count
533     * fields in ctl allow efficient and accurate discovery of
534     * quiescent states (i.e., when all workers are idle) after
535     * deactivation. However, this voting mechanism alone does not
536     * guarantee that a pool can become dormant (quiesced or
537     * terminated), because external racing producers do not vote, and
538     * can asynchronously submit new tasks. To deal with this, the
539     * final unparked thread (in awaitWork) scans external queues to
540     * check for tasks that could have been added during a race window
541     * that would not be accompanied by a signal, in which case
542     * re-activating itself (or any other worker) to recheck. The same
543     * sets of checks are used in tryTerminate, to correctly trigger
544     * delayed termination (shutDown, followed by quiescence) in the
545     * presence of racing submissions. In all cases, the notion of the
546     * "final" unparked thread is an approximation, because new
547     * workers could be in the process of being constructed, which
548     * occasionally adds some extra unnecessary processing.
549     *
550     * Shutdown and Termination. A call to shutdownNow invokes
551     * tryTerminate to atomically set a mode bit. The calling thread,
552     * as well as every other worker thereafter terminating, helps
553     * terminate others by cancelling their unprocessed tasks, and
554     * interrupting other workers. Calls to non-abrupt shutdown()
555     * preface this by checking isQuiescent before triggering the
556     * "STOP" phase of termination. During termination, workers are
557     * stopped using all three of (often in parallel): releasing via
558     * ctl (method reactivate), interrupts, and cancelling tasks that
559     * will cause workers to not find work and exit. To support this,
560     * worker references not removed from the queues array during
561     * termination. It is possible for late thread creations to still
562     * be in progress after a quiescent termination reports terminated
563     * status, but they will also immediately terminate. To conform to
564     * ExecutorService invoke, invokeAll, and invokeAny specs, we must
565     * track pool status while waiting in ForkJoinTask.awaitDone, and
566     * interrupt interruptible callers on termination.
567 dl 1.52 *
568     * Trimming workers. To release resources after periods of lack of
569     * use, a worker starting to wait when the pool is quiescent will
570 dl 1.355 * time out and terminate if the pool has remained quiescent for
571     * period given by field keepAlive.
572 dl 1.52 *
573 jsr166 1.84 * Joining Tasks
574     * =============
575 dl 1.78 *
576 dl 1.355 * Normally, the first option when joining a task that is not done
577 dl 1.405 * is to try to take it from local queue and run it. Otherwise,
578 dl 1.355 * any of several actions may be taken when one worker is waiting
579 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
580 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
581 dl 1.300 * always just let them block (as in Thread.join). We also cannot
582     * just reassign the joiner's run-time stack with another and
583     * replace it later, which would be a form of "continuation", that
584     * even if possible is not necessarily a good idea since we may
585     * need both an unblocked task and its continuation to progress.
586     * Instead we combine two tactics:
587 dl 1.19 *
588     * Helping: Arranging for the joiner to execute some task that it
589 dl 1.355 * could be running if the steal had not occurred.
590 dl 1.19 *
591     * Compensating: Unless there are already enough live threads,
592 dl 1.78 * method tryCompensate() may create or re-activate a spare
593     * thread to compensate for blocked joiners until they unblock.
594     *
595 dl 1.355 * A third form (implemented via tryRemove) amounts to helping a
596     * hypothetical compensator: If we can readily tell that a
597     * possible action of a compensator is to steal and execute the
598 dl 1.105 * task being joined, the joining thread can do so directly,
599 dl 1.355 * without the need for a compensation thread; although with a
600 dl 1.405 * possibility of reduced parallelism because of a transient gap
601     * in the queue array that stalls stealers.
602 dl 1.355 *
603     * Other intermediate forms available for specific task types (for
604     * example helpAsyncBlocker) often avoid or postpone the need for
605     * blocking or compensation.
606 dl 1.52 *
607     * The ManagedBlocker extension API can't use helping so relies
608     * only on compensation in method awaitBlocker.
609 dl 1.19 *
610 dl 1.355 * The algorithm in helpJoin entails a form of "linear helping".
611 dl 1.405 * Each worker records (in field "source") a reference to the
612     * queue from which it last stole a task. The scan in method
613     * helpJoin uses these markers to try to find a worker to help
614     * (i.e., steal back a task from and execute it) that could hasten
615     * completion of the actively joined task. Thus, the joiner
616     * executes a task that would be on its own local deque if the
617     * to-be-joined task had not been stolen. This is a conservative
618     * variant of the approach described in Wagner & Calder
619     * "Leapfrogging: a portable technique for implementing efficient
620     * futures" SIGPLAN Notices, 1993
621     * (http://portal.acm.org/citation.cfm?id=155354). It differs
622     * mainly in that we only record queues, not full dependency
623 dl 1.355 * links. This requires a linear scan of the queues array to
624 dl 1.300 * locate stealers, but isolates cost to when it is needed, rather
625 dl 1.405 * than adding to per-task overhead. For CountedCompleters, the
626     * analogous method helpComplete doesn't need stealer-tracking,
627     * but requires a similar check of completion chains.
628     *
629     * In either case, searches can fail to locate stealers when
630     * stalls delay recording sources. We avoid some of these cases by
631     * using snapshotted values of ctl as a check that the numbers of
632     * workers are not changing. But even when accurately identified,
633 dl 1.355 * stealers might not ever produce a task that the joiner can in
634     * turn help with. So, compensation is tried upon failure to find
635     * tasks to run.
636     *
637 dl 1.300 * Compensation does not by default aim to keep exactly the target
638 dl 1.200 * parallelism number of unblocked threads running at any given
639     * time. Some previous versions of this class employed immediate
640     * compensations for any blocked join. However, in practice, the
641     * vast majority of blockages are transient byproducts of GC and
642 dl 1.345 * other JVM or OS activities that are made worse by replacement
643     * when they cause longer-term oversubscription. Rather than
644     * impose arbitrary policies, we allow users to override the
645     * default of only adding threads upon apparent starvation. The
646     * compensation mechanism may also be bounded. Bounds for the
647 dl 1.404 * commonPool better enable JVMs to cope with programming errors
648     * and abuse before running out of resources to do so.
649 jsr166 1.301 *
650 dl 1.105 * Common Pool
651     * ===========
652     *
653 jsr166 1.175 * The static common pool always exists after static
654 dl 1.105 * initialization. Since it (or any other created pool) need
655     * never be used, we minimize initial construction overhead and
656 dl 1.405 * footprint to the setup of about a dozen fields, although with
657     * some System property parsing and with security processing that
658     * takes far longer than the actual construction when
659     * SecurityManagers are used or properties are set. The common
660     * pool is distinguished internally by having both a null
661     * workerNamePrefix and ISCOMMON config bit set, along with
662     * PRESET_SIZE set if parallelism was configured by system
663     * property.
664     *
665     * When external threads use ForkJoinTask.fork for the common
666     * pool, they can perform subtask processing (see helpComplete and
667     * related methods) upon joins. This caller-helps policy makes it
668 dl 1.200 * sensible to set common pool parallelism level to one (or more)
669     * less than the total number of available cores, or even zero for
670 dl 1.405 * pure caller-runs. For the sake of ExecutorService specs, we can
671     * only do this for tasks entered via fork, not submit. We track
672     * this using a task status bit (markPoolSubmission). In all
673     * other cases, external threads waiting for joins first check the
674     * common pool for their task, which fails quickly if the caller
675     * did not fork to common pool.
676 dl 1.105 *
677 dl 1.399 * Guarantees for common pool parallelism zero are limited to
678     * tasks that are joined by their callers in a tree-structured
679     * fashion or use CountedCompleters (as is true for jdk
680     * parallelStreams). Support infiltrates several methods,
681 dl 1.405 * including those that retry helping steps or spin until we are
682     * sure that none apply if there are no workers.
683 dl 1.399 *
684 dl 1.197 * As a more appropriate default in managed environments, unless
685     * overridden by system properties, we use workers of subclass
686     * InnocuousForkJoinWorkerThread when there is a SecurityManager
687     * present. These workers have no permissions set, do not belong
688     * to any user-defined ThreadGroup, and erase all ThreadLocals
689 dl 1.355 * after executing any top-level task. The associated mechanics
690 dl 1.364 * may be JVM-dependent and must access particular Thread class
691     * fields to achieve this effect.
692 jsr166 1.198 *
693 dl 1.372 * Interrupt handling
694     * ==================
695     *
696     * The framework is designed to manage task cancellation
697     * (ForkJoinTask.cancel) independently from the interrupt status
698     * of threads running tasks. (See the public ForkJoinTask
699     * documentation for rationale.) Interrupts are issued only in
700     * tryTerminate, when workers should be terminating and tasks
701     * should be cancelled anyway. Interrupts are cleared only when
702     * necessary to ensure that calls to LockSupport.park do not loop
703     * indefinitely (park returns immediately if the current thread is
704 dl 1.405 * interrupted). For cases in which task bodies are specified or
705     * desired to interrupt upon cancellation, ForkJoinTask.cancel can
706     * be overridden to do so (as is done for invoke{Any,All}).
707 dl 1.372 *
708 dl 1.345 * Memory placement
709     * ================
710     *
711 dl 1.405 * Performance is very sensitive to placement of instances of
712     * ForkJoinPool and WorkQueues and their queue arrays, as well the
713     * placement of their fields. Caches misses and contention due to
714     * false-sharing have been observed to slow down some programs by
715     * more than a factor of four. There is no perfect solution, in
716     * part because isolating more fields also generates more cache
717     * misses in more common cases (because some fields snd slots are
718     * usually read at the same time), and the main means of placing
719     * memory, the @Contended annotation provides only rough control
720     * (for good reason). We isolate the ForkJoinPool.ctl field as
721     * well the set of WorkQueue fields that otherwise cause the most
722     * false-sharing misses with respect to other fields. Also,
723     * ForkJoinPool fields are ordered such that fields less prone to
724     * contention effects are first, offsetting those that otherwise
725     * would be, while also reducing total footprint vs using
726     * multiple @Contended regions, which tends to slow down
727     * less-contended applications. These arrangements mainly reduce
728     * cache traffic by scanners, which speeds up finding tasks to
729     * run. Initial sizing and resizing of WorkQueue arrays is an
730     * even more delicate tradeoff because the best strategy may vary
731     * across garbage collectors. Small arrays are better for locality
732     * and reduce GC scan time, but large arrays reduce both direct
733     * false-sharing and indirect cases due to GC bookkeeping
734     * (cardmarks etc), and reduce the number of resizes, which are
735     * not especially fast because they require atomic transfers, and
736     * may cause other scanning workers to stall or give up.
737     * Currently, arrays are initialized to be fairly small but early
738     * resizes rapidly increase size by more than a factor of two
739     * until very large. (Maintenance note: any changes in fields,
740     * queues, or their uses must be accompanied by re-evaluation of
741     * these placement and sizing decisions.)
742 dl 1.345 *
743 dl 1.105 * Style notes
744     * ===========
745     *
746 dl 1.355 * Memory ordering relies mainly on atomic operations (CAS,
747 dl 1.405 * getAndSet, getAndAdd) along with moded accesses. These use
748 dl 1.404 * jdk-internal Unsafe for atomics and special memory modes,
749     * rather than VarHandles, to avoid initialization dependencies in
750     * other jdk components that require early parallelism. This can
751     * be awkward and ugly, but also reflects the need to control
752 jsr166 1.315 * outcomes across the unusual cases that arise in very racy code
753 dl 1.319 * with very few invariants. All fields are read into locals
754 dl 1.355 * before use, and null-checked if they are references, even if
755 dl 1.405 * they can never be null under current usages. Usually,
756     * computations (held in local variables) are defined as soon as
757     * logically enabled, sometimes to convince compilers that they
758     * may be performed despite memory ordering constraints. Array
759     * accesses using masked indices include checks (that are always
760     * true) that the array length is non-zero to avoid compilers
761     * inserting more expensive traps. This is usually done in a
762     * "C"-like style of listing declarations at the heads of methods
763     * or blocks, and using inline assignments on first encounter.
764     * Nearly all explicit checks lead to bypass/return, not exception
765     * throws, because they may legitimately arise during shutdown. A
766     * few unusual loop constructions encourage (with varying
767     * effectiveness) JVMs about where (not) to place safepoints.
768 dl 1.200 *
769 dl 1.105 * There is a lot of representation-level coupling among classes
770     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
771     * fields of WorkQueue maintain data structures managed by
772     * ForkJoinPool, so are directly accessed. There is little point
773     * trying to reduce this, since any associated future changes in
774     * representations will need to be accompanied by algorithmic
775     * changes anyway. Several methods intrinsically sprawl because
776 dl 1.200 * they must accumulate sets of consistent reads of fields held in
777 dl 1.345 * local variables. Some others are artificially broken up to
778     * reduce producer/consumer imbalances due to dynamic compilation.
779     * There are also other coding oddities (including several
780     * unnecessary-looking hoisted null checks) that help some methods
781     * perform reasonably even when interpreted (not compiled).
782 dl 1.52 *
783 dl 1.208 * The order of declarations in this file is (with a few exceptions):
784 dl 1.405 * (1) Static constants
785     * (2) Static utility functions
786     * (3) Nested (static) classes
787 dl 1.86 * (4) Fields, along with constants used when unpacking some of them
788     * (5) Internal control methods
789     * (6) Callbacks and other support for ForkJoinTask methods
790     * (7) Exported methods
791     * (8) Static block initializing statics in minimally dependent order
792 dl 1.355 *
793     * Revision notes
794     * ==============
795     *
796 dl 1.405 * The main sources of differences from previous version are:
797     *
798     * * Use of Unsafe vs VarHandle, including re-instatement of some
799     * constructions from pre-VarHandle versions.
800     * * Reduced memory and signal contention, mainly by distinguishing
801     * failure cases.
802     * * Improved initialization, in part by preparing for possible
803     * removal of SecurityManager
804     * * Enable resizing (includes refactoring quiescence/termination)
805     * * Unification of most internal vs external operations; some made
806     * possible via use of WorkQueue.access, and POOLSUBMIT status in tasks.
807     */
808    
809     // static configuration constants
810    
811     /**
812     * Default idle timeout value (in milliseconds) for idle threads
813     * to park waiting for new work before terminating.
814     */
815     static final long DEFAULT_KEEPALIVE = 60_000L;
816    
817     /**
818     * Undershoot tolerance for idle timeouts
819     */
820     static final long TIMEOUT_SLOP = 20L;
821    
822     /**
823     * The default value for common pool maxSpares. Overridable using
824     * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
825     * system property. The default value is far in excess of normal
826     * requirements, but also far short of MAX_CAP and typical OS
827     * thread limits, so allows JVMs to catch misuse/abuse before
828     * running out of resources needed to do so.
829     */
830     static final int DEFAULT_COMMON_MAX_SPARES = 256;
831    
832     /**
833     * Initial capacity of work-stealing queue array. Must be a power
834     * of two, at least 2. See above.
835     */
836     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
837    
838     // Bounds
839     static final int SWIDTH = 16; // width of short
840     static final int SMASK = 0xffff; // short bits == max index
841     static final int MAX_CAP = 0x7fff; // max #workers - 1
842    
843     // pool.runState and workQueue.access bits and sentinels
844     static final int STOP = 1 << 31; // must be negative
845     static final int SHUTDOWN = 1;
846     static final int TERMINATED = 2;
847     static final int PARKED = -1; // access value when parked
848    
849     // {pool, workQueue}.config bits
850     static final int FIFO = 1 << 16; // fifo queue or access mode
851     static final int SRC = 1 << 17; // set when stealable
852     static final int INNOCUOUS = 1 << 18; // set for Innocuous workers
853     static final int TRIMMED = 1 << 19; // timed out while idle
854     static final int ISCOMMON = 1 << 20; // set for common pool
855     static final int PRESET_SIZE = 1 << 21; // size was set by property
856    
857     static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
858    
859     /*
860     * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
861     * RC: Number of released (unqueued) workers
862     * TC: Number of total workers
863     * SS: version count and status of top waiting thread
864     * ID: poolIndex of top of Treiber stack of waiters
865 dl 1.355 *
866 dl 1.405 * When convenient, we can extract the lower 32 stack top bits
867     * (including version bits) as sp=(int)ctl. When sp is non-zero,
868     * there are waiting workers. Count fields may be transiently
869     * negative during termination because of out-of-order updates.
870     * To deal with this, we use casts in and out of "short" and/or
871     * signed shifts to maintain signedness. Because it occupies
872     * uppermost bits, we can add one release count using getAndAdd of
873     * RC_UNIT, rather than CAS, when returning from a blocked join.
874     * Other updates of multiple subfields require CAS.
875 dl 1.86 */
876    
877 dl 1.405 // Lower and upper word masks
878     static final long SP_MASK = 0xffffffffL;
879     static final long UC_MASK = ~SP_MASK;
880     // Release counts
881     static final int RC_SHIFT = 48;
882     static final long RC_UNIT = 0x0001L << RC_SHIFT;
883     static final long RC_MASK = 0xffffL << RC_SHIFT;
884     // Total counts
885     static final int TC_SHIFT = 32;
886     static final long TC_UNIT = 0x0001L << TC_SHIFT;
887     static final long TC_MASK = 0xffffL << TC_SHIFT;
888     // sp bits
889     static final int SS_SEQ = 1 << 16; // version count
890     static final int INACTIVE = 1 << 31; // phase bit when idle
891    
892 dl 1.86 // Static utilities
893    
894     /**
895     * If there is a security manager, makes sure caller has
896     * permission to modify threads.
897 jsr166 1.1 */
898 dl 1.405 @SuppressWarnings("removal")
899 dl 1.86 private static void checkPermission() {
900 dl 1.405 SecurityManager security; RuntimePermission perm;
901     if ((security = System.getSecurityManager()) != null) {
902     if ((perm = modifyThreadPermission) == null)
903     modifyThreadPermission = perm = // races OK
904     new RuntimePermission("modifyThread");
905     security.checkPermission(perm);
906     }
907 dl 1.355 }
908    
909 dl 1.86 // Nested classes
910 jsr166 1.1
911     /**
912 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
913     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
914     * for {@code ForkJoinWorkerThread} subclasses that extend base
915     * functionality or initialize threads with different contexts.
916 jsr166 1.1 */
917     public static interface ForkJoinWorkerThreadFactory {
918     /**
919     * Returns a new worker thread operating in the given pool.
920 dl 1.300 * Returning null or throwing an exception may result in tasks
921     * never being executed. If this method throws an exception,
922     * it is relayed to the caller of the method (for example
923     * {@code execute}) causing attempted thread creation. If this
924     * method returns null or throws an exception, it is not
925     * retried until the next attempted creation (for example
926     * another call to {@code execute}).
927 jsr166 1.1 *
928     * @param pool the pool this thread works in
929 jsr166 1.296 * @return the new worker thread, or {@code null} if the request
930 jsr166 1.331 * to create a thread is rejected
931 jsr166 1.11 * @throws NullPointerException if the pool is null
932 jsr166 1.1 */
933     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
934     }
935    
936     /**
937     * Default ForkJoinWorkerThreadFactory implementation; creates a
938 jsr166 1.331 * new ForkJoinWorkerThread using the system class loader as the
939     * thread context class loader.
940 jsr166 1.1 */
941 dl 1.355 static final class DefaultForkJoinWorkerThreadFactory
942     implements ForkJoinWorkerThreadFactory {
943     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
944 dl 1.405 boolean isCommon = (pool.workerNamePrefix == null);
945     @SuppressWarnings("removal")
946     SecurityManager sm = System.getSecurityManager();
947     if (sm == null)
948     return new ForkJoinWorkerThread(null, pool, true);
949     else if (isCommon)
950     return newCommonWithACC(pool);
951     else
952     return newRegularWithACC(pool);
953     }
954    
955     /*
956     * Create and use static AccessControlContexts only if there
957     * is a SecurityManager. (These can be removed if/when
958     * SecurityManagers are removed from platform.) The ACCs are
959     * immutable and equivalent even when racily initialized, so
960     * they don't require locking, although with the chance of
961     * needlessly duplicate construction.
962     */
963     @SuppressWarnings("removal")
964     static volatile AccessControlContext regularACC, commonACC;
965    
966     @SuppressWarnings("removal")
967     static ForkJoinWorkerThread newRegularWithACC(ForkJoinPool pool) {
968     AccessControlContext acc = regularACC;
969     if (acc == null) {
970     Permissions ps = new Permissions();
971     ps.add(new RuntimePermission("getClassLoader"));
972     ps.add(new RuntimePermission("setContextClassLoader"));
973     regularACC = acc =
974     new AccessControlContext(new ProtectionDomain[] {
975     new ProtectionDomain(null, ps) });
976     }
977 dl 1.355 return AccessController.doPrivileged(
978     new PrivilegedAction<>() {
979     public ForkJoinWorkerThread run() {
980 dl 1.405 return new ForkJoinWorkerThread(null, pool, true);
981     }}, acc);
982 dl 1.355 }
983    
984 dl 1.404 @SuppressWarnings("removal")
985 dl 1.405 static ForkJoinWorkerThread newCommonWithACC(ForkJoinPool pool) {
986     AccessControlContext acc = commonACC;
987     if (acc == null) {
988     Permissions ps = new Permissions();
989     ps.add(new RuntimePermission("getClassLoader"));
990     ps.add(new RuntimePermission("setContextClassLoader"));
991     ps.add(new RuntimePermission("modifyThread"));
992     ps.add(new RuntimePermission("enableContextClassLoaderOverride"));
993     ps.add(new RuntimePermission("modifyThreadGroup"));
994     commonACC = acc =
995     new AccessControlContext(new ProtectionDomain[] {
996     new ProtectionDomain(null, ps) });
997     }
998     return AccessController.doPrivileged(
999     new PrivilegedAction<>() {
1000     public ForkJoinWorkerThread run() {
1001     return new ForkJoinWorkerThread.
1002     InnocuousForkJoinWorkerThread(pool);
1003     }}, acc);
1004 jsr166 1.1 }
1005     }
1006    
1007 dl 1.253 /**
1008 dl 1.78 * Queues supporting work-stealing as well as external task
1009 jsr166 1.202 * submission. See above for descriptions and algorithms.
1010 dl 1.78 */
1011     static final class WorkQueue {
1012 dl 1.355 int stackPred; // pool stack (ctl) predecessor link
1013     int config; // index, mode, ORed with SRC after init
1014 dl 1.345 int base; // index of next slot for poll
1015     ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1016 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
1017 dl 1.112
1018 dl 1.405 // fields otherwise causing more unnecessary false-sharing cache misses
1019 dl 1.355 @jdk.internal.vm.annotation.Contended("w")
1020     int top; // index of next slot for push
1021     @jdk.internal.vm.annotation.Contended("w")
1022 dl 1.405 volatile int access; // values 0, 1 (locked), PARKED, STOP
1023     @jdk.internal.vm.annotation.Contended("w")
1024     volatile int phase; // versioned, negative if inactive
1025     @jdk.internal.vm.annotation.Contended("w")
1026     volatile int source; // source queue id in topLevelExec
1027 dl 1.355 @jdk.internal.vm.annotation.Contended("w")
1028     int nsteals; // number of steals from other queues
1029    
1030     // Support for atomic operations
1031 dl 1.404 private static final Unsafe U;
1032 dl 1.405 private static final long ACCESS;
1033     private static final long PHASE;
1034 dl 1.404 private static final long ABASE;
1035 dl 1.405 private static final int ASHIFT;
1036    
1037     static ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a, int i) {
1038 dl 1.404 return (ForkJoinTask<?>)
1039 dl 1.405 U.getAndSetReference(a, ((long)i << ASHIFT) + ABASE, null);
1040 dl 1.355 }
1041 dl 1.405 static boolean casSlotToNull(ForkJoinTask<?>[] a, int i,
1042     ForkJoinTask<?> c) {
1043     return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE,
1044     c, null);
1045 dl 1.355 }
1046 dl 1.405 final void forcePhaseActive() { // clear sign bit
1047     U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
1048 dl 1.355 }
1049 dl 1.405 final int getAndSetAccess(int v) {
1050     return U.getAndSetInt(this, ACCESS, v);
1051 dl 1.78 }
1052    
1053     /**
1054 dl 1.405 * Constructor. For owned queues, most fields are initialized
1055     * upon thread start in pool.registerWorker.
1056 dl 1.345 */
1057 dl 1.405 WorkQueue(ForkJoinWorkerThread owner, int config) {
1058 dl 1.355 this.owner = owner;
1059     this.config = config;
1060 dl 1.345 }
1061    
1062     /**
1063 jsr166 1.220 * Returns an exportable index (used by ForkJoinWorkerThread).
1064 dl 1.200 */
1065     final int getPoolIndex() {
1066 dl 1.355 return (config & 0xffff) >>> 1; // ignore odd/even tag bit
1067 dl 1.200 }
1068    
1069     /**
1070 dl 1.115 * Returns the approximate number of tasks in the queue.
1071     */
1072     final int queueSize() {
1073 dl 1.405 int unused = access, n = top - base; // for ordering effect
1074 dl 1.355 return (n < 0) ? 0 : n; // ignore transient negative
1075 dl 1.115 }
1076    
1077 jsr166 1.180 /**
1078 dl 1.405 * Pushes a task. Called only by owner or if already locked
1079 dl 1.78 *
1080     * @param task the task. Caller must ensure non-null.
1081 dl 1.405 * @param pool the pool. Must be non-null unless terminating.
1082     * @param signalIfEmpty true if signal when pushing to empty queue
1083 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
1084 dl 1.78 */
1085 dl 1.405 final void push(ForkJoinTask<?> task, ForkJoinPool pool,
1086     boolean signalIfEmpty) {
1087     boolean resize = false;
1088     int s = top++, b = base, cap, m; ForkJoinTask<?>[] a;
1089     if ((a = array) != null && (cap = a.length) > 0) {
1090     if ((m = (cap - 1)) == s - b) {
1091     resize = true; // rapidly grow until large
1092     int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1;
1093     ForkJoinTask<?>[] newArray;
1094     try {
1095     newArray = new ForkJoinTask<?>[newCap];
1096     } catch (Throwable ex) {
1097     top = s;
1098     access = 0;
1099     throw new RejectedExecutionException(
1100     "Queue capacity exceeded");
1101     }
1102     if (newCap > 0) { // always true
1103     int newMask = newCap - 1, k = s;
1104     do { // poll old, push to new
1105     newArray[k-- & newMask] = task;
1106     } while ((task = getAndClearSlot(a, k & m)) != null);
1107     }
1108     array = newArray;
1109     }
1110     else
1111     a[m & s] = task;
1112     getAndSetAccess(0); // for memory effects if owned
1113     if ((resize || (a[m & (s - 1)] == null && signalIfEmpty)) &&
1114     pool != null)
1115     pool.signalWork();
1116 dl 1.78 }
1117     }
1118    
1119 dl 1.178 /**
1120 dl 1.405 * Takes next task, if one exists, in order specified by mode,
1121     * so acts as either local-pop or local-poll. Called only by owner.
1122     * @param fifo nonzero if FIFO mode
1123 dl 1.112 */
1124 dl 1.405 final ForkJoinTask<?> nextLocalTask(int fifo) {
1125     ForkJoinTask<?> t = null;
1126 dl 1.355 ForkJoinTask<?>[] a = array;
1127 dl 1.405 int p = top, s = p - 1, b = base, nb, cap;
1128     if (p - b > 0 && a != null && (cap = a.length) > 0) {
1129     do {
1130     if (fifo == 0 || (nb = b + 1) == p) {
1131     if ((t = getAndClearSlot(a, (cap - 1) & s)) != null) {
1132     top = s;
1133     U.storeFence();
1134     }
1135     break; // lost race for only task
1136     }
1137     else if ((t = getAndClearSlot(a, (cap - 1) & b)) != null) {
1138     base = nb;
1139     U.storeFence();
1140     break;
1141     }
1142     else {
1143     while (b == (b = base)) {
1144     U.loadFence();
1145     Thread.onSpinWait(); // spin to reduce memory traffic
1146     }
1147     }
1148     } while (p - b > 0);
1149 dl 1.345 }
1150     return t;
1151 dl 1.78 }
1152    
1153     /**
1154 dl 1.405 * Takes next task, if one exists, using configured mode.
1155     * (Always owned, never called for Common pool.)
1156 dl 1.78 */
1157 dl 1.405 final ForkJoinTask<?> nextLocalTask() {
1158     return nextLocalTask(config & FIFO);
1159 dl 1.373 }
1160    
1161     /**
1162 dl 1.405 * Pops the given task only if it is at the current top.
1163 dl 1.373 */
1164 dl 1.405 final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
1165 dl 1.355 boolean taken = false;
1166 dl 1.405 ForkJoinTask<?>[] a = array;
1167     int p = top, s, cap, k;
1168     if (task != null && base != p && a != null && (cap = a.length) > 0 &&
1169     a[k = (cap - 1) & (s = p - 1)] == task) {
1170     if (owned || getAndSetAccess(1) == 0) {
1171     if ((owned || (top == p && a[k] == task)) &&
1172     getAndClearSlot(a, k) != null) {
1173     taken = true;
1174     top = s;
1175     U.storeFence();
1176 dl 1.394 }
1177 dl 1.405 if (!owned)
1178     access = 0;
1179 dl 1.392 }
1180 dl 1.355 }
1181     return taken;
1182 dl 1.345 }
1183    
1184     /**
1185 dl 1.405 * Returns next task, if one exists, in order specified by mode.
1186 dl 1.355 */
1187 dl 1.405 final ForkJoinTask<?> peek() {
1188     ForkJoinTask<?>[] a = array;
1189     int cfg = config, p = top, b = base, cap;
1190     if (p != b && a != null && (cap = a.length) > 0) {
1191     if ((cfg & FIFO) == 0)
1192     return a[(cap - 1) & (p - 1)];
1193     else { // skip over in-progress removals
1194     ForkJoinTask<?> t;
1195     for ( ; p - b > 0; ++b) {
1196     if ((t = a[(cap - 1) & b]) != null)
1197     return t;
1198 dl 1.355 }
1199     }
1200 dl 1.78 }
1201 dl 1.405 return null;
1202 dl 1.78 }
1203    
1204     /**
1205 dl 1.405 * Polls for a task. Used only by non-owners in usually
1206     * uncontended contexts.
1207     *
1208     * @param pool if nonnull, pool to signal if more tasks exist
1209 dl 1.78 */
1210 dl 1.405 final ForkJoinTask<?> poll(ForkJoinPool pool) {
1211     for (int b = base;;) {
1212     int cap; ForkJoinTask<?>[] a;
1213     if ((a = array) == null || (cap = a.length) <= 0)
1214     break; // currently impossible
1215     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1216     ForkJoinTask<?> t = a[k];
1217     U.loadFence(); // for re-reads
1218     if (b != (b = base)) // inconsistent
1219     ;
1220     else if (t != null && casSlotToNull(a, k, t)) {
1221     base = nb;
1222     U.storeFence();
1223     if (pool != null && a[nk] != null)
1224     pool.signalWork(); // propagate
1225 dl 1.355 return t;
1226     }
1227 dl 1.405 else if (array != a || a[k] != null)
1228     ; // stale
1229     else if (a[nk] == null && top - b <= 0)
1230     break; // empty
1231 dl 1.355 }
1232     return null;
1233 dl 1.78 }
1234    
1235     /**
1236 dl 1.405 * Tries to poll next task in FIFO order, failing on
1237     * contention or stalls. Used only by topLevelExec to repoll
1238     * from the queue obtained from pool.scan.
1239 dl 1.345 */
1240 dl 1.405 final ForkJoinTask<?> tryPoll() {
1241     int b = base, cap; ForkJoinTask<?>[] a;
1242 dl 1.355 if ((a = array) != null && (cap = a.length) > 0) {
1243 dl 1.405 for (;;) {
1244     int k = (cap - 1) & b, nb = b + 1;
1245     ForkJoinTask<?> t = a[k];
1246     U.loadFence(); // for re-reads
1247     if (b != (b = base))
1248     ; // inconsistent
1249     else if (t != null) {
1250     if (casSlotToNull(a, k, t)) {
1251     base = nb;
1252     U.storeFence();
1253     return t;
1254     }
1255     break; // contended
1256 dl 1.355 }
1257 dl 1.405 else if (a[k] == null)
1258     break; // empty or stalled
1259 jsr166 1.344 }
1260 dl 1.253 }
1261 dl 1.405 return null;
1262 dl 1.253 }
1263    
1264 dl 1.355 // specialized execution methods
1265    
1266 dl 1.253 /**
1267 dl 1.355 * Runs the given (stolen) task if nonnull, as well as
1268 dl 1.405 * remaining local tasks and/or others available from its
1269     * source queue, if any.
1270 dl 1.94 */
1271 dl 1.405 final void topLevelExec(ForkJoinTask<?> task, WorkQueue src) {
1272     int cfg = config, fifo = cfg & FIFO, nstolen = 1;
1273 dl 1.355 while (task != null) {
1274     task.doExec();
1275 dl 1.405 if ((task = nextLocalTask(fifo)) == null &&
1276     src != null && (task = src.tryPoll()) != null)
1277 dl 1.355 ++nstolen;
1278 dl 1.215 }
1279 dl 1.355 nsteals += nstolen;
1280     source = 0;
1281     if ((cfg & INNOCUOUS) != 0)
1282     ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
1283 dl 1.215 }
1284    
1285     /**
1286 dl 1.405 * Deep form of tryUnpush: Traverses from top and removes and
1287     * runs task if present, shifting others to fill gap.
1288     * @return task status if removed, else 0
1289     */
1290     final int tryRemoveAndExec(ForkJoinTask<?> task, boolean owned) {
1291     boolean taken = false;
1292     ForkJoinTask<?>[] a = array;
1293     int p = top, s = p - 1, d = p - base, cap;
1294     if (task != null && d > 0 && a != null && (cap = a.length) > 0) {
1295     for (int m = cap - 1, i = s; ; --i) {
1296     ForkJoinTask<?> t; int k;
1297     if ((t = a[k = i & m]) == task) {
1298     if (!owned && getAndSetAccess(1) != 0)
1299     break; // fail if locked
1300     if ((owned || (top == p && a[k] == task)) &&
1301     getAndClearSlot(a, k) != null) {
1302     taken = true;
1303     if (i != s && i == base)
1304     base = i + 1; // avoid shift
1305     else {
1306     for (int j = i; j != s;) // shift down
1307     a[j & m] = getAndClearSlot(a, ++j & m);
1308     top = s;
1309     }
1310     U.storeFence();
1311     }
1312     if (!owned)
1313     access = 0;
1314     break;
1315     }
1316     else if (t == null || --d == 0)
1317     break;
1318     }
1319     }
1320     if (!taken)
1321     return 0;
1322     return task.doExec();
1323     }
1324    
1325     /**
1326 dl 1.345 * Tries to pop and run tasks within the target's computation
1327     * until done, not found, or limit exceeded.
1328 dl 1.94 *
1329 dl 1.405 * @param task root of computation
1330 dl 1.300 * @param limit max runs, or zero for no limit
1331 jsr166 1.363 * @return task status on exit
1332 dl 1.300 */
1333 dl 1.365 final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1334 dl 1.405 int status = 0;
1335     if (task != null) {
1336     outer: for (;;) {
1337     boolean taken = false;
1338     ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1339     int p, s, cap, k;
1340     if ((status = task.status) < 0)
1341     return status;
1342     if ((a = array) == null || (cap = a.length) <= 0 ||
1343     (t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null ||
1344     !(t instanceof CountedCompleter))
1345 dl 1.355 break;
1346 dl 1.405 for (CountedCompleter<?> f = (CountedCompleter<?>)t;;) {
1347     if (f == task)
1348     break;
1349     else if ((f = f.completer) == null)
1350     break outer; // ineligible
1351     }
1352     if (!owned && getAndSetAccess(1) != 0)
1353     break; // fail if locked
1354     if ((owned || (top == p && a[k] == t)) &&
1355     getAndClearSlot(a, k) != null) {
1356     taken = true;
1357     top = s;
1358     U.storeFence();
1359     }
1360     if (!owned)
1361     access = 0;
1362     if (taken) {
1363     t.doExec();
1364     if (limit != 0 && --limit == 0)
1365     break;
1366 dl 1.104 }
1367     }
1368 dl 1.405 status = task.status;
1369 dl 1.104 }
1370 dl 1.300 return status;
1371     }
1372    
1373 jsr166 1.344 /**
1374 dl 1.345 * Tries to poll and run AsynchronousCompletionTasks until
1375 dl 1.405 * none found or blocker is released
1376 dl 1.345 *
1377     * @param blocker the blocker
1378 jsr166 1.344 */
1379 dl 1.345 final void helpAsyncBlocker(ManagedBlocker blocker) {
1380 dl 1.405 if (blocker != null) {
1381     for (;;) {
1382     int b = base, cap; ForkJoinTask<?>[] a;
1383     if ((a = array) == null || (cap = a.length) <= 0 || b == top)
1384     break;
1385     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1386     ForkJoinTask<?> t = a[k];
1387     U.loadFence(); // for re-reads
1388     if (base != b)
1389     ;
1390     else if (blocker.isReleasable())
1391     break;
1392     else if (a[k] != t)
1393     ;
1394     else if (t != null) {
1395     if (!(t instanceof CompletableFuture
1396     .AsynchronousCompletionTask))
1397     break;
1398     else if (casSlotToNull(a, k, t)) {
1399     base = nb;
1400     U.storeFence();
1401     t.doExec();
1402     }
1403     }
1404     else if (a[nk] == null)
1405     break;
1406 dl 1.178 }
1407 dl 1.78 }
1408     }
1409    
1410 dl 1.355 // misc
1411    
1412 dl 1.78 /**
1413 dl 1.373 * Returns true if owned by a worker thread and not known to be blocked.
1414 dl 1.86 */
1415     final boolean isApparentlyUnblocked() {
1416     Thread wt; Thread.State s;
1417 dl 1.405 return (access != STOP && (wt = owner) != null &&
1418 dl 1.86 (s = wt.getState()) != Thread.State.BLOCKED &&
1419     s != Thread.State.WAITING &&
1420     s != Thread.State.TIMED_WAITING);
1421     }
1422    
1423 dl 1.405 /**
1424     * Callback from InnocuousForkJoinWorkerThread.onStart
1425     */
1426     final void setInnocuous() {
1427     config |= INNOCUOUS;
1428     }
1429    
1430 dl 1.78 static {
1431 dl 1.404 U = Unsafe.getUnsafe();
1432 dl 1.405 Class<WorkQueue> klass = WorkQueue.class;
1433     ACCESS = U.objectFieldOffset(klass, "access");
1434     PHASE = U.objectFieldOffset(klass, "phase");
1435     Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
1436     ABASE = U.arrayBaseOffset(aklass);
1437     int scale = U.arrayIndexScale(aklass);
1438     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1439 dl 1.404 if ((scale & (scale - 1)) != 0)
1440     throw new Error("array index scale not a power of two");
1441 dl 1.78 }
1442     }
1443 dl 1.14
1444 dl 1.112 // static fields (initialized in static initializer below)
1445    
1446     /**
1447     * Creates a new ForkJoinWorkerThread. This factory is used unless
1448     * overridden in ForkJoinPool constructors.
1449     */
1450     public static final ForkJoinWorkerThreadFactory
1451     defaultForkJoinWorkerThreadFactory;
1452    
1453 jsr166 1.1 /**
1454 dl 1.405 * Common (static) pool. Non-null for public use unless a static
1455     * construction exception, but internal usages null-check on use
1456     * to paranoically avoid potential initialization circularities
1457     * as well as to simplify generated code.
1458 dl 1.115 */
1459 dl 1.405 static final ForkJoinPool common;
1460 dl 1.115
1461     /**
1462 dl 1.355 * Sequence number for creating worker names
1463 dl 1.83 */
1464 dl 1.355 private static volatile int poolIds;
1465 dl 1.86
1466     /**
1467 dl 1.405 * Permission required for callers of methods that may start or
1468     * kill threads. Lazily constructed.
1469 dl 1.86 */
1470 dl 1.405 static volatile RuntimePermission modifyThreadPermission;
1471 dl 1.86
1472 dl 1.200
1473 dl 1.300 // Instance fields
1474 dl 1.405 volatile long stealCount; // collects worker nsteals
1475     volatile long threadIds; // for worker thread names
1476 dl 1.355 final long keepAlive; // milliseconds before dropping if idle
1477 dl 1.405 final long bounds; // min, max threads packed as shorts
1478     final int config; // static configuration bits
1479     volatile int runState; // SHUTDOWN, STOP, TERMINATED bits
1480 dl 1.355 WorkQueue[] queues; // main registry
1481     final ReentrantLock registrationLock;
1482     Condition termination; // lazily constructed
1483     final String workerNamePrefix; // null for common pool
1484 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1485 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1486 dl 1.307 final Predicate<? super ForkJoinPool> saturate;
1487 dl 1.405 // final SharedThreadContainer container; // for loom
1488 dl 1.101
1489 dl 1.308 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1490     volatile long ctl; // main pool control
1491 dl 1.405 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
1492     int parallelism; // target number of workers
1493 jsr166 1.309
1494 dl 1.355 // Support for atomic operations
1495 dl 1.404 private static final Unsafe U;
1496     private static final long CTL;
1497 dl 1.405 private static final long RUNSTATE;
1498     private static final long PARALLELISM;
1499 dl 1.404 private static final long THREADIDS;
1500     private static final long POOLIDS;
1501 dl 1.405
1502 dl 1.355 private boolean compareAndSetCtl(long c, long v) {
1503 dl 1.404 return U.compareAndSetLong(this, CTL, c, v);
1504 dl 1.355 }
1505     private long compareAndExchangeCtl(long c, long v) {
1506 dl 1.404 return U.compareAndExchangeLong(this, CTL, c, v);
1507 dl 1.355 }
1508     private long getAndAddCtl(long v) {
1509 dl 1.404 return U.getAndAddLong(this, CTL, v);
1510 dl 1.355 }
1511 dl 1.405 private int getAndBitwiseOrRunState(int v) {
1512     return U.getAndBitwiseOrInt(this, RUNSTATE, v);
1513 dl 1.355 }
1514 dl 1.405 private long incrementThreadIds() {
1515     return U.getAndAddLong(this, THREADIDS, 1L);
1516 dl 1.355 }
1517     private static int getAndAddPoolIds(int x) {
1518 dl 1.404 return U.getAndAddInt(ForkJoinPool.class, POOLIDS, x);
1519 dl 1.355 }
1520 dl 1.405 private int getAndSetParallelism(int v) {
1521     return U.getAndSetInt(this, PARALLELISM, v);
1522     }
1523     private int getParallelismOpaque() {
1524     return U.getIntOpaque(this, PARALLELISM);
1525     }
1526 dl 1.355
1527 dl 1.405 // Creating, registering, and deregistering workers
1528 dl 1.200
1529 dl 1.112 /**
1530 dl 1.200 * Tries to construct and start one worker. Assumes that total
1531     * count has already been incremented as a reservation. Invokes
1532     * deregisterWorker on any failure.
1533     *
1534     * @return true if successful
1535 dl 1.115 */
1536 dl 1.300 private boolean createWorker() {
1537 dl 1.200 ForkJoinWorkerThreadFactory fac = factory;
1538     Throwable ex = null;
1539     ForkJoinWorkerThread wt = null;
1540     try {
1541 dl 1.405 if (runState >= 0 && // avoid construction if terminating
1542     fac != null && (wt = fac.newThread(this)) != null) {
1543 dl 1.200 wt.start();
1544 dl 1.405 // container.start(wt); // for loom
1545 dl 1.200 return true;
1546 dl 1.115 }
1547 dl 1.200 } catch (Throwable rex) {
1548     ex = rex;
1549 dl 1.112 }
1550 dl 1.200 deregisterWorker(wt, ex);
1551     return false;
1552 dl 1.112 }
1553    
1554 dl 1.200 /**
1555 jsr166 1.360 * Provides a name for ForkJoinWorkerThread constructor.
1556 dl 1.200 */
1557 dl 1.355 final String nextWorkerThreadName() {
1558     String prefix = workerNamePrefix;
1559 dl 1.405 long tid = incrementThreadIds() + 1L;
1560 dl 1.355 if (prefix == null) // commonPool has no prefix
1561     prefix = "ForkJoinPool.commonPool-worker-";
1562 dl 1.405 return prefix.concat(Long.toString(tid));
1563 dl 1.200 }
1564 dl 1.112
1565     /**
1566 dl 1.355 * Finishes initializing and records owned queue.
1567     *
1568     * @param w caller's WorkQueue
1569     */
1570     final void registerWorker(WorkQueue w) {
1571     ThreadLocalRandom.localInit();
1572     int seed = ThreadLocalRandom.getProbe();
1573 dl 1.405 ReentrantLock lock = registrationLock;
1574     int cfg = config & FIFO;
1575 dl 1.355 if (w != null && lock != null) {
1576     w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1577 dl 1.405 cfg |= w.config | SRC;
1578     w.stackPred = seed;
1579 dl 1.355 int id = (seed << 1) | 1; // initial index guess
1580     lock.lock();
1581     try {
1582     WorkQueue[] qs; int n; // find queue index
1583     if ((qs = queues) != null && (n = qs.length) > 0) {
1584     int k = n, m = n - 1;
1585     for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2);
1586     if (k == 0)
1587     id = n | 1; // resize below
1588 dl 1.405 w.phase = w.config = id | cfg; // now publishable
1589 dl 1.300
1590 dl 1.355 if (id < n)
1591     qs[id] = w;
1592 dl 1.300 else { // expand array
1593 dl 1.355 int an = n << 1, am = an - 1;
1594 dl 1.300 WorkQueue[] as = new WorkQueue[an];
1595 dl 1.355 as[id & am] = w;
1596     for (int j = 1; j < n; j += 2)
1597     as[j] = qs[j];
1598     for (int j = 0; j < n; j += 2) {
1599     WorkQueue q;
1600     if ((q = qs[j]) != null) // shared queues may move
1601     as[q.config & am] = q;
1602 dl 1.94 }
1603 dl 1.405 U.storeFence(); // fill before publish
1604 dl 1.355 queues = as;
1605 dl 1.94 }
1606     }
1607 dl 1.355 } finally {
1608     lock.unlock();
1609 dl 1.78 }
1610     }
1611     }
1612 dl 1.19
1613 jsr166 1.1 /**
1614 dl 1.86 * Final callback from terminating worker, as well as upon failure
1615 dl 1.105 * to construct or start a worker. Removes record of worker from
1616     * array, and adjusts counts. If pool is shutting down, tries to
1617     * complete termination.
1618 dl 1.78 *
1619 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1620 dl 1.78 * @param ex the exception causing failure, or null if none
1621 dl 1.45 */
1622 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1623 dl 1.405 WorkQueue w = (wt == null) ? null : wt.workQueue;
1624     int cfg = (w == null) ? 0 : w.config;
1625     long c = ctl;
1626     if ((cfg & TRIMMED) == 0) // decrement counts
1627     do {} while (c != (c = compareAndExchangeCtl(
1628     c, ((RC_MASK & (c - RC_UNIT)) |
1629     (TC_MASK & (c - TC_UNIT)) |
1630     (SP_MASK & c)))));
1631     else if ((int)c == 0) // was dropped on timeout
1632     cfg &= ~SRC; // suppress signal if last
1633     if (!tryTerminate(false, false) && w != null) {
1634     ReentrantLock lock; WorkQueue[] qs; int n, i;
1635 dl 1.355 long ns = w.nsteals & 0xffffffffL;
1636 dl 1.405 if ((lock = registrationLock) != null) {
1637     lock.lock(); // remove index unless terminating
1638     if ((qs = queues) != null && (n = qs.length) > 0 &&
1639     qs[i = cfg & (n - 1)] == w)
1640     qs[i] = null;
1641     stealCount += ns; // accumulate steals
1642     lock.unlock();
1643     }
1644     if ((cfg & SRC) != 0)
1645     signalWork(); // possibly replace worker
1646 dl 1.243 }
1647 dl 1.405 if (ex != null) {
1648     if (w != null) {
1649     w.access = STOP; // cancel tasks
1650     for (ForkJoinTask<?> t; (t = w.nextLocalTask(0)) != null; )
1651     ForkJoinTask.cancelIgnoringExceptions(t);
1652     }
1653 dl 1.104 ForkJoinTask.rethrow(ex);
1654 dl 1.405 }
1655 dl 1.78 }
1656 dl 1.52
1657 dl 1.355 /*
1658 dl 1.405 * Releases an idle worker, or creates one if not enough exist.
1659 dl 1.105 */
1660 dl 1.355 final void signalWork() {
1661 dl 1.405 int pc = parallelism, n;
1662     long c = ctl;
1663     WorkQueue[] qs = queues;
1664     if ((short)(c >>> RC_SHIFT) < pc && qs != null && (n = qs.length) > 0) {
1665     for (;;) {
1666     boolean create = false;
1667     int sp = (int)c & ~INACTIVE;
1668     WorkQueue v = qs[sp & (n - 1)];
1669     int deficit = pc - (short)(c >>> TC_SHIFT);
1670     long ac = (c + RC_UNIT) & RC_MASK, nc;
1671     if (sp != 0 && v != null)
1672     nc = (v.stackPred & SP_MASK) | (c & TC_MASK);
1673     else if (deficit <= 0)
1674     break;
1675     else {
1676     create = true;
1677     nc = ((c + TC_UNIT) & TC_MASK);
1678     }
1679     if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
1680     if (create)
1681     createWorker();
1682     else {
1683     Thread owner = v.owner;
1684     v.phase = sp;
1685     if (v.access == PARKED)
1686     LockSupport.unpark(owner);
1687     }
1688     break;
1689     }
1690     }
1691     }
1692     }
1693    
1694     /**
1695     * Reactivates any idle worker, if one exists.
1696     *
1697     * @return the signalled worker, or null if none
1698     */
1699     private WorkQueue reactivate() {
1700     WorkQueue[] qs; int n;
1701     long c = ctl;
1702     if ((qs = queues) != null && (n = qs.length) > 0) {
1703     for (;;) {
1704     int sp = (int)c & ~INACTIVE;
1705     WorkQueue v = qs[sp & (n - 1)];
1706     long ac = UC_MASK & (c + RC_UNIT);
1707     if (sp == 0 || v == null)
1708 dl 1.355 break;
1709     if (c == (c = compareAndExchangeCtl(
1710 dl 1.405 c, (v.stackPred & SP_MASK) | ac))) {
1711     Thread owner = v.owner;
1712     v.phase = sp;
1713     if (v.access == PARKED)
1714     LockSupport.unpark(owner);
1715     return v;
1716 dl 1.355 }
1717 dl 1.200 }
1718 dl 1.405 }
1719     return null;
1720     }
1721    
1722     /**
1723     * Tries to deactivate worker w; called only on idle timeout.
1724     */
1725     private boolean tryTrim(WorkQueue w) {
1726     if (w != null) {
1727     int pred = w.stackPred, cfg = w.config | TRIMMED;
1728     long c = ctl;
1729     int sp = (int)c & ~INACTIVE;
1730     if ((sp & SMASK) == (cfg & SMASK) &&
1731     compareAndSetCtl(c, ((pred & SP_MASK) |
1732     (UC_MASK & (c - TC_UNIT))))) {
1733     w.config = cfg; // add sentinel for deregisterWorker
1734     w.phase = sp;
1735     return true;
1736 dl 1.174 }
1737 dl 1.52 }
1738 dl 1.405 return false;
1739     }
1740    
1741     /**
1742     * Returns true if any submission queue is detectably nonempty.
1743     * Accurate only when workers are quiescent; else conservatively
1744     * approximate.
1745     */
1746     private boolean hasSubmissions() {
1747     WorkQueue[] qs; WorkQueue q;
1748     int n = ((qs = queues) == null) ? 0 : qs.length;
1749     for (int i = 0; i < n; i += 2) {
1750     if ((q = qs[i]) != null && (q.access > 0 || q.top - q.base > 0))
1751     return true;
1752     }
1753     return false;
1754 dl 1.14 }
1755    
1756 dl 1.200 /**
1757 dl 1.355 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1758     * See above for explanation.
1759 dl 1.243 *
1760 dl 1.355 * @param w caller's WorkQueue (may be null on failed initialization)
1761 dl 1.243 */
1762 dl 1.355 final void runWorker(WorkQueue w) {
1763 dl 1.405 if (w != null) { // skip on failed init
1764 dl 1.355 int r = w.stackPred, src = 0; // use seed from registerWorker
1765     do {
1766     r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1767     } while ((src = scan(w, src, r)) >= 0 ||
1768     (src = awaitWork(w)) == 0);
1769 dl 1.405 w.access = STOP; // record normal termination
1770 dl 1.355 }
1771     }
1772    
1773     /**
1774     * Scans for and if found executes top-level tasks: Tries to poll
1775     * each queue starting at a random index with random stride,
1776 dl 1.405 * returning source id or retry indicator.
1777 dl 1.355 *
1778     * @param w caller's WorkQueue
1779     * @param prevSrc the previous queue stolen from in current phase, or 0
1780     * @param r random seed
1781     * @return id of queue if taken, negative if none found, prevSrc for retry
1782     */
1783     private int scan(WorkQueue w, int prevSrc, int r) {
1784     WorkQueue[] qs = queues;
1785     int n = (w == null || qs == null) ? 0 : qs.length;
1786     for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
1787 dl 1.405 int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1788     if ((q = qs[j = r & (n - 1)]) != null &&
1789 dl 1.355 (a = q.array) != null && (cap = a.length) > 0) {
1790 dl 1.405 int src = j | SRC, b = q.base;
1791     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
1792     ForkJoinTask<?> t = a[k];
1793     U.loadFence(); // for re-reads
1794 dl 1.355 if (q.base != b) // inconsistent
1795     return prevSrc;
1796     else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
1797 dl 1.405 q.base = nb;
1798     w.source = src;
1799     if (prevSrc == 0 && q.base == nb && a[nk] != null)
1800 dl 1.355 signalWork(); // propagate
1801     w.topLevelExec(t, q);
1802     return src;
1803     }
1804 dl 1.405 else if (q.array != a || a[k] != null || a[nk] != null)
1805     return prevSrc; // revisit
1806 dl 1.355 }
1807     }
1808 dl 1.405 return -1;
1809 dl 1.355 }
1810    
1811     /**
1812 dl 1.405 * Advances phase, enqueues, and awaits signal or termination.
1813 dl 1.355 *
1814     * @return negative if terminated, else 0
1815     */
1816     private int awaitWork(WorkQueue w) {
1817 dl 1.405 if (w == null)
1818     return -1; // currently impossible
1819     int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase
1820     boolean idle = false; // true if possibly quiescent
1821     if (runState < 0)
1822     return -1; // terminating
1823     long sp = p & SP_MASK, pc = ctl, qc;
1824     w.phase = p | INACTIVE;
1825     do { // enqueue
1826     w.stackPred = (int)pc; // set ctl stack link
1827     } while (pc != (pc = compareAndExchangeCtl(
1828     pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
1829     if ((qc & RC_MASK) <= 0L)
1830     idle = true;
1831     WorkQueue[] qs = queues; // to spin for expected #accesses in scan+signal
1832     int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) + 4, rs;
1833     if (idle && hasSubmissions() && w.phase < 0)
1834     reactivate(); // check for stragglers
1835     if ((rs = runState) < 0 ||
1836     (rs != 0 && idle && tryTerminate(false, false)))
1837     return -1; // quiescent termination
1838     while ((p = w.phase) < 0 && --spins > 0)
1839     Thread.onSpinWait(); // spin before block
1840     if (p < 0) { // await signal
1841     long deadline = (idle) ? keepAlive + System.currentTimeMillis() : 0L;
1842     LockSupport.setCurrentBlocker(this);
1843     for (;;) {
1844     w.access = PARKED; // enable unpark
1845     if (w.phase < 0) {
1846     if (idle)
1847     LockSupport.parkUntil(deadline);
1848     else
1849     LockSupport.park();
1850     }
1851     w.access = 0; // disable unpark
1852     if (w.phase >= 0) {
1853     LockSupport.setCurrentBlocker(null);
1854 dl 1.404 break;
1855     }
1856 dl 1.405 Thread.interrupted(); // clear status for next park
1857     if (idle) { // check for idle timeout
1858     if (deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
1859     if (tryTrim(w))
1860     return -1;
1861     else // not at head; restart timer
1862     deadline += keepAlive;
1863     }
1864     }
1865 dl 1.386 }
1866 dl 1.243 }
1867 dl 1.405 return (runState < 0) ? -1 : 0;
1868 dl 1.355 }
1869 dl 1.300
1870 dl 1.366 /**
1871 dl 1.405 * Non-overridable version of isQuiescent. Returns true if
1872     * quiescent or already terminating.
1873 dl 1.366 */
1874 dl 1.404 private boolean canStop() {
1875 dl 1.405 long c = ctl;
1876     do {
1877     if (runState < 0)
1878     break;
1879     if ((c & RC_MASK) > 0L || hasSubmissions())
1880     return false;
1881     } while (c != (c = ctl)); // validate
1882     return true;
1883     }
1884    
1885     /**
1886     * Scans for and returns a polled task, if available. Used only
1887     * for untracked polls. Begins scan at a random index to avoid
1888     * systematic unfairness.
1889     *
1890     * @param submissionsOnly if true, only scan submission queues
1891     */
1892     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1893     int r = ThreadLocalRandom.nextSecondarySeed();
1894     if (submissionsOnly) // even indices only
1895     r &= ~1;
1896     int step = (submissionsOnly) ? 2 : 1;
1897     WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
1898     if (runState >= 0 && (qs = queues) != null && (n = qs.length) > 0) {
1899     for (int i = n; i > 0; i -= step, r += step) {
1900     if ((q = qs[r & (n - 1)]) != null &&
1901     (t = q.poll(this)) != null)
1902     return t;
1903 dl 1.366 }
1904     }
1905 dl 1.405 return null;
1906 dl 1.366 }
1907    
1908 dl 1.355 /**
1909     * Tries to decrement counts (sometimes implicitly) and possibly
1910     * arrange for a compensating worker in preparation for
1911     * blocking. May fail due to interference, in which case -1 is
1912     * returned so caller may retry. A zero return value indicates
1913     * that the caller doesn't need to re-adjust counts when later
1914     * unblocked.
1915     *
1916     * @param c incoming ctl value
1917 dl 1.405 * @param canSaturate to override saturate predicate
1918 dl 1.373 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1919 dl 1.355 */
1920 dl 1.405 private int tryCompensate(long c, boolean canSaturate) {
1921 dl 1.355 Predicate<? super ForkJoinPool> sat;
1922 dl 1.405 long b = bounds; // unpack fields
1923     int pc = parallelism;
1924 dl 1.355 int minActive = (short)(b & SMASK),
1925 dl 1.405 maxTotal = (short)(b >>> SWIDTH) + pc,
1926     active = (short)(c >>> RC_SHIFT),
1927 dl 1.366 total = (short)(c >>> TC_SHIFT),
1928 dl 1.405 sp = (int)c & ~INACTIVE;
1929     if (runState < 0) // terminating
1930     return -1;
1931     else if (sp != 0 && active <= pc) { // activate idle worker
1932     WorkQueue[] qs; WorkQueue v; int i;
1933     if (ctl == c && (qs = queues) != null &&
1934     qs.length > (i = sp & SMASK) && (v = qs[i]) != null) {
1935     long nc = (v.stackPred & SP_MASK) | (UC_MASK & c);
1936     if (compareAndSetCtl(c, nc)) {
1937     v.phase = sp;
1938     LockSupport.unpark(v.owner);
1939     return UNCOMPENSATE;
1940 dl 1.355 }
1941     }
1942 dl 1.405 return -1; // retry
1943     }
1944     else if (active > minActive && total >= pc) { // reduce active workers
1945     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1946     return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1947 dl 1.355 }
1948 dl 1.405 else if (total < maxTotal && total < MAX_CAP) { // expand pool
1949 dl 1.355 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1950 dl 1.373 return (!compareAndSetCtl(c, nc) ? -1 :
1951     !createWorker() ? 0 : UNCOMPENSATE);
1952 dl 1.355 }
1953 dl 1.405 else if (!compareAndSetCtl(c, c)) // validate
1954 dl 1.355 return -1;
1955 dl 1.405 else if (canSaturate || ((sat = saturate) != null && sat.test(this)))
1956 dl 1.355 return 0;
1957     else
1958     throw new RejectedExecutionException(
1959     "Thread limit exceeded replacing blocked worker");
1960     }
1961    
1962     /**
1963     * Readjusts RC count; called from ForkJoinTask after blocking.
1964     */
1965     final void uncompensate() {
1966     getAndAddCtl(RC_UNIT);
1967 dl 1.243 }
1968    
1969     /**
1970 dl 1.405 * Helps if possible until the given task is done. Processes
1971     * compatible local tasks and scans other queues for task produced
1972     * by w's stealers; returning compensated blocking sentinel if
1973     * none are found.
1974 dl 1.345 *
1975 dl 1.355 * @param task the task
1976     * @param w caller's WorkQueue
1977 dl 1.405 * @param timed true if this is a timed join
1978 dl 1.373 * @return task status on exit, or UNCOMPENSATE for compensated blocking
1979 dl 1.355 */
1980 dl 1.405 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean timed) {
1981     if (w == null || task == null)
1982     return 0;
1983     int wsrc = w.source, wid = (w.config & SMASK) | SRC, r = wid + 2;
1984     long sctl = 0L; // track stability
1985     for (boolean rescan = true;;) {
1986     int s; WorkQueue[] qs;
1987     if ((s = task.status) < 0)
1988     return s;
1989     if (!rescan && sctl == (sctl = ctl) &&
1990     (s = tryCompensate(sctl, timed)) >= 0)
1991     return s; // block
1992     rescan = false;
1993     if (runState < 0)
1994     return 0;
1995     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
1996     scan: for (int i = n >>> 1; i > 0; --i, r += 2) {
1997     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1998     if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
1999     (cap = a.length) > 0) {
2000     for (int src = j | SRC;;) {
2001     int sq = q.source, b = q.base;
2002     int k = (cap - 1) & b, nb = b + 1;
2003     ForkJoinTask<?> t = a[k];
2004     U.loadFence(); // for re-reads
2005     boolean eligible = true; // check steal chain
2006     for (int d = n, v = sq;;) { // may be cyclic; bound
2007     WorkQueue p;
2008     if (v == wid)
2009     break;
2010     if (v == 0 || --d == 0 || (p = qs[v & m]) == null) {
2011     eligible = false;
2012     break;
2013     }
2014     v = p.source;
2015     }
2016     if (q.source != sq || q.base != b)
2017     ; // stale
2018     else if ((s = task.status) < 0)
2019     return s; // recheck before taking
2020     else if (t == null) {
2021     if (a[k] == null) {
2022     if (!rescan && eligible &&
2023     (q.array != a || q.top != b))
2024     rescan = true; // resized or stalled
2025     break;
2026 dl 1.355 }
2027 dl 1.300 }
2028 dl 1.405 else if (t != task && !eligible)
2029     break;
2030     else if (WorkQueue.casSlotToNull(a, k, t)) {
2031     q.base = nb;
2032     w.source = src;
2033     t.doExec();
2034     w.source = wsrc;
2035     rescan = true;
2036     break scan;
2037     }
2038 dl 1.300 }
2039     }
2040     }
2041     }
2042 dl 1.405 }
2043 dl 1.200
2044 dl 1.305 /**
2045 dl 1.405 * Version of helpJoin for CountedCompleters.
2046 jsr166 1.356 *
2047 dl 1.405 * @param task the task
2048 dl 1.355 * @param w caller's WorkQueue
2049 dl 1.405 * @param owned true if w is owned by a ForkJoinWorkerThread
2050     * @param timed true if this is a timed join
2051     * @return task status on exit, or UNCOMPENSATE for compensated blocking
2052 dl 1.305 */
2053 dl 1.405 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned,
2054     boolean timed) {
2055     if (w == null || task == null)
2056     return 0;
2057     int wsrc = w.source, r = w.config;
2058     long sctl = 0L; // track stability
2059     for (boolean rescan = true;;) {
2060     int s; WorkQueue[] qs;
2061     if ((s = w.helpComplete(task, owned, 0)) < 0)
2062     return s;
2063     if (!rescan && sctl == (sctl = ctl)) {
2064     if (!owned)
2065     return 0;
2066     if ((s = tryCompensate(sctl, timed)) >= 0)
2067     return s;
2068     }
2069     rescan = false;
2070     if (runState < 0)
2071     return 0;
2072     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2073     scan: for (int i = n; i > 0; --i, ++r) {
2074     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
2075     if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
2076     (cap = a.length) > 0) {
2077     poll: for (int src = j | SRC, b = q.base;;) {
2078     int k = (cap - 1) & b, nb = b + 1;
2079     ForkJoinTask<?> t = a[k];
2080     U.loadFence(); // for re-reads
2081     if (b != (b = q.base))
2082     ; // stale
2083     else if ((s = task.status) < 0)
2084     return s; // recheck before taking
2085     else if (t == null) {
2086     if (a[k] == null) {
2087     if (!rescan && // resized or stalled
2088     (q.array != a || q.top != b))
2089     rescan = true;
2090     break;
2091     }
2092     }
2093     else if (t instanceof CountedCompleter) {
2094     CountedCompleter<?> f;
2095     for (f = (CountedCompleter<?>)t;;) {
2096     if (f == task)
2097     break;
2098     else if ((f = f.completer) == null)
2099     break poll; // ineligible
2100 dl 1.355 }
2101 dl 1.405 if (WorkQueue.casSlotToNull(a, k, t)) {
2102     q.base = nb;
2103     w.source = src;
2104     t.doExec();
2105     w.source = wsrc;
2106     rescan = true;
2107     break scan;
2108 dl 1.355 }
2109     }
2110 dl 1.405 else
2111     break;
2112 dl 1.200 }
2113     }
2114 dl 1.178 }
2115     }
2116 dl 1.405 }
2117 dl 1.300
2118     /**
2119 dl 1.366 * Runs tasks until {@code isQuiescent()}. Rather than blocking
2120     * when tasks cannot be found, rescans until all others cannot
2121     * find tasks either.
2122     *
2123     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2124     * @param interruptible true if return on interrupt
2125     * @return positive if quiescent, negative if interrupted, else 0
2126     */
2127 dl 1.405 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
2128     long startTime = System.nanoTime(), parkTime = 0L;
2129     int phase; // w.phase set negative when temporarily quiescent
2130     if (w == null || (phase = w.phase) < 0)
2131 dl 1.366 return 0;
2132 dl 1.405 int activePhase = phase, inactivePhase = phase | INACTIVE;
2133     int wsrc = w.source, r = 0;
2134     for (boolean locals = true;;) {
2135     WorkQueue[] qs; WorkQueue q;
2136     if (runState < 0) { // terminating
2137     w.phase = activePhase;
2138     return 1;
2139     }
2140     if (locals) { // run local tasks before (re)polling
2141     for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
2142 dl 1.366 u.doExec();
2143     }
2144 dl 1.405 boolean rescan = false, busy = locals = false, interrupted;
2145     int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
2146     scan: for (int i = n, j; i > 0; --i, ++r) {
2147     if ((q = qs[j = m & r]) != null && q != w) {
2148     for (int src = j | SRC;;) {
2149     ForkJoinTask<?>[] a = q.array;
2150     int b = q.base, cap;
2151     if (a == null || (cap = a.length) <= 0)
2152     break;
2153     int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
2154     ForkJoinTask<?> t = a[k];
2155     U.loadFence(); // for re-reads
2156     if (q.base != b || q.array != a || a[k] != t)
2157     ;
2158     else if (t == null) {
2159     if (!rescan) {
2160     if (a[nk] != null || q.top - b > 0)
2161     rescan = true;
2162     else if (!busy &&
2163     q.owner != null && q.phase >= 0)
2164     busy = true;
2165     }
2166     break;
2167 dl 1.366 }
2168 dl 1.405 else if (phase < 0) // reactivate before taking
2169     w.phase = phase = activePhase;
2170     else if (WorkQueue.casSlotToNull(a, k, t)) {
2171     q.base = nb;
2172 dl 1.366 w.source = src;
2173     t.doExec();
2174 dl 1.405 w.source = wsrc;
2175     rescan = locals = true;
2176     break scan;
2177 dl 1.366 }
2178     }
2179     }
2180     }
2181 dl 1.405 if (rescan)
2182     ; // retry
2183     else if (phase >= 0) {
2184     parkTime = 0L;
2185     w.phase = phase = inactivePhase;
2186     }
2187     else if (!busy) {
2188     w.phase = activePhase;
2189     return 1;
2190     }
2191     else if (parkTime == 0L) {
2192     parkTime = 1L << 10; // initially about 1 usec
2193     Thread.yield();
2194     }
2195     else if ((interrupted = interruptible && Thread.interrupted()) ||
2196     System.nanoTime() - startTime > nanos) {
2197     w.phase = activePhase;
2198     return interrupted ? -1 : 0;
2199     }
2200     else {
2201     LockSupport.parkNanos(this, parkTime);
2202     if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2203     parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
2204 dl 1.366 }
2205     }
2206     }
2207    
2208     /**
2209     * Helps quiesce from external caller until done, interrupted, or timeout
2210     *
2211     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2212     * @param interruptible true if return on interrupt
2213     * @return positive if quiescent, negative if interrupted, else 0
2214     */
2215 dl 1.405 private int externalHelpQuiesce(long nanos, boolean interruptible) {
2216 dl 1.366 for (long startTime = System.nanoTime(), parkTime = 0L;;) {
2217     ForkJoinTask<?> t;
2218     if ((t = pollScan(false)) != null) {
2219     t.doExec();
2220     parkTime = 0L;
2221     }
2222     else if (canStop())
2223     return 1;
2224     else if (parkTime == 0L) {
2225     parkTime = 1L << 10;
2226     Thread.yield();
2227     }
2228     else if ((System.nanoTime() - startTime) > nanos)
2229     return 0;
2230     else if (interruptible && Thread.interrupted())
2231     return -1;
2232     else {
2233     LockSupport.parkNanos(this, parkTime);
2234     if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2235     parkTime <<= 1;
2236     }
2237     }
2238     }
2239    
2240     /**
2241 dl 1.405 * Helps quiesce from either internal or external caller
2242     *
2243     * @param pool the pool to use, or null if any
2244     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2245     * @param interruptible true if return on interrupt
2246     * @return positive if quiescent, negative if interrupted, else 0
2247     */
2248     final static int helpQuiescePool(ForkJoinPool pool, long nanos,
2249     boolean interruptible) {
2250     Thread t; ForkJoinPool p; ForkJoinWorkerThread wt;
2251     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
2252     (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2253     (p == pool || pool == null))
2254     return p.helpQuiesce(wt.workQueue, nanos, interruptible);
2255     else if ((p = pool) != null || (p = common) != null)
2256     return p.externalHelpQuiesce(nanos, interruptible);
2257     else
2258     return 0;
2259     }
2260    
2261     /**
2262 dl 1.300 * Gets and removes a local or stolen task for the given worker.
2263     *
2264     * @return a task, if available
2265     */
2266     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2267     ForkJoinTask<?> t;
2268 dl 1.405 if (w == null || (t = w.nextLocalTask()) == null)
2269 dl 1.345 t = pollScan(false);
2270     return t;
2271 dl 1.90 }
2272    
2273 dl 1.300 // External operations
2274    
2275 dl 1.90 /**
2276 dl 1.355 * Finds and locks a WorkQueue for an external submitter, or
2277 dl 1.405 * throws RejectedExecutionException if shutdown or terminating.
2278     * @param isSubmit false if this is for a common pool fork
2279 dl 1.90 */
2280 dl 1.405 final WorkQueue submissionQueue(boolean isSubmit) {
2281 dl 1.355 int r;
2282 dl 1.405 ReentrantLock lock = registrationLock;
2283 dl 1.300 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2284 dl 1.355 ThreadLocalRandom.localInit(); // initialize caller's probe
2285 dl 1.300 r = ThreadLocalRandom.getProbe();
2286     }
2287 dl 1.405 if (lock != null) { // else init error
2288     for (int id = r << 1;;) { // even indices only
2289     int n, i; WorkQueue[] qs; WorkQueue q;
2290     if ((qs = queues) == null || (n = qs.length) <= 0)
2291     break;
2292     else if ((q = qs[i = (n - 1) & id]) == null) {
2293     WorkQueue w = new WorkQueue(null, id | SRC);
2294     w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
2295     lock.lock(); // install under lock
2296     if (queues == qs && qs[i] == null)
2297     qs[i] = w; // else lost race; discard
2298 dl 1.355 lock.unlock();
2299 dl 1.300 }
2300 dl 1.405 else if (q.getAndSetAccess(1) != 0) // move and restart
2301     id = (r = ThreadLocalRandom.advanceProbe(r)) << 1;
2302     else if (isSubmit && runState != 0) {
2303     q.access = 0; // check while lock held
2304     break;
2305     }
2306     else
2307     return q;
2308 dl 1.345 }
2309 dl 1.90 }
2310 dl 1.405 throw new RejectedExecutionException();
2311 dl 1.90 }
2312    
2313 dl 1.300 /**
2314 dl 1.405 * Pushes a submission to the pool, using internal queue if called
2315     * from ForkJoinWorkerThread, else external queue.
2316 dl 1.300 */
2317 dl 1.405 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty,
2318     ForkJoinTask<T> task) {
2319     WorkQueue q; Thread t; ForkJoinWorkerThread wt;
2320 dl 1.406 U.storeFence(); // ensure safely publishable
2321 dl 1.405 if (task == null) throw new NullPointerException();
2322 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2323 dl 1.405 (wt = (ForkJoinWorkerThread)t).pool == this)
2324     q = wt.workQueue;
2325     else {
2326     task.markPoolSubmission();
2327     q = submissionQueue(true);
2328     }
2329     q.push(task, this, signalIfEmpty);
2330 dl 1.404 return task;
2331     }
2332    
2333     /**
2334 dl 1.405 * Returns queue for an external thread, if one exists that has
2335     * possibly ever submitted to the given pool (nonzero probe), or
2336     * null if none.
2337 dl 1.404 */
2338 dl 1.405 private static WorkQueue externalQueue(ForkJoinPool p) {
2339     WorkQueue[] qs;
2340     int r = ThreadLocalRandom.getProbe(), n;
2341     return (p != null && (qs = p.queues) != null &&
2342     (n = qs.length) > 0 && r != 0) ?
2343     qs[(n - 1) & (r << 1)] : null;
2344 dl 1.300 }
2345    
2346     /**
2347 dl 1.405 * Returns external queue for common pool.
2348 dl 1.355 */
2349     static WorkQueue commonQueue() {
2350 dl 1.405 return externalQueue(common);
2351 dl 1.300 }
2352 dl 1.90
2353     /**
2354 dl 1.396 * Returns queue for an external thread, if one exists
2355     */
2356     final WorkQueue externalQueue() {
2357 dl 1.405 return externalQueue(this);
2358 dl 1.396 }
2359    
2360     /**
2361 dl 1.355 * If the given executor is a ForkJoinPool, poll and execute
2362     * AsynchronousCompletionTasks from worker's queue until none are
2363     * available or blocker is released.
2364 dl 1.300 */
2365 dl 1.355 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2366     WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2367     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2368     if ((wt = (ForkJoinWorkerThread)t).pool == e)
2369     w = wt.workQueue;
2370     }
2371 dl 1.396 else if (e instanceof ForkJoinPool)
2372     w = ((ForkJoinPool)e).externalQueue();
2373 dl 1.355 if (w != null)
2374     w.helpAsyncBlocker(blocker);
2375 dl 1.14 }
2376    
2377     /**
2378 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
2379     * programmers, frameworks, tools, or languages have little or no
2380 jsr166 1.222 * idea about task granularity. In essence, by offering this
2381 dl 1.105 * method, we ask users only about tradeoffs in overhead vs
2382     * expected throughput and its variance, rather than how finely to
2383     * partition tasks.
2384     *
2385     * In a steady state strict (tree-structured) computation, each
2386     * thread makes available for stealing enough tasks for other
2387     * threads to remain active. Inductively, if all threads play by
2388     * the same rules, each thread should make available only a
2389     * constant number of tasks.
2390     *
2391     * The minimum useful constant is just 1. But using a value of 1
2392     * would require immediate replenishment upon each steal to
2393     * maintain enough tasks, which is infeasible. Further,
2394     * partitionings/granularities of offered tasks should minimize
2395     * steal rates, which in general means that threads nearer the top
2396     * of computation tree should generate more than those nearer the
2397     * bottom. In perfect steady state, each thread is at
2398     * approximately the same level of computation tree. However,
2399     * producing extra tasks amortizes the uncertainty of progress and
2400     * diffusion assumptions.
2401     *
2402 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2403 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2404     * against uneven progress; as traded off against the cost of
2405     * extra task overhead. We leave the user to pick a threshold
2406     * value to compare with the results of this call to guide
2407     * decisions, but recommend values such as 3.
2408     *
2409     * When all threads are active, it is on average OK to estimate
2410     * surplus strictly locally. In steady-state, if one thread is
2411     * maintaining say 2 surplus tasks, then so are others. So we can
2412     * just use estimated queue length. However, this strategy alone
2413     * leads to serious mis-estimates in some non-steady-state
2414     * conditions (ramp-up, ramp-down, other stalls). We can detect
2415     * many of these by further considering the number of "idle"
2416     * threads, that are known to have zero queued tasks, so
2417     * compensate by a factor of (#idle/#active) threads.
2418     */
2419     static int getSurplusQueuedTaskCount() {
2420     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2421 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2422     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2423     (q = wt.workQueue) != null) {
2424     int n = q.top - q.base;
2425 dl 1.405 int p = pool.parallelism;
2426     int a = (short)(pool.ctl >>> RC_SHIFT);
2427 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2428     a > (p >>>= 1) ? 1 :
2429     a > (p >>>= 1) ? 2 :
2430     a > (p >>>= 1) ? 4 :
2431     8);
2432 dl 1.105 }
2433     return 0;
2434 dl 1.100 }
2435    
2436 dl 1.300 // Termination
2437 dl 1.14
2438     /**
2439 dl 1.405 * Possibly initiates and/or completes pool termination.
2440 dl 1.14 *
2441     * @param now if true, unconditionally terminate, else only
2442 dl 1.78 * if no work and no active workers
2443 dl 1.243 * @param enable if true, terminate when next possible
2444 dl 1.300 * @return true if terminating or terminated
2445 jsr166 1.1 */
2446 dl 1.300 private boolean tryTerminate(boolean now, boolean enable) {
2447 dl 1.405 int rs; ReentrantLock lock; Condition cond;
2448     if ((rs = runState) >= 0) { // set SHUTDOWN and/or STOP
2449     if ((config & ISCOMMON) != 0)
2450     return false; // cannot shutdown
2451     if (!now) {
2452     if ((rs & SHUTDOWN) == 0) {
2453     if (!enable)
2454     return false;
2455     getAndBitwiseOrRunState(SHUTDOWN);
2456     }
2457     if (!canStop())
2458     return false;
2459     }
2460     getAndBitwiseOrRunState(SHUTDOWN | STOP);
2461     }
2462     WorkQueue released = reactivate(); // try signalling waiter
2463     int tc = (short)(ctl >>> TC_SHIFT);
2464     if (released == null && tc > 0) { // help unblock and cancel
2465     Thread current = Thread.currentThread();
2466     WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
2467     ((ForkJoinWorkerThread)current).workQueue : null);
2468     int r = (w == null) ? 0 : w.config + 1; // stagger traversals
2469     WorkQueue[] qs = queues;
2470     int n = (qs == null) ? 0 : qs.length;
2471     for (int i = 0; i < n; ++i) {
2472     WorkQueue q; Thread thread;
2473     if ((q = qs[(r + i) & (n - 1)]) != null &&
2474     (thread = q.owner) != current && q.access != STOP) {
2475     if (thread != null && !thread.isInterrupted()) {
2476     q.forcePhaseActive(); // for awaitWork
2477 dl 1.366 try {
2478     thread.interrupt();
2479     } catch (Throwable ignore) {
2480     }
2481 dl 1.203 }
2482 dl 1.405 for (ForkJoinTask<?> t; (t = q.poll(null)) != null; )
2483     ForkJoinTask.cancelIgnoringExceptions(t);
2484 dl 1.203 }
2485     }
2486 dl 1.405 }
2487     if ((tc <= 0 || (short)(ctl >>> TC_SHIFT) <= 0) &&
2488     (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
2489     (lock = registrationLock) != null) {
2490     lock.lock(); // signal when no workers
2491     if ((cond = termination) != null)
2492     cond.signalAll();
2493     lock.unlock();
2494     // container.close(); // for loom
2495 dl 1.52 }
2496 dl 1.300 return true;
2497 dl 1.105 }
2498    
2499 dl 1.52 // Exported methods
2500 jsr166 1.1
2501     // Constructors
2502    
2503     /**
2504 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2505 dl 1.300 * java.lang.Runtime#availableProcessors}, using defaults for all
2506 dl 1.319 * other parameters (see {@link #ForkJoinPool(int,
2507     * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2508     * int, int, int, Predicate, long, TimeUnit)}).
2509 jsr166 1.1 *
2510     * @throws SecurityException if a security manager exists and
2511     * the caller is not permitted to modify threads
2512     * because it does not hold {@link
2513     * java.lang.RuntimePermission}{@code ("modifyThread")}
2514     */
2515     public ForkJoinPool() {
2516 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2517 dl 1.300 defaultForkJoinWorkerThreadFactory, null, false,
2518 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2519 jsr166 1.1 }
2520    
2521     /**
2522 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2523 dl 1.319 * level, using defaults for all other parameters (see {@link
2524     * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2525     * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2526     * long, TimeUnit)}).
2527 jsr166 1.1 *
2528 jsr166 1.9 * @param parallelism the parallelism level
2529 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2530 jsr166 1.11 * equal to zero, or greater than implementation limit
2531 jsr166 1.1 * @throws SecurityException if a security manager exists and
2532     * the caller is not permitted to modify threads
2533     * because it does not hold {@link
2534     * java.lang.RuntimePermission}{@code ("modifyThread")}
2535     */
2536     public ForkJoinPool(int parallelism) {
2537 dl 1.300 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2538 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2539 jsr166 1.1 }
2540    
2541     /**
2542 dl 1.300 * Creates a {@code ForkJoinPool} with the given parameters (using
2543 dl 1.319 * defaults for others -- see {@link #ForkJoinPool(int,
2544     * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2545     * int, int, int, Predicate, long, TimeUnit)}).
2546 jsr166 1.1 *
2547 dl 1.18 * @param parallelism the parallelism level. For default value,
2548     * use {@link java.lang.Runtime#availableProcessors}.
2549     * @param factory the factory for creating new threads. For default value,
2550     * use {@link #defaultForkJoinWorkerThreadFactory}.
2551 dl 1.19 * @param handler the handler for internal worker threads that
2552     * terminate due to unrecoverable errors encountered while executing
2553 jsr166 1.31 * tasks. For default value, use {@code null}.
2554 dl 1.19 * @param asyncMode if true,
2555 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2556     * tasks that are never joined. This mode may be more appropriate
2557     * than default locally stack-based mode in applications in which
2558     * worker threads only process event-style asynchronous tasks.
2559 jsr166 1.31 * For default value, use {@code false}.
2560 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2561 jsr166 1.11 * equal to zero, or greater than implementation limit
2562     * @throws NullPointerException if the factory is null
2563 jsr166 1.1 * @throws SecurityException if a security manager exists and
2564     * the caller is not permitted to modify threads
2565     * because it does not hold {@link
2566     * java.lang.RuntimePermission}{@code ("modifyThread")}
2567     */
2568 dl 1.19 public ForkJoinPool(int parallelism,
2569 dl 1.18 ForkJoinWorkerThreadFactory factory,
2570 jsr166 1.156 UncaughtExceptionHandler handler,
2571 dl 1.18 boolean asyncMode) {
2572 dl 1.300 this(parallelism, factory, handler, asyncMode,
2573 dl 1.307 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2574 dl 1.152 }
2575    
2576 dl 1.300 /**
2577     * Creates a {@code ForkJoinPool} with the given parameters.
2578     *
2579     * @param parallelism the parallelism level. For default value,
2580     * use {@link java.lang.Runtime#availableProcessors}.
2581     *
2582     * @param factory the factory for creating new threads. For
2583     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2584     *
2585     * @param handler the handler for internal worker threads that
2586     * terminate due to unrecoverable errors encountered while
2587     * executing tasks. For default value, use {@code null}.
2588     *
2589     * @param asyncMode if true, establishes local first-in-first-out
2590     * scheduling mode for forked tasks that are never joined. This
2591     * mode may be more appropriate than default locally stack-based
2592     * mode in applications in which worker threads only process
2593     * event-style asynchronous tasks. For default value, use {@code
2594     * false}.
2595     *
2596     * @param corePoolSize the number of threads to keep in the pool
2597     * (unless timed out after an elapsed keep-alive). Normally (and
2598     * by default) this is the same value as the parallelism level,
2599     * but may be set to a larger value to reduce dynamic overhead if
2600     * tasks regularly block. Using a smaller value (for example
2601     * {@code 0}) has the same effect as the default.
2602     *
2603     * @param maximumPoolSize the maximum number of threads allowed.
2604     * When the maximum is reached, attempts to replace blocked
2605     * threads fail. (However, because creation and termination of
2606     * different threads may overlap, and may be managed by the given
2607 dl 1.307 * thread factory, this value may be transiently exceeded.) To
2608     * arrange the same value as is used by default for the common
2609 dl 1.319 * pool, use {@code 256} plus the {@code parallelism} level. (By
2610     * default, the common pool allows a maximum of 256 spare
2611     * threads.) Using a value (for example {@code
2612     * Integer.MAX_VALUE}) larger than the implementation's total
2613     * thread limit has the same effect as using this limit (which is
2614     * the default).
2615 dl 1.300 *
2616     * @param minimumRunnable the minimum allowed number of core
2617     * threads not blocked by a join or {@link ManagedBlocker}. To
2618     * ensure progress, when too few unblocked threads exist and
2619     * unexecuted tasks may exist, new threads are constructed, up to
2620     * the given maximumPoolSize. For the default value, use {@code
2621     * 1}, that ensures liveness. A larger value might improve
2622     * throughput in the presence of blocked activities, but might
2623     * not, due to increased overhead. A value of zero may be
2624     * acceptable when submitted tasks cannot have dependencies
2625     * requiring additional threads.
2626     *
2627 jsr166 1.318 * @param saturate if non-null, a predicate invoked upon attempts
2628 dl 1.307 * to create more than the maximum total allowed threads. By
2629     * default, when a thread is about to block on a join or {@link
2630     * ManagedBlocker}, but cannot be replaced because the
2631     * maximumPoolSize would be exceeded, a {@link
2632     * RejectedExecutionException} is thrown. But if this predicate
2633     * returns {@code true}, then no exception is thrown, so the pool
2634     * continues to operate with fewer than the target number of
2635     * runnable threads, which might not ensure progress.
2636 dl 1.300 *
2637     * @param keepAliveTime the elapsed time since last use before
2638     * a thread is terminated (and then later replaced if needed).
2639     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2640     *
2641     * @param unit the time unit for the {@code keepAliveTime} argument
2642     *
2643     * @throws IllegalArgumentException if parallelism is less than or
2644     * equal to zero, or is greater than implementation limit,
2645     * or if maximumPoolSize is less than parallelism,
2646     * of if the keepAliveTime is less than or equal to zero.
2647     * @throws NullPointerException if the factory is null
2648     * @throws SecurityException if a security manager exists and
2649     * the caller is not permitted to modify threads
2650     * because it does not hold {@link
2651     * java.lang.RuntimePermission}{@code ("modifyThread")}
2652 jsr166 1.306 * @since 9
2653 dl 1.300 */
2654     public ForkJoinPool(int parallelism,
2655     ForkJoinWorkerThreadFactory factory,
2656     UncaughtExceptionHandler handler,
2657     boolean asyncMode,
2658     int corePoolSize,
2659     int maximumPoolSize,
2660     int minimumRunnable,
2661 dl 1.307 Predicate<? super ForkJoinPool> saturate,
2662 dl 1.300 long keepAliveTime,
2663     TimeUnit unit) {
2664 dl 1.355 checkPermission();
2665     int p = parallelism;
2666     if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2667 dl 1.152 throw new IllegalArgumentException();
2668 dl 1.355 if (factory == null || unit == null)
2669 dl 1.14 throw new NullPointerException();
2670 dl 1.405 this.parallelism = p;
2671 dl 1.300 this.factory = factory;
2672     this.ueh = handler;
2673 dl 1.307 this.saturate = saturate;
2674 dl 1.405 this.config = asyncMode ? FIFO : 0;
2675 dl 1.355 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2676 dl 1.405 int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2677     int maxSpares = Math.max(0, Math.min(maximumPoolSize - p, MAX_CAP));
2678     int minAvail = Math.max(0, Math.min(minimumRunnable, MAX_CAP));
2679     this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |
2680     ((long)corep << 32);
2681 dl 1.355 int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2682     this.registrationLock = new ReentrantLock();
2683     this.queues = new WorkQueue[size];
2684     String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2685 dl 1.405 String name = "ForkJoinPool-" + pid;
2686     this.workerNamePrefix = name + "-worker-";
2687     // this.container = SharedThreadContainer.create(name); // for loom
2688 jsr166 1.327 }
2689    
2690 dl 1.152 /**
2691 dl 1.300 * Constructor for common pool using parameters possibly
2692     * overridden by system properties
2693     */
2694     private ForkJoinPool(byte forCommonPoolOnly) {
2695 dl 1.405 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
2696     UncaughtExceptionHandler handler = null;
2697 dl 1.404 int maxSpares = DEFAULT_COMMON_MAX_SPARES;
2698 dl 1.405 int pc = 0, preset = 0; // nonzero if size set as property
2699 dl 1.300 try { // ignore exceptions in accessing/parsing properties
2700     String pp = System.getProperty
2701     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2702 dl 1.405 if (pp != null) {
2703     pc = Math.max(0, Integer.parseInt(pp));
2704     preset = PRESET_SIZE;
2705     }
2706     String ms = System.getProperty
2707 dl 1.404 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
2708 dl 1.405 if (ms != null)
2709     maxSpares = Math.max(0, Math.min(MAX_CAP, Integer.parseInt(ms)));
2710     String sf = System.getProperty
2711     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2712     String sh = System.getProperty
2713     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2714     if (sf != null || sh != null) {
2715     ClassLoader ldr = ClassLoader.getSystemClassLoader();
2716     if (sf != null)
2717     fac = (ForkJoinWorkerThreadFactory)
2718     ldr.loadClass(sf).getConstructor().newInstance();
2719     if (sh != null)
2720     handler = (UncaughtExceptionHandler)
2721     ldr.loadClass(sh).getConstructor().newInstance();
2722     }
2723 dl 1.300 } catch (Exception ignore) {
2724     }
2725 dl 1.405 if (preset == 0)
2726     pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2727     int p = Math.min(pc, MAX_CAP);
2728     int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
2729     this.parallelism = p;
2730     this.config = ISCOMMON | preset;
2731     this.bounds = (long)(1 | (maxSpares << SWIDTH));
2732     this.factory = fac;
2733 dl 1.18 this.ueh = handler;
2734 dl 1.355 this.keepAlive = DEFAULT_KEEPALIVE;
2735 dl 1.307 this.saturate = null;
2736 dl 1.355 this.workerNamePrefix = null;
2737 dl 1.405 this.registrationLock = new ReentrantLock();
2738 dl 1.355 this.queues = new WorkQueue[size];
2739 dl 1.405 // this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); // for loom
2740 dl 1.404 }
2741    
2742     /**
2743 dl 1.128 * Returns the common pool instance. This pool is statically
2744 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2745     * #shutdown} or {@link #shutdownNow}. However this pool and any
2746     * ongoing processing are automatically terminated upon program
2747     * {@link System#exit}. Any program that relies on asynchronous
2748     * task processing to complete before program termination should
2749 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2750     * before exit.
2751 dl 1.100 *
2752     * @return the common pool instance
2753 jsr166 1.138 * @since 1.8
2754 dl 1.100 */
2755     public static ForkJoinPool commonPool() {
2756 dl 1.405 // assert common != null : "static init error";
2757     return common;
2758 dl 1.100 }
2759    
2760 jsr166 1.1 // Execution methods
2761    
2762     /**
2763     * Performs the given task, returning its result upon completion.
2764 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2765     * it is rethrown as the outcome of this invocation. Rethrown
2766     * exceptions behave in the same way as regular exceptions, but,
2767     * when possible, contain stack traces (as displayed for example
2768     * using {@code ex.printStackTrace()}) of both the current thread
2769     * as well as the thread actually encountering the exception;
2770     * minimally only the latter.
2771 jsr166 1.1 *
2772     * @param task the task
2773 jsr166 1.191 * @param <T> the type of the task's result
2774 jsr166 1.1 * @return the task's result
2775 jsr166 1.11 * @throws NullPointerException if the task is null
2776     * @throws RejectedExecutionException if the task cannot be
2777     * scheduled for execution
2778 jsr166 1.1 */
2779     public <T> T invoke(ForkJoinTask<T> task) {
2780 dl 1.405 poolSubmit(true, task);
2781     return task.join();
2782 jsr166 1.1 }
2783    
2784     /**
2785     * Arranges for (asynchronous) execution of the given task.
2786     *
2787     * @param task the task
2788 jsr166 1.11 * @throws NullPointerException if the task is null
2789     * @throws RejectedExecutionException if the task cannot be
2790     * scheduled for execution
2791 jsr166 1.1 */
2792 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2793 dl 1.405 poolSubmit(true, task);
2794 jsr166 1.1 }
2795    
2796     // AbstractExecutorService methods
2797    
2798 jsr166 1.11 /**
2799     * @throws NullPointerException if the task is null
2800     * @throws RejectedExecutionException if the task cannot be
2801     * scheduled for execution
2802     */
2803 dl 1.355 @Override
2804     @SuppressWarnings("unchecked")
2805 jsr166 1.1 public void execute(Runnable task) {
2806 dl 1.405 poolSubmit(true, (task instanceof ForkJoinTask<?>)
2807     ? (ForkJoinTask<Void>) task // avoid re-wrap
2808     : new ForkJoinTask.RunnableExecuteAction(task));
2809 jsr166 1.1 }
2810    
2811 jsr166 1.11 /**
2812 dl 1.18 * Submits a ForkJoinTask for execution.
2813     *
2814     * @param task the task to submit
2815 jsr166 1.191 * @param <T> the type of the task's result
2816 dl 1.18 * @return the task
2817     * @throws NullPointerException if the task is null
2818     * @throws RejectedExecutionException if the task cannot be
2819     * scheduled for execution
2820     */
2821     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2822 dl 1.405 return poolSubmit(true, task);
2823 dl 1.18 }
2824    
2825     /**
2826 jsr166 1.11 * @throws NullPointerException if the task is null
2827     * @throws RejectedExecutionException if the task cannot be
2828     * scheduled for execution
2829     */
2830 dl 1.355 @Override
2831 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2832 dl 1.405 return poolSubmit(true, new ForkJoinTask.AdaptedCallable<T>(task));
2833 jsr166 1.1 }
2834    
2835 jsr166 1.11 /**
2836     * @throws NullPointerException if the task is null
2837     * @throws RejectedExecutionException if the task cannot be
2838     * scheduled for execution
2839     */
2840 dl 1.355 @Override
2841 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2842 dl 1.405 return poolSubmit(true, new ForkJoinTask.AdaptedRunnable<T>(task, result));
2843 jsr166 1.1 }
2844    
2845 jsr166 1.11 /**
2846     * @throws NullPointerException if the task is null
2847     * @throws RejectedExecutionException if the task cannot be
2848     * scheduled for execution
2849     */
2850 dl 1.355 @Override
2851 jsr166 1.335 @SuppressWarnings("unchecked")
2852 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2853 dl 1.405 return poolSubmit(true, (task instanceof ForkJoinTask<?>)
2854     ? (ForkJoinTask<Void>) task // avoid re-wrap
2855     : new ForkJoinTask.AdaptedRunnableAction(task));
2856 jsr166 1.1 }
2857    
2858 dl 1.405 // Added mainly for possible use in Loom
2859    
2860 dl 1.404 /**
2861     * Submits the given task without guaranteeing that it will
2862 dl 1.405 * eventually execute in the absence of available active threads.
2863     * In some contexts, this method may reduce contention and
2864     * overhead by relying on context-specific knowledge that existing
2865     * threads (possibly including the calling thread if operating in
2866     * this pool) will eventually be available to execute the task.
2867 dl 1.404 *
2868     * @param task the task
2869     * @param <T> the type of the task's result
2870     * @return the task
2871 dl 1.406 * @since 19
2872 dl 1.404 */
2873 dl 1.405 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
2874     return poolSubmit(false, task);
2875 dl 1.404 }
2876 dl 1.405
2877 dl 1.404 /**
2878 dl 1.405 * Changes the target parallelism of this pool, controlling the
2879     * future creation, use, and termination of worker threads.
2880     * Applications include contexts in which the number of available
2881     * processors changes over time.
2882     *
2883     * @param size the target parallelism level
2884     * @return the previous parallelism level.
2885     * @throws IllegalArgumentException if size is less than 1 or
2886     * greater than the maximum supported by this
2887     * pool (currently 32767).
2888     * @throws IllegalStateException if this is the{@link #commonPool()} and
2889     * parallelism level was set by System property
2890     * {@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}.
2891     * @throws SecurityException if a security manager exists and
2892     * the caller is not permitted to modify threads
2893     * because it does not hold {@link
2894     * java.lang.RuntimePermission}{@code ("modifyThread")}
2895 dl 1.406 * @since 19
2896 dl 1.404 */
2897 dl 1.405 public int setParallelism(int size) {
2898     if (size < 1 || size > MAX_CAP)
2899     throw new IllegalArgumentException();
2900     if ((config & PRESET_SIZE) != 0)
2901     throw new IllegalStateException("Cannot override System property");
2902     checkPermission();
2903     return getAndSetParallelism(size);
2904 dl 1.404 }
2905    
2906 jsr166 1.1 /**
2907 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2908     * @throws RejectedExecutionException {@inheritDoc}
2909     */
2910 dl 1.355 @Override
2911 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2912 dl 1.366 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2913     try {
2914     for (Callable<T> t : tasks) {
2915 dl 1.367 ForkJoinTask<T> f =
2916     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2917 dl 1.366 futures.add(f);
2918 dl 1.405 poolSubmit(true, f);
2919 dl 1.366 }
2920     for (int i = futures.size() - 1; i >= 0; --i)
2921 dl 1.405 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2922 dl 1.366 return futures;
2923     } catch (Throwable t) {
2924     for (Future<T> e : futures)
2925     ForkJoinTask.cancelIgnoringExceptions(e);
2926     throw t;
2927     }
2928 dl 1.355 }
2929    
2930     @Override
2931     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2932     long timeout, TimeUnit unit)
2933     throws InterruptedException {
2934 dl 1.366 long nanos = unit.toNanos(timeout);
2935     ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2936     try {
2937     for (Callable<T> t : tasks) {
2938 dl 1.367 ForkJoinTask<T> f =
2939     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2940 dl 1.366 futures.add(f);
2941 dl 1.405 poolSubmit(true, f);
2942 dl 1.366 }
2943     long startTime = System.nanoTime(), ns = nanos;
2944     boolean timedOut = (ns < 0L);
2945     for (int i = futures.size() - 1; i >= 0; --i) {
2946 dl 1.405 ForkJoinTask<T> f = (ForkJoinTask<T>)futures.get(i);
2947 dl 1.366 if (!f.isDone()) {
2948 dl 1.405 if (!timedOut)
2949     timedOut = !f.quietlyJoin(ns, TimeUnit.NANOSECONDS);
2950 dl 1.366 if (timedOut)
2951     ForkJoinTask.cancelIgnoringExceptions(f);
2952 dl 1.405 else
2953     ns = nanos - (System.nanoTime() - startTime);
2954 dl 1.366 }
2955 dl 1.355 }
2956 dl 1.366 return futures;
2957     } catch (Throwable t) {
2958     for (Future<T> e : futures)
2959     ForkJoinTask.cancelIgnoringExceptions(e);
2960     throw t;
2961 dl 1.355 }
2962 jsr166 1.1 }
2963    
2964 dl 1.367 // Task to hold results from InvokeAnyTasks
2965     static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2966     private static final long serialVersionUID = 2838392045355241008L;
2967     @SuppressWarnings("serial") // Conditionally serializable
2968     volatile E result;
2969 dl 1.391 final AtomicInteger count; // in case all throw
2970 jsr166 1.402 @SuppressWarnings("serial")
2971 dl 1.391 final ForkJoinPool pool; // to check shutdown while collecting
2972     InvokeAnyRoot(int n, ForkJoinPool p) {
2973     pool = p;
2974     count = new AtomicInteger(n);
2975     }
2976 dl 1.367 final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2977 jsr166 1.384 Throwable ex = null;
2978 dl 1.394 boolean failed;
2979     if (c == null || Thread.interrupted() ||
2980 dl 1.405 (pool != null && pool.runState < 0))
2981 dl 1.394 failed = true;
2982     else if (isDone())
2983     failed = false;
2984     else {
2985 dl 1.390 try {
2986     complete(c.call());
2987 dl 1.394 failed = false;
2988 dl 1.390 } catch (Throwable tx) {
2989     ex = tx;
2990 jsr166 1.384 failed = true;
2991     }
2992     }
2993 dl 1.405 if ((pool != null && pool.runState < 0) ||
2994 dl 1.391 (failed && count.getAndDecrement() <= 1))
2995 jsr166 1.384 trySetThrown(ex != null ? ex : new CancellationException());
2996 dl 1.367 }
2997     public final boolean exec() { return false; } // never forked
2998     public final E getRawResult() { return result; }
2999     public final void setRawResult(E v) { result = v; }
3000     }
3001    
3002     // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
3003     static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
3004     private static final long serialVersionUID = 2838392045355241008L;
3005     final InvokeAnyRoot<E> root;
3006     @SuppressWarnings("serial") // Conditionally serializable
3007     final Callable<E> callable;
3008     transient volatile Thread runner;
3009     InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) {
3010     this.root = root;
3011     this.callable = callable;
3012     }
3013     public final boolean exec() {
3014     Thread.interrupted();
3015     runner = Thread.currentThread();
3016     root.tryComplete(callable);
3017     runner = null;
3018     Thread.interrupted();
3019     return true;
3020     }
3021     public final boolean cancel(boolean mayInterruptIfRunning) {
3022     Thread t;
3023     boolean stat = super.cancel(false);
3024     if (mayInterruptIfRunning && (t = runner) != null) {
3025     try {
3026     t.interrupt();
3027     } catch (Throwable ignore) {
3028     }
3029     }
3030     return stat;
3031     }
3032     public final void setRawResult(E v) {} // unused
3033     public final E getRawResult() { return null; }
3034     }
3035    
3036     @Override
3037     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
3038     throws InterruptedException, ExecutionException {
3039     int n = tasks.size();
3040     if (n <= 0)
3041     throw new IllegalArgumentException();
3042 dl 1.390 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
3043 dl 1.367 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
3044     try {
3045 dl 1.390 for (Callable<T> c : tasks) {
3046     if (c == null)
3047     throw new NullPointerException();
3048     InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
3049     fs.add(f);
3050 dl 1.405 poolSubmit(true, f);
3051 dl 1.390 if (root.isDone())
3052     break;
3053     }
3054 dl 1.405 return root.get();
3055 dl 1.367 } finally {
3056     for (InvokeAnyTask<T> f : fs)
3057 dl 1.369 ForkJoinTask.cancelIgnoringExceptions(f);
3058 dl 1.367 }
3059     }
3060    
3061     @Override
3062     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
3063     long timeout, TimeUnit unit)
3064     throws InterruptedException, ExecutionException, TimeoutException {
3065     long nanos = unit.toNanos(timeout);
3066     int n = tasks.size();
3067     if (n <= 0)
3068     throw new IllegalArgumentException();
3069 dl 1.390 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
3070 dl 1.367 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
3071     try {
3072 dl 1.390 for (Callable<T> c : tasks) {
3073     if (c == null)
3074     throw new NullPointerException();
3075     InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
3076     fs.add(f);
3077 dl 1.405 poolSubmit(true, f);
3078 dl 1.390 if (root.isDone())
3079     break;
3080     }
3081 dl 1.405 return root.get(nanos, TimeUnit.NANOSECONDS);
3082 dl 1.367 } finally {
3083     for (InvokeAnyTask<T> f : fs)
3084 dl 1.369 ForkJoinTask.cancelIgnoringExceptions(f);
3085 dl 1.367 }
3086     }
3087    
3088 jsr166 1.1 /**
3089     * Returns the factory used for constructing new workers.
3090     *
3091     * @return the factory used for constructing new workers
3092     */
3093     public ForkJoinWorkerThreadFactory getFactory() {
3094     return factory;
3095     }
3096    
3097     /**
3098     * Returns the handler for internal worker threads that terminate
3099     * due to unrecoverable errors encountered while executing tasks.
3100     *
3101 jsr166 1.4 * @return the handler, or {@code null} if none
3102 jsr166 1.1 */
3103 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
3104 dl 1.14 return ueh;
3105 jsr166 1.1 }
3106    
3107     /**
3108 jsr166 1.9 * Returns the targeted parallelism level of this pool.
3109 jsr166 1.1 *
3110 jsr166 1.9 * @return the targeted parallelism level of this pool
3111 jsr166 1.1 */
3112     public int getParallelism() {
3113 dl 1.405 return Math.max(getParallelismOpaque(), 1);
3114 jsr166 1.1 }
3115    
3116     /**
3117 dl 1.100 * Returns the targeted parallelism level of the common pool.
3118     *
3119     * @return the targeted parallelism level of the common pool
3120 jsr166 1.138 * @since 1.8
3121 dl 1.100 */
3122     public static int getCommonPoolParallelism() {
3123 dl 1.405 return common.getParallelism();
3124 dl 1.100 }
3125    
3126     /**
3127 jsr166 1.1 * Returns the number of worker threads that have started but not
3128 jsr166 1.34 * yet terminated. The result returned by this method may differ
3129 jsr166 1.4 * from {@link #getParallelism} when threads are created to
3130 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
3131     *
3132     * @return the number of worker threads
3133     */
3134     public int getPoolSize() {
3135 dl 1.405 return (short)(ctl >>> TC_SHIFT);
3136 jsr166 1.1 }
3137    
3138     /**
3139 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
3140 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
3141     *
3142 jsr166 1.4 * @return {@code true} if this pool uses async mode
3143 jsr166 1.1 */
3144     public boolean getAsyncMode() {
3145 dl 1.405 return (config & FIFO) != 0;
3146 jsr166 1.1 }
3147    
3148     /**
3149     * Returns an estimate of the number of worker threads that are
3150     * not blocked waiting to join tasks or for other managed
3151 dl 1.14 * synchronization. This method may overestimate the
3152     * number of running threads.
3153 jsr166 1.1 *
3154     * @return the number of worker threads
3155     */
3156     public int getRunningThreadCount() {
3157 dl 1.355 WorkQueue[] qs; WorkQueue q;
3158 jsr166 1.344 int rc = 0;
3159 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3160 dl 1.355 for (int i = 1; i < qs.length; i += 2) {
3161     if ((q = qs[i]) != null && q.isApparentlyUnblocked())
3162 dl 1.78 ++rc;
3163     }
3164     }
3165     return rc;
3166 jsr166 1.1 }
3167    
3168     /**
3169     * Returns an estimate of the number of threads that are currently
3170     * stealing or executing tasks. This method may overestimate the
3171     * number of active threads.
3172     *
3173     * @return the number of active threads
3174     */
3175     public int getActiveThreadCount() {
3176 dl 1.405 return Math.max((short)(ctl >>> RC_SHIFT), 0);
3177 jsr166 1.1 }
3178    
3179     /**
3180 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
3181     * An idle worker is one that cannot obtain a task to execute
3182     * because none are available to steal from other threads, and
3183     * there are no pending submissions to the pool. This method is
3184     * conservative; it might not return {@code true} immediately upon
3185     * idleness of all threads, but will eventually become true if
3186     * threads remain inactive.
3187 jsr166 1.1 *
3188 jsr166 1.4 * @return {@code true} if all threads are currently idle
3189 jsr166 1.1 */
3190     public boolean isQuiescent() {
3191 dl 1.366 return canStop();
3192 jsr166 1.1 }
3193    
3194     /**
3195 dl 1.354 * Returns an estimate of the total number of completed tasks that
3196     * were executed by a thread other than their submitter. The
3197     * reported value underestimates the actual total number of steals
3198     * when the pool is not quiescent. This value may be useful for
3199     * monitoring and tuning fork/join programs: in general, steal
3200     * counts should be high enough to keep threads busy, but low
3201     * enough to avoid overhead and contention across threads.
3202 jsr166 1.1 *
3203     * @return the number of steals
3204     */
3205     public long getStealCount() {
3206 dl 1.300 long count = stealCount;
3207 dl 1.355 WorkQueue[] qs; WorkQueue q;
3208     if ((qs = queues) != null) {
3209     for (int i = 1; i < qs.length; i += 2) {
3210     if ((q = qs[i]) != null)
3211 dl 1.405 count += (long)q.nsteals & 0xffffffffL;
3212 dl 1.78 }
3213     }
3214     return count;
3215 jsr166 1.1 }
3216    
3217     /**
3218     * Returns an estimate of the total number of tasks currently held
3219     * in queues by worker threads (but not including tasks submitted
3220     * to the pool that have not begun executing). This value is only
3221     * an approximation, obtained by iterating across all threads in
3222     * the pool. This method may be useful for tuning task
3223     * granularities.
3224     *
3225     * @return the number of queued tasks
3226     */
3227     public long getQueuedTaskCount() {
3228 dl 1.355 WorkQueue[] qs; WorkQueue q;
3229 dl 1.345 int count = 0;
3230 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3231 dl 1.355 for (int i = 1; i < qs.length; i += 2) {
3232     if ((q = qs[i]) != null)
3233     count += q.queueSize();
3234 dl 1.78 }
3235 dl 1.52 }
3236 jsr166 1.1 return count;
3237     }
3238    
3239     /**
3240 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
3241 dl 1.55 * pool that have not yet begun executing. This method may take
3242 dl 1.52 * time proportional to the number of submissions.
3243 jsr166 1.1 *
3244     * @return the number of queued submissions
3245     */
3246     public int getQueuedSubmissionCount() {
3247 dl 1.355 WorkQueue[] qs; WorkQueue q;
3248 jsr166 1.344 int count = 0;
3249 dl 1.405 if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
3250 dl 1.355 for (int i = 0; i < qs.length; i += 2) {
3251     if ((q = qs[i]) != null)
3252     count += q.queueSize();
3253 dl 1.78 }
3254     }
3255     return count;
3256 jsr166 1.1 }
3257    
3258     /**
3259 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
3260     * pool that have not yet begun executing.
3261 jsr166 1.1 *
3262     * @return {@code true} if there are any queued submissions
3263     */
3264     public boolean hasQueuedSubmissions() {
3265 dl 1.405 return (runState & TERMINATED) == 0 && hasSubmissions();
3266 jsr166 1.1 }
3267    
3268     /**
3269     * Removes and returns the next unexecuted submission if one is
3270     * available. This method may be useful in extensions to this
3271     * class that re-assign work in systems with multiple pools.
3272     *
3273 jsr166 1.4 * @return the next submission, or {@code null} if none
3274 jsr166 1.1 */
3275     protected ForkJoinTask<?> pollSubmission() {
3276 dl 1.300 return pollScan(true);
3277 jsr166 1.1 }
3278    
3279     /**
3280     * Removes all available unexecuted submitted and forked tasks
3281     * from scheduling queues and adds them to the given collection,
3282     * without altering their execution status. These may include
3283 jsr166 1.8 * artificially generated or wrapped tasks. This method is
3284     * designed to be invoked only when the pool is known to be
3285 jsr166 1.1 * quiescent. Invocations at other times may not remove all
3286     * tasks. A failure encountered while attempting to add elements
3287     * to collection {@code c} may result in elements being in
3288     * neither, either or both collections when the associated
3289     * exception is thrown. The behavior of this operation is
3290     * undefined if the specified collection is modified while the
3291     * operation is in progress.
3292     *
3293     * @param c the collection to transfer elements into
3294     * @return the number of elements transferred
3295     */
3296 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
3297 jsr166 1.344 int count = 0;
3298 dl 1.355 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
3299     c.add(t);
3300     ++count;
3301 dl 1.52 }
3302 dl 1.18 return count;
3303     }
3304    
3305     /**
3306 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
3307     * including indications of run state, parallelism level, and
3308     * worker and task counts.
3309     *
3310     * @return a string identifying this pool, as well as its state
3311     */
3312     public String toString() {
3313 dl 1.355 // Use a single pass through queues to collect counts
3314 dl 1.345 long st = stealCount;
3315 dl 1.355 long qt = 0L, ss = 0L; int rc = 0;
3316     WorkQueue[] qs; WorkQueue q;
3317     if ((qs = queues) != null) {
3318     for (int i = 0; i < qs.length; ++i) {
3319     if ((q = qs[i]) != null) {
3320     int size = q.queueSize();
3321 dl 1.86 if ((i & 1) == 0)
3322 dl 1.355 ss += size;
3323 dl 1.86 else {
3324     qt += size;
3325 dl 1.355 st += (long)q.nsteals & 0xffffffffL;
3326     if (q.isApparentlyUnblocked())
3327 dl 1.86 ++rc;
3328     }
3329     }
3330     }
3331     }
3332 dl 1.300
3333 dl 1.405 int pc = parallelism;
3334     long c = ctl;
3335     int tc = (short)(c >>> TC_SHIFT);
3336     int ac = (short)(c >>> RC_SHIFT);
3337 dl 1.78 if (ac < 0) // ignore transient negative
3338     ac = 0;
3339 dl 1.405 int rs = runState;
3340     String level = ((rs & TERMINATED) != 0 ? "Terminated" :
3341     (rs & STOP) != 0 ? "Terminating" :
3342     (rs & SHUTDOWN) != 0 ? "Shutting down" :
3343 dl 1.200 "Running");
3344 jsr166 1.1 return super.toString() +
3345 dl 1.52 "[" + level +
3346 dl 1.14 ", parallelism = " + pc +
3347     ", size = " + tc +
3348     ", active = " + ac +
3349     ", running = " + rc +
3350 jsr166 1.1 ", steals = " + st +
3351     ", tasks = " + qt +
3352 dl 1.355 ", submissions = " + ss +
3353 jsr166 1.1 "]";
3354     }
3355    
3356     /**
3357 dl 1.100 * Possibly initiates an orderly shutdown in which previously
3358     * submitted tasks are executed, but no new tasks will be
3359     * accepted. Invocation has no effect on execution state if this
3360 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
3361 dl 1.100 * already shut down. Tasks that are in the process of being
3362     * submitted concurrently during the course of this method may or
3363     * may not be rejected.
3364 jsr166 1.1 *
3365     * @throws SecurityException if a security manager exists and
3366     * the caller is not permitted to modify threads
3367     * because it does not hold {@link
3368     * java.lang.RuntimePermission}{@code ("modifyThread")}
3369     */
3370     public void shutdown() {
3371     checkPermission();
3372 dl 1.404 tryTerminate(false, true);
3373 jsr166 1.1 }
3374    
3375     /**
3376 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
3377     * all subsequently submitted tasks. Invocation has no effect on
3378 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
3379 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
3380     * are in the process of being submitted or executed concurrently
3381     * during the course of this method may or may not be
3382     * rejected. This method cancels both existing and unexecuted
3383     * tasks, in order to permit termination in the presence of task
3384     * dependencies. So the method always returns an empty list
3385     * (unlike the case for some other Executors).
3386 jsr166 1.1 *
3387     * @return an empty list
3388     * @throws SecurityException if a security manager exists and
3389     * the caller is not permitted to modify threads
3390     * because it does not hold {@link
3391     * java.lang.RuntimePermission}{@code ("modifyThread")}
3392     */
3393     public List<Runnable> shutdownNow() {
3394     checkPermission();
3395 dl 1.404 tryTerminate(true, true);
3396 jsr166 1.1 return Collections.emptyList();
3397     }
3398    
3399     /**
3400     * Returns {@code true} if all tasks have completed following shut down.
3401     *
3402     * @return {@code true} if all tasks have completed following shut down
3403     */
3404     public boolean isTerminated() {
3405 dl 1.405 return (runState & TERMINATED) != 0;
3406 jsr166 1.1 }
3407    
3408     /**
3409     * Returns {@code true} if the process of termination has
3410 jsr166 1.9 * commenced but not yet completed. This method may be useful for
3411     * debugging. A return of {@code true} reported a sufficient
3412     * period after shutdown may indicate that submitted tasks have
3413 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
3414 dl 1.49 * causing this executor not to properly terminate. (See the
3415     * advisory notes for class {@link ForkJoinTask} stating that
3416     * tasks should not normally entail blocking operations. But if
3417     * they do, they must abort them on interrupt.)
3418 jsr166 1.1 *
3419 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
3420 jsr166 1.1 */
3421     public boolean isTerminating() {
3422 dl 1.405 return (runState & (STOP | TERMINATED)) == STOP;
3423 jsr166 1.1 }
3424    
3425     /**
3426     * Returns {@code true} if this pool has been shut down.
3427     *
3428     * @return {@code true} if this pool has been shut down
3429     */
3430     public boolean isShutdown() {
3431 dl 1.405 return runState != 0;
3432 jsr166 1.9 }
3433    
3434     /**
3435 dl 1.105 * Blocks until all tasks have completed execution after a
3436     * shutdown request, or the timeout occurs, or the current thread
3437 dl 1.134 * is interrupted, whichever happens first. Because the {@link
3438     * #commonPool()} never terminates until program shutdown, when
3439     * applied to the common pool, this method is equivalent to {@link
3440 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3441 jsr166 1.1 *
3442     * @param timeout the maximum time to wait
3443     * @param unit the time unit of the timeout argument
3444     * @return {@code true} if this executor terminated and
3445     * {@code false} if the timeout elapsed before termination
3446     * @throws InterruptedException if interrupted while waiting
3447     */
3448     public boolean awaitTermination(long timeout, TimeUnit unit)
3449     throws InterruptedException {
3450 dl 1.405 ReentrantLock lock; Condition cond; boolean terminated;
3451 dl 1.355 long nanos = unit.toNanos(timeout);
3452 dl 1.405 if ((config & ISCOMMON) != 0) {
3453     if (helpQuiescePool(this, nanos, true) < 0)
3454 dl 1.366 throw new InterruptedException();
3455 dl 1.405 terminated = false;
3456 dl 1.134 }
3457 dl 1.405 else if (!(terminated = ((runState & TERMINATED) != 0))) {
3458     tryTerminate(false, false); // reduce transient blocking
3459     if ((lock = registrationLock) != null &&
3460     !(terminated = (((runState & TERMINATED) != 0)))) {
3461     lock.lock();
3462     try {
3463     if ((cond = termination) == null)
3464     termination = cond = lock.newCondition();
3465     while (!(terminated = ((runState & TERMINATED) != 0)) &&
3466     nanos > 0L)
3467     nanos = cond.awaitNanos(nanos);
3468     } finally {
3469     lock.unlock();
3470     }
3471 dl 1.366 }
3472 dl 1.18 }
3473 dl 1.366 return terminated;
3474 jsr166 1.1 }
3475    
3476     /**
3477 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
3478     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3479     * waits and/or attempts to assist performing tasks until this
3480     * pool {@link #isQuiescent} or the indicated timeout elapses.
3481     *
3482     * @param timeout the maximum time to wait
3483     * @param unit the time unit of the timeout argument
3484     * @return {@code true} if quiescent; {@code false} if the
3485     * timeout elapsed.
3486     */
3487     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3488 dl 1.405 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
3489 dl 1.134 }
3490    
3491     /**
3492 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3493 jsr166 1.8 * in {@link ForkJoinPool}s.
3494     *
3495 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3496 jsr166 1.218 * {@link #isReleasable} must return {@code true} if blocking is
3497     * not necessary. Method {@link #block} blocks the current thread
3498 dl 1.19 * if necessary (perhaps internally invoking {@code isReleasable}
3499 dl 1.54 * before actually blocking). These actions are performed by any
3500 dl 1.355 * thread invoking {@link
3501     * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
3502     * methods in this API accommodate synchronizers that may, but
3503     * don't usually, block for long periods. Similarly, they allow
3504     * more efficient internal handling of cases in which additional
3505     * workers may be, but usually are not, needed to ensure
3506     * sufficient parallelism. Toward this end, implementations of
3507     * method {@code isReleasable} must be amenable to repeated
3508     * invocation. Neither method is invoked after a prior invocation
3509     * of {@code isReleasable} or {@code block} returns {@code true}.
3510 jsr166 1.1 *
3511     * <p>For example, here is a ManagedBlocker based on a
3512     * ReentrantLock:
3513 jsr166 1.239 * <pre> {@code
3514 jsr166 1.1 * class ManagedLocker implements ManagedBlocker {
3515     * final ReentrantLock lock;
3516     * boolean hasLock = false;
3517     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3518     * public boolean block() {
3519     * if (!hasLock)
3520     * lock.lock();
3521     * return true;
3522     * }
3523     * public boolean isReleasable() {
3524     * return hasLock || (hasLock = lock.tryLock());
3525     * }
3526     * }}</pre>
3527 dl 1.19 *
3528     * <p>Here is a class that possibly blocks waiting for an
3529     * item on a given queue:
3530 jsr166 1.239 * <pre> {@code
3531 dl 1.19 * class QueueTaker<E> implements ManagedBlocker {
3532     * final BlockingQueue<E> queue;
3533     * volatile E item = null;
3534     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3535     * public boolean block() throws InterruptedException {
3536     * if (item == null)
3537 dl 1.23 * item = queue.take();
3538 dl 1.19 * return true;
3539     * }
3540     * public boolean isReleasable() {
3541 dl 1.23 * return item != null || (item = queue.poll()) != null;
3542 dl 1.19 * }
3543     * public E getItem() { // call after pool.managedBlock completes
3544     * return item;
3545     * }
3546     * }}</pre>
3547 jsr166 1.1 */
3548     public static interface ManagedBlocker {
3549     /**
3550     * Possibly blocks the current thread, for example waiting for
3551     * a lock or condition.
3552     *
3553 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3554     * (i.e., if isReleasable would return true)
3555 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3556     * (the method is not required to do so, but is allowed to)
3557     */
3558     boolean block() throws InterruptedException;
3559    
3560     /**
3561 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3562 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3563 jsr166 1.1 */
3564     boolean isReleasable();
3565     }
3566    
3567     /**
3568 jsr166 1.217 * Runs the given possibly blocking task. When {@linkplain
3569     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3570     * method possibly arranges for a spare thread to be activated if
3571     * necessary to ensure sufficient parallelism while the current
3572     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3573 jsr166 1.1 *
3574 jsr166 1.217 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3575     * {@code blocker.block()} until either method returns {@code true}.
3576     * Every call to {@code blocker.block()} is preceded by a call to
3577     * {@code blocker.isReleasable()} that returned {@code false}.
3578     *
3579     * <p>If not running in a ForkJoinPool, this method is
3580 jsr166 1.8 * behaviorally equivalent to
3581 jsr166 1.239 * <pre> {@code
3582 jsr166 1.1 * while (!blocker.isReleasable())
3583     * if (blocker.block())
3584 jsr166 1.217 * break;}</pre>
3585 jsr166 1.8 *
3586 jsr166 1.217 * If running in a ForkJoinPool, the pool may first be expanded to
3587     * ensure sufficient parallelism available during the call to
3588     * {@code blocker.block()}.
3589 jsr166 1.1 *
3590 jsr166 1.217 * @param blocker the blocker task
3591     * @throws InterruptedException if {@code blocker.block()} did so
3592 jsr166 1.1 */
3593 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3594 jsr166 1.1 throws InterruptedException {
3595 dl 1.355 Thread t; ForkJoinPool p;
3596     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3597     (p = ((ForkJoinWorkerThread)t).pool) != null)
3598     p.compensatedBlock(blocker);
3599     else
3600     unmanagedBlock(blocker);
3601     }
3602    
3603     /** ManagedBlock for ForkJoinWorkerThreads */
3604     private void compensatedBlock(ManagedBlocker blocker)
3605     throws InterruptedException {
3606 dl 1.345 if (blocker == null) throw new NullPointerException();
3607 dl 1.355 for (;;) {
3608     int comp; boolean done;
3609     long c = ctl;
3610     if (blocker.isReleasable())
3611     break;
3612 dl 1.405 if ((comp = tryCompensate(c, false)) >= 0) {
3613 dl 1.355 long post = (comp == 0) ? 0L : RC_UNIT;
3614     try {
3615     done = blocker.block();
3616     } finally {
3617     getAndAddCtl(post);
3618     }
3619     if (done)
3620 dl 1.105 break;
3621 dl 1.78 }
3622 dl 1.18 }
3623 jsr166 1.1 }
3624    
3625 dl 1.355 /** ManagedBlock for external threads */
3626     private static void unmanagedBlock(ManagedBlocker blocker)
3627     throws InterruptedException {
3628     if (blocker == null) throw new NullPointerException();
3629     do {} while (!blocker.isReleasable() && !blocker.block());
3630 dl 1.310 }
3631    
3632 dl 1.355 // AbstractExecutorService.newTaskFor overrides rely on
3633     // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks
3634     // that also implement RunnableFuture.
3635 jsr166 1.1
3636 dl 1.355 @Override
3637 jsr166 1.1 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3638 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3639 jsr166 1.1 }
3640    
3641 dl 1.355 @Override
3642 jsr166 1.1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3643 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3644 jsr166 1.1 }
3645    
3646 dl 1.52 static {
3647 dl 1.405 U = Unsafe.getUnsafe();
3648     Class<ForkJoinPool> klass = ForkJoinPool.class;
3649 jsr166 1.3 try {
3650 dl 1.405 POOLIDS = U.staticFieldOffset(klass.getDeclaredField("poolIds"));
3651 dl 1.404 } catch (NoSuchFieldException e) {
3652 jsr166 1.347 throw new ExceptionInInitializerError(e);
3653 dl 1.52 }
3654 dl 1.405 CTL = U.objectFieldOffset(klass, "ctl");
3655     RUNSTATE = U.objectFieldOffset(klass, "runState");
3656     PARALLELISM = U.objectFieldOffset(klass, "parallelism");
3657     THREADIDS = U.objectFieldOffset(klass, "threadIds");
3658 dl 1.105
3659 dl 1.404 defaultForkJoinWorkerThreadFactory =
3660     new DefaultForkJoinWorkerThreadFactory();
3661 dl 1.405 @SuppressWarnings("removal")
3662     ForkJoinPool p = common = (System.getSecurityManager() == null) ?
3663     new ForkJoinPool((byte)0) :
3664     AccessController.doPrivileged(new PrivilegedAction<>() {
3665     public ForkJoinPool run() {
3666     return new ForkJoinPool((byte)0); }});
3667     Class<?> dep = LockSupport.class; // ensure loaded
3668 jsr166 1.3 }
3669 jsr166 1.1 }