ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.304
Committed: Mon Mar 14 18:11:23 2016 UTC (8 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.303: +5 -5 lines
Log Message:
clarify implementation comment

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.security.AccessControlContext;
11 import java.security.Permissions;
12 import java.security.ProtectionDomain;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.CountedCompleter;
20 import java.util.concurrent.ForkJoinTask;
21 import java.util.concurrent.ForkJoinWorkerThread;
22 import java.util.concurrent.locks.LockSupport;
23
24 /**
25 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 * A {@code ForkJoinPool} provides the entry point for submissions
27 * from non-{@code ForkJoinTask} clients, as well as management and
28 * monitoring operations.
29 *
30 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31 * ExecutorService} mainly by virtue of employing
32 * <em>work-stealing</em>: all threads in the pool attempt to find and
33 * execute tasks submitted to the pool and/or created by other active
34 * tasks (eventually blocking waiting for work if none exist). This
35 * enables efficient processing when most tasks spawn other subtasks
36 * (as do most {@code ForkJoinTask}s), as well as when many small
37 * tasks are submitted to the pool from external clients. Especially
38 * when setting <em>asyncMode</em> to true in constructors, {@code
39 * ForkJoinPool}s may also be appropriate for use with event-style
40 * tasks that are never joined.
41 *
42 * <p>A static {@link #commonPool()} is available and appropriate for
43 * most applications. The common pool is used by any ForkJoinTask that
44 * is not explicitly submitted to a specified pool. Using the common
45 * pool normally reduces resource usage (its threads are slowly
46 * reclaimed during periods of non-use, and reinstated upon subsequent
47 * use).
48 *
49 * <p>For applications that require separate or custom pools, a {@code
50 * ForkJoinPool} may be constructed with a given target parallelism
51 * level; by default, equal to the number of available processors.
52 * The pool attempts to maintain enough active (or available) threads
53 * by dynamically adding, suspending, or resuming internal worker
54 * threads, even if some tasks are stalled waiting to join others.
55 * However, no such adjustments are guaranteed in the face of blocked
56 * I/O or other unmanaged synchronization. The nested {@link
57 * ManagedBlocker} interface enables extension of the kinds of
58 * synchronization accommodated. The default policies may be
59 * overridden using a constructor with parameters corresponding to
60 * those documented in class {@link ThreadPoolExecutor}.
61 *
62 * <p>In addition to execution and lifecycle control methods, this
63 * class provides status check methods (for example
64 * {@link #getStealCount}) that are intended to aid in developing,
65 * tuning, and monitoring fork/join applications. Also, method
66 * {@link #toString} returns indications of pool state in a
67 * convenient form for informal monitoring.
68 *
69 * <p>As is the case with other ExecutorServices, there are three
70 * main task execution methods summarized in the following table.
71 * These are designed to be used primarily by clients not already
72 * engaged in fork/join computations in the current pool. The main
73 * forms of these methods accept instances of {@code ForkJoinTask},
74 * but overloaded forms also allow mixed execution of plain {@code
75 * Runnable}- or {@code Callable}- based activities as well. However,
76 * tasks that are already executing in a pool should normally instead
77 * use the within-computation forms listed in the table unless using
78 * async event-style tasks that are not usually joined, in which case
79 * there is little difference among choice of methods.
80 *
81 * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 * <caption>Summary of task execution methods</caption>
83 * <tr>
84 * <td></td>
85 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87 * </tr>
88 * <tr>
89 * <td> <b>Arrange async execution</b></td>
90 * <td> {@link #execute(ForkJoinTask)}</td>
91 * <td> {@link ForkJoinTask#fork}</td>
92 * </tr>
93 * <tr>
94 * <td> <b>Await and obtain result</b></td>
95 * <td> {@link #invoke(ForkJoinTask)}</td>
96 * <td> {@link ForkJoinTask#invoke}</td>
97 * </tr>
98 * <tr>
99 * <td> <b>Arrange exec and obtain Future</b></td>
100 * <td> {@link #submit(ForkJoinTask)}</td>
101 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102 * </tr>
103 * </table>
104 *
105 * <p>The common pool is by default constructed with default
106 * parameters, but these may be controlled by setting three
107 * {@linkplain System#getProperty system properties}:
108 * <ul>
109 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110 * - the parallelism level, a non-negative integer
111 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112 * - the class name of a {@link ForkJoinWorkerThreadFactory}
113 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114 * - the class name of a {@link UncaughtExceptionHandler}
115 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 * - the maximum number of allowed extra threads to maintain target
117 * parallelism (default 256).
118 * </ul>
119 * If a {@link SecurityManager} is present and no factory is
120 * specified, then the default pool uses a factory supplying
121 * threads that have no {@link Permissions} enabled.
122 * The system class loader is used to load these classes.
123 * Upon any error in establishing these settings, default parameters
124 * are used. It is possible to disable or limit the use of threads in
125 * the common pool by setting the parallelism property to zero, and/or
126 * using a factory that may return {@code null}. However doing so may
127 * cause unjoined tasks to never be executed.
128 *
129 * <p><b>Implementation notes</b>: This implementation restricts the
130 * maximum number of running threads to 32767. Attempts to create
131 * pools with greater than the maximum number result in
132 * {@code IllegalArgumentException}.
133 *
134 * <p>This implementation rejects submitted tasks (that is, by throwing
135 * {@link RejectedExecutionException}) only when the pool is shut down
136 * or internal resources have been exhausted.
137 *
138 * @since 1.7
139 * @author Doug Lea
140 */
141 public class ForkJoinPool extends AbstractExecutorService {
142
143 /*
144 * Implementation Overview
145 *
146 * This class and its nested classes provide the main
147 * functionality and control for a set of worker threads:
148 * Submissions from non-FJ threads enter into submission queues.
149 * Workers take these tasks and typically split them into subtasks
150 * that may be stolen by other workers. Preference rules give
151 * first priority to processing tasks from their own queues (LIFO
152 * or FIFO, depending on mode), then to randomized FIFO steals of
153 * tasks in other queues. This framework began as vehicle for
154 * supporting tree-structured parallelism using work-stealing.
155 * Over time, its scalability advantages led to extensions and
156 * changes to better support more diverse usage contexts. Because
157 * most internal methods and nested classes are interrelated,
158 * their main rationale and descriptions are presented here;
159 * individual methods and nested classes contain only brief
160 * comments about details.
161 *
162 * WorkQueues
163 * ==========
164 *
165 * Most operations occur within work-stealing queues (in nested
166 * class WorkQueue). These are special forms of Deques that
167 * support only three of the four possible end-operations -- push,
168 * pop, and poll (aka steal), under the further constraints that
169 * push and pop are called only from the owning thread (or, as
170 * extended here, under a lock), while poll may be called from
171 * other threads. (If you are unfamiliar with them, you probably
172 * want to read Herlihy and Shavit's book "The Art of
173 * Multiprocessor programming", chapter 16 describing these in
174 * more detail before proceeding.) The main work-stealing queue
175 * design is roughly similar to those in the papers "Dynamic
176 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177 * (http://research.sun.com/scalable/pubs/index.html) and
178 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 * The main differences ultimately stem from GC requirements that
181 * we null out taken slots as soon as we can, to maintain as small
182 * a footprint as possible even in programs generating huge
183 * numbers of tasks. To accomplish this, we shift the CAS
184 * arbitrating pop vs poll (steal) from being on the indices
185 * ("base" and "top") to the slots themselves.
186 *
187 * Adding tasks then takes the form of a classic array push(task)
188 * in a circular buffer:
189 * q.array[q.top++ % length] = task;
190 *
191 * (The actual code needs to null-check and size-check the array,
192 * uses masking, not mod, for indexing a power-of-two-sized array,
193 * properly fences accesses, and possibly signals waiting workers
194 * to start scanning -- see below.) Both a successful pop and
195 * poll mainly entail a CAS of a slot from non-null to null.
196 *
197 * The pop operation (always performed by owner) is:
198 * if ((the task at top slot is not null) and
199 * (CAS slot to null))
200 * decrement top and return task;
201 *
202 * And the poll operation (usually by a stealer) is
203 * if ((the task at base slot is not null) and
204 * (CAS slot to null))
205 * increment base and return task;
206 *
207 * There are several variants of each of these. In particular,
208 * almost all uses of poll occur within scan operations that also
209 * interleave contention tracking (with associated code sprawl.)
210 *
211 * Memory ordering. See "Correct and Efficient Work-Stealing for
212 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214 * analysis of memory ordering requirements in work-stealing
215 * algorithms similar to (but different than) the one used here.
216 * Extracting tasks in array slots via (fully fenced) CAS provides
217 * primary synchronization. The base and top indices imprecisely
218 * guide where to extract from. We do not always require strict
219 * orderings of array and index updates, so sometimes let them be
220 * subject to compiler and processor reorderings. However, the
221 * volatile "base" index also serves as a basis for memory
222 * ordering: Slot accesses are preceded by a read of base,
223 * ensuring happens-before ordering with respect to stealers (so
224 * the slots themselves can be read via plain array reads.) The
225 * only other memory orderings relied on are maintained in the
226 * course of signalling and activation (see below). A check that
227 * base == top indicates (momentary) emptiness, but otherwise may
228 * err on the side of possibly making the queue appear nonempty
229 * when a push, pop, or poll have not fully committed, or making
230 * it appear empty when an update of top has not yet been visibly
231 * written. (Method isEmpty() checks the case of a partially
232 * completed removal of the last element.) Because of this, the
233 * poll operation, considered individually, is not wait-free. One
234 * thief cannot successfully continue until another in-progress
235 * one (or, if previously empty, a push) visibly completes.
236 * However, in the aggregate, we ensure at least probabilistic
237 * non-blockingness. If an attempted steal fails, a scanning
238 * thief chooses a different random victim target to try next. So,
239 * in order for one thief to progress, it suffices for any
240 * in-progress poll or new push on any empty queue to
241 * complete.
242 *
243 * This approach also enables support of a user mode in which
244 * local task processing is in FIFO, not LIFO order, simply by
245 * using poll rather than pop. This can be useful in
246 * message-passing frameworks in which tasks are never joined.
247 *
248 * WorkQueues are also used in a similar way for tasks submitted
249 * to the pool. We cannot mix these tasks in the same queues used
250 * by workers. Instead, we randomly associate submission queues
251 * with submitting threads, using a form of hashing. The
252 * ThreadLocalRandom probe value serves as a hash code for
253 * choosing existing queues, and may be randomly repositioned upon
254 * contention with other submitters. In essence, submitters act
255 * like workers except that they are restricted to executing local
256 * tasks that they submitted. Insertion of tasks in shared mode
257 * requires a lock but we use only a simple spinlock (using field
258 * phase), because submitters encountering a busy queue move to a
259 * different position to use or create other queues -- they block
260 * only when creating and registering new queues. Because it is
261 * used only as a spinlock, unlocking requires only a "releasing"
262 * store (using putOrderedInt).
263 *
264 * Management
265 * ==========
266 *
267 * The main throughput advantages of work-stealing stem from
268 * decentralized control -- workers mostly take tasks from
269 * themselves or each other, at rates that can exceed a billion
270 * per second. The pool itself creates, activates (enables
271 * scanning for and running tasks), deactivates, blocks, and
272 * terminates threads, all with minimal central information.
273 * There are only a few properties that we can globally track or
274 * maintain, so we pack them into a small number of variables,
275 * often maintaining atomicity without blocking or locking.
276 * Nearly all essentially atomic control state is held in a few
277 * volatile variables that are by far most often read (not
278 * written) as status and consistency checks. We pack as much
279 * information into them as we can.
280 *
281 * Field "ctl" contains 64 bits holding information needed to
282 * atomically decide to add, enqueue (on an event queue), and
283 * dequeue (and release)-activate workers. To enable this
284 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285 * far in excess of normal operating range) to allow ids, counts,
286 * and their negations (used for thresholding) to fit into 16bit
287 * subfields.
288 *
289 * Field "mode" holds configuration parameters as well as lifetime
290 * status, atomically and monotonically setting SHUTDOWN, STOP,
291 * and finally TERMINATED bits.
292 *
293 * Field "workQueues" holds references to WorkQueues. It is
294 * updated (only during worker creation and termination) under
295 * lock (using field workerNamePrefix as lock), but is otherwise
296 * concurrently readable, and accessed directly. We also ensure
297 * that uses of the array reference itself never become too stale
298 * in case of resizing. To simplify index-based operations, the
299 * array size is always a power of two, and all readers must
300 * tolerate null slots. Worker queues are at odd indices. Shared
301 * (submission) queues are at even indices, up to a maximum of 64
302 * slots, to limit growth even if array needs to expand to add
303 * more workers. Grouping them together in this way simplifies and
304 * speeds up task scanning.
305 *
306 * All worker thread creation is on-demand, triggered by task
307 * submissions, replacement of terminated workers, and/or
308 * compensation for blocked workers. However, all other support
309 * code is set up to work with other policies. To ensure that we
310 * do not hold on to worker references that would prevent GC, all
311 * accesses to workQueues are via indices into the workQueues
312 * array (which is one source of some of the messy code
313 * constructions here). In essence, the workQueues array serves as
314 * a weak reference mechanism. Thus for example the stack top
315 * subfield of ctl stores indices, not references.
316 *
317 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318 * cannot let workers spin indefinitely scanning for tasks when
319 * none can be found immediately, and we cannot start/resume
320 * workers unless there appear to be tasks available. On the
321 * other hand, we must quickly prod them into action when new
322 * tasks are submitted or generated. In many usages, ramp-up time
323 * is the main limiting factor in overall performance, which is
324 * compounded at program start-up by JIT compilation and
325 * allocation. So we streamline this as much as possible.
326 *
327 * The "ctl" field atomically maintains total worker and
328 * "released" worker counts, plus the head of the available worker
329 * queue (actually stack, represented by the lower 32bit subfield
330 * of ctl). Released workers are those known to be scanning for
331 * and/or running tasks. Unreleased ("available") workers are
332 * recorded in the ctl stack. These workers are made available for
333 * signalling by enqueuing in ctl (see method runWorker). The
334 * "queue" is a form of Treiber stack. This is ideal for
335 * activating threads in most-recently used order, and improves
336 * performance and locality, outweighing the disadvantages of
337 * being prone to contention and inability to release a worker
338 * unless it is topmost on stack. To avoid missed signal problems
339 * inherent in any wait/signal design, available workers rescan
340 * for (and if found run) tasks after enqueuing. Normally their
341 * release status will be updated while doing so, but the released
342 * worker ctl count may underestimate the number of active
343 * threads. (However, it is still possible to determine quiescence
344 * via a validation traversal -- see isQuiescent). After an
345 * unsuccessful rescan, available workers are blocked until
346 * signalled (see signalWork). The top stack state holds the
347 * value of the "phase" field of the worker: its index and status,
348 * plus a version counter that, in addition to the count subfields
349 * (also serving as version stamps) provide protection against
350 * Treiber stack ABA effects.
351 *
352 * Creating workers. To create a worker, we pre-increment counts
353 * (serving as a reservation), and attempt to construct a
354 * ForkJoinWorkerThread via its factory. Upon construction, the
355 * new thread invokes registerWorker, where it constructs a
356 * WorkQueue and is assigned an index in the workQueues array
357 * (expanding the array if necessary). The thread is then started.
358 * Upon any exception across these steps, or null return from
359 * factory, deregisterWorker adjusts counts and records
360 * accordingly. If a null return, the pool continues running with
361 * fewer than the target number workers. If exceptional, the
362 * exception is propagated, generally to some external caller.
363 * Worker index assignment avoids the bias in scanning that would
364 * occur if entries were sequentially packed starting at the front
365 * of the workQueues array. We treat the array as a simple
366 * power-of-two hash table, expanding as needed. The seedIndex
367 * increment ensures no collisions until a resize is needed or a
368 * worker is deregistered and replaced, and thereafter keeps
369 * probability of collision low. We cannot use
370 * ThreadLocalRandom.getProbe() for similar purposes here because
371 * the thread has not started yet, but do so for creating
372 * submission queues for existing external threads (see
373 * externalPush).
374 *
375 * WorkQueue field "phase" is used by both workers and the pool to
376 * manage and track whether a worker is UNSIGNALLED (possibly
377 * blocked waiting for a signal). When a worker is enqueued its
378 * phase field is set. Note that phase field updates lag queue CAS
379 * releases so usage requires care -- seeing a negative phase does
380 * not guarantee that the worker is available. When queued, the
381 * lower 16 bits of scanState must hold its pool index. So we
382 * place the index there upon initialization (see registerWorker)
383 * and otherwise keep it there or restore it when necessary.
384 *
385 * The ctl field also serves as the basis for memory
386 * synchronization surrounding activation. This uses a more
387 * efficient version of a Dekker-like rule that task producers and
388 * consumers sync with each other by both writing/CASing ctl (even
389 * if to its current value). This would be extremely costly. So
390 * we relax it in several ways: (1) Producers only signal when
391 * their queue is empty. Other workers propagate this signal (in
392 * method scan) when they find tasks; to further reduce flailing,
393 * each worker signals only one other per activation. (2) Workers
394 * only enqueue after scanning (see below) and not finding any
395 * tasks. (3) Rather than CASing ctl to its current value in the
396 * common case where no action is required, we reduce write
397 * contention by equivalently prefacing signalWork when called by
398 * an external task producer using a memory access with
399 * full-volatile semantics or a "fullFence".
400 *
401 * Almost always, too many signals are issued. A task producer
402 * cannot in general tell if some existing worker is in the midst
403 * of finishing one task (or already scanning) and ready to take
404 * another without being signalled. So the producer might instead
405 * activate a different worker that does not find any work, and
406 * then inactivates. This scarcely matters in steady-state
407 * computations involving all workers, but can create contention
408 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 * computations involving only a few workers.
410 *
411 * Scanning. Method runWorker performs top-level scanning for
412 * tasks. Each scan traverses and tries to poll from each queue
413 * starting at a random index and circularly stepping. Scans are
414 * not performed in ideal random permutation order, to reduce
415 * cacheline contention. The pseudorandom generator need not have
416 * high-quality statistical properties in the long term, but just
417 * within computations; We use Marsaglia XorShifts (often via
418 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419 * suffice. Scanning also employs contention reduction: When
420 * scanning workers fail to extract an apparently existing task,
421 * they soon restart at a different pseudorandom index. This
422 * improves throughput when many threads are trying to take tasks
423 * from few queues, which can be common in some usages. Scans do
424 * not otherwise explicitly take into account core affinities,
425 * loads, cache localities, etc, However, they do exploit temporal
426 * locality (which usually approximates these) by preferring to
427 * re-poll (at most #workers times) from the same queue after a
428 * successful poll before trying others.
429 *
430 * Trimming workers. To release resources after periods of lack of
431 * use, a worker starting to wait when the pool is quiescent will
432 * time out and terminate (see method scan) if the pool has
433 * remained quiescent for period given by field keepAlive.
434 *
435 * Shutdown and Termination. A call to shutdownNow invokes
436 * tryTerminate to atomically set a runState bit. The calling
437 * thread, as well as every other worker thereafter terminating,
438 * helps terminate others by cancelling their unprocessed tasks,
439 * and waking them up, doing so repeatedly until stable. Calls to
440 * non-abrupt shutdown() preface this by checking whether
441 * termination should commence by sweeping through queues (until
442 * stable) to ensure lack of in-flight submissions and workers
443 * about to process them before triggering the "STOP" phase of
444 * termination.
445 *
446 * Joining Tasks
447 * =============
448 *
449 * Any of several actions may be taken when one worker is waiting
450 * to join a task stolen (or always held) by another. Because we
451 * are multiplexing many tasks on to a pool of workers, we can't
452 * always just let them block (as in Thread.join). We also cannot
453 * just reassign the joiner's run-time stack with another and
454 * replace it later, which would be a form of "continuation", that
455 * even if possible is not necessarily a good idea since we may
456 * need both an unblocked task and its continuation to progress.
457 * Instead we combine two tactics:
458 *
459 * Helping: Arranging for the joiner to execute some task that it
460 * would be running if the steal had not occurred.
461 *
462 * Compensating: Unless there are already enough live threads,
463 * method tryCompensate() may create or re-activate a spare
464 * thread to compensate for blocked joiners until they unblock.
465 *
466 * A third form (implemented in tryRemoveAndExec) amounts to
467 * helping a hypothetical compensator: If we can readily tell that
468 * a possible action of a compensator is to steal and execute the
469 * task being joined, the joining thread can do so directly,
470 * without the need for a compensation thread.
471 *
472 * The ManagedBlocker extension API can't use helping so relies
473 * only on compensation in method awaitBlocker.
474 *
475 * The algorithm in awaitJoin entails a form of "linear helping".
476 * Each worker records (in field source) the id of the queue from
477 * which it last stole a task. The scan in method awaitJoin uses
478 * these markers to try to find a worker to help (i.e., steal back
479 * a task from and execute it) that could hasten completion of the
480 * actively joined task. Thus, the joiner executes a task that
481 * would be on its own local deque if the to-be-joined task had
482 * not been stolen. This is a conservative variant of the approach
483 * described in Wagner & Calder "Leapfrogging: a portable
484 * technique for implementing efficient futures" SIGPLAN Notices,
485 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486 * mainly in that we only record queue ids, not full dependency
487 * links. This requires a linear scan of the workQueues array to
488 * locate stealers, but isolates cost to when it is needed, rather
489 * than adding to per-task overhead. Searches can fail to locate
490 * stealers GC stalls and the like delay recording sources.
491 * Further, even when accurately identified, stealers might not
492 * ever produce a task that the joiner can in turn help with. So,
493 * compensation is tried upon failure to find tasks to run.
494 *
495 * Compensation does not by default aim to keep exactly the target
496 * parallelism number of unblocked threads running at any given
497 * time. Some previous versions of this class employed immediate
498 * compensations for any blocked join. However, in practice, the
499 * vast majority of blockages are transient byproducts of GC and
500 * other JVM or OS activities that are made worse by replacement.
501 * Rather than impose arbitrary policies, we allow users to
502 * override the default of only adding threads upon apparent
503 * starvation. The compensation mechanism may also be bounded.
504 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505 * JVMs to cope with programming errors and abuse before running
506 * out of resources to do so.
507 *
508 * Common Pool
509 * ===========
510 *
511 * The static common pool always exists after static
512 * initialization. Since it (or any other created pool) need
513 * never be used, we minimize initial construction overhead and
514 * footprint to the setup of about a dozen fields.
515 *
516 * When external threads submit to the common pool, they can
517 * perform subtask processing (see externalHelpComplete and
518 * related methods) upon joins. This caller-helps policy makes it
519 * sensible to set common pool parallelism level to one (or more)
520 * less than the total number of available cores, or even zero for
521 * pure caller-runs. We do not need to record whether external
522 * submissions are to the common pool -- if not, external help
523 * methods return quickly. These submitters would otherwise be
524 * blocked waiting for completion, so the extra effort (with
525 * liberally sprinkled task status checks) in inapplicable cases
526 * amounts to an odd form of limited spin-wait before blocking in
527 * ForkJoinTask.join.
528 *
529 * As a more appropriate default in managed environments, unless
530 * overridden by system properties, we use workers of subclass
531 * InnocuousForkJoinWorkerThread when there is a SecurityManager
532 * present. These workers have no permissions set, do not belong
533 * to any user-defined ThreadGroup, and erase all ThreadLocals
534 * after executing any top-level task (see
535 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536 * in ForkJoinWorkerThread) may be JVM-dependent and must access
537 * particular Thread class fields to achieve this effect.
538 *
539 * Style notes
540 * ===========
541 *
542 * Memory ordering relies mainly on Unsafe intrinsics that carry
543 * the further responsibility of explicitly performing null- and
544 * bounds- checks otherwise carried out implicitly by JVMs. This
545 * can be awkward and ugly, but also reflects the need to control
546 * outcomes across the unusual cases that arise in very racy code
547 * with very few invariants. So these explicit checks would exist
548 * in some form anyway. All fields are read into locals before
549 * use, and null-checked if they are references. This is usually
550 * done in a "C"-like style of listing declarations at the heads
551 * of methods or blocks, and using inline assignments on first
552 * encounter. Array bounds-checks are usually performed by
553 * masking with array.length-1, which relies on the invariant that
554 * these arrays are created with positive lengths, which is itself
555 * paranoically checked. Nearly all explicit checks lead to
556 * bypass/return, not exception throws, because they may
557 * legitimately arise due to cancellation/revocation during
558 * shutdown.
559 *
560 * There is a lot of representation-level coupling among classes
561 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562 * fields of WorkQueue maintain data structures managed by
563 * ForkJoinPool, so are directly accessed. There is little point
564 * trying to reduce this, since any associated future changes in
565 * representations will need to be accompanied by algorithmic
566 * changes anyway. Several methods intrinsically sprawl because
567 * they must accumulate sets of consistent reads of fields held in
568 * local variables. There are also other coding oddities
569 * (including several unnecessary-looking hoisted null checks)
570 * that help some methods perform reasonably even when interpreted
571 * (not compiled).
572 *
573 * The order of declarations in this file is (with a few exceptions):
574 * (1) Static utility functions
575 * (2) Nested (static) classes
576 * (3) Static fields
577 * (4) Fields, along with constants used when unpacking some of them
578 * (5) Internal control methods
579 * (6) Callbacks and other support for ForkJoinTask methods
580 * (7) Exported methods
581 * (8) Static block initializing statics in minimally dependent order
582 */
583
584 // Static utilities
585
586 /**
587 * If there is a security manager, makes sure caller has
588 * permission to modify threads.
589 */
590 private static void checkPermission() {
591 SecurityManager security = System.getSecurityManager();
592 if (security != null)
593 security.checkPermission(modifyThreadPermission);
594 }
595
596 // Nested classes
597
598 /**
599 * Factory for creating new {@link ForkJoinWorkerThread}s.
600 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601 * for {@code ForkJoinWorkerThread} subclasses that extend base
602 * functionality or initialize threads with different contexts.
603 */
604 public static interface ForkJoinWorkerThreadFactory {
605 /**
606 * Returns a new worker thread operating in the given pool.
607 * Returning null or throwing an exception may result in tasks
608 * never being executed. If this method throws an exception,
609 * it is relayed to the caller of the method (for example
610 * {@code execute}) causing attempted thread creation. If this
611 * method returns null or throws an exception, it is not
612 * retried until the next attempted creation (for example
613 * another call to {@code execute}).
614 *
615 * @param pool the pool this thread works in
616 * @return the new worker thread, or {@code null} if the request
617 * to create a thread is rejected.
618 * @throws NullPointerException if the pool is null
619 */
620 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621 }
622
623 /**
624 * Default ForkJoinWorkerThreadFactory implementation; creates a
625 * new ForkJoinWorkerThread.
626 */
627 private static final class DefaultForkJoinWorkerThreadFactory
628 implements ForkJoinWorkerThreadFactory {
629 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 return new ForkJoinWorkerThread(pool);
631 }
632 }
633
634 // Constants shared across ForkJoinPool and WorkQueue
635
636 // Bounds
637 static final int SWIDTH = 16; // width of short
638 static final int SMASK = 0xffff; // short bits == max index
639 static final int MAX_CAP = 0x7fff; // max #workers - 1
640 static final int SQMASK = 0x007e; // max 64 (even) slots
641
642 // Masks and units for WorkQueue.phase and ctl sp subfield
643 static final int UNSIGNALLED = 1 << 31; // must be negative
644 static final int SS_SEQ = 1 << 16; // version count
645 static final int QLOCK = 1; // must be 1
646
647 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648 static final int OWNED = 1; // queue has owner thread
649 static final int FIFO = 1 << 16; // fifo queue or access mode
650 static final int SATURATE = 1 << 17; // for tryCompensate
651 static final int SHUTDOWN = 1 << 18;
652 static final int TERMINATED = 1 << 19;
653 static final int STOP = 1 << 31; // must be negative
654 static final int QUIET = 1 << 30; // not scanning or working
655 static final int DORMANT = QUIET | UNSIGNALLED;
656
657 /**
658 * The maximum number of local polls from the same queue before
659 * checking others. This is a safeguard against infinitely unfair
660 * looping under unbounded user task recursion, and must be larger
661 * than plausible cases of intentional bounded task recursion.
662 */
663 static final int POLL_LIMIT = 1 << 10;
664
665 /**
666 * Queues supporting work-stealing as well as external task
667 * submission. See above for descriptions and algorithms.
668 * Performance on most platforms is very sensitive to placement of
669 * instances of both WorkQueues and their arrays -- we absolutely
670 * do not want multiple WorkQueue instances or multiple queue
671 * arrays sharing cache lines. The @Contended annotation alerts
672 * JVMs to try to keep instances apart.
673 */
674 // For now, using manual padding.
675 // @jdk.internal.vm.annotation.Contended
676 // @sun.misc.Contended
677 static final class WorkQueue {
678
679 /**
680 * Capacity of work-stealing queue array upon initialization.
681 * Must be a power of two; at least 4, but should be larger to
682 * reduce or eliminate cacheline sharing among queues.
683 * Currently, it is much larger, as a partial workaround for
684 * the fact that JVMs often place arrays in locations that
685 * share GC bookkeeping (especially cardmarks) such that
686 * per-write accesses encounter serious memory contention.
687 */
688 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689
690 /**
691 * Maximum size for queue arrays. Must be a power of two less
692 * than or equal to 1 << (31 - width of array entry) to ensure
693 * lack of wraparound of index calculations, but defined to a
694 * value a bit less than this to help users trap runaway
695 * programs before saturating systems.
696 */
697 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698
699 // Instance fields
700 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702 volatile int phase; // versioned, negative: queued, 1: locked
703 int stackPred; // pool stack (ctl) predecessor link
704 int nsteals; // number of steals
705 int id; // index, mode, tag
706 volatile int source; // source queue id, or sentinel
707 volatile int base; // index of next slot for poll
708 int top; // index of next slot for push
709 ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 final ForkJoinPool pool; // the containing pool (may be null)
711 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714
715 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 this.pool = pool;
717 this.owner = owner;
718 // Place indices in the center of array (that is not yet allocated)
719 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720 }
721
722 /**
723 * Returns an exportable index (used by ForkJoinWorkerThread).
724 */
725 final int getPoolIndex() {
726 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 }
728
729 /**
730 * Returns the approximate number of tasks in the queue.
731 */
732 final int queueSize() {
733 int n = base - top; // read base first
734 return (n >= 0) ? 0 : -n; // ignore transient negative
735 }
736
737 /**
738 * Provides a more accurate estimate of whether this queue has
739 * any tasks than does queueSize, by checking whether a
740 * near-empty queue has at least one unclaimed task.
741 */
742 final boolean isEmpty() {
743 ForkJoinTask<?>[] a; int n, al, b;
744 return ((n = (b = base) - top) >= 0 || // possibly one task
745 (n == -1 && ((a = array) == null ||
746 (al = a.length) == 0 ||
747 a[(al - 1) & b] == null)));
748 }
749
750
751 /**
752 * Pushes a task. Call only by owner in unshared queues.
753 *
754 * @param task the task. Caller must ensure non-null.
755 * @throws RejectedExecutionException if array cannot be resized
756 */
757 final void push(ForkJoinTask<?> task) {
758 int s = top; ForkJoinTask<?>[] a; int al, d;
759 if ((a = array) != null && (al = a.length) > 0) {
760 int index = (al - 1) & s;
761 long offset = ((long)index << ASHIFT) + ABASE;
762 ForkJoinPool p = pool;
763 top = s + 1;
764 U.putOrderedObject(a, offset, task);
765 if ((d = base - s) == 0 && p != null) {
766 U.fullFence();
767 p.signalWork();
768 }
769 else if (d + al == 1)
770 growArray();
771 }
772 }
773
774 /**
775 * Initializes or doubles the capacity of array. Call either
776 * by owner or with lock held -- it is OK for base, but not
777 * top, to move while resizings are in progress.
778 */
779 final ForkJoinTask<?>[] growArray() {
780 ForkJoinTask<?>[] oldA = array;
781 int oldSize = oldA != null ? oldA.length : 0;
782 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 throw new RejectedExecutionException("Queue capacity exceeded");
785 int oldMask, t, b;
786 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 (t = top) - (b = base) > 0) {
789 int mask = size - 1;
790 do { // emulate poll from old array, push to new array
791 int index = b & oldMask;
792 long offset = ((long)index << ASHIFT) + ABASE;
793 ForkJoinTask<?> x = (ForkJoinTask<?>)
794 U.getObjectVolatile(oldA, offset);
795 if (x != null &&
796 U.compareAndSwapObject(oldA, offset, x, null))
797 a[b & mask] = x;
798 } while (++b != t);
799 U.storeFence();
800 }
801 return a;
802 }
803
804 /**
805 * Takes next task, if one exists, in LIFO order. Call only
806 * by owner in unshared queues.
807 */
808 final ForkJoinTask<?> pop() {
809 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 if ((a = array) != null && b != s && (al = a.length) > 0) {
811 int index = (al - 1) & --s;
812 long offset = ((long)index << ASHIFT) + ABASE;
813 ForkJoinTask<?> t = (ForkJoinTask<?>)
814 U.getObject(a, offset);
815 if (t != null &&
816 U.compareAndSwapObject(a, offset, t, null)) {
817 top = s;
818 U.storeFence();
819 return t;
820 }
821 }
822 return null;
823 }
824
825 /**
826 * Takes next task, if one exists, in FIFO order.
827 */
828 final ForkJoinTask<?> poll() {
829 for (;;) {
830 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831 if ((a = array) != null && (d = b - s) < 0 &&
832 (al = a.length) > 0) {
833 int index = (al - 1) & b;
834 long offset = ((long)index << ASHIFT) + ABASE;
835 ForkJoinTask<?> t = (ForkJoinTask<?>)
836 U.getObjectVolatile(a, offset);
837 if (b++ == base) {
838 if (t != null) {
839 if (U.compareAndSwapObject(a, offset, t, null)) {
840 base = b;
841 return t;
842 }
843 }
844 else if (d == -1)
845 break; // now empty
846 }
847 }
848 else
849 break;
850 }
851 return null;
852 }
853
854 /**
855 * Takes next task, if one exists, in order specified by mode.
856 */
857 final ForkJoinTask<?> nextLocalTask() {
858 return ((id & FIFO) != 0) ? poll() : pop();
859 }
860
861 /**
862 * Returns next task, if one exists, in order specified by mode.
863 */
864 final ForkJoinTask<?> peek() {
865 int al; ForkJoinTask<?>[] a;
866 return ((a = array) != null && (al = a.length) > 0) ?
867 a[(al - 1) &
868 ((id & FIFO) != 0 ? base : top - 1)] : null;
869 }
870
871 /**
872 * Pops the given task only if it is at the current top.
873 */
874 final boolean tryUnpush(ForkJoinTask<?> task) {
875 int b = base, s = top, al; ForkJoinTask<?>[] a;
876 if ((a = array) != null && b != s && (al = a.length) > 0) {
877 int index = (al - 1) & --s;
878 long offset = ((long)index << ASHIFT) + ABASE;
879 if (U.compareAndSwapObject(a, offset, task, null)) {
880 top = s;
881 U.storeFence();
882 return true;
883 }
884 }
885 return false;
886 }
887
888 /**
889 * Removes and cancels all known tasks, ignoring any exceptions.
890 */
891 final void cancelAll() {
892 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 ForkJoinTask.cancelIgnoringExceptions(t);
894 }
895
896 // Specialized execution methods
897
898 /**
899 * Pops and executes up to limit consecutive tasks or until empty.
900 *
901 * @param limit max runs, or zero for no limit
902 */
903 final void localPopAndExec(int limit) {
904 for (;;) {
905 int b = base, s = top, al; ForkJoinTask<?>[] a;
906 if ((a = array) != null && b != s && (al = a.length) > 0) {
907 int index = (al - 1) & --s;
908 long offset = ((long)index << ASHIFT) + ABASE;
909 ForkJoinTask<?> t = (ForkJoinTask<?>)
910 U.getAndSetObject(a, offset, null);
911 if (t != null) {
912 top = s;
913 U.storeFence();
914 t.doExec();
915 if (limit != 0 && --limit == 0)
916 break;
917 }
918 else
919 break;
920 }
921 else
922 break;
923 }
924 }
925
926 /**
927 * Polls and executes up to limit consecutive tasks or until empty.
928 *
929 * @param limit, or zero for no limit
930 */
931 final void localPollAndExec(int limit) {
932 for (int polls = 0;;) {
933 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934 if ((a = array) != null && (d = b - s) < 0 &&
935 (al = a.length) > 0) {
936 int index = (al - 1) & b++;
937 long offset = ((long)index << ASHIFT) + ABASE;
938 ForkJoinTask<?> t = (ForkJoinTask<?>)
939 U.getAndSetObject(a, offset, null);
940 if (t != null) {
941 base = b;
942 t.doExec();
943 if (limit != 0 && ++polls == limit)
944 break;
945 }
946 else if (d == -1)
947 break; // now empty
948 else
949 polls = 0; // stolen; reset
950 }
951 else
952 break;
953 }
954 }
955
956 /**
957 * If present, removes task from queue and executes it.
958 */
959 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960 ForkJoinTask<?>[] wa; int s, wal;
961 if (base - (s = top) < 0 && // traverse from top
962 (wa = array) != null && (wal = wa.length) > 0) {
963 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964 int index = i & m;
965 long offset = (index << ASHIFT) + ABASE;
966 ForkJoinTask<?> t = (ForkJoinTask<?>)
967 U.getObject(wa, offset);
968 if (t == null)
969 break;
970 else if (t == task) {
971 if (U.compareAndSwapObject(wa, offset, t, null)) {
972 top = ns; // safely shift down
973 for (int j = i; j != ns; ++j) {
974 ForkJoinTask<?> f;
975 int pindex = (j + 1) & m;
976 long pOffset = (pindex << ASHIFT) + ABASE;
977 f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978 U.putObjectVolatile(wa, pOffset, null);
979
980 int jindex = j & m;
981 long jOffset = (jindex << ASHIFT) + ABASE;
982 U.putOrderedObject(wa, jOffset, f);
983 }
984 U.storeFence();
985 t.doExec();
986 }
987 break;
988 }
989 }
990 }
991 }
992
993 /**
994 * Tries to steal and run tasks within the target's
995 * computation until done, not found, or limit exceeded.
996 *
997 * @param task root of CountedCompleter computation
998 * @param limit max runs, or zero for no limit
999 * @return task status on exit
1000 */
1001 final int localHelpCC(CountedCompleter<?> task, int limit) {
1002 int status = 0;
1003 if (task != null && (status = task.status) >= 0) {
1004 for (;;) {
1005 boolean help = false;
1006 int b = base, s = top, al; ForkJoinTask<?>[] a;
1007 if ((a = array) != null && b != s && (al = a.length) > 0) {
1008 int index = (al - 1) & (s - 1);
1009 long offset = ((long)index << ASHIFT) + ABASE;
1010 ForkJoinTask<?> o = (ForkJoinTask<?>)
1011 U.getObject(a, offset);
1012 if (o instanceof CountedCompleter) {
1013 CountedCompleter<?> t = (CountedCompleter<?>)o;
1014 for (CountedCompleter<?> f = t;;) {
1015 if (f != task) {
1016 if ((f = f.completer) == null) // try parent
1017 break;
1018 }
1019 else {
1020 if (U.compareAndSwapObject(a, offset,
1021 t, null)) {
1022 top = s - 1;
1023 U.storeFence();
1024 t.doExec();
1025 help = true;
1026 }
1027 break;
1028 }
1029 }
1030 }
1031 }
1032 if ((status = task.status) < 0 || !help ||
1033 (limit != 0 && --limit == 0))
1034 break;
1035 }
1036 }
1037 return status;
1038 }
1039
1040 // Operations on shared queues
1041
1042 /**
1043 * Tries to lock shared queue by CASing phase field.
1044 */
1045 final boolean tryLockSharedQueue() {
1046 return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 }
1048
1049 /**
1050 * Shared version of tryUnpush.
1051 */
1052 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053 boolean popped = false;
1054 int s = top - 1, al; ForkJoinTask<?>[] a;
1055 if ((a = array) != null && (al = a.length) > 0) {
1056 int index = (al - 1) & s;
1057 long offset = ((long)index << ASHIFT) + ABASE;
1058 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059 if (t == task &&
1060 U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061 if (top == s + 1 && array == a &&
1062 U.compareAndSwapObject(a, offset, task, null)) {
1063 popped = true;
1064 top = s;
1065 }
1066 U.putOrderedInt(this, PHASE, 0);
1067 }
1068 }
1069 return popped;
1070 }
1071
1072 /**
1073 * Shared version of localHelpCC.
1074 */
1075 final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076 int status = 0;
1077 if (task != null && (status = task.status) >= 0) {
1078 for (;;) {
1079 boolean help = false;
1080 int b = base, s = top, al; ForkJoinTask<?>[] a;
1081 if ((a = array) != null && b != s && (al = a.length) > 0) {
1082 int index = (al - 1) & (s - 1);
1083 long offset = ((long)index << ASHIFT) + ABASE;
1084 ForkJoinTask<?> o = (ForkJoinTask<?>)
1085 U.getObject(a, offset);
1086 if (o instanceof CountedCompleter) {
1087 CountedCompleter<?> t = (CountedCompleter<?>)o;
1088 for (CountedCompleter<?> f = t;;) {
1089 if (f != task) {
1090 if ((f = f.completer) == null)
1091 break;
1092 }
1093 else {
1094 if (U.compareAndSwapInt(this, PHASE,
1095 0, QLOCK)) {
1096 if (top == s && array == a &&
1097 U.compareAndSwapObject(a, offset,
1098 t, null)) {
1099 help = true;
1100 top = s - 1;
1101 }
1102 U.putOrderedInt(this, PHASE, 0);
1103 if (help)
1104 t.doExec();
1105 }
1106 break;
1107 }
1108 }
1109 }
1110 }
1111 if ((status = task.status) < 0 || !help ||
1112 (limit != 0 && --limit == 0))
1113 break;
1114 }
1115 }
1116 return status;
1117 }
1118
1119 /**
1120 * Returns true if owned and not known to be blocked.
1121 */
1122 final boolean isApparentlyUnblocked() {
1123 Thread wt; Thread.State s;
1124 return ((wt = owner) != null &&
1125 (s = wt.getState()) != Thread.State.BLOCKED &&
1126 s != Thread.State.WAITING &&
1127 s != Thread.State.TIMED_WAITING);
1128 }
1129
1130 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 private static final long PHASE;
1133 private static final int ABASE;
1134 private static final int ASHIFT;
1135 static {
1136 try {
1137 PHASE = U.objectFieldOffset
1138 (WorkQueue.class.getDeclaredField("phase"));
1139 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140 int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 if ((scale & (scale - 1)) != 0)
1142 throw new Error("array index scale not a power of two");
1143 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 } catch (ReflectiveOperationException e) {
1145 throw new Error(e);
1146 }
1147 }
1148 }
1149
1150 // static fields (initialized in static initializer below)
1151
1152 /**
1153 * Creates a new ForkJoinWorkerThread. This factory is used unless
1154 * overridden in ForkJoinPool constructors.
1155 */
1156 public static final ForkJoinWorkerThreadFactory
1157 defaultForkJoinWorkerThreadFactory;
1158
1159 /**
1160 * Permission required for callers of methods that may start or
1161 * kill threads.
1162 */
1163 static final RuntimePermission modifyThreadPermission;
1164
1165 /**
1166 * Common (static) pool. Non-null for public use unless a static
1167 * construction exception, but internal usages null-check on use
1168 * to paranoically avoid potential initialization circularities
1169 * as well as to simplify generated code.
1170 */
1171 static final ForkJoinPool common;
1172
1173 /**
1174 * Common pool parallelism. To allow simpler use and management
1175 * when common pool threads are disabled, we allow the underlying
1176 * common.parallelism field to be zero, but in that case still report
1177 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 */
1179 static final int COMMON_PARALLELISM;
1180
1181 /**
1182 * Limit on spare thread construction in tryCompensate.
1183 */
1184 private static final int COMMON_MAX_SPARES;
1185
1186 /**
1187 * Sequence number for creating workerNamePrefix.
1188 */
1189 private static int poolNumberSequence;
1190
1191 /**
1192 * Returns the next sequence number. We don't expect this to
1193 * ever contend, so use simple builtin sync.
1194 */
1195 private static final synchronized int nextPoolId() {
1196 return ++poolNumberSequence;
1197 }
1198
1199 // static configuration constants
1200
1201 /**
1202 * Default idle timeout value (in milliseconds) for the thread
1203 * triggering quiescence to park waiting for new work
1204 */
1205 private static final long DEFAULT_KEEPALIVE = 60000L;
1206
1207 /**
1208 * Undershoot tolerance for idle timeouts
1209 */
1210 private static final long TIMEOUT_SLOP = 20L;
1211
1212 /**
1213 * The default value for COMMON_MAX_SPARES. Overridable using the
1214 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215 * property. The default value is far in excess of normal
1216 * requirements, but also far short of MAX_CAP and typical OS
1217 * thread limits, so allows JVMs to catch misuse/abuse before
1218 * running out of resources needed to do so.
1219 */
1220 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221
1222 /**
1223 * Increment for seed generators. See class ThreadLocal for
1224 * explanation.
1225 */
1226 private static final int SEED_INCREMENT = 0x9e3779b9;
1227
1228 /*
1229 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 * RC: Number of released (unqueued) workers minus target parallelism
1231 * TC: Number of total workers minus target parallelism
1232 * SS: version count and status of top waiting thread
1233 * ID: poolIndex of top of Treiber stack of waiters
1234 *
1235 * When convenient, we can extract the lower 32 stack top bits
1236 * (including version bits) as sp=(int)ctl. The offsets of counts
1237 * by the target parallelism and the positionings of fields makes
1238 * it possible to perform the most common checks via sign tests of
1239 * fields: When ac is negative, there are not enough unqueued
1240 * workers, when tc is negative, there are not enough total
1241 * workers. When sp is non-zero, there are waiting workers. To
1242 * deal with possibly negative fields, we use casts in and out of
1243 * "short" and/or signed shifts to maintain signedness.
1244 *
1245 * Because it occupies uppermost bits, we can add one release count
1246 * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 * from a blocked join. Other updates entail multiple subfields
1248 * and masking, requiring CAS.
1249 *
1250 * The limits packed in field "bounds" are also offset by the
1251 * parallelism level to make them comparable to the ctl rc and tc
1252 * fields.
1253 */
1254
1255 // Lower and upper word masks
1256 private static final long SP_MASK = 0xffffffffL;
1257 private static final long UC_MASK = ~SP_MASK;
1258
1259 // Release counts
1260 private static final int RC_SHIFT = 48;
1261 private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262 private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263
1264 // Total counts
1265 private static final int TC_SHIFT = 32;
1266 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269
1270 // Instance fields
1271
1272 // Segregate ctl field, For now using padding vs @Contended
1273 // @jdk.internal.vm.annotation.Contended("fjpctl")
1274 // @sun.misc.Contended("fjpctl")
1275 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 volatile long ctl; // main pool control
1278 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279 volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280
1281 volatile long stealCount; // collects worker nsteals
1282 final long keepAlive; // milliseconds before dropping if idle
1283 int indexSeed; // next worker index
1284 final int bounds; // min, max threads packed as shorts
1285 volatile int mode; // parallelism, runstate, queue mode
1286 WorkQueue[] workQueues; // main registry
1287 final String workerNamePrefix; // for worker thread string; sync lock
1288 final ForkJoinWorkerThreadFactory factory;
1289 final UncaughtExceptionHandler ueh; // per-worker UEH
1290
1291 // Creating, registering and deregistering workers
1292
1293 /**
1294 * Tries to construct and start one worker. Assumes that total
1295 * count has already been incremented as a reservation. Invokes
1296 * deregisterWorker on any failure.
1297 *
1298 * @return true if successful
1299 */
1300 private boolean createWorker() {
1301 ForkJoinWorkerThreadFactory fac = factory;
1302 Throwable ex = null;
1303 ForkJoinWorkerThread wt = null;
1304 try {
1305 if (fac != null && (wt = fac.newThread(this)) != null) {
1306 wt.start();
1307 return true;
1308 }
1309 } catch (Throwable rex) {
1310 ex = rex;
1311 }
1312 deregisterWorker(wt, ex);
1313 return false;
1314 }
1315
1316 /**
1317 * Tries to add one worker, incrementing ctl counts before doing
1318 * so, relying on createWorker to back out on failure.
1319 *
1320 * @param c incoming ctl value, with total count negative and no
1321 * idle workers. On CAS failure, c is refreshed and retried if
1322 * this holds (otherwise, a new worker is not needed).
1323 */
1324 private void tryAddWorker(long c) {
1325 do {
1326 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 (TC_MASK & (c + TC_UNIT)));
1328 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 createWorker();
1330 break;
1331 }
1332 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333 }
1334
1335 /**
1336 * Callback from ForkJoinWorkerThread constructor to establish and
1337 * record its WorkQueue.
1338 *
1339 * @param wt the worker thread
1340 * @return the worker's queue
1341 */
1342 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 UncaughtExceptionHandler handler;
1344 wt.setDaemon(true); // configure thread
1345 if ((handler = ueh) != null)
1346 wt.setUncaughtExceptionHandler(handler);
1347 WorkQueue w = new WorkQueue(this, wt);
1348 int tid = 0; // for thread name
1349 int fifo = mode & FIFO;
1350 String prefix = workerNamePrefix;
1351 if (prefix != null) {
1352 synchronized (prefix) {
1353 WorkQueue[] ws = workQueues; int n;
1354 int s = indexSeed += SEED_INCREMENT;
1355 if (ws != null && (n = ws.length) > 1) {
1356 int m = n - 1;
1357 tid = s & m;
1358 int i = m & ((s << 1) | 1); // odd-numbered indices
1359 for (int probes = n >>> 1;;) { // find empty slot
1360 WorkQueue q;
1361 if ((q = ws[i]) == null || q.phase == QUIET)
1362 break;
1363 else if (--probes == 0) {
1364 i = n | 1; // resize below
1365 break;
1366 }
1367 else
1368 i = (i + 2) & m;
1369 }
1370
1371 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372 w.phase = w.id = id; // now publishable
1373
1374 if (i < n)
1375 ws[i] = w;
1376 else { // expand array
1377 int an = n << 1;
1378 WorkQueue[] as = new WorkQueue[an];
1379 as[i] = w;
1380 int am = an - 1;
1381 for (int j = 0; j < n; ++j) {
1382 WorkQueue v; // copy external queue
1383 if ((v = ws[j]) != null) // position may change
1384 as[v.id & am & SQMASK] = v;
1385 if (++j >= n)
1386 break;
1387 as[j] = ws[j]; // copy worker
1388 }
1389 workQueues = as;
1390 }
1391 }
1392 }
1393 wt.setName(prefix.concat(Integer.toString(tid)));
1394 }
1395 return w;
1396 }
1397
1398 /**
1399 * Final callback from terminating worker, as well as upon failure
1400 * to construct or start a worker. Removes record of worker from
1401 * array, and adjusts counts. If pool is shutting down, tries to
1402 * complete termination.
1403 *
1404 * @param wt the worker thread, or null if construction failed
1405 * @param ex the exception causing failure, or null if none
1406 */
1407 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408 WorkQueue w = null;
1409 int phase = 0;
1410 if (wt != null && (w = wt.workQueue) != null) {
1411 Object lock = workerNamePrefix;
1412 long ns = (long)w.nsteals & 0xffffffffL;
1413 int idx = w.id & SMASK;
1414 if (lock != null) {
1415 WorkQueue[] ws; // remove index from array
1416 synchronized (lock) {
1417 if ((ws = workQueues) != null && ws.length > idx &&
1418 ws[idx] == w)
1419 ws[idx] = null;
1420 stealCount += ns;
1421 }
1422 }
1423 phase = w.phase;
1424 }
1425 if (phase != QUIET) { // else pre-adjusted
1426 long c; // decrement counts
1427 do {} while (!U.compareAndSwapLong
1428 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 (TC_MASK & (c - TC_UNIT)) |
1430 (SP_MASK & c))));
1431 }
1432 if (w != null)
1433 w.cancelAll(); // cancel remaining tasks
1434
1435 if (!tryTerminate(false, false) && // possibly replace worker
1436 w != null && w.array != null) // avoid repeated failures
1437 signalWork();
1438
1439 if (ex == null) // help clean on way out
1440 ForkJoinTask.helpExpungeStaleExceptions();
1441 else // rethrow
1442 ForkJoinTask.rethrow(ex);
1443 }
1444
1445 /**
1446 * Tries to create or release a worker if too few are running.
1447 */
1448 final void signalWork() {
1449 for (;;) {
1450 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 if ((c = ctl) >= 0L) // enough workers
1452 break;
1453 else if ((sp = (int)c) == 0) { // no idle workers
1454 if ((c & ADD_WORKER) != 0L) // too few workers
1455 tryAddWorker(c);
1456 break;
1457 }
1458 else if ((ws = workQueues) == null)
1459 break; // unstarted/terminated
1460 else if (ws.length <= (i = sp & SMASK))
1461 break; // terminated
1462 else if ((v = ws[i]) == null)
1463 break; // terminating
1464 else {
1465 int np = sp & ~UNSIGNALLED;
1466 int vp = v.phase;
1467 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468 Thread vt = v.owner;
1469 if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470 v.phase = np;
1471 if (v.source < 0)
1472 LockSupport.unpark(vt);
1473 break;
1474 }
1475 }
1476 }
1477 }
1478
1479 /**
1480 * Tries to decrement counts (sometimes implicitly) and possibly
1481 * arrange for a compensating worker in preparation for blocking:
1482 * If not all core workers yet exist, creates one, else if any are
1483 * unreleased (possibly including caller) releases one, else if
1484 * fewer than the minimum allowed number of workers running,
1485 * checks to see that they are all active, and if so creates an
1486 * extra worker unless over maximum limit and policy is to
1487 * saturate. Most of these steps can fail due to interference, in
1488 * which case 0 is returned so caller will retry. A negative
1489 * return value indicates that the caller doesn't need to
1490 * re-adjust counts when later unblocked.
1491 *
1492 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 */
1494 private int tryCompensate(WorkQueue w) {
1495 int t, n, sp;
1496 long c = ctl;
1497 WorkQueue[] ws = workQueues;
1498 if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499 if (ws == null || (n = ws.length) <= 0 || w == null)
1500 return 0; // disabled
1501 else if ((sp = (int)c) != 0) { // replace or release
1502 WorkQueue v = ws[sp & (n - 1)];
1503 int wp = w.phase;
1504 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505 int np = sp & ~UNSIGNALLED;
1506 if (v != null) {
1507 int vp = v.phase;
1508 Thread vt = v.owner;
1509 long nc = ((long)v.stackPred & SP_MASK) | uc;
1510 if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511 v.phase = np;
1512 if (v.source < 0)
1513 LockSupport.unpark(vt);
1514 return (wp < 0) ? -1 : 1;
1515 }
1516 }
1517 return 0;
1518 }
1519 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520 (short)(bounds & SMASK) > 0) {
1521 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522 return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523 }
1524 else { // validate
1525 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526 boolean unstable = false;
1527 for (int i = 1; i < n; i += 2) {
1528 WorkQueue q; Thread wt; Thread.State ts;
1529 if ((q = ws[i]) != null) {
1530 if (q.source == 0) {
1531 unstable = true;
1532 break;
1533 }
1534 else {
1535 --tc;
1536 if ((wt = q.owner) != null &&
1537 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538 ts == Thread.State.WAITING))
1539 ++bc; // worker is blocking
1540 }
1541 }
1542 }
1543 if (unstable || tc != 0 || ctl != c)
1544 return 0; // inconsistent
1545 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546 if ((md & SATURATE) != 0)
1547 return -1;
1548 else if (bc < pc) { // lagging
1549 Thread.yield(); // for retry spins
1550 return 0;
1551 }
1552 else
1553 throw new RejectedExecutionException(
1554 "Thread limit exceeded replacing blocked worker");
1555 }
1556 }
1557 }
1558
1559 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560 return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 }
1562
1563 /**
1564 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 * See above for explanation.
1566 */
1567 final void runWorker(WorkQueue w) {
1568 WorkQueue[] ws;
1569 w.growArray(); // allocate queue
1570 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571 if (r == 0) // initial nonzero seed
1572 r = 1;
1573 int lastSignalId = 0; // avoid unneeded signals
1574 while ((ws = workQueues) != null) {
1575 boolean nonempty = false; // scan
1576 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578 if ((i = r & m) >= 0 && i < n && // always true
1579 (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 (a = q.array) != null && (al = a.length) > 0) {
1581 int qid = q.id; // (never zero)
1582 int index = (al - 1) & b;
1583 long offset = ((long)index << ASHIFT) + ABASE;
1584 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585 U.getObjectVolatile(a, offset);
1586 if (t != null && b++ == q.base &&
1587 U.compareAndSwapObject(a, offset, t, null)) {
1588 if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589 signalWork(); // propagate signal
1590 w.source = lastSignalId = qid;
1591 t.doExec();
1592 if ((w.id & FIFO) != 0) // run remaining locals
1593 w.localPollAndExec(POLL_LIMIT);
1594 else
1595 w.localPopAndExec(POLL_LIMIT);
1596 ForkJoinWorkerThread thread = w.owner;
1597 ++w.nsteals;
1598 w.source = 0; // now idle
1599 if (thread != null)
1600 thread.afterTopLevelExec();
1601 }
1602 nonempty = true;
1603 }
1604 else if (nonempty)
1605 break;
1606 else
1607 ++r;
1608 }
1609
1610 if (nonempty) { // move (xorshift)
1611 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612 }
1613 else {
1614 int phase;
1615 lastSignalId = 0; // clear for next scan
1616 if ((phase = w.phase) >= 0) { // enqueue
1617 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618 long c, nc;
1619 do {
1620 w.stackPred = (int)(c = ctl);
1621 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622 } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623 }
1624 else { // already queued
1625 int pred = w.stackPred;
1626 w.source = DORMANT; // enable signal
1627 for (int steps = 0;;) {
1628 int md, rc; long c;
1629 if (w.phase >= 0) {
1630 w.source = 0;
1631 break;
1632 }
1633 else if ((md = mode) < 0) // shutting down
1634 return;
1635 else if ((rc = ((md & SMASK) + // possibly quiescent
1636 (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637 (md & SHUTDOWN) != 0 &&
1638 tryTerminate(false, false))
1639 return; // help terminate
1640 else if ((++steps & 1) == 0)
1641 Thread.interrupted(); // clear between parks
1642 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643 long d = keepAlive + System.currentTimeMillis();
1644 LockSupport.parkUntil(this, d);
1645 if (ctl == c &&
1646 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647 long nc = ((UC_MASK & (c - TC_UNIT)) |
1648 (SP_MASK & pred));
1649 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650 w.phase = QUIET;
1651 return; // drop on timeout
1652 }
1653 }
1654 }
1655 else
1656 LockSupport.park(this);
1657 }
1658 }
1659 }
1660 }
1661 }
1662
1663 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1664 int s = 0;
1665 if (w != null && task != null &&
1666 (!(task instanceof CountedCompleter) ||
1667 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1668 w.tryRemoveAndExec(task);
1669 int src = w.source, id = w.id;
1670 s = task.status;
1671 while (s >= 0) {
1672 WorkQueue[] ws;
1673 boolean nonempty = false;
1674 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1675 if ((ws = workQueues) != null) { // scan for matching id
1676 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1677 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1678 if ((i = (r + j) & m) >= 0 && i < n &&
1679 (q = ws[i]) != null && q.source == id &&
1680 (b = q.base) - q.top < 0 &&
1681 (a = q.array) != null && (al = a.length) > 0) {
1682 int qid = q.id;
1683 int index = (al - 1) & b;
1684 long offset = ((long)index << ASHIFT) + ABASE;
1685 ForkJoinTask<?> t = (ForkJoinTask<?>)
1686 U.getObjectVolatile(a, offset);
1687 if (t != null && b++ == q.base && id == q.source &&
1688 U.compareAndSwapObject(a, offset, t, null)) {
1689 q.base = b;
1690 w.source = qid;
1691 t.doExec();
1692 w.source = src;
1693 }
1694 nonempty = true;
1695 break;
1696 }
1697 }
1698 }
1699 if ((s = task.status) < 0)
1700 break;
1701 else if (!nonempty) {
1702 long ms, ns; int block;
1703 if (deadline == 0L)
1704 ms = 0L; // untimed
1705 else if ((ns = deadline - System.nanoTime()) <= 0L)
1706 break; // timeout
1707 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1708 ms = 1L; // avoid 0 for timed wait
1709 if ((block = tryCompensate(w)) != 0) {
1710 task.internalWait(ms);
1711 U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1712 }
1713 s = task.status;
1714 }
1715 }
1716 }
1717 return s;
1718 }
1719
1720 /**
1721 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1722 * when tasks cannot be found, rescans until all others cannot
1723 * find tasks either.
1724 */
1725 final void helpQuiescePool(WorkQueue w) {
1726 int prevSrc = w.source, fifo = w.id & FIFO;
1727 for (int source = prevSrc, released = -1;;) { // -1 until known
1728 WorkQueue[] ws;
1729 if (fifo != 0)
1730 w.localPollAndExec(0);
1731 else
1732 w.localPopAndExec(0);
1733 if (released == -1 && w.phase >= 0)
1734 released = 1;
1735 boolean quiet = true, empty = true;
1736 int r = ThreadLocalRandom.nextSecondarySeed();
1737 if ((ws = workQueues) != null) {
1738 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1739 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1740 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1741 if ((b = q.base) - q.top < 0 &&
1742 (a = q.array) != null && (al = a.length) > 0) {
1743 int qid = q.id;
1744 if (released == 0) { // increment
1745 released = 1;
1746 U.getAndAddLong(this, CTL, RC_UNIT);
1747 }
1748 int index = (al - 1) & b;
1749 long offset = ((long)index << ASHIFT) + ABASE;
1750 ForkJoinTask<?> t = (ForkJoinTask<?>)
1751 U.getObjectVolatile(a, offset);
1752 if (t != null && b++ == q.base &&
1753 U.compareAndSwapObject(a, offset, t, null)) {
1754 q.base = b;
1755 w.source = source = q.id;
1756 t.doExec();
1757 w.source = source = prevSrc;
1758 }
1759 quiet = empty = false;
1760 break;
1761 }
1762 else if ((q.source & QUIET) == 0)
1763 quiet = false;
1764 }
1765 }
1766 }
1767 if (quiet) {
1768 if (released == 0)
1769 U.getAndAddLong(this, CTL, RC_UNIT);
1770 w.source = prevSrc;
1771 break;
1772 }
1773 else if (empty) {
1774 if (source != QUIET)
1775 w.source = source = QUIET;
1776 if (released == 1) { // decrement
1777 released = 0;
1778 U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1779 }
1780 }
1781 }
1782 }
1783
1784 /**
1785 * Scans for and returns a polled task, if available.
1786 * Used only for untracked polls.
1787 *
1788 * @param submissionsOnly if true, only scan submission queues
1789 */
1790 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1791 WorkQueue[] ws; int n;
1792 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1793 (n = ws.length) > 0) {
1794 int m = n - 1;
1795 int r = ThreadLocalRandom.nextSecondarySeed();
1796 int h = r >>> 16;
1797 int origin, step;
1798 if (submissionsOnly) {
1799 origin = (r & ~1) & m; // even indices and steps
1800 step = (h & ~1) | 2;
1801 }
1802 else {
1803 origin = r & m;
1804 step = h | 1;
1805 }
1806 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1807 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1808 if ((q = ws[k]) != null) {
1809 checkSum += b = q.base;
1810 if (b - q.top < 0 &&
1811 (a = q.array) != null && (al = a.length) > 0) {
1812 int index = (al - 1) & b;
1813 long offset = ((long)index << ASHIFT) + ABASE;
1814 ForkJoinTask<?> t = (ForkJoinTask<?>)
1815 U.getObjectVolatile(a, offset);
1816 if (t != null && b++ == q.base &&
1817 U.compareAndSwapObject(a, offset, t, null)) {
1818 q.base = b;
1819 return t;
1820 }
1821 else
1822 break; // restart
1823 }
1824 }
1825 if ((k = (k + step) & m) == origin) {
1826 if (oldSum == (oldSum = checkSum))
1827 break rescan;
1828 checkSum = 0;
1829 }
1830 }
1831 }
1832 return null;
1833 }
1834
1835 /**
1836 * Gets and removes a local or stolen task for the given worker.
1837 *
1838 * @return a task, if available
1839 */
1840 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1841 ForkJoinTask<?> t;
1842 if (w != null &&
1843 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1844 return t;
1845 else
1846 return pollScan(false);
1847 }
1848
1849 // External operations
1850
1851 /**
1852 * Adds the given task to a submission queue at submitter's
1853 * current queue, creating one if null or contended.
1854 *
1855 * @param task the task. Caller must ensure non-null.
1856 */
1857 final void externalPush(ForkJoinTask<?> task) {
1858 int r; // initialize caller's probe
1859 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1860 ThreadLocalRandom.localInit();
1861 r = ThreadLocalRandom.getProbe();
1862 }
1863 for (;;) {
1864 int md = mode, n;
1865 WorkQueue[] ws = workQueues;
1866 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1867 throw new RejectedExecutionException();
1868 else {
1869 WorkQueue q;
1870 boolean push = false, grow = false;
1871 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1872 Object lock = workerNamePrefix;
1873 int qid = (r | QUIET) & ~(FIFO | OWNED);
1874 q = new WorkQueue(this, null);
1875 q.id = qid;
1876 q.source = QUIET;
1877 q.phase = QLOCK; // lock queue
1878 if (lock != null) {
1879 synchronized (lock) { // lock pool to install
1880 int i;
1881 if ((ws = workQueues) != null &&
1882 (n = ws.length) > 0 &&
1883 ws[i = qid & (n - 1) & SQMASK] == null) {
1884 ws[i] = q;
1885 push = grow = true;
1886 }
1887 }
1888 }
1889 }
1890 else if (q.tryLockSharedQueue()) {
1891 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1892 if ((a = q.array) != null && (al = a.length) > 0 &&
1893 al - 1 + (d = b - s) > 0) {
1894 a[(al - 1) & s] = task;
1895 q.top = s + 1; // relaxed writes OK here
1896 q.phase = 0;
1897 if (d < 0 && q.base - s < 0)
1898 break; // no signal needed
1899 }
1900 else
1901 grow = true;
1902 push = true;
1903 }
1904 if (push) {
1905 if (grow) {
1906 try {
1907 q.growArray();
1908 int s = q.top, al; ForkJoinTask<?>[] a;
1909 if ((a = q.array) != null && (al = a.length) > 0) {
1910 a[(al - 1) & s] = task;
1911 q.top = s + 1;
1912 }
1913 } finally {
1914 q.phase = 0;
1915 }
1916 }
1917 signalWork();
1918 break;
1919 }
1920 else // move if busy
1921 r = ThreadLocalRandom.advanceProbe(r);
1922 }
1923 }
1924 }
1925
1926 /**
1927 * Pushes a possibly-external submission.
1928 */
1929 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1930 Thread t; ForkJoinWorkerThread w; WorkQueue q;
1931 if (task == null)
1932 throw new NullPointerException();
1933 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1934 (w = (ForkJoinWorkerThread)t).pool == this &&
1935 (q = w.workQueue) != null)
1936 q.push(task);
1937 else
1938 externalPush(task);
1939 return task;
1940 }
1941
1942 /**
1943 * Returns common pool queue for an external thread.
1944 */
1945 static WorkQueue commonSubmitterQueue() {
1946 ForkJoinPool p = common;
1947 int r = ThreadLocalRandom.getProbe();
1948 WorkQueue[] ws; int n;
1949 return (p != null && (ws = p.workQueues) != null &&
1950 (n = ws.length) > 0) ?
1951 ws[(n - 1) & r & SQMASK] : null;
1952 }
1953
1954 /**
1955 * Performs tryUnpush for an external submitter.
1956 */
1957 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1958 int r = ThreadLocalRandom.getProbe();
1959 WorkQueue[] ws; WorkQueue w; int n;
1960 return ((ws = workQueues) != null &&
1961 (n = ws.length) > 0 &&
1962 (w = ws[(n - 1) & r & SQMASK]) != null &&
1963 w.trySharedUnpush(task));
1964 }
1965
1966 /**
1967 * Performs helpComplete for an external submitter.
1968 */
1969 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1970 int r = ThreadLocalRandom.getProbe();
1971 WorkQueue[] ws; WorkQueue w; int n;
1972 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1973 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1974 w.sharedHelpCC(task, maxTasks) : 0;
1975 }
1976
1977 /**
1978 * Tries to steal and run tasks within the target's computation.
1979 * The maxTasks argument supports external usages; internal calls
1980 * use zero, allowing unbounded steps (external calls trap
1981 * non-positive values).
1982 *
1983 * @param w caller
1984 * @param maxTasks if non-zero, the maximum number of other tasks to run
1985 * @return task status on exit
1986 */
1987 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1988 int maxTasks) {
1989 return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
1990 }
1991
1992 /**
1993 * Returns a cheap heuristic guide for task partitioning when
1994 * programmers, frameworks, tools, or languages have little or no
1995 * idea about task granularity. In essence, by offering this
1996 * method, we ask users only about tradeoffs in overhead vs
1997 * expected throughput and its variance, rather than how finely to
1998 * partition tasks.
1999 *
2000 * In a steady state strict (tree-structured) computation, each
2001 * thread makes available for stealing enough tasks for other
2002 * threads to remain active. Inductively, if all threads play by
2003 * the same rules, each thread should make available only a
2004 * constant number of tasks.
2005 *
2006 * The minimum useful constant is just 1. But using a value of 1
2007 * would require immediate replenishment upon each steal to
2008 * maintain enough tasks, which is infeasible. Further,
2009 * partitionings/granularities of offered tasks should minimize
2010 * steal rates, which in general means that threads nearer the top
2011 * of computation tree should generate more than those nearer the
2012 * bottom. In perfect steady state, each thread is at
2013 * approximately the same level of computation tree. However,
2014 * producing extra tasks amortizes the uncertainty of progress and
2015 * diffusion assumptions.
2016 *
2017 * So, users will want to use values larger (but not much larger)
2018 * than 1 to both smooth over transient shortages and hedge
2019 * against uneven progress; as traded off against the cost of
2020 * extra task overhead. We leave the user to pick a threshold
2021 * value to compare with the results of this call to guide
2022 * decisions, but recommend values such as 3.
2023 *
2024 * When all threads are active, it is on average OK to estimate
2025 * surplus strictly locally. In steady-state, if one thread is
2026 * maintaining say 2 surplus tasks, then so are others. So we can
2027 * just use estimated queue length. However, this strategy alone
2028 * leads to serious mis-estimates in some non-steady-state
2029 * conditions (ramp-up, ramp-down, other stalls). We can detect
2030 * many of these by further considering the number of "idle"
2031 * threads, that are known to have zero queued tasks, so
2032 * compensate by a factor of (#idle/#active) threads.
2033 */
2034 static int getSurplusQueuedTaskCount() {
2035 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2036 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2037 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2038 (q = wt.workQueue) != null) {
2039 int p = pool.mode & SMASK;
2040 int a = p + (int)(pool.ctl >> RC_SHIFT);
2041 int n = q.top - q.base;
2042 return n - (a > (p >>>= 1) ? 0 :
2043 a > (p >>>= 1) ? 1 :
2044 a > (p >>>= 1) ? 2 :
2045 a > (p >>>= 1) ? 4 :
2046 8);
2047 }
2048 return 0;
2049 }
2050
2051 // Termination
2052
2053 /**
2054 * Possibly initiates and/or completes termination.
2055 *
2056 * @param now if true, unconditionally terminate, else only
2057 * if no work and no active workers
2058 * @param enable if true, terminate when next possible
2059 * @return true if terminating or terminated
2060 */
2061 private boolean tryTerminate(boolean now, boolean enable) {
2062 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2063
2064 while (((md = mode) & SHUTDOWN) == 0) {
2065 if (!enable || this == common) // cannot shutdown
2066 return false;
2067 else
2068 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2069 }
2070
2071 while (((md = mode) & STOP) == 0) { // try to initiate termination
2072 if (!now) { // check if quiescent & empty
2073 for (long oldSum = 0L;;) { // repeat until stable
2074 boolean running = false;
2075 long checkSum = ctl;
2076 WorkQueue[] ws = workQueues;
2077 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2078 running = true;
2079 else if (ws != null) {
2080 WorkQueue w; int b;
2081 for (int i = 0; i < ws.length; ++i) {
2082 if ((w = ws[i]) != null) {
2083 checkSum += (b = w.base) + w.id;
2084 if (b != w.top ||
2085 ((i & 1) == 1 && w.source >= 0)) {
2086 running = true;
2087 break;
2088 }
2089 }
2090 }
2091 }
2092 if (((md = mode) & STOP) != 0)
2093 break; // already triggered
2094 else if (running)
2095 return false;
2096 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2097 break;
2098 }
2099 }
2100 if ((md & STOP) == 0)
2101 U.compareAndSwapInt(this, MODE, md, md | STOP);
2102 }
2103
2104 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2105 for (long oldSum = 0L;;) { // repeat until stable
2106 WorkQueue[] ws; WorkQueue w;
2107 long checkSum = ctl;
2108 if ((ws = workQueues) != null) {
2109 for (int i = 0; i < ws.length; ++i) {
2110 if ((w = ws[i]) != null) {
2111 ForkJoinWorkerThread wt = w.owner;
2112 w.cancelAll(); // clear queues
2113 if (wt != null) {
2114 try { // unblock join or park
2115 wt.interrupt();
2116 } catch (Throwable ignore) {
2117 }
2118 }
2119 checkSum += w.base + w.id;
2120 }
2121 }
2122 }
2123 if (((md = mode) & TERMINATED) != 0 ||
2124 (workQueues == ws && oldSum == (oldSum = checkSum)))
2125 break;
2126 }
2127 if ((md & TERMINATED) != 0)
2128 break;
2129 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2130 break;
2131 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2132 synchronized (this) {
2133 notifyAll(); // for awaitTermination
2134 }
2135 break;
2136 }
2137 }
2138 return true;
2139 }
2140
2141 // Exported methods
2142
2143 // Constructors
2144
2145 /**
2146 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2147 * java.lang.Runtime#availableProcessors}, using defaults for all
2148 * other parameters.
2149 *
2150 * @throws SecurityException if a security manager exists and
2151 * the caller is not permitted to modify threads
2152 * because it does not hold {@link
2153 * java.lang.RuntimePermission}{@code ("modifyThread")}
2154 */
2155 public ForkJoinPool() {
2156 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2157 defaultForkJoinWorkerThreadFactory, null, false,
2158 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2159 }
2160
2161 /**
2162 * Creates a {@code ForkJoinPool} with the indicated parallelism
2163 * level, using defaults for all other parameters.
2164 *
2165 * @param parallelism the parallelism level
2166 * @throws IllegalArgumentException if parallelism less than or
2167 * equal to zero, or greater than implementation limit
2168 * @throws SecurityException if a security manager exists and
2169 * the caller is not permitted to modify threads
2170 * because it does not hold {@link
2171 * java.lang.RuntimePermission}{@code ("modifyThread")}
2172 */
2173 public ForkJoinPool(int parallelism) {
2174 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2175 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2176 }
2177
2178 /**
2179 * Creates a {@code ForkJoinPool} with the given parameters (using
2180 * defaults for others).
2181 *
2182 * @param parallelism the parallelism level. For default value,
2183 * use {@link java.lang.Runtime#availableProcessors}.
2184 * @param factory the factory for creating new threads. For default value,
2185 * use {@link #defaultForkJoinWorkerThreadFactory}.
2186 * @param handler the handler for internal worker threads that
2187 * terminate due to unrecoverable errors encountered while executing
2188 * tasks. For default value, use {@code null}.
2189 * @param asyncMode if true,
2190 * establishes local first-in-first-out scheduling mode for forked
2191 * tasks that are never joined. This mode may be more appropriate
2192 * than default locally stack-based mode in applications in which
2193 * worker threads only process event-style asynchronous tasks.
2194 * For default value, use {@code false}.
2195 * @throws IllegalArgumentException if parallelism less than or
2196 * equal to zero, or greater than implementation limit
2197 * @throws NullPointerException if the factory is null
2198 * @throws SecurityException if a security manager exists and
2199 * the caller is not permitted to modify threads
2200 * because it does not hold {@link
2201 * java.lang.RuntimePermission}{@code ("modifyThread")}
2202 */
2203 public ForkJoinPool(int parallelism,
2204 ForkJoinWorkerThreadFactory factory,
2205 UncaughtExceptionHandler handler,
2206 boolean asyncMode) {
2207 this(parallelism, factory, handler, asyncMode,
2208 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2209 }
2210
2211 /**
2212 * Creates a {@code ForkJoinPool} with the given parameters.
2213 *
2214 * @param parallelism the parallelism level. For default value,
2215 * use {@link java.lang.Runtime#availableProcessors}.
2216 *
2217 * @param factory the factory for creating new threads. For
2218 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2219 *
2220 * @param handler the handler for internal worker threads that
2221 * terminate due to unrecoverable errors encountered while
2222 * executing tasks. For default value, use {@code null}.
2223 *
2224 * @param asyncMode if true, establishes local first-in-first-out
2225 * scheduling mode for forked tasks that are never joined. This
2226 * mode may be more appropriate than default locally stack-based
2227 * mode in applications in which worker threads only process
2228 * event-style asynchronous tasks. For default value, use {@code
2229 * false}.
2230 *
2231 * @param corePoolSize the number of threads to keep in the pool
2232 * (unless timed out after an elapsed keep-alive). Normally (and
2233 * by default) this is the same value as the parallelism level,
2234 * but may be set to a larger value to reduce dynamic overhead if
2235 * tasks regularly block. Using a smaller value (for example
2236 * {@code 0}) has the same effect as the default.
2237 *
2238 * @param maximumPoolSize the maximum number of threads allowed.
2239 * When the maximum is reached, attempts to replace blocked
2240 * threads fail. (However, because creation and termination of
2241 * different threads may overlap, and may be managed by the given
2242 * thread factory, this value may be transiently exceeded.) The
2243 * default for the common pool is {@code 256} plus the parallelism
2244 * level. Using a value (for example {@code Integer.MAX_VALUE})
2245 * larger than the implementation's total thread limit has the
2246 * same effect as using this limit.
2247 *
2248 * @param minimumRunnable the minimum allowed number of core
2249 * threads not blocked by a join or {@link ManagedBlocker}. To
2250 * ensure progress, when too few unblocked threads exist and
2251 * unexecuted tasks may exist, new threads are constructed, up to
2252 * the given maximumPoolSize. For the default value, use {@code
2253 * 1}, that ensures liveness. A larger value might improve
2254 * throughput in the presence of blocked activities, but might
2255 * not, due to increased overhead. A value of zero may be
2256 * acceptable when submitted tasks cannot have dependencies
2257 * requiring additional threads.
2258 *
2259 * @param rejectOnSaturation if true, attempts to create more than
2260 * the maximum total allowed threads throw {@link
2261 * RejectedExecutionException}. Otherwise, the pool continues to
2262 * operate, but with fewer than the target number of runnable
2263 * threads, so might not ensure progress. For default value, use
2264 * {@code true}.
2265 *
2266 * @param keepAliveTime the elapsed time since last use before
2267 * a thread is terminated (and then later replaced if needed).
2268 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2269 *
2270 * @param unit the time unit for the {@code keepAliveTime} argument
2271 *
2272 * @throws IllegalArgumentException if parallelism is less than or
2273 * equal to zero, or is greater than implementation limit,
2274 * or if maximumPoolSize is less than parallelism,
2275 * of if the keepAliveTime is less than or equal to zero.
2276 * @throws NullPointerException if the factory is null
2277 * @throws SecurityException if a security manager exists and
2278 * the caller is not permitted to modify threads
2279 * because it does not hold {@link
2280 * java.lang.RuntimePermission}{@code ("modifyThread")}
2281 */
2282 public ForkJoinPool(int parallelism,
2283 ForkJoinWorkerThreadFactory factory,
2284 UncaughtExceptionHandler handler,
2285 boolean asyncMode,
2286 int corePoolSize,
2287 int maximumPoolSize,
2288 int minimumRunnable,
2289 boolean rejectOnSaturation,
2290 long keepAliveTime,
2291 TimeUnit unit) {
2292 // check, encode, pack parameters
2293 if (parallelism <= 0 || parallelism > MAX_CAP ||
2294 maximumPoolSize < parallelism || keepAliveTime <= 0L)
2295 throw new IllegalArgumentException();
2296 if (factory == null)
2297 throw new NullPointerException();
2298 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2299
2300 String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2301 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2302 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2303 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2304 int m = (parallelism |
2305 (asyncMode ? FIFO : 0) |
2306 (rejectOnSaturation ? 0 : SATURATE));
2307 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2308 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2309 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2310 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2311 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2312 n = (n + 1) << 1; // power of two, including space for submission queues
2313
2314 this.workQueues = new WorkQueue[n];
2315 this.workerNamePrefix = prefix;
2316 this.factory = factory;
2317 this.ueh = handler;
2318 this.keepAlive = ms;
2319 this.bounds = b;
2320 this.mode = m;
2321 this.ctl = c;
2322 checkPermission();
2323 }
2324
2325 /**
2326 * Constructor for common pool using parameters possibly
2327 * overridden by system properties
2328 */
2329 private ForkJoinPool(byte forCommonPoolOnly) {
2330 int parallelism = -1;
2331 ForkJoinWorkerThreadFactory fac = null;
2332 UncaughtExceptionHandler handler = null;
2333 try { // ignore exceptions in accessing/parsing properties
2334 String pp = System.getProperty
2335 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2336 String fp = System.getProperty
2337 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2338 String hp = System.getProperty
2339 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2340 if (pp != null)
2341 parallelism = Integer.parseInt(pp);
2342 if (fp != null)
2343 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2344 getSystemClassLoader().loadClass(fp).newInstance());
2345 if (hp != null)
2346 handler = ((UncaughtExceptionHandler)ClassLoader.
2347 getSystemClassLoader().loadClass(hp).newInstance());
2348 } catch (Exception ignore) {
2349 }
2350
2351 if (fac == null) {
2352 if (System.getSecurityManager() == null)
2353 fac = defaultForkJoinWorkerThreadFactory;
2354 else // use security-managed default
2355 fac = new InnocuousForkJoinWorkerThreadFactory();
2356 }
2357 if (parallelism < 0 && // default 1 less than #cores
2358 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2359 parallelism = 1;
2360 if (parallelism > MAX_CAP)
2361 parallelism = MAX_CAP;
2362
2363 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2364 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2365 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2366 int m = (parallelism < 1) ? 1 : parallelism;
2367 int n = (parallelism > 1) ? parallelism - 1 : 1;
2368 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2369 n = (n + 1) << 1;
2370
2371 this.workQueues = new WorkQueue[n];
2372 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2373 this.factory = fac;
2374 this.ueh = handler;
2375 this.keepAlive = DEFAULT_KEEPALIVE;
2376 this.bounds = b;
2377 this.mode = m;
2378 this.ctl = c;
2379 }
2380
2381 /**
2382 * Returns the common pool instance. This pool is statically
2383 * constructed; its run state is unaffected by attempts to {@link
2384 * #shutdown} or {@link #shutdownNow}. However this pool and any
2385 * ongoing processing are automatically terminated upon program
2386 * {@link System#exit}. Any program that relies on asynchronous
2387 * task processing to complete before program termination should
2388 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2389 * before exit.
2390 *
2391 * @return the common pool instance
2392 * @since 1.8
2393 */
2394 public static ForkJoinPool commonPool() {
2395 // assert common != null : "static init error";
2396 return common;
2397 }
2398
2399 // Execution methods
2400
2401 /**
2402 * Performs the given task, returning its result upon completion.
2403 * If the computation encounters an unchecked Exception or Error,
2404 * it is rethrown as the outcome of this invocation. Rethrown
2405 * exceptions behave in the same way as regular exceptions, but,
2406 * when possible, contain stack traces (as displayed for example
2407 * using {@code ex.printStackTrace()}) of both the current thread
2408 * as well as the thread actually encountering the exception;
2409 * minimally only the latter.
2410 *
2411 * @param task the task
2412 * @param <T> the type of the task's result
2413 * @return the task's result
2414 * @throws NullPointerException if the task is null
2415 * @throws RejectedExecutionException if the task cannot be
2416 * scheduled for execution
2417 */
2418 public <T> T invoke(ForkJoinTask<T> task) {
2419 if (task == null)
2420 throw new NullPointerException();
2421 externalSubmit(task);
2422 return task.join();
2423 }
2424
2425 /**
2426 * Arranges for (asynchronous) execution of the given task.
2427 *
2428 * @param task the task
2429 * @throws NullPointerException if the task is null
2430 * @throws RejectedExecutionException if the task cannot be
2431 * scheduled for execution
2432 */
2433 public void execute(ForkJoinTask<?> task) {
2434 externalSubmit(task);
2435 }
2436
2437 // AbstractExecutorService methods
2438
2439 /**
2440 * @throws NullPointerException if the task is null
2441 * @throws RejectedExecutionException if the task cannot be
2442 * scheduled for execution
2443 */
2444 public void execute(Runnable task) {
2445 if (task == null)
2446 throw new NullPointerException();
2447 ForkJoinTask<?> job;
2448 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2449 job = (ForkJoinTask<?>) task;
2450 else
2451 job = new ForkJoinTask.RunnableExecuteAction(task);
2452 externalSubmit(job);
2453 }
2454
2455 /**
2456 * Submits a ForkJoinTask for execution.
2457 *
2458 * @param task the task to submit
2459 * @param <T> the type of the task's result
2460 * @return the task
2461 * @throws NullPointerException if the task is null
2462 * @throws RejectedExecutionException if the task cannot be
2463 * scheduled for execution
2464 */
2465 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2466 return externalSubmit(task);
2467 }
2468
2469 /**
2470 * @throws NullPointerException if the task is null
2471 * @throws RejectedExecutionException if the task cannot be
2472 * scheduled for execution
2473 */
2474 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2475 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2476 }
2477
2478 /**
2479 * @throws NullPointerException if the task is null
2480 * @throws RejectedExecutionException if the task cannot be
2481 * scheduled for execution
2482 */
2483 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2484 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2485 }
2486
2487 /**
2488 * @throws NullPointerException if the task is null
2489 * @throws RejectedExecutionException if the task cannot be
2490 * scheduled for execution
2491 */
2492 public ForkJoinTask<?> submit(Runnable task) {
2493 if (task == null)
2494 throw new NullPointerException();
2495 ForkJoinTask<?> job;
2496 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2497 job = (ForkJoinTask<?>) task;
2498 else
2499 job = new ForkJoinTask.AdaptedRunnableAction(task);
2500 return externalSubmit(job);
2501 }
2502
2503 /**
2504 * @throws NullPointerException {@inheritDoc}
2505 * @throws RejectedExecutionException {@inheritDoc}
2506 */
2507 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2508 // In previous versions of this class, this method constructed
2509 // a task to run ForkJoinTask.invokeAll, but now external
2510 // invocation of multiple tasks is at least as efficient.
2511 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2512
2513 try {
2514 for (Callable<T> t : tasks) {
2515 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2516 futures.add(f);
2517 externalSubmit(f);
2518 }
2519 for (int i = 0, size = futures.size(); i < size; i++)
2520 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2521 return futures;
2522 } catch (Throwable t) {
2523 for (int i = 0, size = futures.size(); i < size; i++)
2524 futures.get(i).cancel(false);
2525 throw t;
2526 }
2527 }
2528
2529 /**
2530 * Returns the factory used for constructing new workers.
2531 *
2532 * @return the factory used for constructing new workers
2533 */
2534 public ForkJoinWorkerThreadFactory getFactory() {
2535 return factory;
2536 }
2537
2538 /**
2539 * Returns the handler for internal worker threads that terminate
2540 * due to unrecoverable errors encountered while executing tasks.
2541 *
2542 * @return the handler, or {@code null} if none
2543 */
2544 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2545 return ueh;
2546 }
2547
2548 /**
2549 * Returns the targeted parallelism level of this pool.
2550 *
2551 * @return the targeted parallelism level of this pool
2552 */
2553 public int getParallelism() {
2554 return mode & SMASK;
2555 }
2556
2557 /**
2558 * Returns the targeted parallelism level of the common pool.
2559 *
2560 * @return the targeted parallelism level of the common pool
2561 * @since 1.8
2562 */
2563 public static int getCommonPoolParallelism() {
2564 return COMMON_PARALLELISM;
2565 }
2566
2567 /**
2568 * Returns the number of worker threads that have started but not
2569 * yet terminated. The result returned by this method may differ
2570 * from {@link #getParallelism} when threads are created to
2571 * maintain parallelism when others are cooperatively blocked.
2572 *
2573 * @return the number of worker threads
2574 */
2575 public int getPoolSize() {
2576 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2577 }
2578
2579 /**
2580 * Returns {@code true} if this pool uses local first-in-first-out
2581 * scheduling mode for forked tasks that are never joined.
2582 *
2583 * @return {@code true} if this pool uses async mode
2584 */
2585 public boolean getAsyncMode() {
2586 return (mode & FIFO) != 0;
2587 }
2588
2589 /**
2590 * Returns an estimate of the number of worker threads that are
2591 * not blocked waiting to join tasks or for other managed
2592 * synchronization. This method may overestimate the
2593 * number of running threads.
2594 *
2595 * @return the number of worker threads
2596 */
2597 public int getRunningThreadCount() {
2598 int rc = 0;
2599 WorkQueue[] ws; WorkQueue w;
2600 if ((ws = workQueues) != null) {
2601 for (int i = 1; i < ws.length; i += 2) {
2602 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2603 ++rc;
2604 }
2605 }
2606 return rc;
2607 }
2608
2609 /**
2610 * Returns an estimate of the number of threads that are currently
2611 * stealing or executing tasks. This method may overestimate the
2612 * number of active threads.
2613 *
2614 * @return the number of active threads
2615 */
2616 public int getActiveThreadCount() {
2617 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2618 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2619 }
2620
2621 /**
2622 * Returns {@code true} if all worker threads are currently idle.
2623 * An idle worker is one that cannot obtain a task to execute
2624 * because none are available to steal from other threads, and
2625 * there are no pending submissions to the pool. This method is
2626 * conservative; it might not return {@code true} immediately upon
2627 * idleness of all threads, but will eventually become true if
2628 * threads remain inactive.
2629 *
2630 * @return {@code true} if all threads are currently idle
2631 */
2632 public boolean isQuiescent() {
2633 for (;;) {
2634 long c = ctl;
2635 int md = mode, pc = md & SMASK;
2636 int tc = pc + (short)(c >> TC_SHIFT);
2637 int rc = pc + (int)(c >> RC_SHIFT);
2638 if ((md & (STOP | TERMINATED)) != 0)
2639 return true;
2640 else if (rc > 0)
2641 return false;
2642 else {
2643 WorkQueue[] ws; WorkQueue v;
2644 if ((ws = workQueues) != null) {
2645 for (int i = 1; i < ws.length; i += 2) {
2646 if ((v = ws[i]) != null) {
2647 if ((v.source & QUIET) == 0)
2648 return false;
2649 --tc;
2650 }
2651 }
2652 }
2653 if (tc == 0 && ctl == c)
2654 return true;
2655 }
2656 }
2657 }
2658
2659 /**
2660 * Returns an estimate of the total number of tasks stolen from
2661 * one thread's work queue by another. The reported value
2662 * underestimates the actual total number of steals when the pool
2663 * is not quiescent. This value may be useful for monitoring and
2664 * tuning fork/join programs: in general, steal counts should be
2665 * high enough to keep threads busy, but low enough to avoid
2666 * overhead and contention across threads.
2667 *
2668 * @return the number of steals
2669 */
2670 public long getStealCount() {
2671 long count = stealCount;
2672 WorkQueue[] ws; WorkQueue w;
2673 if ((ws = workQueues) != null) {
2674 for (int i = 1; i < ws.length; i += 2) {
2675 if ((w = ws[i]) != null)
2676 count += (long)w.nsteals & 0xffffffffL;
2677 }
2678 }
2679 return count;
2680 }
2681
2682 /**
2683 * Returns an estimate of the total number of tasks currently held
2684 * in queues by worker threads (but not including tasks submitted
2685 * to the pool that have not begun executing). This value is only
2686 * an approximation, obtained by iterating across all threads in
2687 * the pool. This method may be useful for tuning task
2688 * granularities.
2689 *
2690 * @return the number of queued tasks
2691 */
2692 public long getQueuedTaskCount() {
2693 long count = 0;
2694 WorkQueue[] ws; WorkQueue w;
2695 if ((ws = workQueues) != null) {
2696 for (int i = 1; i < ws.length; i += 2) {
2697 if ((w = ws[i]) != null)
2698 count += w.queueSize();
2699 }
2700 }
2701 return count;
2702 }
2703
2704 /**
2705 * Returns an estimate of the number of tasks submitted to this
2706 * pool that have not yet begun executing. This method may take
2707 * time proportional to the number of submissions.
2708 *
2709 * @return the number of queued submissions
2710 */
2711 public int getQueuedSubmissionCount() {
2712 int count = 0;
2713 WorkQueue[] ws; WorkQueue w;
2714 if ((ws = workQueues) != null) {
2715 for (int i = 0; i < ws.length; i += 2) {
2716 if ((w = ws[i]) != null)
2717 count += w.queueSize();
2718 }
2719 }
2720 return count;
2721 }
2722
2723 /**
2724 * Returns {@code true} if there are any tasks submitted to this
2725 * pool that have not yet begun executing.
2726 *
2727 * @return {@code true} if there are any queued submissions
2728 */
2729 public boolean hasQueuedSubmissions() {
2730 WorkQueue[] ws; WorkQueue w;
2731 if ((ws = workQueues) != null) {
2732 for (int i = 0; i < ws.length; i += 2) {
2733 if ((w = ws[i]) != null && !w.isEmpty())
2734 return true;
2735 }
2736 }
2737 return false;
2738 }
2739
2740 /**
2741 * Removes and returns the next unexecuted submission if one is
2742 * available. This method may be useful in extensions to this
2743 * class that re-assign work in systems with multiple pools.
2744 *
2745 * @return the next submission, or {@code null} if none
2746 */
2747 protected ForkJoinTask<?> pollSubmission() {
2748 return pollScan(true);
2749 }
2750
2751 /**
2752 * Removes all available unexecuted submitted and forked tasks
2753 * from scheduling queues and adds them to the given collection,
2754 * without altering their execution status. These may include
2755 * artificially generated or wrapped tasks. This method is
2756 * designed to be invoked only when the pool is known to be
2757 * quiescent. Invocations at other times may not remove all
2758 * tasks. A failure encountered while attempting to add elements
2759 * to collection {@code c} may result in elements being in
2760 * neither, either or both collections when the associated
2761 * exception is thrown. The behavior of this operation is
2762 * undefined if the specified collection is modified while the
2763 * operation is in progress.
2764 *
2765 * @param c the collection to transfer elements into
2766 * @return the number of elements transferred
2767 */
2768 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2769 int count = 0;
2770 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2771 if ((ws = workQueues) != null) {
2772 for (int i = 0; i < ws.length; ++i) {
2773 if ((w = ws[i]) != null) {
2774 while ((t = w.poll()) != null) {
2775 c.add(t);
2776 ++count;
2777 }
2778 }
2779 }
2780 }
2781 return count;
2782 }
2783
2784 /**
2785 * Returns a string identifying this pool, as well as its state,
2786 * including indications of run state, parallelism level, and
2787 * worker and task counts.
2788 *
2789 * @return a string identifying this pool, as well as its state
2790 */
2791 public String toString() {
2792 // Use a single pass through workQueues to collect counts
2793 long qt = 0L, qs = 0L; int rc = 0;
2794 long st = stealCount;
2795 WorkQueue[] ws; WorkQueue w;
2796 if ((ws = workQueues) != null) {
2797 for (int i = 0; i < ws.length; ++i) {
2798 if ((w = ws[i]) != null) {
2799 int size = w.queueSize();
2800 if ((i & 1) == 0)
2801 qs += size;
2802 else {
2803 qt += size;
2804 st += (long)w.nsteals & 0xffffffffL;
2805 if (w.isApparentlyUnblocked())
2806 ++rc;
2807 }
2808 }
2809 }
2810 }
2811
2812 int md = mode;
2813 int pc = (md & SMASK);
2814 long c = ctl;
2815 int tc = pc + (short)(c >>> TC_SHIFT);
2816 int ac = pc + (int)(c >> RC_SHIFT);
2817 if (ac < 0) // ignore transient negative
2818 ac = 0;
2819 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2820 (md & STOP) != 0 ? "Terminating" :
2821 (md & SHUTDOWN) != 0 ? "Shutting down" :
2822 "Running");
2823 return super.toString() +
2824 "[" + level +
2825 ", parallelism = " + pc +
2826 ", size = " + tc +
2827 ", active = " + ac +
2828 ", running = " + rc +
2829 ", steals = " + st +
2830 ", tasks = " + qt +
2831 ", submissions = " + qs +
2832 "]";
2833 }
2834
2835 /**
2836 * Possibly initiates an orderly shutdown in which previously
2837 * submitted tasks are executed, but no new tasks will be
2838 * accepted. Invocation has no effect on execution state if this
2839 * is the {@link #commonPool()}, and no additional effect if
2840 * already shut down. Tasks that are in the process of being
2841 * submitted concurrently during the course of this method may or
2842 * may not be rejected.
2843 *
2844 * @throws SecurityException if a security manager exists and
2845 * the caller is not permitted to modify threads
2846 * because it does not hold {@link
2847 * java.lang.RuntimePermission}{@code ("modifyThread")}
2848 */
2849 public void shutdown() {
2850 checkPermission();
2851 tryTerminate(false, true);
2852 }
2853
2854 /**
2855 * Possibly attempts to cancel and/or stop all tasks, and reject
2856 * all subsequently submitted tasks. Invocation has no effect on
2857 * execution state if this is the {@link #commonPool()}, and no
2858 * additional effect if already shut down. Otherwise, tasks that
2859 * are in the process of being submitted or executed concurrently
2860 * during the course of this method may or may not be
2861 * rejected. This method cancels both existing and unexecuted
2862 * tasks, in order to permit termination in the presence of task
2863 * dependencies. So the method always returns an empty list
2864 * (unlike the case for some other Executors).
2865 *
2866 * @return an empty list
2867 * @throws SecurityException if a security manager exists and
2868 * the caller is not permitted to modify threads
2869 * because it does not hold {@link
2870 * java.lang.RuntimePermission}{@code ("modifyThread")}
2871 */
2872 public List<Runnable> shutdownNow() {
2873 checkPermission();
2874 tryTerminate(true, true);
2875 return Collections.emptyList();
2876 }
2877
2878 /**
2879 * Returns {@code true} if all tasks have completed following shut down.
2880 *
2881 * @return {@code true} if all tasks have completed following shut down
2882 */
2883 public boolean isTerminated() {
2884 return (mode & TERMINATED) != 0;
2885 }
2886
2887 /**
2888 * Returns {@code true} if the process of termination has
2889 * commenced but not yet completed. This method may be useful for
2890 * debugging. A return of {@code true} reported a sufficient
2891 * period after shutdown may indicate that submitted tasks have
2892 * ignored or suppressed interruption, or are waiting for I/O,
2893 * causing this executor not to properly terminate. (See the
2894 * advisory notes for class {@link ForkJoinTask} stating that
2895 * tasks should not normally entail blocking operations. But if
2896 * they do, they must abort them on interrupt.)
2897 *
2898 * @return {@code true} if terminating but not yet terminated
2899 */
2900 public boolean isTerminating() {
2901 int md = mode;
2902 return (md & STOP) != 0 && (md & TERMINATED) == 0;
2903 }
2904
2905 /**
2906 * Returns {@code true} if this pool has been shut down.
2907 *
2908 * @return {@code true} if this pool has been shut down
2909 */
2910 public boolean isShutdown() {
2911 return (mode & SHUTDOWN) != 0;
2912 }
2913
2914 /**
2915 * Blocks until all tasks have completed execution after a
2916 * shutdown request, or the timeout occurs, or the current thread
2917 * is interrupted, whichever happens first. Because the {@link
2918 * #commonPool()} never terminates until program shutdown, when
2919 * applied to the common pool, this method is equivalent to {@link
2920 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2921 *
2922 * @param timeout the maximum time to wait
2923 * @param unit the time unit of the timeout argument
2924 * @return {@code true} if this executor terminated and
2925 * {@code false} if the timeout elapsed before termination
2926 * @throws InterruptedException if interrupted while waiting
2927 */
2928 public boolean awaitTermination(long timeout, TimeUnit unit)
2929 throws InterruptedException {
2930 if (Thread.interrupted())
2931 throw new InterruptedException();
2932 if (this == common) {
2933 awaitQuiescence(timeout, unit);
2934 return false;
2935 }
2936 long nanos = unit.toNanos(timeout);
2937 if (isTerminated())
2938 return true;
2939 if (nanos <= 0L)
2940 return false;
2941 long deadline = System.nanoTime() + nanos;
2942 synchronized (this) {
2943 for (;;) {
2944 if (isTerminated())
2945 return true;
2946 if (nanos <= 0L)
2947 return false;
2948 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2949 wait(millis > 0L ? millis : 1L);
2950 nanos = deadline - System.nanoTime();
2951 }
2952 }
2953 }
2954
2955 /**
2956 * If called by a ForkJoinTask operating in this pool, equivalent
2957 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2958 * waits and/or attempts to assist performing tasks until this
2959 * pool {@link #isQuiescent} or the indicated timeout elapses.
2960 *
2961 * @param timeout the maximum time to wait
2962 * @param unit the time unit of the timeout argument
2963 * @return {@code true} if quiescent; {@code false} if the
2964 * timeout elapsed.
2965 */
2966 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2967 long nanos = unit.toNanos(timeout);
2968 ForkJoinWorkerThread wt;
2969 Thread thread = Thread.currentThread();
2970 if ((thread instanceof ForkJoinWorkerThread) &&
2971 (wt = (ForkJoinWorkerThread)thread).pool == this) {
2972 helpQuiescePool(wt.workQueue);
2973 return true;
2974 }
2975 else {
2976 for (long startTime = System.nanoTime();;) {
2977 ForkJoinTask<?> t;
2978 if ((t = pollScan(false)) != null)
2979 t.doExec();
2980 else if (isQuiescent())
2981 return true;
2982 else if ((System.nanoTime() - startTime) > nanos)
2983 return false;
2984 else
2985 Thread.yield(); // cannot block
2986 }
2987 }
2988 }
2989
2990 /**
2991 * Waits and/or attempts to assist performing tasks indefinitely
2992 * until the {@link #commonPool()} {@link #isQuiescent}.
2993 */
2994 static void quiesceCommonPool() {
2995 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
2996 }
2997
2998 /**
2999 * Interface for extending managed parallelism for tasks running
3000 * in {@link ForkJoinPool}s.
3001 *
3002 * <p>A {@code ManagedBlocker} provides two methods. Method
3003 * {@link #isReleasable} must return {@code true} if blocking is
3004 * not necessary. Method {@link #block} blocks the current thread
3005 * if necessary (perhaps internally invoking {@code isReleasable}
3006 * before actually blocking). These actions are performed by any
3007 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3008 * The unusual methods in this API accommodate synchronizers that
3009 * may, but don't usually, block for long periods. Similarly, they
3010 * allow more efficient internal handling of cases in which
3011 * additional workers may be, but usually are not, needed to
3012 * ensure sufficient parallelism. Toward this end,
3013 * implementations of method {@code isReleasable} must be amenable
3014 * to repeated invocation.
3015 *
3016 * <p>For example, here is a ManagedBlocker based on a
3017 * ReentrantLock:
3018 * <pre> {@code
3019 * class ManagedLocker implements ManagedBlocker {
3020 * final ReentrantLock lock;
3021 * boolean hasLock = false;
3022 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3023 * public boolean block() {
3024 * if (!hasLock)
3025 * lock.lock();
3026 * return true;
3027 * }
3028 * public boolean isReleasable() {
3029 * return hasLock || (hasLock = lock.tryLock());
3030 * }
3031 * }}</pre>
3032 *
3033 * <p>Here is a class that possibly blocks waiting for an
3034 * item on a given queue:
3035 * <pre> {@code
3036 * class QueueTaker<E> implements ManagedBlocker {
3037 * final BlockingQueue<E> queue;
3038 * volatile E item = null;
3039 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3040 * public boolean block() throws InterruptedException {
3041 * if (item == null)
3042 * item = queue.take();
3043 * return true;
3044 * }
3045 * public boolean isReleasable() {
3046 * return item != null || (item = queue.poll()) != null;
3047 * }
3048 * public E getItem() { // call after pool.managedBlock completes
3049 * return item;
3050 * }
3051 * }}</pre>
3052 */
3053 public static interface ManagedBlocker {
3054 /**
3055 * Possibly blocks the current thread, for example waiting for
3056 * a lock or condition.
3057 *
3058 * @return {@code true} if no additional blocking is necessary
3059 * (i.e., if isReleasable would return true)
3060 * @throws InterruptedException if interrupted while waiting
3061 * (the method is not required to do so, but is allowed to)
3062 */
3063 boolean block() throws InterruptedException;
3064
3065 /**
3066 * Returns {@code true} if blocking is unnecessary.
3067 * @return {@code true} if blocking is unnecessary
3068 */
3069 boolean isReleasable();
3070 }
3071
3072 /**
3073 * Runs the given possibly blocking task. When {@linkplain
3074 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3075 * method possibly arranges for a spare thread to be activated if
3076 * necessary to ensure sufficient parallelism while the current
3077 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3078 *
3079 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3080 * {@code blocker.block()} until either method returns {@code true}.
3081 * Every call to {@code blocker.block()} is preceded by a call to
3082 * {@code blocker.isReleasable()} that returned {@code false}.
3083 *
3084 * <p>If not running in a ForkJoinPool, this method is
3085 * behaviorally equivalent to
3086 * <pre> {@code
3087 * while (!blocker.isReleasable())
3088 * if (blocker.block())
3089 * break;}</pre>
3090 *
3091 * If running in a ForkJoinPool, the pool may first be expanded to
3092 * ensure sufficient parallelism available during the call to
3093 * {@code blocker.block()}.
3094 *
3095 * @param blocker the blocker task
3096 * @throws InterruptedException if {@code blocker.block()} did so
3097 */
3098 public static void managedBlock(ManagedBlocker blocker)
3099 throws InterruptedException {
3100 ForkJoinPool p;
3101 ForkJoinWorkerThread wt;
3102 WorkQueue w;
3103 Thread t = Thread.currentThread();
3104 if ((t instanceof ForkJoinWorkerThread) &&
3105 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3106 (w = wt.workQueue) != null) {
3107 int block;
3108 while (!blocker.isReleasable()) {
3109 if ((block = p.tryCompensate(w)) != 0) {
3110 try {
3111 do {} while (!blocker.isReleasable() &&
3112 !blocker.block());
3113 } finally {
3114 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3115 }
3116 break;
3117 }
3118 }
3119 }
3120 else {
3121 do {} while (!blocker.isReleasable() &&
3122 !blocker.block());
3123 }
3124 }
3125
3126 // AbstractExecutorService overrides. These rely on undocumented
3127 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3128 // implement RunnableFuture.
3129
3130 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3131 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3132 }
3133
3134 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3135 return new ForkJoinTask.AdaptedCallable<T>(callable);
3136 }
3137
3138 // Unsafe mechanics
3139 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3140 private static final long CTL;
3141 private static final long MODE;
3142 private static final int ABASE;
3143 private static final int ASHIFT;
3144
3145 static {
3146 try {
3147 CTL = U.objectFieldOffset
3148 (ForkJoinPool.class.getDeclaredField("ctl"));
3149 MODE = U.objectFieldOffset
3150 (ForkJoinPool.class.getDeclaredField("mode"));
3151 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3152 int scale = U.arrayIndexScale(ForkJoinTask[].class);
3153 if ((scale & (scale - 1)) != 0)
3154 throw new Error("array index scale not a power of two");
3155 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3156 } catch (ReflectiveOperationException e) {
3157 throw new Error(e);
3158 }
3159
3160 // Reduce the risk of rare disastrous classloading in first call to
3161 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3162 Class<?> ensureLoaded = LockSupport.class;
3163
3164 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3165 try {
3166 String p = System.getProperty
3167 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3168 if (p != null)
3169 commonMaxSpares = Integer.parseInt(p);
3170 } catch (Exception ignore) {}
3171 COMMON_MAX_SPARES = commonMaxSpares;
3172
3173 defaultForkJoinWorkerThreadFactory =
3174 new DefaultForkJoinWorkerThreadFactory();
3175 modifyThreadPermission = new RuntimePermission("modifyThread");
3176
3177 common = java.security.AccessController.doPrivileged
3178 (new java.security.PrivilegedAction<ForkJoinPool>() {
3179 public ForkJoinPool run() {
3180 return new ForkJoinPool((byte)0); }});
3181
3182 COMMON_PARALLELISM = common.mode & SMASK;
3183 }
3184
3185 /**
3186 * Factory for innocuous worker threads.
3187 */
3188 private static final class InnocuousForkJoinWorkerThreadFactory
3189 implements ForkJoinWorkerThreadFactory {
3190
3191 /**
3192 * An ACC to restrict permissions for the factory itself.
3193 * The constructed workers have no permissions set.
3194 */
3195 private static final AccessControlContext innocuousAcc;
3196 static {
3197 Permissions innocuousPerms = new Permissions();
3198 innocuousPerms.add(modifyThreadPermission);
3199 innocuousPerms.add(new RuntimePermission(
3200 "enableContextClassLoaderOverride"));
3201 innocuousPerms.add(new RuntimePermission(
3202 "modifyThreadGroup"));
3203 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3204 new ProtectionDomain(null, innocuousPerms)
3205 });
3206 }
3207
3208 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3209 return java.security.AccessController.doPrivileged(
3210 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3211 public ForkJoinWorkerThread run() {
3212 return new ForkJoinWorkerThread.
3213 InnocuousForkJoinWorkerThread(pool);
3214 }}, innocuousAcc);
3215 }
3216 }
3217
3218 }