ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.368
Committed: Mon Feb 3 12:47:52 2020 UTC (4 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.367: +17 -16 lines
Log Message:
fix invokeAny for parallelism 0

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.lang.invoke.MethodHandles;
11 import java.lang.invoke.VarHandle;
12 import java.security.AccessController;
13 import java.security.AccessControlContext;
14 import java.security.Permission;
15 import java.security.Permissions;
16 import java.security.PrivilegedAction;
17 import java.security.ProtectionDomain;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Iterator;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.function.Predicate;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.locks.LockSupport;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.locks.Condition;
29
30 /**
31 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
32 * A {@code ForkJoinPool} provides the entry point for submissions
33 * from non-{@code ForkJoinTask} clients, as well as management and
34 * monitoring operations.
35 *
36 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
37 * ExecutorService} mainly by virtue of employing
38 * <em>work-stealing</em>: all threads in the pool attempt to find and
39 * execute tasks submitted to the pool and/or created by other active
40 * tasks (eventually blocking waiting for work if none exist). This
41 * enables efficient processing when most tasks spawn other subtasks
42 * (as do most {@code ForkJoinTask}s), as well as when many small
43 * tasks are submitted to the pool from external clients. Especially
44 * when setting <em>asyncMode</em> to true in constructors, {@code
45 * ForkJoinPool}s may also be appropriate for use with event-style
46 * tasks that are never joined. All worker threads are initialized
47 * with {@link Thread#isDaemon} set {@code true}.
48 *
49 * <p>A static {@link #commonPool()} is available and appropriate for
50 * most applications. The common pool is used by any ForkJoinTask that
51 * is not explicitly submitted to a specified pool. Using the common
52 * pool normally reduces resource usage (its threads are slowly
53 * reclaimed during periods of non-use, and reinstated upon subsequent
54 * use).
55 *
56 * <p>For applications that require separate or custom pools, a {@code
57 * ForkJoinPool} may be constructed with a given target parallelism
58 * level; by default, equal to the number of available processors.
59 * The pool attempts to maintain enough active (or available) threads
60 * by dynamically adding, suspending, or resuming internal worker
61 * threads, even if some tasks are stalled waiting to join others.
62 * However, no such adjustments are guaranteed in the face of blocked
63 * I/O or other unmanaged synchronization. The nested {@link
64 * ManagedBlocker} interface enables extension of the kinds of
65 * synchronization accommodated. The default policies may be
66 * overridden using a constructor with parameters corresponding to
67 * those documented in class {@link ThreadPoolExecutor}.
68 *
69 * <p>In addition to execution and lifecycle control methods, this
70 * class provides status check methods (for example
71 * {@link #getStealCount}) that are intended to aid in developing,
72 * tuning, and monitoring fork/join applications. Also, method
73 * {@link #toString} returns indications of pool state in a
74 * convenient form for informal monitoring.
75 *
76 * <p>As is the case with other ExecutorServices, there are three
77 * main task execution methods summarized in the following table.
78 * These are designed to be used primarily by clients not already
79 * engaged in fork/join computations in the current pool. The main
80 * forms of these methods accept instances of {@code ForkJoinTask},
81 * but overloaded forms also allow mixed execution of plain {@code
82 * Runnable}- or {@code Callable}- based activities as well. However,
83 * tasks that are already executing in a pool should normally instead
84 * use the within-computation forms listed in the table unless using
85 * async event-style tasks that are not usually joined, in which case
86 * there is little difference among choice of methods.
87 *
88 * <table class="plain">
89 * <caption>Summary of task execution methods</caption>
90 * <tr>
91 * <td></td>
92 * <th scope="col"> Call from non-fork/join clients</th>
93 * <th scope="col"> Call from within fork/join computations</th>
94 * </tr>
95 * <tr>
96 * <th scope="row" style="text-align:left"> Arrange async execution</th>
97 * <td> {@link #execute(ForkJoinTask)}</td>
98 * <td> {@link ForkJoinTask#fork}</td>
99 * </tr>
100 * <tr>
101 * <th scope="row" style="text-align:left"> Await and obtain result</th>
102 * <td> {@link #invoke(ForkJoinTask)}</td>
103 * <td> {@link ForkJoinTask#invoke}</td>
104 * </tr>
105 * <tr>
106 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
107 * <td> {@link #submit(ForkJoinTask)}</td>
108 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
109 * </tr>
110 * </table>
111 *
112 * <p>The parameters used to construct the common pool may be controlled by
113 * setting the following {@linkplain System#getProperty system properties}:
114 * <ul>
115 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
116 * - the parallelism level, a non-negative integer
117 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
118 * - the class name of a {@link ForkJoinWorkerThreadFactory}.
119 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
120 * is used to load this class.
121 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
122 * - the class name of a {@link UncaughtExceptionHandler}.
123 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
124 * is used to load this class.
125 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
126 * - the maximum number of allowed extra threads to maintain target
127 * parallelism (default 256).
128 * </ul>
129 * If no thread factory is supplied via a system property, then the
130 * common pool uses a factory that uses the system class loader as the
131 * {@linkplain Thread#getContextClassLoader() thread context class loader}.
132 * In addition, if a {@link SecurityManager} is present, then
133 * the common pool uses a factory supplying threads that have no
134 * {@link Permissions} enabled.
135 *
136 * Upon any error in establishing these settings, default parameters
137 * are used. It is possible to disable or limit the use of threads in
138 * the common pool by setting the parallelism property to zero, and/or
139 * using a factory that may return {@code null}. However doing so may
140 * cause unjoined tasks to never be executed.
141 *
142 * <p><b>Implementation notes</b>: This implementation restricts the
143 * maximum number of running threads to 32767. Attempts to create
144 * pools with greater than the maximum number result in
145 * {@code IllegalArgumentException}.
146 *
147 * <p>This implementation rejects submitted tasks (that is, by throwing
148 * {@link RejectedExecutionException}) only when the pool is shut down
149 * or internal resources have been exhausted.
150 *
151 * @since 1.7
152 * @author Doug Lea
153 */
154 public class ForkJoinPool extends AbstractExecutorService {
155
156 /*
157 * Implementation Overview
158 *
159 * This class and its nested classes provide the main
160 * functionality and control for a set of worker threads:
161 * Submissions from non-FJ threads enter into submission queues.
162 * Workers take these tasks and typically split them into subtasks
163 * that may be stolen by other workers. Work-stealing based on
164 * randomized scans generally leads to better throughput than
165 * "work dealing" in which producers assign tasks to idle threads,
166 * in part because threads that have finished other tasks before
167 * the signalled thread wakes up (which can be a long time) can
168 * take the task instead. Preference rules give first priority to
169 * processing tasks from their own queues (LIFO or FIFO, depending
170 * on mode), then to randomized FIFO steals of tasks in other
171 * queues. This framework began as vehicle for supporting
172 * tree-structured parallelism using work-stealing. Over time,
173 * its scalability advantages led to extensions and changes to
174 * better support more diverse usage contexts. Because most
175 * internal methods and nested classes are interrelated, their
176 * main rationale and descriptions are presented here; individual
177 * methods and nested classes contain only brief comments about
178 * details.
179 *
180 * WorkQueues
181 * ==========
182 *
183 * Most operations occur within work-stealing queues (in nested
184 * class WorkQueue). These are special forms of Deques that
185 * support only three of the four possible end-operations -- push,
186 * pop, and poll (aka steal), under the further constraints that
187 * push and pop are called only from the owning thread (or, as
188 * extended here, under a lock), while poll may be called from
189 * other threads. (If you are unfamiliar with them, you probably
190 * want to read Herlihy and Shavit's book "The Art of
191 * Multiprocessor programming", chapter 16 describing these in
192 * more detail before proceeding.) The main work-stealing queue
193 * design is roughly similar to those in the papers "Dynamic
194 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
195 * (http://research.sun.com/scalable/pubs/index.html) and
196 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
197 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
198 * The main differences ultimately stem from GC requirements that
199 * we null out taken slots as soon as we can, to maintain as small
200 * a footprint as possible even in programs generating huge
201 * numbers of tasks. To accomplish this, we shift the CAS
202 * arbitrating pop vs poll (steal) from being on the indices
203 * ("base" and "top") to the slots themselves.
204 *
205 * Adding tasks then takes the form of a classic array push(task)
206 * in a circular buffer:
207 * q.array[q.top++ % length] = task;
208 *
209 * The actual code needs to null-check and size-check the array,
210 * uses masking, not mod, for indexing a power-of-two-sized array,
211 * enforces memory ordering, supports resizing, and possibly
212 * signals waiting workers to start scanning -- see below.
213 *
214 * The pop operation (always performed by owner) is of the form:
215 * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
216 * decrement top and return task;
217 * If this fails, the queue is empty.
218 *
219 * The poll operation by another stealer thread is, basically:
220 * if (CAS nonnull task at q.array[q.base % length] to null)
221 * increment base and return task;
222 *
223 * This may fail due to contention, and may be retried.
224 * Implementations must ensure a consistent snapshot of the base
225 * index and the task (by looping or trying elsewhere) before
226 * trying CAS. There isn't actually a method of this form,
227 * because failure due to inconsistency or contention is handled
228 * in different ways in different contexts, normally by first
229 * trying other queues. (For the most straightforward example, see
230 * method pollScan.) There are further variants for cases
231 * requiring inspection of elements before extracting them, so
232 * must interleave these with variants of this code. Also, a more
233 * efficient version (nextLocalTask) is used for polls by owners.
234 * It avoids some overhead because the queue cannot be growing
235 * during call.
236 *
237 * Memory ordering. See "Correct and Efficient Work-Stealing for
238 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
239 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
240 * analysis of memory ordering requirements in work-stealing
241 * algorithms similar to the one used here. Inserting and
242 * extracting tasks in array slots via volatile or atomic accesses
243 * or explicit fences provides primary synchronization.
244 *
245 * Operations on deque elements require reads and writes of both
246 * indices and slots. When possible, we allow these to occur in
247 * any order. Because the base and top indices (along with other
248 * pool or array fields accessed in many methods) only imprecisely
249 * guide where to extract from, we let accesses other than the
250 * element getAndSet/CAS/setVolatile appear in any order, using
251 * plain mode. But we must still preface some methods (mainly
252 * those that may be accessed externally) with an acquireFence to
253 * avoid unbounded staleness. This is equivalent to acting as if
254 * callers use an acquiring read of the reference to the pool or
255 * queue when invoking the method, even when they do not. We use
256 * explicit acquiring reads (getSlot) rather than plain array
257 * access when acquire mode is required but not otherwise ensured
258 * by context. To reduce stalls by other stealers, we encourage
259 * timely writes to the base index by immediately following
260 * updates with a write of a volatile field that must be updated
261 * anyway, or an Opaque-mode write if there is no such
262 * opportunity.
263 *
264 * Because indices and slot contents cannot always be consistent,
265 * the emptiness check base == top is only quiescently accurate
266 * (and so used where this suffices). Otherwise, it may err on the
267 * side of possibly making the queue appear nonempty when a push,
268 * pop, or poll have not fully committed, or making it appear
269 * empty when an update of top or base has not yet been seen.
270 * Method isEmpty() provides a more accurate test by checking both
271 * indices and slots. Similarly, the check in push for the queue
272 * array being full may trigger when not completely full, causing
273 * a resize earlier than required.
274 *
275 * Mainly because of these potential inconsistencies among slots
276 * vs indices, the poll operation, considered individually, is not
277 * wait-free. One thief cannot successfully continue until another
278 * in-progress one (or, if previously empty, a push) visibly
279 * completes. This can stall threads when required to consume
280 * from a given queue (which may spin). However, in the
281 * aggregate, we ensure probabilistic non-blockingness at least
282 * until checking quiescence (which is intrinsically blocking):
283 * If an attempted steal fails, a scanning thief chooses a
284 * different victim target to try next. So, in order for one thief
285 * to progress, it suffices for any in-progress poll or new push
286 * on any empty queue to complete. The worst cases occur when many
287 * threads are looking for tasks being produced by a stalled
288 * producer.
289 *
290 * This approach also enables support of a user mode in which
291 * local task processing is in FIFO, not LIFO order, simply by
292 * using poll rather than pop. This can be useful in
293 * message-passing frameworks in which tasks are never joined,
294 * although with increased contention among task producers and
295 * consumers.
296 *
297 * WorkQueues are also used in a similar way for tasks submitted
298 * to the pool. We cannot mix these tasks in the same queues used
299 * by workers. Instead, we randomly associate submission queues
300 * with submitting threads, using a form of hashing. The
301 * ThreadLocalRandom probe value serves as a hash code for
302 * choosing existing queues, and may be randomly repositioned upon
303 * contention with other submitters. In essence, submitters act
304 * like workers except that they are restricted to executing local
305 * tasks that they submitted (or when known, subtasks thereof).
306 * Insertion of tasks in shared mode requires a lock. We use only
307 * a simple spinlock (using field "source"), because submitters
308 * encountering a busy queue move to a different position to use
309 * or create other queues. They block only when registering new
310 * queues.
311 *
312 * Management
313 * ==========
314 *
315 * The main throughput advantages of work-stealing stem from
316 * decentralized control -- workers mostly take tasks from
317 * themselves or each other, at rates that can exceed a billion
318 * per second. Most non-atomic control is performed by some form
319 * of scanning across or within queues. The pool itself creates,
320 * activates (enables scanning for and running tasks),
321 * deactivates, blocks, and terminates threads, all with minimal
322 * central information. There are only a few properties that we
323 * can globally track or maintain, so we pack them into a small
324 * number of variables, often maintaining atomicity without
325 * blocking or locking. Nearly all essentially atomic control
326 * state is held in a few volatile variables that are by far most
327 * often read (not written) as status and consistency checks. We
328 * pack as much information into them as we can.
329 *
330 * Field "ctl" contains 64 bits holding information needed to
331 * atomically decide to add, enqueue (on an event queue), and
332 * dequeue and release workers. To enable this packing, we
333 * restrict maximum parallelism to (1<<15)-1 (which is far in
334 * excess of normal operating range) to allow ids, counts, and
335 * their negations (used for thresholding) to fit into 16bit
336 * subfields.
337 *
338 * Field "mode" holds configuration parameters as well as lifetime
339 * status, atomically and monotonically setting SHUTDOWN, STOP,
340 * and finally TERMINATED bits. It is updated only via bitwise
341 * atomics (getAndBitwiseOr).
342 *
343 * Array "queues" holds references to WorkQueues. It is updated
344 * (only during worker creation and termination) under the
345 * registrationLock, but is otherwise concurrently readable, and
346 * accessed directly (although always prefaced by acquireFences or
347 * other acquiring reads). To simplify index-based operations, the
348 * array size is always a power of two, and all readers must
349 * tolerate null slots. Worker queues are at odd indices. Worker
350 * ids masked with SMASK match their index. Shared (submission)
351 * queues are at even indices. Grouping them together in this way
352 * simplifies and speeds up task scanning.
353 *
354 * All worker thread creation is on-demand, triggered by task
355 * submissions, replacement of terminated workers, and/or
356 * compensation for blocked workers. However, all other support
357 * code is set up to work with other policies. To ensure that we
358 * do not hold on to worker or task references that would prevent
359 * GC, all accesses to workQueues are via indices into the
360 * queues array (which is one source of some of the messy code
361 * constructions here). In essence, the queues array serves as
362 * a weak reference mechanism. Thus for example the stack top
363 * subfield of ctl stores indices, not references.
364 *
365 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
366 * cannot let workers spin indefinitely scanning for tasks when
367 * none can be found immediately, and we cannot start/resume
368 * workers unless there appear to be tasks available. On the
369 * other hand, we must quickly prod them into action when new
370 * tasks are submitted or generated. These latencies are mainly a
371 * function of JVM park/unpark (and underlying OS) performance,
372 * which can be slow and variable. In many usages, ramp-up time
373 * is the main limiting factor in overall performance, which is
374 * compounded at program start-up by JIT compilation and
375 * allocation. On the other hand, throughput degrades when too
376 * many threads poll for too few tasks.
377 *
378 * The "ctl" field atomically maintains total and "released"
379 * worker counts, plus the head of the available worker queue
380 * (actually stack, represented by the lower 32bit subfield of
381 * ctl). Released workers are those known to be scanning for
382 * and/or running tasks. Unreleased ("available") workers are
383 * recorded in the ctl stack. These workers are made available for
384 * signalling by enqueuing in ctl (see method awaitWork). The
385 * "queue" is a form of Treiber stack. This is ideal for
386 * activating threads in most-recently used order, and improves
387 * performance and locality, outweighing the disadvantages of
388 * being prone to contention and inability to release a worker
389 * unless it is topmost on stack. The top stack state holds the
390 * value of the "phase" field of the worker: its index and status,
391 * plus a version counter that, in addition to the count subfields
392 * (also serving as version stamps) provide protection against
393 * Treiber stack ABA effects.
394 *
395 * Creating workers. To create a worker, we pre-increment counts
396 * (serving as a reservation), and attempt to construct a
397 * ForkJoinWorkerThread via its factory. On starting, the new
398 * thread first invokes registerWorker, where it constructs a
399 * WorkQueue and is assigned an index in the queues array
400 * (expanding the array if necessary). Upon any exception across
401 * these steps, or null return from factory, deregisterWorker
402 * adjusts counts and records accordingly. If a null return, the
403 * pool continues running with fewer than the target number
404 * workers. If exceptional, the exception is propagated, generally
405 * to some external caller.
406 *
407 * WorkQueue field "phase" is used by both workers and the pool to
408 * manage and track whether a worker is UNSIGNALLED (possibly
409 * blocked waiting for a signal). When a worker is enqueued its
410 * phase field is set negative. Note that phase field updates lag
411 * queue CAS releases; seeing a negative phase does not guarantee
412 * that the worker is available. When queued, the lower 16 bits of
413 * its phase must hold its pool index. So we place the index there
414 * upon initialization and never modify these bits.
415 *
416 * The ctl field also serves as the basis for memory
417 * synchronization surrounding activation. This uses a more
418 * efficient version of a Dekker-like rule that task producers and
419 * consumers sync with each other by both writing/CASing ctl (even
420 * if to its current value). However, rather than CASing ctl to
421 * its current value in the common case where no action is
422 * required, we reduce write contention by ensuring that
423 * signalWork invocations are prefaced with a full-volatile memory
424 * access (which is usually needed anyway).
425 *
426 * Signalling. Signals (in signalWork) cause new or reactivated
427 * workers to scan for tasks. Method signalWork and its callers
428 * try to approximate the unattainable goal of having the right
429 * number of workers activated for the tasks at hand, but must err
430 * on the side of too many workers vs too few to avoid stalls. If
431 * computations are purely tree structured, it suffices for every
432 * worker to activate another when it pushes a task into an empty
433 * queue, resulting in O(log(#threads)) steps to full activation.
434 * If instead, tasks come in serially from only a single producer,
435 * each worker taking its first (since the last quiescence) task
436 * from a queue should signal another if there are more tasks in
437 * that queue. This is equivalent to, but generally faster than,
438 * arranging the stealer take two tasks, re-pushing one on its own
439 * queue, and signalling (because its queue is empty), also
440 * resulting in logarithmic full activation time. Because we don't
441 * know about usage patterns (or most commonly, mixtures), we use
442 * both approaches. We approximate the second rule by arranging
443 * that workers in scan() do not repeat signals when repeatedly
444 * taking tasks from any given queue, by remembering the previous
445 * one. There are narrow windows in which both rules may apply,
446 * leading to duplicate or unnecessary signals. Despite such
447 * limitations, these rules usually avoid slowdowns that otherwise
448 * occur when too many workers contend to take too few tasks, or
449 * when producers waste most of their time resignalling. However,
450 * contention and overhead effects may still occur during ramp-up,
451 * ramp-down, and small computations involving only a few workers.
452 *
453 * Scanning. Method scan performs top-level scanning for (and
454 * execution of) tasks. Scans by different workers and/or at
455 * different times are unlikely to poll queues in the same
456 * order. Each scan traverses and tries to poll from each queue in
457 * a pseudorandom permutation order by starting at a random index,
458 * and using a constant cyclically exhaustive stride; restarting
459 * upon contention. (Non-top-level scans; for example in
460 * helpJoin, use simpler linear probes because they do not
461 * systematically contend with top-level scans.) The pseudorandom
462 * generator need not have high-quality statistical properties in
463 * the long term. We use Marsaglia XorShifts, seeded with the Weyl
464 * sequence from ThreadLocalRandom probes, which are cheap and
465 * suffice. Scans do not otherwise explicitly take into account
466 * core affinities, loads, cache localities, etc, However, they do
467 * exploit temporal locality (which usually approximates these) by
468 * preferring to re-poll from the same queue after a successful
469 * poll before trying others (see method topLevelExec). This
470 * reduces fairness, which is partially counteracted by using a
471 * one-shot form of poll (tryPoll) that may lose to other workers.
472 *
473 * Deactivation. Method scan returns a sentinel when no tasks are
474 * found, leading to deactivation (see awaitWork). The count
475 * fields in ctl allow accurate discovery of quiescent states
476 * (i.e., when all workers are idle) after deactivation. However,
477 * this may also race with new (external) submissions, so a
478 * recheck is also needed to determine quiescence. Upon apparently
479 * triggering quiescence, awaitWork re-scans and self-signals if
480 * it may have missed a signal. In other cases, a missed signal
481 * may transiently lower parallelism because deactivation does not
482 * necessarily mean that there is no more work, only that that
483 * there were no tasks not taken by other workers. But more
484 * signals are generated (see above) to eventually reactivate if
485 * needed.
486 *
487 * Trimming workers. To release resources after periods of lack of
488 * use, a worker starting to wait when the pool is quiescent will
489 * time out and terminate if the pool has remained quiescent for
490 * period given by field keepAlive.
491 *
492 * Shutdown and Termination. A call to shutdownNow invokes
493 * tryTerminate to atomically set a mode bit. The calling thread,
494 * as well as every other worker thereafter terminating, helps
495 * terminate others by cancelling their unprocessed tasks, and
496 * waking them up. Calls to non-abrupt shutdown() preface this by
497 * checking isQuiescent before triggering the "STOP" phase of
498 * termination.
499 *
500 * Joining Tasks
501 * =============
502 *
503 * Normally, the first option when joining a task that is not done
504 * is to try to unfork it from local queue and run it. Otherwise,
505 * any of several actions may be taken when one worker is waiting
506 * to join a task stolen (or always held) by another. Because we
507 * are multiplexing many tasks on to a pool of workers, we can't
508 * always just let them block (as in Thread.join). We also cannot
509 * just reassign the joiner's run-time stack with another and
510 * replace it later, which would be a form of "continuation", that
511 * even if possible is not necessarily a good idea since we may
512 * need both an unblocked task and its continuation to progress.
513 * Instead we combine two tactics:
514 *
515 * Helping: Arranging for the joiner to execute some task that it
516 * could be running if the steal had not occurred.
517 *
518 * Compensating: Unless there are already enough live threads,
519 * method tryCompensate() may create or re-activate a spare
520 * thread to compensate for blocked joiners until they unblock.
521 *
522 * A third form (implemented via tryRemove) amounts to helping a
523 * hypothetical compensator: If we can readily tell that a
524 * possible action of a compensator is to steal and execute the
525 * task being joined, the joining thread can do so directly,
526 * without the need for a compensation thread; although with a
527 * (rare) possibility of reduced parallelism because of a
528 * transient gap in the queue array.
529 *
530 * Other intermediate forms available for specific task types (for
531 * example helpAsyncBlocker) often avoid or postpone the need for
532 * blocking or compensation.
533 *
534 * The ManagedBlocker extension API can't use helping so relies
535 * only on compensation in method awaitBlocker.
536 *
537 * The algorithm in helpJoin entails a form of "linear helping".
538 * Each worker records (in field "source") the id of the queue
539 * from which it last stole a task. The scan in method helpJoin
540 * uses these markers to try to find a worker to help (i.e., steal
541 * back a task from and execute it) that could hasten completion
542 * of the actively joined task. Thus, the joiner executes a task
543 * that would be on its own local deque if the to-be-joined task
544 * had not been stolen. This is a conservative variant of the
545 * approach described in Wagner & Calder "Leapfrogging: a portable
546 * technique for implementing efficient futures" SIGPLAN Notices,
547 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
548 * mainly in that we only record queue ids, not full dependency
549 * links. This requires a linear scan of the queues array to
550 * locate stealers, but isolates cost to when it is needed, rather
551 * than adding to per-task overhead. Also, searches are limited to
552 * direct and at most two levels of indirect stealers, after which
553 * there are rapidly diminishing returns on increased overhead.
554 * Searches can fail to locate stealers when stalls delay
555 * recording sources. Further, even when accurately identified,
556 * stealers might not ever produce a task that the joiner can in
557 * turn help with. So, compensation is tried upon failure to find
558 * tasks to run.
559 *
560 * Joining CountedCompleters (see helpComplete) differs from (and
561 * is generally more efficient than) other cases because task
562 * eligibility is determined by checking completion chains rather
563 * than tracking stealers.
564 *
565 * Joining under timeouts (ForkJoinTask timed get) uses a
566 * constrained mixture of helping and compensating in part because
567 * pools (actually, only the common pool) may not have any
568 * available threads: If the pool is saturated (all available
569 * workers are busy), the caller tries to remove and otherwise
570 * help; else it blocks under compensation so that it may time out
571 * independently of any tasks.
572 *
573 * Compensation does not by default aim to keep exactly the target
574 * parallelism number of unblocked threads running at any given
575 * time. Some previous versions of this class employed immediate
576 * compensations for any blocked join. However, in practice, the
577 * vast majority of blockages are transient byproducts of GC and
578 * other JVM or OS activities that are made worse by replacement
579 * when they cause longer-term oversubscription. Rather than
580 * impose arbitrary policies, we allow users to override the
581 * default of only adding threads upon apparent starvation. The
582 * compensation mechanism may also be bounded. Bounds for the
583 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
584 * with programming errors and abuse before running out of
585 * resources to do so.
586 *
587 * Common Pool
588 * ===========
589 *
590 * The static common pool always exists after static
591 * initialization. Since it (or any other created pool) need
592 * never be used, we minimize initial construction overhead and
593 * footprint to the setup of about a dozen fields.
594 *
595 * When external threads submit to the common pool, they can
596 * perform subtask processing (see helpComplete and related
597 * methods) upon joins. This caller-helps policy makes it
598 * sensible to set common pool parallelism level to one (or more)
599 * less than the total number of available cores, or even zero for
600 * pure caller-runs. We do not need to record whether external
601 * submissions are to the common pool -- if not, external help
602 * methods return quickly. These submitters would otherwise be
603 * blocked waiting for completion, so the extra effort (with
604 * liberally sprinkled task status checks) in inapplicable cases
605 * amounts to an odd form of limited spin-wait before blocking in
606 * ForkJoinTask.join.
607 *
608 * As a more appropriate default in managed environments, unless
609 * overridden by system properties, we use workers of subclass
610 * InnocuousForkJoinWorkerThread when there is a SecurityManager
611 * present. These workers have no permissions set, do not belong
612 * to any user-defined ThreadGroup, and erase all ThreadLocals
613 * after executing any top-level task. The associated mechanics
614 * may be JVM-dependent and must access particular Thread class
615 * fields to achieve this effect.
616 *
617 * Memory placement
618 * ================
619 *
620 * Performance can be very sensitive to placement of instances of
621 * ForkJoinPool and WorkQueues and their queue arrays. To reduce
622 * false-sharing impact, the @Contended annotation isolates the
623 * ForkJoinPool.ctl field as well as the most heavily written
624 * WorkQueue fields. These mainly reduce cache traffic by scanners.
625 * WorkQueue arrays are presized large enough to avoid resizing
626 * (which transiently reduces throughput) in most tree-like
627 * computations, although not in some streaming usages. Initial
628 * sizes are not large enough to avoid secondary contention
629 * effects (especially for GC cardmarks) when queues are placed
630 * near each other in memory. This is common, but has different
631 * impact in different collectors and remains incompletely
632 * addressed.
633 *
634 * Style notes
635 * ===========
636 *
637 * Memory ordering relies mainly on atomic operations (CAS,
638 * getAndSet, getAndAdd) along with explicit fences. This can be
639 * awkward and ugly, but also reflects the need to control
640 * outcomes across the unusual cases that arise in very racy code
641 * with very few invariants. All fields are read into locals
642 * before use, and null-checked if they are references, even if
643 * they can never be null under current usages. Array accesses
644 * using masked indices include checks (that are always true) that
645 * the array length is non-zero to avoid compilers inserting more
646 * expensive traps. This is usually done in a "C"-like style of
647 * listing declarations at the heads of methods or blocks, and
648 * using inline assignments on first encounter. Nearly all
649 * explicit checks lead to bypass/return, not exception throws,
650 * because they may legitimately arise during shutdown.
651 *
652 * There is a lot of representation-level coupling among classes
653 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
654 * fields of WorkQueue maintain data structures managed by
655 * ForkJoinPool, so are directly accessed. There is little point
656 * trying to reduce this, since any associated future changes in
657 * representations will need to be accompanied by algorithmic
658 * changes anyway. Several methods intrinsically sprawl because
659 * they must accumulate sets of consistent reads of fields held in
660 * local variables. Some others are artificially broken up to
661 * reduce producer/consumer imbalances due to dynamic compilation.
662 * There are also other coding oddities (including several
663 * unnecessary-looking hoisted null checks) that help some methods
664 * perform reasonably even when interpreted (not compiled).
665 *
666 * The order of declarations in this file is (with a few exceptions):
667 * (1) Static utility functions
668 * (2) Nested (static) classes
669 * (3) Static fields
670 * (4) Fields, along with constants used when unpacking some of them
671 * (5) Internal control methods
672 * (6) Callbacks and other support for ForkJoinTask methods
673 * (7) Exported methods
674 * (8) Static block initializing statics in minimally dependent order
675 *
676 * Revision notes
677 * ==============
678 *
679 * The main sources of differences of January 2020 ForkJoin
680 * classes from previous version are:
681 *
682 * * ForkJoinTask now uses field "aux" to support blocking joins
683 * and/or record exceptions, replacing reliance on builtin
684 * monitors and side tables.
685 * * Scans probe slots (vs compare indices), along with related
686 * changes that reduce performance differences across most
687 * garbage collectors, and reduce contention.
688 * * Refactoring for better integration of special task types and
689 * other capabilities that had been incrementally tacked on. Plus
690 * many minor reworkings to improve consistency.
691 */
692
693 // Static utilities
694
695 /**
696 * If there is a security manager, makes sure caller has
697 * permission to modify threads.
698 */
699 private static void checkPermission() {
700 SecurityManager security = System.getSecurityManager();
701 if (security != null)
702 security.checkPermission(modifyThreadPermission);
703 }
704
705 static AccessControlContext contextWithPermissions(Permission ... perms) {
706 Permissions permissions = new Permissions();
707 for (Permission perm : perms)
708 permissions.add(perm);
709 return new AccessControlContext(
710 new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
711 }
712
713 // Nested classes
714
715 /**
716 * Factory for creating new {@link ForkJoinWorkerThread}s.
717 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
718 * for {@code ForkJoinWorkerThread} subclasses that extend base
719 * functionality or initialize threads with different contexts.
720 */
721 public static interface ForkJoinWorkerThreadFactory {
722 /**
723 * Returns a new worker thread operating in the given pool.
724 * Returning null or throwing an exception may result in tasks
725 * never being executed. If this method throws an exception,
726 * it is relayed to the caller of the method (for example
727 * {@code execute}) causing attempted thread creation. If this
728 * method returns null or throws an exception, it is not
729 * retried until the next attempted creation (for example
730 * another call to {@code execute}).
731 *
732 * @param pool the pool this thread works in
733 * @return the new worker thread, or {@code null} if the request
734 * to create a thread is rejected
735 * @throws NullPointerException if the pool is null
736 */
737 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
738 }
739
740 /**
741 * Default ForkJoinWorkerThreadFactory implementation; creates a
742 * new ForkJoinWorkerThread using the system class loader as the
743 * thread context class loader.
744 */
745 static final class DefaultForkJoinWorkerThreadFactory
746 implements ForkJoinWorkerThreadFactory {
747 // ACC for access to the factory
748 private static final AccessControlContext ACC = contextWithPermissions(
749 new RuntimePermission("getClassLoader"),
750 new RuntimePermission("setContextClassLoader"));
751
752 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
753 return AccessController.doPrivileged(
754 new PrivilegedAction<>() {
755 public ForkJoinWorkerThread run() {
756 return new ForkJoinWorkerThread(null, pool, true, false);
757 }},
758 ACC);
759 }
760 }
761
762 /**
763 * Factory for InnocuousForkJoinWorkerThread. Support requires
764 * that we break quite a lot of encapsulation (some via helper
765 * methods in ThreadLocalRandom) to access and set Thread fields.
766 */
767 static final class InnocuousForkJoinWorkerThreadFactory
768 implements ForkJoinWorkerThreadFactory {
769 // ACC for access to the factory
770 private static final AccessControlContext ACC = contextWithPermissions(
771 modifyThreadPermission,
772 new RuntimePermission("enableContextClassLoaderOverride"),
773 new RuntimePermission("modifyThreadGroup"),
774 new RuntimePermission("getClassLoader"),
775 new RuntimePermission("setContextClassLoader"));
776
777 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
778 return AccessController.doPrivileged(
779 new PrivilegedAction<>() {
780 public ForkJoinWorkerThread run() {
781 return new ForkJoinWorkerThread.
782 InnocuousForkJoinWorkerThread(pool); }},
783 ACC);
784 }
785 }
786
787 // Constants shared across ForkJoinPool and WorkQueue
788
789 // Bounds
790 static final int SWIDTH = 16; // width of short
791 static final int SMASK = 0xffff; // short bits == max index
792 static final int MAX_CAP = 0x7fff; // max #workers - 1
793
794 // Masks and units for WorkQueue.phase and ctl sp subfield
795 static final int UNSIGNALLED = 1 << 31; // must be negative
796 static final int SS_SEQ = 1 << 16; // version count
797
798 // Mode bits and sentinels, some also used in WorkQueue fields
799 static final int FIFO = 1 << 16; // fifo queue or access mode
800 static final int SRC = 1 << 17; // set for valid queue ids
801 static final int INNOCUOUS = 1 << 18; // set for Innocuous workers
802 static final int QUIET = 1 << 19; // quiescing phase or source
803 static final int SHUTDOWN = 1 << 24;
804 static final int TERMINATED = 1 << 25;
805 static final int STOP = 1 << 31; // must be negative
806 static final int ADJUST = 1 << 16; // tryCompensate return
807
808 /**
809 * Initial capacity of work-stealing queue array. Must be a power
810 * of two, at least 2. See above.
811 */
812 static final int INITIAL_QUEUE_CAPACITY = 1 << 8;
813
814 /**
815 * Queues supporting work-stealing as well as external task
816 * submission. See above for descriptions and algorithms.
817 */
818 static final class WorkQueue {
819 volatile int phase; // versioned, negative if inactive
820 int stackPred; // pool stack (ctl) predecessor link
821 int config; // index, mode, ORed with SRC after init
822 int base; // index of next slot for poll
823 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
824 final ForkJoinWorkerThread owner; // owning thread or null if shared
825
826 // segregate fields frequently updated but not read by scans or steals
827 @jdk.internal.vm.annotation.Contended("w")
828 int top; // index of next slot for push
829 @jdk.internal.vm.annotation.Contended("w")
830 volatile int source; // source queue id, lock, or sentinel
831 @jdk.internal.vm.annotation.Contended("w")
832 int nsteals; // number of steals from other queues
833
834 // Support for atomic operations
835 private static final VarHandle QA; // for array slots
836 private static final VarHandle SOURCE;
837 private static final VarHandle BASE;
838 static final ForkJoinTask<?> getSlot(ForkJoinTask<?>[] a, int i) {
839 return (ForkJoinTask<?>)QA.getAcquire(a, i);
840 }
841 static final ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a,
842 int i) {
843 return (ForkJoinTask<?>)QA.getAndSet(a, i, null);
844 }
845 static final void setSlotVolatile(ForkJoinTask<?>[] a, int i,
846 ForkJoinTask<?> v) {
847 QA.setVolatile(a, i, v);
848 }
849 static final boolean casSlotToNull(ForkJoinTask<?>[] a, int i,
850 ForkJoinTask<?> c) {
851 return QA.weakCompareAndSet(a, i, c, null);
852 }
853 final boolean tryLock() {
854 return SOURCE.compareAndSet(this, 0, 1);
855 }
856 final void setBaseOpaque(int b) {
857 BASE.setOpaque(this, b);
858 }
859
860 /**
861 * Constructor used by ForkJoinWorkerThreads. Most fields
862 * are initialized upon thread start, in pool.registerWorker.
863 */
864 WorkQueue(ForkJoinWorkerThread owner, boolean isInnocuous) {
865 this.config = (isInnocuous) ? INNOCUOUS : 0;
866 this.owner = owner;
867 }
868
869 /**
870 * Constructor used for external queues.
871 */
872 WorkQueue(int config) {
873 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
874 this.config = config;
875 owner = null;
876 phase = -1;
877 }
878
879 /**
880 * Returns an exportable index (used by ForkJoinWorkerThread).
881 */
882 final int getPoolIndex() {
883 return (config & 0xffff) >>> 1; // ignore odd/even tag bit
884 }
885
886 /**
887 * Returns the approximate number of tasks in the queue.
888 */
889 final int queueSize() {
890 VarHandle.acquireFence(); // ensure fresh reads by external callers
891 int n = top - base;
892 return (n < 0) ? 0 : n; // ignore transient negative
893 }
894
895 /**
896 * Provides a more conservative estimate of whether this queue
897 * has any tasks than does queueSize.
898 */
899 final boolean isEmpty() {
900 return !((source != 0 && owner == null) || top - base > 0);
901 }
902
903 /**
904 * Pushes a task. Call only by owner in unshared queues.
905 *
906 * @param task the task. Caller must ensure non-null.
907 * @param pool (no-op if null)
908 * @throws RejectedExecutionException if array cannot be resized
909 */
910 final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
911 ForkJoinTask<?>[] a = array;
912 int s = top++, d = s - base, cap, m; // skip insert if disabled
913 if (a != null && pool != null && (cap = a.length) > 0) {
914 setSlotVolatile(a, (m = cap - 1) & s, task);
915 if (d == m)
916 growArray();
917 if (d == m || a[m & (s - 1)] == null)
918 pool.signalWork(); // signal if was empty or resized
919 }
920 }
921
922 /**
923 * Pushes task to a shared queue with lock already held, and unlocks.
924 *
925 * @return true if caller should signal work
926 */
927 final boolean lockedPush(ForkJoinTask<?> task) {
928 ForkJoinTask<?>[] a = array;
929 int s = top++, d = s - base, cap, m;
930 if (a != null && (cap = a.length) > 0) {
931 a[(m = cap - 1) & s] = task;
932 if (d == m)
933 growArray();
934 source = 0; // unlock
935 if (d == m || a[m & (s - 1)] == null)
936 return true;
937 }
938 return false;
939 }
940
941 /**
942 * Doubles the capacity of array. Called by owner or with lock
943 * held after pre-incrementing top, which is reverted on
944 * allocation failure.
945 */
946 final void growArray() {
947 ForkJoinTask<?>[] oldArray = array, newArray;
948 int s = top - 1, oldCap, newCap;
949 if (oldArray != null && (oldCap = oldArray.length) > 0 &&
950 (newCap = oldCap << 1) > 0) { // skip if disabled
951 try {
952 newArray = new ForkJoinTask<?>[newCap];
953 } catch (Throwable ex) {
954 top = s;
955 if (owner == null)
956 source = 0; // unlock
957 throw new RejectedExecutionException(
958 "Queue capacity exceeded");
959 }
960 int newMask = newCap - 1, oldMask = oldCap - 1;
961 for (int k = oldCap; k > 0; --k, --s) {
962 ForkJoinTask<?> x; // poll old, push to new
963 if ((x = getAndClearSlot(oldArray, s & oldMask)) == null)
964 break; // others already taken
965 newArray[s & newMask] = x;
966 }
967 VarHandle.releaseFence(); // fill before publish
968 array = newArray;
969 }
970 }
971
972 // Variants of pop
973
974 /**
975 * Pops and returns task, or null if empty. Called only by owner.
976 */
977 private ForkJoinTask<?> pop() {
978 ForkJoinTask<?> t = null;
979 int s = top, cap; ForkJoinTask<?>[] a;
980 if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
981 (t = getAndClearSlot(a, (cap - 1) & s)) != null)
982 top = s;
983 return t;
984 }
985
986 /**
987 * Pops the given task for owner only if it is at the current top.
988 */
989 final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
990 boolean taken = false;
991 int s = top, cap, k; ForkJoinTask<?>[] a;
992 if ((a = array) != null && (cap = a.length) > 0 &&
993 a[k = (cap - 1) & (s - 1)] == task) {
994 if (owned || tryLock()) {
995 if ((owned || (top == s && array == a)) &&
996 (taken = casSlotToNull(a, k, task)))
997 top = s - 1;
998 if (!owned)
999 source = 0; // release lock
1000 }
1001 }
1002 return taken;
1003 }
1004
1005 /**
1006 * Deep form of tryUnpush: Traverses from top and removes task if
1007 * present, shifting others to fill gap.
1008 */
1009 final boolean tryRemove(ForkJoinTask<?> task, boolean owned) {
1010 boolean taken = false;
1011 int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1012 if ((a = array) != null && task != null && (cap = a.length) > 0) {
1013 int m = cap - 1, s = p - 1, d = p - base;
1014 for (int i = s, k; d > 0; --i, --d) {
1015 if ((t = a[k = i & m]) == task) {
1016 if (owned || tryLock()) {
1017 if ((owned || (array == a && top == p)) &&
1018 (taken = casSlotToNull(a, k, t))) {
1019 for (int j = i; j != s; ) // shift down
1020 a[j & m] = getAndClearSlot(a, ++j & m);
1021 top = s;
1022 }
1023 if (!owned)
1024 source = 0;
1025 }
1026 break;
1027 }
1028 }
1029 }
1030 return taken;
1031 }
1032
1033 // variants of poll
1034
1035 /**
1036 * Tries once to poll next task in FIFO order, failing on
1037 * inconsistency or contention.
1038 */
1039 final ForkJoinTask<?> tryPoll() {
1040 int cap, b, k; ForkJoinTask<?>[] a;
1041 if ((a = array) != null && (cap = a.length) > 0) {
1042 ForkJoinTask<?> t = getSlot(a, k = (cap - 1) & (b = base));
1043 if (base == b++ && t != null && casSlotToNull(a, k, t)) {
1044 setBaseOpaque(b);
1045 return t;
1046 }
1047 }
1048 return null;
1049 }
1050
1051 /**
1052 * Takes next task, if one exists, in order specified by mode.
1053 */
1054 final ForkJoinTask<?> nextLocalTask(int cfg) {
1055 ForkJoinTask<?> t = null;
1056 int s = top, cap; ForkJoinTask<?>[] a;
1057 if ((a = array) != null && (cap = a.length) > 0) {
1058 for (int b, d;;) {
1059 if ((d = s - (b = base)) <= 0)
1060 break;
1061 if (d == 1 || (cfg & FIFO) == 0) {
1062 if ((t = getAndClearSlot(a, --s & (cap - 1))) != null)
1063 top = s;
1064 break;
1065 }
1066 if ((t = getAndClearSlot(a, b++ & (cap - 1))) != null) {
1067 setBaseOpaque(b);
1068 break;
1069 }
1070 }
1071 }
1072 return t;
1073 }
1074
1075 /**
1076 * Takes next task, if one exists, using configured mode.
1077 */
1078 final ForkJoinTask<?> nextLocalTask() {
1079 return nextLocalTask(config);
1080 }
1081
1082 /**
1083 * Returns next task, if one exists, in order specified by mode.
1084 */
1085 final ForkJoinTask<?> peek() {
1086 VarHandle.acquireFence();
1087 int cap; ForkJoinTask<?>[] a;
1088 return ((a = array) != null && (cap = a.length) > 0) ?
1089 a[(cap - 1) & ((config & FIFO) != 0 ? base : top - 1)] : null;
1090 }
1091
1092 // specialized execution methods
1093
1094 /**
1095 * Runs the given (stolen) task if nonnull, as well as
1096 * remaining local tasks and/or others available from the
1097 * given queue.
1098 */
1099 final void topLevelExec(ForkJoinTask<?> task, WorkQueue q) {
1100 int cfg = config, nstolen = 1;
1101 while (task != null) {
1102 task.doExec();
1103 if ((task = nextLocalTask(cfg)) == null &&
1104 q != null && (task = q.tryPoll()) != null)
1105 ++nstolen;
1106 }
1107 nsteals += nstolen;
1108 source = 0;
1109 if ((cfg & INNOCUOUS) != 0)
1110 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
1111 }
1112
1113 /**
1114 * Tries to pop and run tasks within the target's computation
1115 * until done, not found, or limit exceeded.
1116 *
1117 * @param task root of CountedCompleter computation
1118 * @param owned true if owned by a ForkJoinWorkerThread
1119 * @param limit max runs, or zero for no limit
1120 * @return task status on exit
1121 */
1122 final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1123 int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1124 while (task != null && (status = task.status) >= 0 &&
1125 (a = array) != null && (cap = a.length) > 0 &&
1126 (t = a[k = (cap - 1) & (s = (p = top) - 1)])
1127 instanceof CountedCompleter) {
1128 CountedCompleter<?> f = (CountedCompleter<?>)t;
1129 boolean taken = false;
1130 for (;;) { // exec if root task is a completer of t
1131 if (f == task) {
1132 if (owned) {
1133 if ((taken = casSlotToNull(a, k, t)))
1134 top = s;
1135 }
1136 else if (tryLock()) {
1137 if (top == p && array == a &&
1138 (taken = casSlotToNull(a, k, t)))
1139 top = s;
1140 source = 0;
1141 }
1142 break;
1143 }
1144 else if ((f = f.completer) == null)
1145 break;
1146 }
1147 if (!taken)
1148 break;
1149 t.doExec();
1150 if (limit != 0 && --limit == 0)
1151 break;
1152 }
1153 return status;
1154 }
1155
1156 /**
1157 * Tries to poll and run AsynchronousCompletionTasks until
1158 * none found or blocker is released.
1159 *
1160 * @param blocker the blocker
1161 */
1162 final void helpAsyncBlocker(ManagedBlocker blocker) {
1163 int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1164 while (blocker != null && (d = top - (b = base)) > 0 &&
1165 (a = array) != null && (cap = a.length) > 0 &&
1166 (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) ||
1167 t instanceof
1168 CompletableFuture.AsynchronousCompletionTask) &&
1169 !blocker.isReleasable()) {
1170 if (t != null && base == b++ && casSlotToNull(a, k, t)) {
1171 setBaseOpaque(b);
1172 t.doExec();
1173 }
1174 }
1175 }
1176
1177 // misc
1178
1179 /** AccessControlContext for innocuous workers, created on 1st use. */
1180 private static AccessControlContext INNOCUOUS_ACC;
1181
1182 /**
1183 * Initializes (upon registration) InnocuousForkJoinWorkerThreads.
1184 */
1185 final void initializeInnocuousWorker() {
1186 AccessControlContext acc; // racy construction OK
1187 if ((acc = INNOCUOUS_ACC) == null)
1188 INNOCUOUS_ACC = acc = new AccessControlContext(
1189 new ProtectionDomain[] { new ProtectionDomain(null, null) });
1190 Thread t = Thread.currentThread();
1191 ThreadLocalRandom.setInheritedAccessControlContext(t, acc);
1192 ThreadLocalRandom.eraseThreadLocals(t);
1193 }
1194
1195 /**
1196 * Returns true if owned and not known to be blocked.
1197 */
1198 final boolean isApparentlyUnblocked() {
1199 Thread wt; Thread.State s;
1200 return ((wt = owner) != null &&
1201 (s = wt.getState()) != Thread.State.BLOCKED &&
1202 s != Thread.State.WAITING &&
1203 s != Thread.State.TIMED_WAITING);
1204 }
1205
1206 static {
1207 try {
1208 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
1209 MethodHandles.Lookup l = MethodHandles.lookup();
1210 SOURCE = l.findVarHandle(WorkQueue.class, "source", int.class);
1211 BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1212 } catch (ReflectiveOperationException e) {
1213 throw new ExceptionInInitializerError(e);
1214 }
1215 }
1216 }
1217
1218 // static fields (initialized in static initializer below)
1219
1220 /**
1221 * Creates a new ForkJoinWorkerThread. This factory is used unless
1222 * overridden in ForkJoinPool constructors.
1223 */
1224 public static final ForkJoinWorkerThreadFactory
1225 defaultForkJoinWorkerThreadFactory;
1226
1227 /**
1228 * Permission required for callers of methods that may start or
1229 * kill threads.
1230 */
1231 static final RuntimePermission modifyThreadPermission;
1232
1233 /**
1234 * Common (static) pool. Non-null for public use unless a static
1235 * construction exception, but internal usages null-check on use
1236 * to paranoically avoid potential initialization circularities
1237 * as well as to simplify generated code.
1238 */
1239 static final ForkJoinPool common;
1240
1241 /**
1242 * Common pool parallelism. To allow simpler use and management
1243 * when common pool threads are disabled, we allow the underlying
1244 * common.parallelism field to be zero, but in that case still report
1245 * parallelism as 1 to reflect resulting caller-runs mechanics.
1246 */
1247 static final int COMMON_PARALLELISM;
1248
1249 /**
1250 * Limit on spare thread construction in tryCompensate.
1251 */
1252 private static final int COMMON_MAX_SPARES;
1253
1254 /**
1255 * Sequence number for creating worker names
1256 */
1257 private static volatile int poolIds;
1258
1259 // static configuration constants
1260
1261 /**
1262 * Default idle timeout value (in milliseconds) for the thread
1263 * triggering quiescence to park waiting for new work
1264 */
1265 private static final long DEFAULT_KEEPALIVE = 60_000L;
1266
1267 /**
1268 * Undershoot tolerance for idle timeouts
1269 */
1270 private static final long TIMEOUT_SLOP = 20L;
1271
1272 /**
1273 * The default value for COMMON_MAX_SPARES. Overridable using the
1274 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1275 * property. The default value is far in excess of normal
1276 * requirements, but also far short of MAX_CAP and typical OS
1277 * thread limits, so allows JVMs to catch misuse/abuse before
1278 * running out of resources needed to do so.
1279 */
1280 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1281
1282 /*
1283 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1284 * RC: Number of released (unqueued) workers minus target parallelism
1285 * TC: Number of total workers minus target parallelism
1286 * SS: version count and status of top waiting thread
1287 * ID: poolIndex of top of Treiber stack of waiters
1288 *
1289 * When convenient, we can extract the lower 32 stack top bits
1290 * (including version bits) as sp=(int)ctl. The offsets of counts
1291 * by the target parallelism and the positionings of fields makes
1292 * it possible to perform the most common checks via sign tests of
1293 * fields: When ac is negative, there are not enough unqueued
1294 * workers, when tc is negative, there are not enough total
1295 * workers. When sp is non-zero, there are waiting workers. To
1296 * deal with possibly negative fields, we use casts in and out of
1297 * "short" and/or signed shifts to maintain signedness.
1298 *
1299 * Because it occupies uppermost bits, we can add one release
1300 * count using getAndAdd of RC_UNIT, rather than CAS, when
1301 * returning from a blocked join. Other updates entail multiple
1302 * subfields and masking, requiring CAS.
1303 *
1304 * The limits packed in field "bounds" are also offset by the
1305 * parallelism level to make them comparable to the ctl rc and tc
1306 * fields.
1307 */
1308
1309 // Lower and upper word masks
1310 private static final long SP_MASK = 0xffffffffL;
1311 private static final long UC_MASK = ~SP_MASK;
1312
1313 // Release counts
1314 private static final int RC_SHIFT = 48;
1315 private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1316 private static final long RC_MASK = 0xffffL << RC_SHIFT;
1317
1318 // Total counts
1319 private static final int TC_SHIFT = 32;
1320 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1321 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1322 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1323
1324 // Instance fields
1325
1326 final long keepAlive; // milliseconds before dropping if idle
1327 volatile long stealCount; // collects worker nsteals
1328 int scanRover; // advances across pollScan calls
1329 volatile int threadIds; // for worker thread names
1330 final int bounds; // min, max threads packed as shorts
1331 volatile int mode; // parallelism, runstate, queue mode
1332 WorkQueue[] queues; // main registry
1333 final ReentrantLock registrationLock;
1334 Condition termination; // lazily constructed
1335 final String workerNamePrefix; // null for common pool
1336 final ForkJoinWorkerThreadFactory factory;
1337 final UncaughtExceptionHandler ueh; // per-worker UEH
1338 final Predicate<? super ForkJoinPool> saturate;
1339
1340 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1341 volatile long ctl; // main pool control
1342
1343 // Support for atomic operations
1344 private static final VarHandle CTL;
1345 private static final VarHandle MODE;
1346 private static final VarHandle THREADIDS;
1347 private static final VarHandle POOLIDS;
1348 private boolean compareAndSetCtl(long c, long v) {
1349 return CTL.compareAndSet(this, c, v);
1350 }
1351 private long compareAndExchangeCtl(long c, long v) {
1352 return (long)CTL.compareAndExchange(this, c, v);
1353 }
1354 private long getAndAddCtl(long v) {
1355 return (long)CTL.getAndAdd(this, v);
1356 }
1357 private int getAndBitwiseOrMode(int v) {
1358 return (int)MODE.getAndBitwiseOr(this, v);
1359 }
1360 private int getAndAddThreadIds(int x) {
1361 return (int)THREADIDS.getAndAdd(this, x);
1362 }
1363 private static int getAndAddPoolIds(int x) {
1364 return (int)POOLIDS.getAndAdd(x);
1365 }
1366
1367 // Creating, registering and deregistering workers
1368
1369 /**
1370 * Tries to construct and start one worker. Assumes that total
1371 * count has already been incremented as a reservation. Invokes
1372 * deregisterWorker on any failure.
1373 *
1374 * @return true if successful
1375 */
1376 private boolean createWorker() {
1377 ForkJoinWorkerThreadFactory fac = factory;
1378 Throwable ex = null;
1379 ForkJoinWorkerThread wt = null;
1380 try {
1381 if (fac != null && (wt = fac.newThread(this)) != null) {
1382 wt.start();
1383 return true;
1384 }
1385 } catch (Throwable rex) {
1386 ex = rex;
1387 }
1388 deregisterWorker(wt, ex);
1389 return false;
1390 }
1391
1392 /**
1393 * Provides a name for ForkJoinWorkerThread constructor.
1394 */
1395 final String nextWorkerThreadName() {
1396 String prefix = workerNamePrefix;
1397 int tid = getAndAddThreadIds(1) + 1;
1398 if (prefix == null) // commonPool has no prefix
1399 prefix = "ForkJoinPool.commonPool-worker-";
1400 return prefix.concat(Integer.toString(tid));
1401 }
1402
1403 /**
1404 * Finishes initializing and records owned queue.
1405 *
1406 * @param w caller's WorkQueue
1407 */
1408 final void registerWorker(WorkQueue w) {
1409 ReentrantLock lock = registrationLock;
1410 ThreadLocalRandom.localInit();
1411 int seed = ThreadLocalRandom.getProbe();
1412 if (w != null && lock != null) {
1413 int modebits = (mode & FIFO) | w.config;
1414 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1415 w.stackPred = seed; // stash for runWorker
1416 if ((modebits & INNOCUOUS) != 0)
1417 w.initializeInnocuousWorker();
1418 int id = (seed << 1) | 1; // initial index guess
1419 lock.lock();
1420 try {
1421 WorkQueue[] qs; int n; // find queue index
1422 if ((qs = queues) != null && (n = qs.length) > 0) {
1423 int k = n, m = n - 1;
1424 for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2);
1425 if (k == 0)
1426 id = n | 1; // resize below
1427 w.phase = w.config = id | modebits; // now publishable
1428
1429 if (id < n)
1430 qs[id] = w;
1431 else { // expand array
1432 int an = n << 1, am = an - 1;
1433 WorkQueue[] as = new WorkQueue[an];
1434 as[id & am] = w;
1435 for (int j = 1; j < n; j += 2)
1436 as[j] = qs[j];
1437 for (int j = 0; j < n; j += 2) {
1438 WorkQueue q;
1439 if ((q = qs[j]) != null) // shared queues may move
1440 as[q.config & am] = q;
1441 }
1442 VarHandle.releaseFence(); // fill before publish
1443 queues = as;
1444 }
1445 }
1446 } finally {
1447 lock.unlock();
1448 }
1449 }
1450 }
1451
1452 /**
1453 * Final callback from terminating worker, as well as upon failure
1454 * to construct or start a worker. Removes record of worker from
1455 * array, and adjusts counts. If pool is shutting down, tries to
1456 * complete termination.
1457 *
1458 * @param wt the worker thread, or null if construction failed
1459 * @param ex the exception causing failure, or null if none
1460 */
1461 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1462 ReentrantLock lock = registrationLock;
1463 WorkQueue w = null;
1464 int cfg = 0;
1465 if (wt != null && (w = wt.workQueue) != null && lock != null) {
1466 WorkQueue[] qs; int n, i;
1467 cfg = w.config;
1468 long ns = w.nsteals & 0xffffffffL;
1469 lock.lock(); // remove index from array
1470 if ((qs = queues) != null && (n = qs.length) > 0 &&
1471 qs[i = cfg & (n - 1)] == w)
1472 qs[i] = null;
1473 stealCount += ns; // accumulate steals
1474 lock.unlock();
1475 long c = ctl;
1476 if (w.phase != QUIET) // decrement counts
1477 do {} while (c != (c = compareAndExchangeCtl(
1478 c, ((RC_MASK & (c - RC_UNIT)) |
1479 (TC_MASK & (c - TC_UNIT)) |
1480 (SP_MASK & c)))));
1481 else if ((int)c == 0) // was dropped on timeout
1482 cfg = 0; // suppress signal if last
1483 for (ForkJoinTask<?> t; (t = w.pop()) != null; )
1484 ForkJoinTask.cancelIgnoringExceptions(t); // cancel tasks
1485 }
1486
1487 if (!tryTerminate(false, false) && w != null && (cfg & SRC) != 0)
1488 signalWork(); // possibly replace worker
1489 if (ex != null)
1490 ForkJoinTask.rethrow(ex);
1491 }
1492
1493 /*
1494 * Tries to create or release a worker if too few are running.
1495 */
1496 final void signalWork() {
1497 for (long c = ctl; c < 0L;) {
1498 int sp, i; WorkQueue[] qs; WorkQueue v;
1499 if ((sp = (int)c & ~UNSIGNALLED) == 0) { // no idle workers
1500 if ((c & ADD_WORKER) == 0L) // enough total workers
1501 break;
1502 if (c == (c = compareAndExchangeCtl(
1503 c, ((RC_MASK & (c + RC_UNIT)) |
1504 (TC_MASK & (c + TC_UNIT)))))) {
1505 createWorker();
1506 break;
1507 }
1508 }
1509 else if ((qs = queues) == null)
1510 break; // unstarted/terminated
1511 else if (qs.length <= (i = sp & SMASK))
1512 break; // terminated
1513 else if ((v = qs[i]) == null)
1514 break; // terminating
1515 else {
1516 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1517 Thread vt = v.owner;
1518 if (c == (c = compareAndExchangeCtl(c, nc))) {
1519 v.phase = sp;
1520 LockSupport.unpark(vt); // release idle worker
1521 break;
1522 }
1523 }
1524 }
1525 }
1526
1527 /**
1528 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1529 * See above for explanation.
1530 *
1531 * @param w caller's WorkQueue (may be null on failed initialization)
1532 */
1533 final void runWorker(WorkQueue w) {
1534 if (w != null) { // skip on failed init
1535 w.config |= SRC; // mark as valid source
1536 int r = w.stackPred, src = 0; // use seed from registerWorker
1537 do {
1538 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1539 } while ((src = scan(w, src, r)) >= 0 ||
1540 (src = awaitWork(w)) == 0);
1541 }
1542 }
1543
1544 /**
1545 * Scans for and if found executes top-level tasks: Tries to poll
1546 * each queue starting at a random index with random stride,
1547 * returning source id or retry indicator if contended or
1548 * inconsistent.
1549 *
1550 * @param w caller's WorkQueue
1551 * @param prevSrc the previous queue stolen from in current phase, or 0
1552 * @param r random seed
1553 * @return id of queue if taken, negative if none found, prevSrc for retry
1554 */
1555 private int scan(WorkQueue w, int prevSrc, int r) {
1556 WorkQueue[] qs = queues;
1557 int n = (w == null || qs == null) ? 0 : qs.length;
1558 for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
1559 int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a;
1560 if ((q = qs[j = r & (n - 1)]) != null && // poll at qs[j].array[k]
1561 (a = q.array) != null && (cap = a.length) > 0) {
1562 int k = (cap - 1) & (b = q.base), nextBase = b + 1;
1563 int nextIndex = (cap - 1) & nextBase, src = j | SRC;
1564 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1565 if (q.base != b) // inconsistent
1566 return prevSrc;
1567 else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
1568 q.base = nextBase;
1569 ForkJoinTask<?> next = a[nextIndex];
1570 if ((w.source = src) != prevSrc && next != null)
1571 signalWork(); // propagate
1572 w.topLevelExec(t, q);
1573 return src;
1574 }
1575 else if (a[nextIndex] != null) // revisit
1576 return prevSrc;
1577 }
1578 }
1579 return (queues != qs) ? prevSrc: -1; // possibly resized
1580 }
1581
1582 /**
1583 * Advances worker phase, pushes onto ctl stack, and awaits signal
1584 * or reports termination.
1585 *
1586 * @return negative if terminated, else 0
1587 */
1588 private int awaitWork(WorkQueue w) {
1589 if (w == null)
1590 return -1; // already terminated
1591 int phase, ac, md, rc; // advance phase
1592 w.phase = (phase = w.phase + SS_SEQ) | UNSIGNALLED;
1593 long prevCtl = ctl, c; // enqueue
1594 do {
1595 w.stackPred = (int)prevCtl;
1596 c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK);
1597 } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c)));
1598
1599 LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
1600 long deadline = 0L; // use timed wait if nonzero
1601 if ((rc = (ac = (int)(c >> RC_SHIFT)) + ((md = mode) & SMASK)) <= 0) {
1602 if ((deadline = System.currentTimeMillis() + keepAlive) == 0L)
1603 deadline = 1L; // avoid zero
1604 WorkQueue[] qs = queues; // check for racing submission
1605 int n = (qs == null || ctl != c) ? 0 : qs.length;
1606 for (int i = 0; i < n; i += 2) {
1607 WorkQueue q; ForkJoinTask<?>[] a; int cap;
1608 if ((q = qs[i]) != null && (a = q.array) != null &&
1609 (cap = a.length) > 0 && a[(cap - 1) & q.base] != null) {
1610 if (ctl == c && compareAndSetCtl(c, prevCtl))
1611 w.phase = phase; // self-signal
1612 break;
1613 }
1614 }
1615 }
1616 for (;;) { // await activation or termination
1617 if ((md = mode) < 0)
1618 return -1;
1619 else if (w.phase >= 0)
1620 break;
1621 else if ((int)(ctl >> RC_SHIFT) > ac)
1622 Thread.onSpinWait(); // signal in progress
1623 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1624 tryTerminate(false, false))
1625 return -1; // trigger quiescent termination
1626 else {
1627 if (rc <= 0)
1628 LockSupport.parkUntil(deadline);
1629 else
1630 LockSupport.park();
1631 if ((int)(ctl >> RC_SHIFT) <= ac &&
1632 !Thread.interrupted() && rc <= 0 &&
1633 deadline - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1634 compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1635 (w.stackPred & SP_MASK)))) {
1636 w.phase = QUIET;
1637 return -1; // drop on timeout
1638 }
1639 }
1640 }
1641 LockSupport.setCurrentBlocker(null);
1642 return 0;
1643 }
1644
1645 // Utilities used by ForkJoinTask
1646
1647 /**
1648 * Returns true if all workers are busy
1649 */
1650 final boolean isSaturated() {
1651 long c;
1652 return (int)((c = ctl) >> RC_SHIFT) >= 0 && ((int)c & ~UNSIGNALLED) == 0;
1653 }
1654
1655 /**
1656 * Returns true if terminated or terminating
1657 */
1658 final boolean isStopping() {
1659 return mode < 0;
1660 }
1661
1662 /**
1663 * Returns true if can start terminating if enabled, or already terminated
1664 */
1665 final boolean canStop() {
1666 outer: for (long oldSum = 0L;;) { // repeat until stable
1667 int md; WorkQueue q;
1668 WorkQueue[] qs = queues;
1669 long c = ctl, checkSum = c;
1670 if (((md = mode) & STOP) != 0 || qs == null)
1671 return true;
1672 if ((md & SMASK) + (int)(c >> RC_SHIFT) > 0)
1673 break;
1674 for (int i = 1, s; i < qs.length; i += 2) { // scan submitters
1675 long u = ((long)i) << 32;
1676 if ((q = qs[i]) == null)
1677 checkSum += u;
1678 else if (q.source == 0 && (s = q.top) == q.base)
1679 checkSum += u + s;
1680 else
1681 break outer;
1682 }
1683 if (oldSum == (oldSum = checkSum) && queues == qs)
1684 return true;
1685 }
1686 return (mode & STOP) != 0; // recheck mode on false return
1687 }
1688
1689 /**
1690 * Tries to decrement counts (sometimes implicitly) and possibly
1691 * arrange for a compensating worker in preparation for
1692 * blocking. May fail due to interference, in which case -1 is
1693 * returned so caller may retry. A zero return value indicates
1694 * that the caller doesn't need to re-adjust counts when later
1695 * unblocked.
1696 *
1697 * @param c incoming ctl value
1698 * @return ADJUST: block then adjust, 0: block without adjust, -1 : retry
1699 */
1700 private int tryCompensate(long c) {
1701 Predicate<? super ForkJoinPool> sat;
1702 int b = bounds; // counts are signed; centered at parallelism level == 0
1703 int minActive = (short)(b & SMASK),
1704 maxTotal = b >>> SWIDTH,
1705 active = (int)(c >> RC_SHIFT),
1706 total = (short)(c >>> TC_SHIFT),
1707 sp = (int)c & ~UNSIGNALLED;
1708 if (total >= 0) {
1709 if (sp != 0) { // activate idle worker
1710 WorkQueue[] qs; int n; WorkQueue v;
1711 if ((qs = queues) != null && (n = qs.length) > 0 &&
1712 (v = qs[sp & (n - 1)]) != null) {
1713 Thread vt = v.owner;
1714 long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1715 if (compareAndSetCtl(c, nc)) {
1716 v.phase = sp;
1717 LockSupport.unpark(vt);
1718 return ADJUST;
1719 }
1720 }
1721 return -1; // retry
1722 }
1723 else if (active > minActive) { // reduce parallelism
1724 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1725 return compareAndSetCtl(c, nc) ? ADJUST : -1;
1726 }
1727 }
1728 if (total < maxTotal) { // expand pool
1729 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1730 return !compareAndSetCtl(c, nc) ? -1 : !createWorker() ? 0 : ADJUST;
1731 }
1732 else if (!compareAndSetCtl(c, c)) // validate
1733 return -1;
1734 else if ((sat = saturate) != null && sat.test(this))
1735 return 0;
1736 else
1737 throw new RejectedExecutionException(
1738 "Thread limit exceeded replacing blocked worker");
1739 }
1740
1741 /**
1742 * Readjusts RC count; called from ForkJoinTask after blocking.
1743 */
1744 final void uncompensate() {
1745 getAndAddCtl(RC_UNIT);
1746 }
1747
1748 /**
1749 * Helps if possible until the given task is done. Scans other
1750 * queues for a task produced by one of w's stealers; returning
1751 * compensated blocking sentinel if none are found.
1752 *
1753 * @param task the task
1754 * @param w caller's WorkQueue
1755 * @return task status on exit, or ADJUST for compensated blocking
1756 */
1757 final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
1758 int s = 0;
1759 if (task != null && w != null) {
1760 int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
1761 boolean scan = true;
1762 long c = 0L; // track ctl stability
1763 outer: for (;;) {
1764 if ((s = task.status) < 0)
1765 break;
1766 else if (mode < 0)
1767 ForkJoinTask.cancelIgnoringExceptions(task);
1768 else if (!scan && c == (c = ctl)) {
1769 if ((s = tryCompensate(c)) >= 0)
1770 break; // block
1771 }
1772 else { // scan for subtasks
1773 scan = false;
1774 WorkQueue[] qs = queues;
1775 int n = (qs == null) ? 0 : qs.length, m = n - 1;
1776 for (int i = n; i > 0; i -= 2, r += 2) {
1777 int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
1778 if ((q = qs[j = r & m]) != null) {
1779 int sq = q.source & SMASK, cap, b;
1780 if ((a = q.array) != null && (cap = a.length) > 0) {
1781 int k = (cap - 1) & (b = q.base);
1782 int nextBase = b + 1, src = j | SRC, sx;
1783 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1784 boolean eligible = sq == wid ||
1785 ((x = qs[sq & m]) != null && // indirect
1786 ((sx = (x.source & SMASK)) == wid ||
1787 ((y = qs[sx & m]) != null && // 2-indirect
1788 (y.source & SMASK) == wid)));
1789 if ((s = task.status) < 0)
1790 break outer;
1791 else if ((q.source & SMASK) != sq ||
1792 q.base != b)
1793 scan = true; // inconsistent
1794 else if (t == null)
1795 scan |= (a[nextBase & (cap - 1)] != null ||
1796 q.top != b); // lagging
1797 else if (eligible) {
1798 if (WorkQueue.casSlotToNull(a, k, t)) {
1799 q.base = nextBase;
1800 w.source = src;
1801 t.doExec();
1802 w.source = wsrc;
1803 }
1804 scan = true;
1805 break;
1806 }
1807 }
1808 }
1809 }
1810 }
1811 }
1812 }
1813 return s;
1814 }
1815
1816 /**
1817 * Extra helpJoin steps for CountedCompleters. Scans for and runs
1818 * subtasks of the given root task, returning if none are found.
1819 *
1820 * @param task root of CountedCompleter computation
1821 * @param w caller's WorkQueue
1822 * @param owned true if owned by a ForkJoinWorkerThread
1823 * @return task status on exit
1824 */
1825 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned) {
1826 int s = 0;
1827 if (task != null && w != null) {
1828 int r = w.config;
1829 boolean scan = true, locals = true;
1830 long c = 0L;
1831 outer: for (;;) {
1832 if (locals) { // try locals before scanning
1833 if ((s = w.helpComplete(task, owned, 0)) < 0)
1834 break;
1835 locals = false;
1836 }
1837 else if ((s = task.status) < 0)
1838 break;
1839 else if (!scan && c == (c = ctl))
1840 break;
1841 else if (mode < 0)
1842 ForkJoinTask.cancelIgnoringExceptions(task);
1843 else { // scan for subtasks
1844 scan = false;
1845 WorkQueue[] qs = queues;
1846 int n = (qs == null) ? 0 : qs.length;
1847 for (int i = n; i > 0; --i, ++r) {
1848 int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a;
1849 boolean eligible = false;
1850 if ((q = qs[j = r & (n - 1)]) != null &&
1851 (a = q.array) != null && (cap = a.length) > 0) {
1852 int k = (cap - 1) & (b = q.base), nextBase = b + 1;
1853 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1854 if (t instanceof CountedCompleter) {
1855 CountedCompleter<?> f = (CountedCompleter<?>)t;
1856 do {} while (!(eligible = (f == task)) &&
1857 (f = f.completer) != null);
1858 }
1859 if ((s = task.status) < 0)
1860 break outer;
1861 else if (q.base != b)
1862 scan = true; // inconsistent
1863 else if (t == null)
1864 scan |= (a[nextBase & (cap - 1)] != null ||
1865 q.top != b);
1866 else if (eligible) {
1867 if (WorkQueue.casSlotToNull(a, k, t)) {
1868 q.setBaseOpaque(nextBase);
1869 t.doExec();
1870 locals = true;
1871 }
1872 scan = true;
1873 break;
1874 }
1875 }
1876 }
1877 }
1878 }
1879 }
1880 return s;
1881 }
1882
1883 /**
1884 * Scans for and returns a polled task, if available. Used only
1885 * for untracked polls. Begins scan at an index (scanRover)
1886 * advanced on each call, to avoid systematic unfairness.
1887 *
1888 * @param submissionsOnly if true, only scan submission queues
1889 */
1890 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1891 VarHandle.acquireFence();
1892 int r = scanRover += 0x61c88647; // Weyl increment; raciness OK
1893 if (submissionsOnly) // even indices only
1894 r &= ~1;
1895 int step = (submissionsOnly) ? 2 : 1;
1896 WorkQueue[] qs; int n;
1897 while ((qs = queues) != null && (n = qs.length) > 0) {
1898 boolean scan = false;
1899 for (int i = 0; i < n; i += step) {
1900 int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a;
1901 if ((q = qs[j = (n - 1) & (r + i)]) != null &&
1902 (a = q.array) != null && (cap = a.length) > 0) {
1903 int k = (cap - 1) & (b = q.base), nextBase = b + 1;
1904 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1905 if (q.base != b)
1906 scan = true;
1907 else if (t == null)
1908 scan |= (q.top != b || a[nextBase & (cap - 1)] != null);
1909 else if (!WorkQueue.casSlotToNull(a, k, t))
1910 scan = true;
1911 else {
1912 q.setBaseOpaque(nextBase);
1913 return t;
1914 }
1915 }
1916 }
1917 if (!scan && queues == qs)
1918 break;
1919 }
1920 return null;
1921 }
1922
1923 /**
1924 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1925 * when tasks cannot be found, rescans until all others cannot
1926 * find tasks either.
1927 *
1928 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
1929 * @param interruptible true if return on interrupt
1930 * @return positive if quiescent, negative if interrupted, else 0
1931 */
1932 final int helpQuiescePool(WorkQueue w, long nanos, boolean interruptible) {
1933 if (w == null)
1934 return 0;
1935 long startTime = System.nanoTime(), parkTime = 0L;
1936 int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1;
1937 for (boolean active = true, locals = true;;) {
1938 boolean busy = false, scan = false;
1939 if (locals) { // run local tasks before (re)polling
1940 locals = false;
1941 for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;)
1942 u.doExec();
1943 }
1944 WorkQueue[] qs = queues;
1945 int n = (qs == null) ? 0 : qs.length;
1946 for (int i = n; i > 0; --i, ++r) {
1947 int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a;
1948 if ((q = qs[j = (n - 1) & r]) != null && q != w &&
1949 (a = q.array) != null && (cap = a.length) > 0) {
1950 int k = (cap - 1) & (b = q.base);
1951 int nextBase = b + 1, src = j | SRC;
1952 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1953 if (q.base != b)
1954 busy = scan = true;
1955 else if (t != null) {
1956 busy = scan = true;
1957 if (!active) { // increment before taking
1958 active = true;
1959 getAndAddCtl(RC_UNIT);
1960 }
1961 if (WorkQueue.casSlotToNull(a, k, t)) {
1962 q.base = nextBase;
1963 w.source = src;
1964 t.doExec();
1965 w.source = wsrc = prevSrc;
1966 locals = true;
1967 }
1968 break;
1969 }
1970 else if (!busy) {
1971 if (q.top != b || a[nextBase & (cap - 1)] != null)
1972 busy = scan = true;
1973 else if (q.source != QUIET && q.phase >= 0)
1974 busy = true;
1975 }
1976 }
1977 }
1978 VarHandle.acquireFence();
1979 if (!scan && queues == qs) {
1980 boolean interrupted;
1981 if (!busy) {
1982 w.source = prevSrc;
1983 if (!active)
1984 getAndAddCtl(RC_UNIT);
1985 return 1;
1986 }
1987 if (wsrc != QUIET)
1988 w.source = wsrc = QUIET;
1989 if (active) { // decrement
1990 active = false;
1991 parkTime = 0L;
1992 getAndAddCtl(RC_MASK & -RC_UNIT);
1993 }
1994 else if (parkTime == 0L) {
1995 parkTime = 1L << 10; // initially about 1 usec
1996 Thread.yield();
1997 }
1998 else if ((interrupted = interruptible && Thread.interrupted()) ||
1999 System.nanoTime() - startTime > nanos) {
2000 getAndAddCtl(RC_UNIT);
2001 return interrupted ? -1 : 0;
2002 }
2003 else {
2004 LockSupport.parkNanos(this, parkTime);
2005 if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2006 parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
2007 }
2008 }
2009 }
2010 }
2011
2012 /**
2013 * Helps quiesce from external caller until done, interrupted, or timeout
2014 *
2015 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2016 * @param interruptible true if return on interrupt
2017 * @return positive if quiescent, negative if interrupted, else 0
2018 */
2019 final int externalHelpQuiescePool(long nanos, boolean interruptible) {
2020 for (long startTime = System.nanoTime(), parkTime = 0L;;) {
2021 ForkJoinTask<?> t;
2022 if ((t = pollScan(false)) != null) {
2023 t.doExec();
2024 parkTime = 0L;
2025 }
2026 else if (canStop())
2027 return 1;
2028 else if (parkTime == 0L) {
2029 parkTime = 1L << 10;
2030 Thread.yield();
2031 }
2032 else if ((System.nanoTime() - startTime) > nanos)
2033 return 0;
2034 else if (interruptible && Thread.interrupted())
2035 return -1;
2036 else {
2037 LockSupport.parkNanos(this, parkTime);
2038 if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2039 parkTime <<= 1;
2040 }
2041 }
2042 }
2043
2044 /**
2045 * Gets and removes a local or stolen task for the given worker.
2046 *
2047 * @return a task, if available
2048 */
2049 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2050 ForkJoinTask<?> t;
2051 if (w == null || (t = w.nextLocalTask(w.config)) == null)
2052 t = pollScan(false);
2053 return t;
2054 }
2055
2056 // External operations
2057
2058 /**
2059 * Finds and locks a WorkQueue for an external submitter, or
2060 * returns null if shutdown or terminating.
2061 */
2062 final WorkQueue submissionQueue() {
2063 int r;
2064 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2065 ThreadLocalRandom.localInit(); // initialize caller's probe
2066 r = ThreadLocalRandom.getProbe();
2067 }
2068 for (int id = r << 1;;) { // even indices only
2069 int md = mode, n, i; WorkQueue q; ReentrantLock lock;
2070 WorkQueue[] qs = queues;
2071 if ((md & SHUTDOWN) != 0 || qs == null || (n = qs.length) <= 0)
2072 return null;
2073 else if ((q = qs[i = (n - 1) & id]) == null) {
2074 if ((lock = registrationLock) != null) {
2075 WorkQueue w = new WorkQueue(id | SRC);
2076 lock.lock(); // install under lock
2077 if (qs[i] == null)
2078 qs[i] = w; // else lost race; discard
2079 lock.unlock();
2080 }
2081 }
2082 else if (!q.tryLock()) // move and restart
2083 id = (r = ThreadLocalRandom.advanceProbe(r)) << 1;
2084 else
2085 return q;
2086 }
2087 }
2088
2089 /**
2090 * Adds the given task to an external submission queue, or throws
2091 * exception if shutdown or terminating.
2092 *
2093 * @param task the task. Caller must ensure non-null.
2094 */
2095 final void externalPush(ForkJoinTask<?> task) {
2096 WorkQueue q;
2097 if ((q = submissionQueue()) == null)
2098 throw new RejectedExecutionException(); // shutdown or disabled
2099 else if (q.lockedPush(task))
2100 signalWork();
2101 }
2102
2103 /**
2104 * Pushes a possibly-external submission.
2105 */
2106 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2107 Thread t; ForkJoinWorkerThread wt; WorkQueue q;
2108 if (task == null)
2109 throw new NullPointerException();
2110 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2111 (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
2112 wt.pool == this)
2113 q.push(task, this);
2114 else
2115 externalPush(task);
2116 return task;
2117 }
2118
2119 /**
2120 * Returns common pool queue for an external thread that has
2121 * possibly ever submitted a common pool task (nonzero probe), or
2122 * null if none.
2123 */
2124 static WorkQueue commonQueue() {
2125 ForkJoinPool p; WorkQueue[] qs;
2126 int r = ThreadLocalRandom.getProbe(), n;
2127 return ((p = common) != null && (qs = p.queues) != null &&
2128 (n = qs.length) > 0 && r != 0) ?
2129 qs[(n - 1) & (r << 1)] : null;
2130 }
2131
2132 /**
2133 * If the given executor is a ForkJoinPool, poll and execute
2134 * AsynchronousCompletionTasks from worker's queue until none are
2135 * available or blocker is released.
2136 */
2137 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2138 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2139 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2140 if ((wt = (ForkJoinWorkerThread)t).pool == e)
2141 w = wt.workQueue;
2142 }
2143 else if (e == common)
2144 w = commonQueue();
2145 if (w != null)
2146 w.helpAsyncBlocker(blocker);
2147 }
2148
2149 /**
2150 * Returns a cheap heuristic guide for task partitioning when
2151 * programmers, frameworks, tools, or languages have little or no
2152 * idea about task granularity. In essence, by offering this
2153 * method, we ask users only about tradeoffs in overhead vs
2154 * expected throughput and its variance, rather than how finely to
2155 * partition tasks.
2156 *
2157 * In a steady state strict (tree-structured) computation, each
2158 * thread makes available for stealing enough tasks for other
2159 * threads to remain active. Inductively, if all threads play by
2160 * the same rules, each thread should make available only a
2161 * constant number of tasks.
2162 *
2163 * The minimum useful constant is just 1. But using a value of 1
2164 * would require immediate replenishment upon each steal to
2165 * maintain enough tasks, which is infeasible. Further,
2166 * partitionings/granularities of offered tasks should minimize
2167 * steal rates, which in general means that threads nearer the top
2168 * of computation tree should generate more than those nearer the
2169 * bottom. In perfect steady state, each thread is at
2170 * approximately the same level of computation tree. However,
2171 * producing extra tasks amortizes the uncertainty of progress and
2172 * diffusion assumptions.
2173 *
2174 * So, users will want to use values larger (but not much larger)
2175 * than 1 to both smooth over transient shortages and hedge
2176 * against uneven progress; as traded off against the cost of
2177 * extra task overhead. We leave the user to pick a threshold
2178 * value to compare with the results of this call to guide
2179 * decisions, but recommend values such as 3.
2180 *
2181 * When all threads are active, it is on average OK to estimate
2182 * surplus strictly locally. In steady-state, if one thread is
2183 * maintaining say 2 surplus tasks, then so are others. So we can
2184 * just use estimated queue length. However, this strategy alone
2185 * leads to serious mis-estimates in some non-steady-state
2186 * conditions (ramp-up, ramp-down, other stalls). We can detect
2187 * many of these by further considering the number of "idle"
2188 * threads, that are known to have zero queued tasks, so
2189 * compensate by a factor of (#idle/#active) threads.
2190 */
2191 static int getSurplusQueuedTaskCount() {
2192 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2193 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2194 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2195 (q = wt.workQueue) != null) {
2196 int p = pool.mode & SMASK;
2197 int a = p + (int)(pool.ctl >> RC_SHIFT);
2198 int n = q.top - q.base;
2199 return n - (a > (p >>>= 1) ? 0 :
2200 a > (p >>>= 1) ? 1 :
2201 a > (p >>>= 1) ? 2 :
2202 a > (p >>>= 1) ? 4 :
2203 8);
2204 }
2205 return 0;
2206 }
2207
2208 // Termination
2209
2210 /**
2211 * Possibly initiates and/or completes termination.
2212 *
2213 * @param now if true, unconditionally terminate, else only
2214 * if no work and no active workers
2215 * @param enable if true, terminate when next possible
2216 * @return true if terminating or terminated
2217 */
2218 private boolean tryTerminate(boolean now, boolean enable) {
2219 int md; // try to set SHUTDOWN, then STOP, then help terminate
2220 if (((md = mode) & SHUTDOWN) == 0) {
2221 if (!enable)
2222 return false;
2223 md = getAndBitwiseOrMode(SHUTDOWN);
2224 }
2225 if ((md & STOP) == 0) {
2226 if (!now && !canStop())
2227 return false;
2228 md = getAndBitwiseOrMode(STOP);
2229 }
2230 if ((md & TERMINATED) == 0) {
2231 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
2232 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel tasks
2233
2234 WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2235 if ((qs = queues) != null && (n = qs.length) > 0) {
2236 for (int j = 1; j < n; j += 2) { // unblock other workers
2237 if ((q = qs[j]) != null && (thread = q.owner) != null &&
2238 !thread.isInterrupted()) {
2239 try {
2240 thread.interrupt();
2241 } catch (Throwable ignore) {
2242 }
2243 }
2244 }
2245 }
2246
2247 ReentrantLock lock; Condition cond; // signal when no workers
2248 if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2249 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2250 (lock = registrationLock) != null) {
2251 lock.lock();
2252 if ((cond = termination) != null)
2253 cond.signalAll();
2254 lock.unlock();
2255 }
2256 }
2257 return true;
2258 }
2259
2260 // Exported methods
2261
2262 // Constructors
2263
2264 /**
2265 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2266 * java.lang.Runtime#availableProcessors}, using defaults for all
2267 * other parameters (see {@link #ForkJoinPool(int,
2268 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2269 * int, int, int, Predicate, long, TimeUnit)}).
2270 *
2271 * @throws SecurityException if a security manager exists and
2272 * the caller is not permitted to modify threads
2273 * because it does not hold {@link
2274 * java.lang.RuntimePermission}{@code ("modifyThread")}
2275 */
2276 public ForkJoinPool() {
2277 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2278 defaultForkJoinWorkerThreadFactory, null, false,
2279 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2280 }
2281
2282 /**
2283 * Creates a {@code ForkJoinPool} with the indicated parallelism
2284 * level, using defaults for all other parameters (see {@link
2285 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2286 * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2287 * long, TimeUnit)}).
2288 *
2289 * @param parallelism the parallelism level
2290 * @throws IllegalArgumentException if parallelism less than or
2291 * equal to zero, or greater than implementation limit
2292 * @throws SecurityException if a security manager exists and
2293 * the caller is not permitted to modify threads
2294 * because it does not hold {@link
2295 * java.lang.RuntimePermission}{@code ("modifyThread")}
2296 */
2297 public ForkJoinPool(int parallelism) {
2298 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2299 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2300 }
2301
2302 /**
2303 * Creates a {@code ForkJoinPool} with the given parameters (using
2304 * defaults for others -- see {@link #ForkJoinPool(int,
2305 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2306 * int, int, int, Predicate, long, TimeUnit)}).
2307 *
2308 * @param parallelism the parallelism level. For default value,
2309 * use {@link java.lang.Runtime#availableProcessors}.
2310 * @param factory the factory for creating new threads. For default value,
2311 * use {@link #defaultForkJoinWorkerThreadFactory}.
2312 * @param handler the handler for internal worker threads that
2313 * terminate due to unrecoverable errors encountered while executing
2314 * tasks. For default value, use {@code null}.
2315 * @param asyncMode if true,
2316 * establishes local first-in-first-out scheduling mode for forked
2317 * tasks that are never joined. This mode may be more appropriate
2318 * than default locally stack-based mode in applications in which
2319 * worker threads only process event-style asynchronous tasks.
2320 * For default value, use {@code false}.
2321 * @throws IllegalArgumentException if parallelism less than or
2322 * equal to zero, or greater than implementation limit
2323 * @throws NullPointerException if the factory is null
2324 * @throws SecurityException if a security manager exists and
2325 * the caller is not permitted to modify threads
2326 * because it does not hold {@link
2327 * java.lang.RuntimePermission}{@code ("modifyThread")}
2328 */
2329 public ForkJoinPool(int parallelism,
2330 ForkJoinWorkerThreadFactory factory,
2331 UncaughtExceptionHandler handler,
2332 boolean asyncMode) {
2333 this(parallelism, factory, handler, asyncMode,
2334 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2335 }
2336
2337 /**
2338 * Creates a {@code ForkJoinPool} with the given parameters.
2339 *
2340 * @param parallelism the parallelism level. For default value,
2341 * use {@link java.lang.Runtime#availableProcessors}.
2342 *
2343 * @param factory the factory for creating new threads. For
2344 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2345 *
2346 * @param handler the handler for internal worker threads that
2347 * terminate due to unrecoverable errors encountered while
2348 * executing tasks. For default value, use {@code null}.
2349 *
2350 * @param asyncMode if true, establishes local first-in-first-out
2351 * scheduling mode for forked tasks that are never joined. This
2352 * mode may be more appropriate than default locally stack-based
2353 * mode in applications in which worker threads only process
2354 * event-style asynchronous tasks. For default value, use {@code
2355 * false}.
2356 *
2357 * @param corePoolSize the number of threads to keep in the pool
2358 * (unless timed out after an elapsed keep-alive). Normally (and
2359 * by default) this is the same value as the parallelism level,
2360 * but may be set to a larger value to reduce dynamic overhead if
2361 * tasks regularly block. Using a smaller value (for example
2362 * {@code 0}) has the same effect as the default.
2363 *
2364 * @param maximumPoolSize the maximum number of threads allowed.
2365 * When the maximum is reached, attempts to replace blocked
2366 * threads fail. (However, because creation and termination of
2367 * different threads may overlap, and may be managed by the given
2368 * thread factory, this value may be transiently exceeded.) To
2369 * arrange the same value as is used by default for the common
2370 * pool, use {@code 256} plus the {@code parallelism} level. (By
2371 * default, the common pool allows a maximum of 256 spare
2372 * threads.) Using a value (for example {@code
2373 * Integer.MAX_VALUE}) larger than the implementation's total
2374 * thread limit has the same effect as using this limit (which is
2375 * the default).
2376 *
2377 * @param minimumRunnable the minimum allowed number of core
2378 * threads not blocked by a join or {@link ManagedBlocker}. To
2379 * ensure progress, when too few unblocked threads exist and
2380 * unexecuted tasks may exist, new threads are constructed, up to
2381 * the given maximumPoolSize. For the default value, use {@code
2382 * 1}, that ensures liveness. A larger value might improve
2383 * throughput in the presence of blocked activities, but might
2384 * not, due to increased overhead. A value of zero may be
2385 * acceptable when submitted tasks cannot have dependencies
2386 * requiring additional threads.
2387 *
2388 * @param saturate if non-null, a predicate invoked upon attempts
2389 * to create more than the maximum total allowed threads. By
2390 * default, when a thread is about to block on a join or {@link
2391 * ManagedBlocker}, but cannot be replaced because the
2392 * maximumPoolSize would be exceeded, a {@link
2393 * RejectedExecutionException} is thrown. But if this predicate
2394 * returns {@code true}, then no exception is thrown, so the pool
2395 * continues to operate with fewer than the target number of
2396 * runnable threads, which might not ensure progress.
2397 *
2398 * @param keepAliveTime the elapsed time since last use before
2399 * a thread is terminated (and then later replaced if needed).
2400 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2401 *
2402 * @param unit the time unit for the {@code keepAliveTime} argument
2403 *
2404 * @throws IllegalArgumentException if parallelism is less than or
2405 * equal to zero, or is greater than implementation limit,
2406 * or if maximumPoolSize is less than parallelism,
2407 * of if the keepAliveTime is less than or equal to zero.
2408 * @throws NullPointerException if the factory is null
2409 * @throws SecurityException if a security manager exists and
2410 * the caller is not permitted to modify threads
2411 * because it does not hold {@link
2412 * java.lang.RuntimePermission}{@code ("modifyThread")}
2413 * @since 9
2414 */
2415 public ForkJoinPool(int parallelism,
2416 ForkJoinWorkerThreadFactory factory,
2417 UncaughtExceptionHandler handler,
2418 boolean asyncMode,
2419 int corePoolSize,
2420 int maximumPoolSize,
2421 int minimumRunnable,
2422 Predicate<? super ForkJoinPool> saturate,
2423 long keepAliveTime,
2424 TimeUnit unit) {
2425 checkPermission();
2426 int p = parallelism;
2427 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2428 throw new IllegalArgumentException();
2429 if (factory == null || unit == null)
2430 throw new NullPointerException();
2431 this.factory = factory;
2432 this.ueh = handler;
2433 this.saturate = saturate;
2434 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2435 int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2436 int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2437 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
2438 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2439 this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
2440 this.mode = p | (asyncMode ? FIFO : 0);
2441 this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2442 (((long)(-p) << RC_SHIFT) & RC_MASK));
2443 this.registrationLock = new ReentrantLock();
2444 this.queues = new WorkQueue[size];
2445 String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2446 this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
2447 }
2448
2449 // helper method for commonPool constructor
2450 private static Object newInstanceFromSystemProperty(String property)
2451 throws ReflectiveOperationException {
2452 String className = System.getProperty(property);
2453 return (className == null)
2454 ? null
2455 : ClassLoader.getSystemClassLoader().loadClass(className)
2456 .getConstructor().newInstance();
2457 }
2458
2459 /**
2460 * Constructor for common pool using parameters possibly
2461 * overridden by system properties
2462 */
2463 private ForkJoinPool(byte forCommonPoolOnly) {
2464 int parallelism = Runtime.getRuntime().availableProcessors() - 1;
2465 ForkJoinWorkerThreadFactory fac = null;
2466 UncaughtExceptionHandler handler = null;
2467 try { // ignore exceptions in accessing/parsing properties
2468 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2469 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2470 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2471 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2472 String pp = System.getProperty
2473 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2474 if (pp != null)
2475 parallelism = Integer.parseInt(pp);
2476 } catch (Exception ignore) {
2477 }
2478 int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
2479 int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
2480 this.factory = (fac != null) ? fac :
2481 (System.getSecurityManager() == null ?
2482 defaultForkJoinWorkerThreadFactory :
2483 new InnocuousForkJoinWorkerThreadFactory());
2484 this.ueh = handler;
2485 this.keepAlive = DEFAULT_KEEPALIVE;
2486 this.saturate = null;
2487 this.workerNamePrefix = null;
2488 this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2489 this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2490 (((long)(-p) << RC_SHIFT) & RC_MASK));
2491 this.queues = new WorkQueue[size];
2492 this.registrationLock = new ReentrantLock();
2493 }
2494
2495 /**
2496 * Returns the common pool instance. This pool is statically
2497 * constructed; its run state is unaffected by attempts to {@link
2498 * #shutdown} or {@link #shutdownNow}. However this pool and any
2499 * ongoing processing are automatically terminated upon program
2500 * {@link System#exit}. Any program that relies on asynchronous
2501 * task processing to complete before program termination should
2502 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2503 * before exit.
2504 *
2505 * @return the common pool instance
2506 * @since 1.8
2507 */
2508 public static ForkJoinPool commonPool() {
2509 // assert common != null : "static init error";
2510 return common;
2511 }
2512
2513 // Execution methods
2514
2515 /**
2516 * Performs the given task, returning its result upon completion.
2517 * If the computation encounters an unchecked Exception or Error,
2518 * it is rethrown as the outcome of this invocation. Rethrown
2519 * exceptions behave in the same way as regular exceptions, but,
2520 * when possible, contain stack traces (as displayed for example
2521 * using {@code ex.printStackTrace()}) of both the current thread
2522 * as well as the thread actually encountering the exception;
2523 * minimally only the latter.
2524 *
2525 * @param task the task
2526 * @param <T> the type of the task's result
2527 * @return the task's result
2528 * @throws NullPointerException if the task is null
2529 * @throws RejectedExecutionException if the task cannot be
2530 * scheduled for execution
2531 */
2532 public <T> T invoke(ForkJoinTask<T> task) {
2533 externalSubmit(task);
2534 return task.join();
2535 }
2536
2537 /**
2538 * Arranges for (asynchronous) execution of the given task.
2539 *
2540 * @param task the task
2541 * @throws NullPointerException if the task is null
2542 * @throws RejectedExecutionException if the task cannot be
2543 * scheduled for execution
2544 */
2545 public void execute(ForkJoinTask<?> task) {
2546 externalSubmit(task);
2547 }
2548
2549 // AbstractExecutorService methods
2550
2551 /**
2552 * @throws NullPointerException if the task is null
2553 * @throws RejectedExecutionException if the task cannot be
2554 * scheduled for execution
2555 */
2556 @Override
2557 @SuppressWarnings("unchecked")
2558 public void execute(Runnable task) {
2559 externalSubmit((task instanceof ForkJoinTask<?>)
2560 ? (ForkJoinTask<Void>) task // avoid re-wrap
2561 : new ForkJoinTask.RunnableExecuteAction(task));
2562 }
2563
2564 /**
2565 * Submits a ForkJoinTask for execution.
2566 *
2567 * @param task the task to submit
2568 * @param <T> the type of the task's result
2569 * @return the task
2570 * @throws NullPointerException if the task is null
2571 * @throws RejectedExecutionException if the task cannot be
2572 * scheduled for execution
2573 */
2574 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2575 return externalSubmit(task);
2576 }
2577
2578 /**
2579 * @throws NullPointerException if the task is null
2580 * @throws RejectedExecutionException if the task cannot be
2581 * scheduled for execution
2582 */
2583 @Override
2584 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2585 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2586 }
2587
2588 /**
2589 * @throws NullPointerException if the task is null
2590 * @throws RejectedExecutionException if the task cannot be
2591 * scheduled for execution
2592 */
2593 @Override
2594 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2595 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2596 }
2597
2598 /**
2599 * @throws NullPointerException if the task is null
2600 * @throws RejectedExecutionException if the task cannot be
2601 * scheduled for execution
2602 */
2603 @Override
2604 @SuppressWarnings("unchecked")
2605 public ForkJoinTask<?> submit(Runnable task) {
2606 return externalSubmit((task instanceof ForkJoinTask<?>)
2607 ? (ForkJoinTask<Void>) task // avoid re-wrap
2608 : new ForkJoinTask.AdaptedRunnableAction(task));
2609 }
2610
2611 /**
2612 * @throws NullPointerException {@inheritDoc}
2613 * @throws RejectedExecutionException {@inheritDoc}
2614 */
2615 @Override
2616 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2617 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2618 try {
2619 for (Callable<T> t : tasks) {
2620 ForkJoinTask<T> f =
2621 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2622 futures.add(f);
2623 externalSubmit(f);
2624 }
2625 for (int i = futures.size() - 1; i >= 0; --i)
2626 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2627 return futures;
2628 } catch (Throwable t) {
2629 for (Future<T> e : futures)
2630 ForkJoinTask.cancelIgnoringExceptions(e);
2631 throw t;
2632 }
2633 }
2634
2635 @Override
2636 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2637 long timeout, TimeUnit unit)
2638 throws InterruptedException {
2639 long nanos = unit.toNanos(timeout);
2640 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2641 try {
2642 for (Callable<T> t : tasks) {
2643 ForkJoinTask<T> f =
2644 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2645 futures.add(f);
2646 externalSubmit(f);
2647 }
2648 long startTime = System.nanoTime(), ns = nanos;
2649 boolean timedOut = (ns < 0L);
2650 for (int i = futures.size() - 1; i >= 0; --i) {
2651 Future<T> f = futures.get(i);
2652 if (!f.isDone()) {
2653 if (timedOut)
2654 ForkJoinTask.cancelIgnoringExceptions(f);
2655 else {
2656 try {
2657 f.get(ns, TimeUnit.NANOSECONDS);
2658 } catch (CancellationException | TimeoutException |
2659 ExecutionException ok) {
2660 }
2661 if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2662 timedOut = true;
2663 }
2664 }
2665 }
2666 return futures;
2667 } catch (Throwable t) {
2668 for (Future<T> e : futures)
2669 ForkJoinTask.cancelIgnoringExceptions(e);
2670 throw t;
2671 }
2672 }
2673
2674 // Task to hold results from InvokeAnyTasks
2675 static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2676 private static final long serialVersionUID = 2838392045355241008L;
2677 @SuppressWarnings("serial") // Conditionally serializable
2678 volatile E result;
2679 final AtomicInteger count; // in case all throw
2680 InvokeAnyRoot(int n) { count = new AtomicInteger(n); }
2681 final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2682 if (c != null && !isDone()) { // raciness OK
2683 try {
2684 complete(c.call());
2685 } catch (Throwable ex) {
2686 if (count.getAndDecrement() <= 1)
2687 trySetThrown(ex);
2688 }
2689 }
2690 }
2691 public final boolean exec() { return false; } // never forked
2692 public final E getRawResult() { return result; }
2693 public final void setRawResult(E v) { result = v; }
2694 }
2695
2696 // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
2697 static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
2698 private static final long serialVersionUID = 2838392045355241008L;
2699 final InvokeAnyRoot<E> root;
2700 @SuppressWarnings("serial") // Conditionally serializable
2701 final Callable<E> callable;
2702 transient volatile Thread runner;
2703 InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) {
2704 this.root = root;
2705 this.callable = callable;
2706 }
2707 public final boolean exec() {
2708 Thread.interrupted();
2709 runner = Thread.currentThread();
2710 root.tryComplete(callable);
2711 runner = null;
2712 Thread.interrupted();
2713 return true;
2714 }
2715 public final boolean cancel(boolean mayInterruptIfRunning) {
2716 Thread t;
2717 boolean stat = super.cancel(false);
2718 if (mayInterruptIfRunning && (t = runner) != null) {
2719 try {
2720 t.interrupt();
2721 } catch (Throwable ignore) {
2722 }
2723 }
2724 return stat;
2725 }
2726 public final void setRawResult(E v) {} // unused
2727 public final E getRawResult() { return null; }
2728 }
2729
2730 @Override
2731 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2732 throws InterruptedException, ExecutionException {
2733 int n = tasks.size();
2734 if (n <= 0)
2735 throw new IllegalArgumentException();
2736 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n);
2737 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2738 for (Callable<T> c : tasks) {
2739 if (c == null)
2740 throw new NullPointerException();
2741 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2742 fs.add(f);
2743 if (isSaturated())
2744 f.doExec();
2745 else
2746 externalSubmit(f);
2747 if (root.isDone())
2748 break;
2749 }
2750 try {
2751 return root.get();
2752 } finally {
2753 for (InvokeAnyTask<T> f : fs)
2754 f.cancel(true);
2755 }
2756 }
2757
2758 @Override
2759 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2760 long timeout, TimeUnit unit)
2761 throws InterruptedException, ExecutionException, TimeoutException {
2762 long nanos = unit.toNanos(timeout);
2763 int n = tasks.size();
2764 if (n <= 0)
2765 throw new IllegalArgumentException();
2766 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n);
2767 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2768 for (Callable<T> c : tasks) {
2769 if (c == null)
2770 throw new NullPointerException();
2771 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2772 fs.add(f);
2773 if (isSaturated())
2774 f.doExec();
2775 else
2776 externalSubmit(f);
2777 if (root.isDone())
2778 break;
2779 }
2780 try {
2781 return root.get(nanos, TimeUnit.NANOSECONDS);
2782 } finally {
2783 for (InvokeAnyTask<T> f : fs)
2784 f.cancel(true);
2785 }
2786 }
2787
2788 /**
2789 * Returns the factory used for constructing new workers.
2790 *
2791 * @return the factory used for constructing new workers
2792 */
2793 public ForkJoinWorkerThreadFactory getFactory() {
2794 return factory;
2795 }
2796
2797 /**
2798 * Returns the handler for internal worker threads that terminate
2799 * due to unrecoverable errors encountered while executing tasks.
2800 *
2801 * @return the handler, or {@code null} if none
2802 */
2803 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2804 return ueh;
2805 }
2806
2807 /**
2808 * Returns the targeted parallelism level of this pool.
2809 *
2810 * @return the targeted parallelism level of this pool
2811 */
2812 public int getParallelism() {
2813 int par = mode & SMASK;
2814 return (par > 0) ? par : 1;
2815 }
2816
2817 /**
2818 * Returns the targeted parallelism level of the common pool.
2819 *
2820 * @return the targeted parallelism level of the common pool
2821 * @since 1.8
2822 */
2823 public static int getCommonPoolParallelism() {
2824 return COMMON_PARALLELISM;
2825 }
2826
2827 /**
2828 * Returns the number of worker threads that have started but not
2829 * yet terminated. The result returned by this method may differ
2830 * from {@link #getParallelism} when threads are created to
2831 * maintain parallelism when others are cooperatively blocked.
2832 *
2833 * @return the number of worker threads
2834 */
2835 public int getPoolSize() {
2836 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2837 }
2838
2839 /**
2840 * Returns {@code true} if this pool uses local first-in-first-out
2841 * scheduling mode for forked tasks that are never joined.
2842 *
2843 * @return {@code true} if this pool uses async mode
2844 */
2845 public boolean getAsyncMode() {
2846 return (mode & FIFO) != 0;
2847 }
2848
2849 /**
2850 * Returns an estimate of the number of worker threads that are
2851 * not blocked waiting to join tasks or for other managed
2852 * synchronization. This method may overestimate the
2853 * number of running threads.
2854 *
2855 * @return the number of worker threads
2856 */
2857 public int getRunningThreadCount() {
2858 VarHandle.acquireFence();
2859 WorkQueue[] qs; WorkQueue q;
2860 int rc = 0;
2861 if ((qs = queues) != null) {
2862 for (int i = 1; i < qs.length; i += 2) {
2863 if ((q = qs[i]) != null && q.isApparentlyUnblocked())
2864 ++rc;
2865 }
2866 }
2867 return rc;
2868 }
2869
2870 /**
2871 * Returns an estimate of the number of threads that are currently
2872 * stealing or executing tasks. This method may overestimate the
2873 * number of active threads.
2874 *
2875 * @return the number of active threads
2876 */
2877 public int getActiveThreadCount() {
2878 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2879 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2880 }
2881
2882 /**
2883 * Returns {@code true} if all worker threads are currently idle.
2884 * An idle worker is one that cannot obtain a task to execute
2885 * because none are available to steal from other threads, and
2886 * there are no pending submissions to the pool. This method is
2887 * conservative; it might not return {@code true} immediately upon
2888 * idleness of all threads, but will eventually become true if
2889 * threads remain inactive.
2890 *
2891 * @return {@code true} if all threads are currently idle
2892 */
2893 public boolean isQuiescent() {
2894 return canStop();
2895 }
2896
2897 /**
2898 * Returns an estimate of the total number of completed tasks that
2899 * were executed by a thread other than their submitter. The
2900 * reported value underestimates the actual total number of steals
2901 * when the pool is not quiescent. This value may be useful for
2902 * monitoring and tuning fork/join programs: in general, steal
2903 * counts should be high enough to keep threads busy, but low
2904 * enough to avoid overhead and contention across threads.
2905 *
2906 * @return the number of steals
2907 */
2908 public long getStealCount() {
2909 long count = stealCount;
2910 WorkQueue[] qs; WorkQueue q;
2911 if ((qs = queues) != null) {
2912 for (int i = 1; i < qs.length; i += 2) {
2913 if ((q = qs[i]) != null)
2914 count += (long)q.nsteals & 0xffffffffL;
2915 }
2916 }
2917 return count;
2918 }
2919
2920 /**
2921 * Returns an estimate of the total number of tasks currently held
2922 * in queues by worker threads (but not including tasks submitted
2923 * to the pool that have not begun executing). This value is only
2924 * an approximation, obtained by iterating across all threads in
2925 * the pool. This method may be useful for tuning task
2926 * granularities.
2927 *
2928 * @return the number of queued tasks
2929 */
2930 public long getQueuedTaskCount() {
2931 VarHandle.acquireFence();
2932 WorkQueue[] qs; WorkQueue q;
2933 int count = 0;
2934 if ((qs = queues) != null) {
2935 for (int i = 1; i < qs.length; i += 2) {
2936 if ((q = qs[i]) != null)
2937 count += q.queueSize();
2938 }
2939 }
2940 return count;
2941 }
2942
2943 /**
2944 * Returns an estimate of the number of tasks submitted to this
2945 * pool that have not yet begun executing. This method may take
2946 * time proportional to the number of submissions.
2947 *
2948 * @return the number of queued submissions
2949 */
2950 public int getQueuedSubmissionCount() {
2951 VarHandle.acquireFence();
2952 WorkQueue[] qs; WorkQueue q;
2953 int count = 0;
2954 if ((qs = queues) != null) {
2955 for (int i = 0; i < qs.length; i += 2) {
2956 if ((q = qs[i]) != null)
2957 count += q.queueSize();
2958 }
2959 }
2960 return count;
2961 }
2962
2963 /**
2964 * Returns {@code true} if there are any tasks submitted to this
2965 * pool that have not yet begun executing.
2966 *
2967 * @return {@code true} if there are any queued submissions
2968 */
2969 public boolean hasQueuedSubmissions() {
2970 VarHandle.acquireFence();
2971 WorkQueue[] qs; WorkQueue q;
2972 if ((qs = queues) != null) {
2973 for (int i = 0; i < qs.length; i += 2) {
2974 if ((q = qs[i]) != null && !q.isEmpty())
2975 return true;
2976 }
2977 }
2978 return false;
2979 }
2980
2981 /**
2982 * Removes and returns the next unexecuted submission if one is
2983 * available. This method may be useful in extensions to this
2984 * class that re-assign work in systems with multiple pools.
2985 *
2986 * @return the next submission, or {@code null} if none
2987 */
2988 protected ForkJoinTask<?> pollSubmission() {
2989 return pollScan(true);
2990 }
2991
2992 /**
2993 * Removes all available unexecuted submitted and forked tasks
2994 * from scheduling queues and adds them to the given collection,
2995 * without altering their execution status. These may include
2996 * artificially generated or wrapped tasks. This method is
2997 * designed to be invoked only when the pool is known to be
2998 * quiescent. Invocations at other times may not remove all
2999 * tasks. A failure encountered while attempting to add elements
3000 * to collection {@code c} may result in elements being in
3001 * neither, either or both collections when the associated
3002 * exception is thrown. The behavior of this operation is
3003 * undefined if the specified collection is modified while the
3004 * operation is in progress.
3005 *
3006 * @param c the collection to transfer elements into
3007 * @return the number of elements transferred
3008 */
3009 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
3010 int count = 0;
3011 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
3012 c.add(t);
3013 ++count;
3014 }
3015 return count;
3016 }
3017
3018 /**
3019 * Returns a string identifying this pool, as well as its state,
3020 * including indications of run state, parallelism level, and
3021 * worker and task counts.
3022 *
3023 * @return a string identifying this pool, as well as its state
3024 */
3025 public String toString() {
3026 // Use a single pass through queues to collect counts
3027 int md = mode; // read volatile fields first
3028 long c = ctl;
3029 long st = stealCount;
3030 long qt = 0L, ss = 0L; int rc = 0;
3031 WorkQueue[] qs; WorkQueue q;
3032 if ((qs = queues) != null) {
3033 for (int i = 0; i < qs.length; ++i) {
3034 if ((q = qs[i]) != null) {
3035 int size = q.queueSize();
3036 if ((i & 1) == 0)
3037 ss += size;
3038 else {
3039 qt += size;
3040 st += (long)q.nsteals & 0xffffffffL;
3041 if (q.isApparentlyUnblocked())
3042 ++rc;
3043 }
3044 }
3045 }
3046 }
3047
3048 int pc = (md & SMASK);
3049 int tc = pc + (short)(c >>> TC_SHIFT);
3050 int ac = pc + (int)(c >> RC_SHIFT);
3051 if (ac < 0) // ignore transient negative
3052 ac = 0;
3053 String level = ((md & TERMINATED) != 0 ? "Terminated" :
3054 (md & STOP) != 0 ? "Terminating" :
3055 (md & SHUTDOWN) != 0 ? "Shutting down" :
3056 "Running");
3057 return super.toString() +
3058 "[" + level +
3059 ", parallelism = " + pc +
3060 ", size = " + tc +
3061 ", active = " + ac +
3062 ", running = " + rc +
3063 ", steals = " + st +
3064 ", tasks = " + qt +
3065 ", submissions = " + ss +
3066 "]";
3067 }
3068
3069 /**
3070 * Possibly initiates an orderly shutdown in which previously
3071 * submitted tasks are executed, but no new tasks will be
3072 * accepted. Invocation has no effect on execution state if this
3073 * is the {@link #commonPool()}, and no additional effect if
3074 * already shut down. Tasks that are in the process of being
3075 * submitted concurrently during the course of this method may or
3076 * may not be rejected.
3077 *
3078 * @throws SecurityException if a security manager exists and
3079 * the caller is not permitted to modify threads
3080 * because it does not hold {@link
3081 * java.lang.RuntimePermission}{@code ("modifyThread")}
3082 */
3083 public void shutdown() {
3084 checkPermission();
3085 if (this != common)
3086 tryTerminate(false, true);
3087 }
3088
3089 /**
3090 * Possibly attempts to cancel and/or stop all tasks, and reject
3091 * all subsequently submitted tasks. Invocation has no effect on
3092 * execution state if this is the {@link #commonPool()}, and no
3093 * additional effect if already shut down. Otherwise, tasks that
3094 * are in the process of being submitted or executed concurrently
3095 * during the course of this method may or may not be
3096 * rejected. This method cancels both existing and unexecuted
3097 * tasks, in order to permit termination in the presence of task
3098 * dependencies. So the method always returns an empty list
3099 * (unlike the case for some other Executors).
3100 *
3101 * @return an empty list
3102 * @throws SecurityException if a security manager exists and
3103 * the caller is not permitted to modify threads
3104 * because it does not hold {@link
3105 * java.lang.RuntimePermission}{@code ("modifyThread")}
3106 */
3107 public List<Runnable> shutdownNow() {
3108 checkPermission();
3109 if (this != common)
3110 tryTerminate(true, true);
3111 return Collections.emptyList();
3112 }
3113
3114 /**
3115 * Returns {@code true} if all tasks have completed following shut down.
3116 *
3117 * @return {@code true} if all tasks have completed following shut down
3118 */
3119 public boolean isTerminated() {
3120 return (mode & TERMINATED) != 0;
3121 }
3122
3123 /**
3124 * Returns {@code true} if the process of termination has
3125 * commenced but not yet completed. This method may be useful for
3126 * debugging. A return of {@code true} reported a sufficient
3127 * period after shutdown may indicate that submitted tasks have
3128 * ignored or suppressed interruption, or are waiting for I/O,
3129 * causing this executor not to properly terminate. (See the
3130 * advisory notes for class {@link ForkJoinTask} stating that
3131 * tasks should not normally entail blocking operations. But if
3132 * they do, they must abort them on interrupt.)
3133 *
3134 * @return {@code true} if terminating but not yet terminated
3135 */
3136 public boolean isTerminating() {
3137 return (mode & (STOP | TERMINATED)) == STOP;
3138 }
3139
3140 /**
3141 * Returns {@code true} if this pool has been shut down.
3142 *
3143 * @return {@code true} if this pool has been shut down
3144 */
3145 public boolean isShutdown() {
3146 return (mode & SHUTDOWN) != 0;
3147 }
3148
3149 /**
3150 * Blocks until all tasks have completed execution after a
3151 * shutdown request, or the timeout occurs, or the current thread
3152 * is interrupted, whichever happens first. Because the {@link
3153 * #commonPool()} never terminates until program shutdown, when
3154 * applied to the common pool, this method is equivalent to {@link
3155 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3156 *
3157 * @param timeout the maximum time to wait
3158 * @param unit the time unit of the timeout argument
3159 * @return {@code true} if this executor terminated and
3160 * {@code false} if the timeout elapsed before termination
3161 * @throws InterruptedException if interrupted while waiting
3162 */
3163 public boolean awaitTermination(long timeout, TimeUnit unit)
3164 throws InterruptedException {
3165 ReentrantLock lock; Condition cond;
3166 long nanos = unit.toNanos(timeout);
3167 boolean terminated = false;
3168 if (this == common) {
3169 Thread t; ForkJoinWorkerThread wt; int q;
3170 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3171 (wt = (ForkJoinWorkerThread)t).pool == this)
3172 q = helpQuiescePool(wt.workQueue, nanos, true);
3173 else
3174 q = externalHelpQuiescePool(nanos, true);
3175 if (q < 0)
3176 throw new InterruptedException();
3177 }
3178 else if (!(terminated = isTerminated()) &&
3179 (lock = registrationLock) != null) {
3180 lock.lock();
3181 try {
3182 if ((cond = termination) == null)
3183 termination = cond = lock.newCondition();
3184 while (!(terminated = isTerminated()) && nanos > 0L)
3185 nanos = cond.awaitNanos(nanos);
3186 } finally {
3187 lock.unlock();
3188 }
3189 }
3190 return terminated;
3191 }
3192
3193 /**
3194 * If called by a ForkJoinTask operating in this pool, equivalent
3195 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3196 * waits and/or attempts to assist performing tasks until this
3197 * pool {@link #isQuiescent} or the indicated timeout elapses.
3198 *
3199 * @param timeout the maximum time to wait
3200 * @param unit the time unit of the timeout argument
3201 * @return {@code true} if quiescent; {@code false} if the
3202 * timeout elapsed.
3203 */
3204 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3205 Thread t; ForkJoinWorkerThread wt; int q;
3206 long nanos = unit.toNanos(timeout);
3207 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3208 (wt = (ForkJoinWorkerThread)t).pool == this)
3209 q = helpQuiescePool(wt.workQueue, nanos, false);
3210 else
3211 q = externalHelpQuiescePool(nanos, false);
3212 return (q > 0);
3213 }
3214
3215 /**
3216 * Interface for extending managed parallelism for tasks running
3217 * in {@link ForkJoinPool}s.
3218 *
3219 * <p>A {@code ManagedBlocker} provides two methods. Method
3220 * {@link #isReleasable} must return {@code true} if blocking is
3221 * not necessary. Method {@link #block} blocks the current thread
3222 * if necessary (perhaps internally invoking {@code isReleasable}
3223 * before actually blocking). These actions are performed by any
3224 * thread invoking {@link
3225 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
3226 * methods in this API accommodate synchronizers that may, but
3227 * don't usually, block for long periods. Similarly, they allow
3228 * more efficient internal handling of cases in which additional
3229 * workers may be, but usually are not, needed to ensure
3230 * sufficient parallelism. Toward this end, implementations of
3231 * method {@code isReleasable} must be amenable to repeated
3232 * invocation. Neither method is invoked after a prior invocation
3233 * of {@code isReleasable} or {@code block} returns {@code true}.
3234 *
3235 * <p>For example, here is a ManagedBlocker based on a
3236 * ReentrantLock:
3237 * <pre> {@code
3238 * class ManagedLocker implements ManagedBlocker {
3239 * final ReentrantLock lock;
3240 * boolean hasLock = false;
3241 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3242 * public boolean block() {
3243 * if (!hasLock)
3244 * lock.lock();
3245 * return true;
3246 * }
3247 * public boolean isReleasable() {
3248 * return hasLock || (hasLock = lock.tryLock());
3249 * }
3250 * }}</pre>
3251 *
3252 * <p>Here is a class that possibly blocks waiting for an
3253 * item on a given queue:
3254 * <pre> {@code
3255 * class QueueTaker<E> implements ManagedBlocker {
3256 * final BlockingQueue<E> queue;
3257 * volatile E item = null;
3258 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3259 * public boolean block() throws InterruptedException {
3260 * if (item == null)
3261 * item = queue.take();
3262 * return true;
3263 * }
3264 * public boolean isReleasable() {
3265 * return item != null || (item = queue.poll()) != null;
3266 * }
3267 * public E getItem() { // call after pool.managedBlock completes
3268 * return item;
3269 * }
3270 * }}</pre>
3271 */
3272 public static interface ManagedBlocker {
3273 /**
3274 * Possibly blocks the current thread, for example waiting for
3275 * a lock or condition.
3276 *
3277 * @return {@code true} if no additional blocking is necessary
3278 * (i.e., if isReleasable would return true)
3279 * @throws InterruptedException if interrupted while waiting
3280 * (the method is not required to do so, but is allowed to)
3281 */
3282 boolean block() throws InterruptedException;
3283
3284 /**
3285 * Returns {@code true} if blocking is unnecessary.
3286 * @return {@code true} if blocking is unnecessary
3287 */
3288 boolean isReleasable();
3289 }
3290
3291 /**
3292 * Runs the given possibly blocking task. When {@linkplain
3293 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3294 * method possibly arranges for a spare thread to be activated if
3295 * necessary to ensure sufficient parallelism while the current
3296 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3297 *
3298 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3299 * {@code blocker.block()} until either method returns {@code true}.
3300 * Every call to {@code blocker.block()} is preceded by a call to
3301 * {@code blocker.isReleasable()} that returned {@code false}.
3302 *
3303 * <p>If not running in a ForkJoinPool, this method is
3304 * behaviorally equivalent to
3305 * <pre> {@code
3306 * while (!blocker.isReleasable())
3307 * if (blocker.block())
3308 * break;}</pre>
3309 *
3310 * If running in a ForkJoinPool, the pool may first be expanded to
3311 * ensure sufficient parallelism available during the call to
3312 * {@code blocker.block()}.
3313 *
3314 * @param blocker the blocker task
3315 * @throws InterruptedException if {@code blocker.block()} did so
3316 */
3317 public static void managedBlock(ManagedBlocker blocker)
3318 throws InterruptedException {
3319 Thread t; ForkJoinPool p;
3320 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3321 (p = ((ForkJoinWorkerThread)t).pool) != null)
3322 p.compensatedBlock(blocker);
3323 else
3324 unmanagedBlock(blocker);
3325 }
3326
3327 /** ManagedBlock for ForkJoinWorkerThreads */
3328 private void compensatedBlock(ManagedBlocker blocker)
3329 throws InterruptedException {
3330 if (blocker == null) throw new NullPointerException();
3331 for (;;) {
3332 int comp; boolean done;
3333 long c = ctl;
3334 if (blocker.isReleasable())
3335 break;
3336 if ((comp = tryCompensate(c)) >= 0) {
3337 long post = (comp == 0) ? 0L : RC_UNIT;
3338 try {
3339 done = blocker.block();
3340 } finally {
3341 getAndAddCtl(post);
3342 }
3343 if (done)
3344 break;
3345 }
3346 }
3347 }
3348
3349 /** ManagedBlock for external threads */
3350 private static void unmanagedBlock(ManagedBlocker blocker)
3351 throws InterruptedException {
3352 if (blocker == null) throw new NullPointerException();
3353 do {} while (!blocker.isReleasable() && !blocker.block());
3354 }
3355
3356 // AbstractExecutorService.newTaskFor overrides rely on
3357 // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks
3358 // that also implement RunnableFuture.
3359
3360 @Override
3361 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3362 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3363 }
3364
3365 @Override
3366 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3367 return new ForkJoinTask.AdaptedCallable<T>(callable);
3368 }
3369
3370 static {
3371 try {
3372 MethodHandles.Lookup l = MethodHandles.lookup();
3373 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3374 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3375 THREADIDS = l.findVarHandle(ForkJoinPool.class, "threadIds", int.class);
3376 POOLIDS = l.findStaticVarHandle(ForkJoinPool.class, "poolIds", int.class);
3377 } catch (ReflectiveOperationException e) {
3378 throw new ExceptionInInitializerError(e);
3379 }
3380
3381 // Reduce the risk of rare disastrous classloading in first call to
3382 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3383 Class<?> ensureLoaded = LockSupport.class;
3384
3385 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3386 try {
3387 String p = System.getProperty
3388 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3389 if (p != null)
3390 commonMaxSpares = Integer.parseInt(p);
3391 } catch (Exception ignore) {}
3392 COMMON_MAX_SPARES = commonMaxSpares;
3393
3394 defaultForkJoinWorkerThreadFactory =
3395 new DefaultForkJoinWorkerThreadFactory();
3396 modifyThreadPermission = new RuntimePermission("modifyThread");
3397 common = AccessController.doPrivileged(new PrivilegedAction<>() {
3398 public ForkJoinPool run() {
3399 return new ForkJoinPool((byte)0); }});
3400
3401 COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3402 }
3403 }