ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.306
Committed: Mon Mar 14 19:57:33 2016 UTC (8 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.305: +1 -0 lines
Log Message:
add missing @since 9

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.security.AccessControlContext;
11 import java.security.Permissions;
12 import java.security.ProtectionDomain;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.CountedCompleter;
20 import java.util.concurrent.ForkJoinTask;
21 import java.util.concurrent.ForkJoinWorkerThread;
22 import java.util.concurrent.locks.LockSupport;
23
24 /**
25 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 * A {@code ForkJoinPool} provides the entry point for submissions
27 * from non-{@code ForkJoinTask} clients, as well as management and
28 * monitoring operations.
29 *
30 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31 * ExecutorService} mainly by virtue of employing
32 * <em>work-stealing</em>: all threads in the pool attempt to find and
33 * execute tasks submitted to the pool and/or created by other active
34 * tasks (eventually blocking waiting for work if none exist). This
35 * enables efficient processing when most tasks spawn other subtasks
36 * (as do most {@code ForkJoinTask}s), as well as when many small
37 * tasks are submitted to the pool from external clients. Especially
38 * when setting <em>asyncMode</em> to true in constructors, {@code
39 * ForkJoinPool}s may also be appropriate for use with event-style
40 * tasks that are never joined.
41 *
42 * <p>A static {@link #commonPool()} is available and appropriate for
43 * most applications. The common pool is used by any ForkJoinTask that
44 * is not explicitly submitted to a specified pool. Using the common
45 * pool normally reduces resource usage (its threads are slowly
46 * reclaimed during periods of non-use, and reinstated upon subsequent
47 * use).
48 *
49 * <p>For applications that require separate or custom pools, a {@code
50 * ForkJoinPool} may be constructed with a given target parallelism
51 * level; by default, equal to the number of available processors.
52 * The pool attempts to maintain enough active (or available) threads
53 * by dynamically adding, suspending, or resuming internal worker
54 * threads, even if some tasks are stalled waiting to join others.
55 * However, no such adjustments are guaranteed in the face of blocked
56 * I/O or other unmanaged synchronization. The nested {@link
57 * ManagedBlocker} interface enables extension of the kinds of
58 * synchronization accommodated. The default policies may be
59 * overridden using a constructor with parameters corresponding to
60 * those documented in class {@link ThreadPoolExecutor}.
61 *
62 * <p>In addition to execution and lifecycle control methods, this
63 * class provides status check methods (for example
64 * {@link #getStealCount}) that are intended to aid in developing,
65 * tuning, and monitoring fork/join applications. Also, method
66 * {@link #toString} returns indications of pool state in a
67 * convenient form for informal monitoring.
68 *
69 * <p>As is the case with other ExecutorServices, there are three
70 * main task execution methods summarized in the following table.
71 * These are designed to be used primarily by clients not already
72 * engaged in fork/join computations in the current pool. The main
73 * forms of these methods accept instances of {@code ForkJoinTask},
74 * but overloaded forms also allow mixed execution of plain {@code
75 * Runnable}- or {@code Callable}- based activities as well. However,
76 * tasks that are already executing in a pool should normally instead
77 * use the within-computation forms listed in the table unless using
78 * async event-style tasks that are not usually joined, in which case
79 * there is little difference among choice of methods.
80 *
81 * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 * <caption>Summary of task execution methods</caption>
83 * <tr>
84 * <td></td>
85 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87 * </tr>
88 * <tr>
89 * <td> <b>Arrange async execution</b></td>
90 * <td> {@link #execute(ForkJoinTask)}</td>
91 * <td> {@link ForkJoinTask#fork}</td>
92 * </tr>
93 * <tr>
94 * <td> <b>Await and obtain result</b></td>
95 * <td> {@link #invoke(ForkJoinTask)}</td>
96 * <td> {@link ForkJoinTask#invoke}</td>
97 * </tr>
98 * <tr>
99 * <td> <b>Arrange exec and obtain Future</b></td>
100 * <td> {@link #submit(ForkJoinTask)}</td>
101 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102 * </tr>
103 * </table>
104 *
105 * <p>The common pool is by default constructed with default
106 * parameters, but these may be controlled by setting three
107 * {@linkplain System#getProperty system properties}:
108 * <ul>
109 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110 * - the parallelism level, a non-negative integer
111 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112 * - the class name of a {@link ForkJoinWorkerThreadFactory}
113 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114 * - the class name of a {@link UncaughtExceptionHandler}
115 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 * - the maximum number of allowed extra threads to maintain target
117 * parallelism (default 256).
118 * </ul>
119 * If a {@link SecurityManager} is present and no factory is
120 * specified, then the default pool uses a factory supplying
121 * threads that have no {@link Permissions} enabled.
122 * The system class loader is used to load these classes.
123 * Upon any error in establishing these settings, default parameters
124 * are used. It is possible to disable or limit the use of threads in
125 * the common pool by setting the parallelism property to zero, and/or
126 * using a factory that may return {@code null}. However doing so may
127 * cause unjoined tasks to never be executed.
128 *
129 * <p><b>Implementation notes</b>: This implementation restricts the
130 * maximum number of running threads to 32767. Attempts to create
131 * pools with greater than the maximum number result in
132 * {@code IllegalArgumentException}.
133 *
134 * <p>This implementation rejects submitted tasks (that is, by throwing
135 * {@link RejectedExecutionException}) only when the pool is shut down
136 * or internal resources have been exhausted.
137 *
138 * @since 1.7
139 * @author Doug Lea
140 */
141 public class ForkJoinPool extends AbstractExecutorService {
142
143 /*
144 * Implementation Overview
145 *
146 * This class and its nested classes provide the main
147 * functionality and control for a set of worker threads:
148 * Submissions from non-FJ threads enter into submission queues.
149 * Workers take these tasks and typically split them into subtasks
150 * that may be stolen by other workers. Preference rules give
151 * first priority to processing tasks from their own queues (LIFO
152 * or FIFO, depending on mode), then to randomized FIFO steals of
153 * tasks in other queues. This framework began as vehicle for
154 * supporting tree-structured parallelism using work-stealing.
155 * Over time, its scalability advantages led to extensions and
156 * changes to better support more diverse usage contexts. Because
157 * most internal methods and nested classes are interrelated,
158 * their main rationale and descriptions are presented here;
159 * individual methods and nested classes contain only brief
160 * comments about details.
161 *
162 * WorkQueues
163 * ==========
164 *
165 * Most operations occur within work-stealing queues (in nested
166 * class WorkQueue). These are special forms of Deques that
167 * support only three of the four possible end-operations -- push,
168 * pop, and poll (aka steal), under the further constraints that
169 * push and pop are called only from the owning thread (or, as
170 * extended here, under a lock), while poll may be called from
171 * other threads. (If you are unfamiliar with them, you probably
172 * want to read Herlihy and Shavit's book "The Art of
173 * Multiprocessor programming", chapter 16 describing these in
174 * more detail before proceeding.) The main work-stealing queue
175 * design is roughly similar to those in the papers "Dynamic
176 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177 * (http://research.sun.com/scalable/pubs/index.html) and
178 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 * The main differences ultimately stem from GC requirements that
181 * we null out taken slots as soon as we can, to maintain as small
182 * a footprint as possible even in programs generating huge
183 * numbers of tasks. To accomplish this, we shift the CAS
184 * arbitrating pop vs poll (steal) from being on the indices
185 * ("base" and "top") to the slots themselves.
186 *
187 * Adding tasks then takes the form of a classic array push(task)
188 * in a circular buffer:
189 * q.array[q.top++ % length] = task;
190 *
191 * (The actual code needs to null-check and size-check the array,
192 * uses masking, not mod, for indexing a power-of-two-sized array,
193 * properly fences accesses, and possibly signals waiting workers
194 * to start scanning -- see below.) Both a successful pop and
195 * poll mainly entail a CAS of a slot from non-null to null.
196 *
197 * The pop operation (always performed by owner) is:
198 * if ((the task at top slot is not null) and
199 * (CAS slot to null))
200 * decrement top and return task;
201 *
202 * And the poll operation (usually by a stealer) is
203 * if ((the task at base slot is not null) and
204 * (CAS slot to null))
205 * increment base and return task;
206 *
207 * There are several variants of each of these. In particular,
208 * almost all uses of poll occur within scan operations that also
209 * interleave contention tracking (with associated code sprawl.)
210 *
211 * Memory ordering. See "Correct and Efficient Work-Stealing for
212 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214 * analysis of memory ordering requirements in work-stealing
215 * algorithms similar to (but different than) the one used here.
216 * Extracting tasks in array slots via (fully fenced) CAS provides
217 * primary synchronization. The base and top indices imprecisely
218 * guide where to extract from. We do not always require strict
219 * orderings of array and index updates, so sometimes let them be
220 * subject to compiler and processor reorderings. However, the
221 * volatile "base" index also serves as a basis for memory
222 * ordering: Slot accesses are preceded by a read of base,
223 * ensuring happens-before ordering with respect to stealers (so
224 * the slots themselves can be read via plain array reads.) The
225 * only other memory orderings relied on are maintained in the
226 * course of signalling and activation (see below). A check that
227 * base == top indicates (momentary) emptiness, but otherwise may
228 * err on the side of possibly making the queue appear nonempty
229 * when a push, pop, or poll have not fully committed, or making
230 * it appear empty when an update of top has not yet been visibly
231 * written. (Method isEmpty() checks the case of a partially
232 * completed removal of the last element.) Because of this, the
233 * poll operation, considered individually, is not wait-free. One
234 * thief cannot successfully continue until another in-progress
235 * one (or, if previously empty, a push) visibly completes.
236 * However, in the aggregate, we ensure at least probabilistic
237 * non-blockingness. If an attempted steal fails, a scanning
238 * thief chooses a different random victim target to try next. So,
239 * in order for one thief to progress, it suffices for any
240 * in-progress poll or new push on any empty queue to
241 * complete.
242 *
243 * This approach also enables support of a user mode in which
244 * local task processing is in FIFO, not LIFO order, simply by
245 * using poll rather than pop. This can be useful in
246 * message-passing frameworks in which tasks are never joined.
247 *
248 * WorkQueues are also used in a similar way for tasks submitted
249 * to the pool. We cannot mix these tasks in the same queues used
250 * by workers. Instead, we randomly associate submission queues
251 * with submitting threads, using a form of hashing. The
252 * ThreadLocalRandom probe value serves as a hash code for
253 * choosing existing queues, and may be randomly repositioned upon
254 * contention with other submitters. In essence, submitters act
255 * like workers except that they are restricted to executing local
256 * tasks that they submitted. Insertion of tasks in shared mode
257 * requires a lock but we use only a simple spinlock (using field
258 * phase), because submitters encountering a busy queue move to a
259 * different position to use or create other queues -- they block
260 * only when creating and registering new queues. Because it is
261 * used only as a spinlock, unlocking requires only a "releasing"
262 * store (using putOrderedInt).
263 *
264 * Management
265 * ==========
266 *
267 * The main throughput advantages of work-stealing stem from
268 * decentralized control -- workers mostly take tasks from
269 * themselves or each other, at rates that can exceed a billion
270 * per second. The pool itself creates, activates (enables
271 * scanning for and running tasks), deactivates, blocks, and
272 * terminates threads, all with minimal central information.
273 * There are only a few properties that we can globally track or
274 * maintain, so we pack them into a small number of variables,
275 * often maintaining atomicity without blocking or locking.
276 * Nearly all essentially atomic control state is held in a few
277 * volatile variables that are by far most often read (not
278 * written) as status and consistency checks. We pack as much
279 * information into them as we can.
280 *
281 * Field "ctl" contains 64 bits holding information needed to
282 * atomically decide to add, enqueue (on an event queue), and
283 * dequeue (and release)-activate workers. To enable this
284 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285 * far in excess of normal operating range) to allow ids, counts,
286 * and their negations (used for thresholding) to fit into 16bit
287 * subfields.
288 *
289 * Field "mode" holds configuration parameters as well as lifetime
290 * status, atomically and monotonically setting SHUTDOWN, STOP,
291 * and finally TERMINATED bits.
292 *
293 * Field "workQueues" holds references to WorkQueues. It is
294 * updated (only during worker creation and termination) under
295 * lock (using field workerNamePrefix as lock), but is otherwise
296 * concurrently readable, and accessed directly. We also ensure
297 * that uses of the array reference itself never become too stale
298 * in case of resizing. To simplify index-based operations, the
299 * array size is always a power of two, and all readers must
300 * tolerate null slots. Worker queues are at odd indices. Shared
301 * (submission) queues are at even indices, up to a maximum of 64
302 * slots, to limit growth even if array needs to expand to add
303 * more workers. Grouping them together in this way simplifies and
304 * speeds up task scanning.
305 *
306 * All worker thread creation is on-demand, triggered by task
307 * submissions, replacement of terminated workers, and/or
308 * compensation for blocked workers. However, all other support
309 * code is set up to work with other policies. To ensure that we
310 * do not hold on to worker references that would prevent GC, all
311 * accesses to workQueues are via indices into the workQueues
312 * array (which is one source of some of the messy code
313 * constructions here). In essence, the workQueues array serves as
314 * a weak reference mechanism. Thus for example the stack top
315 * subfield of ctl stores indices, not references.
316 *
317 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318 * cannot let workers spin indefinitely scanning for tasks when
319 * none can be found immediately, and we cannot start/resume
320 * workers unless there appear to be tasks available. On the
321 * other hand, we must quickly prod them into action when new
322 * tasks are submitted or generated. In many usages, ramp-up time
323 * is the main limiting factor in overall performance, which is
324 * compounded at program start-up by JIT compilation and
325 * allocation. So we streamline this as much as possible.
326 *
327 * The "ctl" field atomically maintains total worker and
328 * "released" worker counts, plus the head of the available worker
329 * queue (actually stack, represented by the lower 32bit subfield
330 * of ctl). Released workers are those known to be scanning for
331 * and/or running tasks. Unreleased ("available") workers are
332 * recorded in the ctl stack. These workers are made available for
333 * signalling by enqueuing in ctl (see method runWorker). The
334 * "queue" is a form of Treiber stack. This is ideal for
335 * activating threads in most-recently used order, and improves
336 * performance and locality, outweighing the disadvantages of
337 * being prone to contention and inability to release a worker
338 * unless it is topmost on stack. To avoid missed signal problems
339 * inherent in any wait/signal design, available workers rescan
340 * for (and if found run) tasks after enqueuing. Normally their
341 * release status will be updated while doing so, but the released
342 * worker ctl count may underestimate the number of active
343 * threads. (However, it is still possible to determine quiescence
344 * via a validation traversal -- see isQuiescent). After an
345 * unsuccessful rescan, available workers are blocked until
346 * signalled (see signalWork). The top stack state holds the
347 * value of the "phase" field of the worker: its index and status,
348 * plus a version counter that, in addition to the count subfields
349 * (also serving as version stamps) provide protection against
350 * Treiber stack ABA effects.
351 *
352 * Creating workers. To create a worker, we pre-increment counts
353 * (serving as a reservation), and attempt to construct a
354 * ForkJoinWorkerThread via its factory. Upon construction, the
355 * new thread invokes registerWorker, where it constructs a
356 * WorkQueue and is assigned an index in the workQueues array
357 * (expanding the array if necessary). The thread is then started.
358 * Upon any exception across these steps, or null return from
359 * factory, deregisterWorker adjusts counts and records
360 * accordingly. If a null return, the pool continues running with
361 * fewer than the target number workers. If exceptional, the
362 * exception is propagated, generally to some external caller.
363 * Worker index assignment avoids the bias in scanning that would
364 * occur if entries were sequentially packed starting at the front
365 * of the workQueues array. We treat the array as a simple
366 * power-of-two hash table, expanding as needed. The seedIndex
367 * increment ensures no collisions until a resize is needed or a
368 * worker is deregistered and replaced, and thereafter keeps
369 * probability of collision low. We cannot use
370 * ThreadLocalRandom.getProbe() for similar purposes here because
371 * the thread has not started yet, but do so for creating
372 * submission queues for existing external threads (see
373 * externalPush).
374 *
375 * WorkQueue field "phase" is used by both workers and the pool to
376 * manage and track whether a worker is UNSIGNALLED (possibly
377 * blocked waiting for a signal). When a worker is enqueued its
378 * phase field is set. Note that phase field updates lag queue CAS
379 * releases so usage requires care -- seeing a negative phase does
380 * not guarantee that the worker is available. When queued, the
381 * lower 16 bits of scanState must hold its pool index. So we
382 * place the index there upon initialization (see registerWorker)
383 * and otherwise keep it there or restore it when necessary.
384 *
385 * The ctl field also serves as the basis for memory
386 * synchronization surrounding activation. This uses a more
387 * efficient version of a Dekker-like rule that task producers and
388 * consumers sync with each other by both writing/CASing ctl (even
389 * if to its current value). This would be extremely costly. So
390 * we relax it in several ways: (1) Producers only signal when
391 * their queue is empty. Other workers propagate this signal (in
392 * method scan) when they find tasks; to further reduce flailing,
393 * each worker signals only one other per activation. (2) Workers
394 * only enqueue after scanning (see below) and not finding any
395 * tasks. (3) Rather than CASing ctl to its current value in the
396 * common case where no action is required, we reduce write
397 * contention by equivalently prefacing signalWork when called by
398 * an external task producer using a memory access with
399 * full-volatile semantics or a "fullFence".
400 *
401 * Almost always, too many signals are issued. A task producer
402 * cannot in general tell if some existing worker is in the midst
403 * of finishing one task (or already scanning) and ready to take
404 * another without being signalled. So the producer might instead
405 * activate a different worker that does not find any work, and
406 * then inactivates. This scarcely matters in steady-state
407 * computations involving all workers, but can create contention
408 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 * computations involving only a few workers.
410 *
411 * Scanning. Method runWorker performs top-level scanning for
412 * tasks. Each scan traverses and tries to poll from each queue
413 * starting at a random index and circularly stepping. Scans are
414 * not performed in ideal random permutation order, to reduce
415 * cacheline contention. The pseudorandom generator need not have
416 * high-quality statistical properties in the long term, but just
417 * within computations; We use Marsaglia XorShifts (often via
418 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419 * suffice. Scanning also employs contention reduction: When
420 * scanning workers fail to extract an apparently existing task,
421 * they soon restart at a different pseudorandom index. This
422 * improves throughput when many threads are trying to take tasks
423 * from few queues, which can be common in some usages. Scans do
424 * not otherwise explicitly take into account core affinities,
425 * loads, cache localities, etc, However, they do exploit temporal
426 * locality (which usually approximates these) by preferring to
427 * re-poll (at most #workers times) from the same queue after a
428 * successful poll before trying others.
429 *
430 * Trimming workers. To release resources after periods of lack of
431 * use, a worker starting to wait when the pool is quiescent will
432 * time out and terminate (see method scan) if the pool has
433 * remained quiescent for period given by field keepAlive.
434 *
435 * Shutdown and Termination. A call to shutdownNow invokes
436 * tryTerminate to atomically set a runState bit. The calling
437 * thread, as well as every other worker thereafter terminating,
438 * helps terminate others by cancelling their unprocessed tasks,
439 * and waking them up, doing so repeatedly until stable. Calls to
440 * non-abrupt shutdown() preface this by checking whether
441 * termination should commence by sweeping through queues (until
442 * stable) to ensure lack of in-flight submissions and workers
443 * about to process them before triggering the "STOP" phase of
444 * termination.
445 *
446 * Joining Tasks
447 * =============
448 *
449 * Any of several actions may be taken when one worker is waiting
450 * to join a task stolen (or always held) by another. Because we
451 * are multiplexing many tasks on to a pool of workers, we can't
452 * always just let them block (as in Thread.join). We also cannot
453 * just reassign the joiner's run-time stack with another and
454 * replace it later, which would be a form of "continuation", that
455 * even if possible is not necessarily a good idea since we may
456 * need both an unblocked task and its continuation to progress.
457 * Instead we combine two tactics:
458 *
459 * Helping: Arranging for the joiner to execute some task that it
460 * would be running if the steal had not occurred.
461 *
462 * Compensating: Unless there are already enough live threads,
463 * method tryCompensate() may create or re-activate a spare
464 * thread to compensate for blocked joiners until they unblock.
465 *
466 * A third form (implemented in tryRemoveAndExec) amounts to
467 * helping a hypothetical compensator: If we can readily tell that
468 * a possible action of a compensator is to steal and execute the
469 * task being joined, the joining thread can do so directly,
470 * without the need for a compensation thread.
471 *
472 * The ManagedBlocker extension API can't use helping so relies
473 * only on compensation in method awaitBlocker.
474 *
475 * The algorithm in awaitJoin entails a form of "linear helping".
476 * Each worker records (in field source) the id of the queue from
477 * which it last stole a task. The scan in method awaitJoin uses
478 * these markers to try to find a worker to help (i.e., steal back
479 * a task from and execute it) that could hasten completion of the
480 * actively joined task. Thus, the joiner executes a task that
481 * would be on its own local deque if the to-be-joined task had
482 * not been stolen. This is a conservative variant of the approach
483 * described in Wagner & Calder "Leapfrogging: a portable
484 * technique for implementing efficient futures" SIGPLAN Notices,
485 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486 * mainly in that we only record queue ids, not full dependency
487 * links. This requires a linear scan of the workQueues array to
488 * locate stealers, but isolates cost to when it is needed, rather
489 * than adding to per-task overhead. Searches can fail to locate
490 * stealers GC stalls and the like delay recording sources.
491 * Further, even when accurately identified, stealers might not
492 * ever produce a task that the joiner can in turn help with. So,
493 * compensation is tried upon failure to find tasks to run.
494 *
495 * Compensation does not by default aim to keep exactly the target
496 * parallelism number of unblocked threads running at any given
497 * time. Some previous versions of this class employed immediate
498 * compensations for any blocked join. However, in practice, the
499 * vast majority of blockages are transient byproducts of GC and
500 * other JVM or OS activities that are made worse by replacement.
501 * Rather than impose arbitrary policies, we allow users to
502 * override the default of only adding threads upon apparent
503 * starvation. The compensation mechanism may also be bounded.
504 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505 * JVMs to cope with programming errors and abuse before running
506 * out of resources to do so.
507 *
508 * Common Pool
509 * ===========
510 *
511 * The static common pool always exists after static
512 * initialization. Since it (or any other created pool) need
513 * never be used, we minimize initial construction overhead and
514 * footprint to the setup of about a dozen fields.
515 *
516 * When external threads submit to the common pool, they can
517 * perform subtask processing (see externalHelpComplete and
518 * related methods) upon joins. This caller-helps policy makes it
519 * sensible to set common pool parallelism level to one (or more)
520 * less than the total number of available cores, or even zero for
521 * pure caller-runs. We do not need to record whether external
522 * submissions are to the common pool -- if not, external help
523 * methods return quickly. These submitters would otherwise be
524 * blocked waiting for completion, so the extra effort (with
525 * liberally sprinkled task status checks) in inapplicable cases
526 * amounts to an odd form of limited spin-wait before blocking in
527 * ForkJoinTask.join.
528 *
529 * As a more appropriate default in managed environments, unless
530 * overridden by system properties, we use workers of subclass
531 * InnocuousForkJoinWorkerThread when there is a SecurityManager
532 * present. These workers have no permissions set, do not belong
533 * to any user-defined ThreadGroup, and erase all ThreadLocals
534 * after executing any top-level task (see
535 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536 * in ForkJoinWorkerThread) may be JVM-dependent and must access
537 * particular Thread class fields to achieve this effect.
538 *
539 * Style notes
540 * ===========
541 *
542 * Memory ordering relies mainly on Unsafe intrinsics that carry
543 * the further responsibility of explicitly performing null- and
544 * bounds- checks otherwise carried out implicitly by JVMs. This
545 * can be awkward and ugly, but also reflects the need to control
546 * outcomes across the unusual cases that arise in very racy code
547 * with very few invariants. So these explicit checks would exist
548 * in some form anyway. All fields are read into locals before
549 * use, and null-checked if they are references. This is usually
550 * done in a "C"-like style of listing declarations at the heads
551 * of methods or blocks, and using inline assignments on first
552 * encounter. Array bounds-checks are usually performed by
553 * masking with array.length-1, which relies on the invariant that
554 * these arrays are created with positive lengths, which is itself
555 * paranoically checked. Nearly all explicit checks lead to
556 * bypass/return, not exception throws, because they may
557 * legitimately arise due to cancellation/revocation during
558 * shutdown.
559 *
560 * There is a lot of representation-level coupling among classes
561 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562 * fields of WorkQueue maintain data structures managed by
563 * ForkJoinPool, so are directly accessed. There is little point
564 * trying to reduce this, since any associated future changes in
565 * representations will need to be accompanied by algorithmic
566 * changes anyway. Several methods intrinsically sprawl because
567 * they must accumulate sets of consistent reads of fields held in
568 * local variables. There are also other coding oddities
569 * (including several unnecessary-looking hoisted null checks)
570 * that help some methods perform reasonably even when interpreted
571 * (not compiled).
572 *
573 * The order of declarations in this file is (with a few exceptions):
574 * (1) Static utility functions
575 * (2) Nested (static) classes
576 * (3) Static fields
577 * (4) Fields, along with constants used when unpacking some of them
578 * (5) Internal control methods
579 * (6) Callbacks and other support for ForkJoinTask methods
580 * (7) Exported methods
581 * (8) Static block initializing statics in minimally dependent order
582 */
583
584 // Static utilities
585
586 /**
587 * If there is a security manager, makes sure caller has
588 * permission to modify threads.
589 */
590 private static void checkPermission() {
591 SecurityManager security = System.getSecurityManager();
592 if (security != null)
593 security.checkPermission(modifyThreadPermission);
594 }
595
596 // Nested classes
597
598 /**
599 * Factory for creating new {@link ForkJoinWorkerThread}s.
600 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601 * for {@code ForkJoinWorkerThread} subclasses that extend base
602 * functionality or initialize threads with different contexts.
603 */
604 public static interface ForkJoinWorkerThreadFactory {
605 /**
606 * Returns a new worker thread operating in the given pool.
607 * Returning null or throwing an exception may result in tasks
608 * never being executed. If this method throws an exception,
609 * it is relayed to the caller of the method (for example
610 * {@code execute}) causing attempted thread creation. If this
611 * method returns null or throws an exception, it is not
612 * retried until the next attempted creation (for example
613 * another call to {@code execute}).
614 *
615 * @param pool the pool this thread works in
616 * @return the new worker thread, or {@code null} if the request
617 * to create a thread is rejected.
618 * @throws NullPointerException if the pool is null
619 */
620 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621 }
622
623 /**
624 * Default ForkJoinWorkerThreadFactory implementation; creates a
625 * new ForkJoinWorkerThread.
626 */
627 private static final class DefaultForkJoinWorkerThreadFactory
628 implements ForkJoinWorkerThreadFactory {
629 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 return new ForkJoinWorkerThread(pool);
631 }
632 }
633
634 // Constants shared across ForkJoinPool and WorkQueue
635
636 // Bounds
637 static final int SWIDTH = 16; // width of short
638 static final int SMASK = 0xffff; // short bits == max index
639 static final int MAX_CAP = 0x7fff; // max #workers - 1
640 static final int SQMASK = 0x007e; // max 64 (even) slots
641
642 // Masks and units for WorkQueue.phase and ctl sp subfield
643 static final int UNSIGNALLED = 1 << 31; // must be negative
644 static final int SS_SEQ = 1 << 16; // version count
645 static final int QLOCK = 1; // must be 1
646
647 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648 static final int OWNED = 1; // queue has owner thread
649 static final int FIFO = 1 << 16; // fifo queue or access mode
650 static final int SATURATE = 1 << 17; // for tryCompensate
651 static final int SHUTDOWN = 1 << 18;
652 static final int TERMINATED = 1 << 19;
653 static final int STOP = 1 << 31; // must be negative
654 static final int QUIET = 1 << 30; // not scanning or working
655 static final int DORMANT = QUIET | UNSIGNALLED;
656
657 /**
658 * The maximum number of local polls from the same queue before
659 * checking others. This is a safeguard against infinitely unfair
660 * looping under unbounded user task recursion, and must be larger
661 * than plausible cases of intentional bounded task recursion.
662 */
663 static final int POLL_LIMIT = 1 << 10;
664
665 /**
666 * Queues supporting work-stealing as well as external task
667 * submission. See above for descriptions and algorithms.
668 * Performance on most platforms is very sensitive to placement of
669 * instances of both WorkQueues and their arrays -- we absolutely
670 * do not want multiple WorkQueue instances or multiple queue
671 * arrays sharing cache lines. The @Contended annotation alerts
672 * JVMs to try to keep instances apart.
673 */
674 // For now, using manual padding.
675 // @jdk.internal.vm.annotation.Contended
676 // @sun.misc.Contended
677 static final class WorkQueue {
678
679 /**
680 * Capacity of work-stealing queue array upon initialization.
681 * Must be a power of two; at least 4, but should be larger to
682 * reduce or eliminate cacheline sharing among queues.
683 * Currently, it is much larger, as a partial workaround for
684 * the fact that JVMs often place arrays in locations that
685 * share GC bookkeeping (especially cardmarks) such that
686 * per-write accesses encounter serious memory contention.
687 */
688 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689
690 /**
691 * Maximum size for queue arrays. Must be a power of two less
692 * than or equal to 1 << (31 - width of array entry) to ensure
693 * lack of wraparound of index calculations, but defined to a
694 * value a bit less than this to help users trap runaway
695 * programs before saturating systems.
696 */
697 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698
699 // Instance fields
700 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702 volatile int phase; // versioned, negative: queued, 1: locked
703 int stackPred; // pool stack (ctl) predecessor link
704 int nsteals; // number of steals
705 int id; // index, mode, tag
706 volatile int source; // source queue id, or sentinel
707 volatile int base; // index of next slot for poll
708 int top; // index of next slot for push
709 ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 final ForkJoinPool pool; // the containing pool (may be null)
711 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714
715 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 this.pool = pool;
717 this.owner = owner;
718 // Place indices in the center of array (that is not yet allocated)
719 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720 }
721
722 /**
723 * Returns an exportable index (used by ForkJoinWorkerThread).
724 */
725 final int getPoolIndex() {
726 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 }
728
729 /**
730 * Returns the approximate number of tasks in the queue.
731 */
732 final int queueSize() {
733 int n = base - top; // read base first
734 return (n >= 0) ? 0 : -n; // ignore transient negative
735 }
736
737 /**
738 * Provides a more accurate estimate of whether this queue has
739 * any tasks than does queueSize, by checking whether a
740 * near-empty queue has at least one unclaimed task.
741 */
742 final boolean isEmpty() {
743 ForkJoinTask<?>[] a; int n, al, b;
744 return ((n = (b = base) - top) >= 0 || // possibly one task
745 (n == -1 && ((a = array) == null ||
746 (al = a.length) == 0 ||
747 a[(al - 1) & b] == null)));
748 }
749
750
751 /**
752 * Pushes a task. Call only by owner in unshared queues.
753 *
754 * @param task the task. Caller must ensure non-null.
755 * @throws RejectedExecutionException if array cannot be resized
756 */
757 final void push(ForkJoinTask<?> task) {
758 int s = top; ForkJoinTask<?>[] a; int al, d;
759 if ((a = array) != null && (al = a.length) > 0) {
760 int index = (al - 1) & s;
761 long offset = ((long)index << ASHIFT) + ABASE;
762 ForkJoinPool p = pool;
763 top = s + 1;
764 U.putOrderedObject(a, offset, task);
765 if ((d = base - s) == 0 && p != null) {
766 U.fullFence();
767 p.signalWork();
768 }
769 else if (d + al == 1)
770 growArray();
771 }
772 }
773
774 /**
775 * Initializes or doubles the capacity of array. Call either
776 * by owner or with lock held -- it is OK for base, but not
777 * top, to move while resizings are in progress.
778 */
779 final ForkJoinTask<?>[] growArray() {
780 ForkJoinTask<?>[] oldA = array;
781 int oldSize = oldA != null ? oldA.length : 0;
782 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 throw new RejectedExecutionException("Queue capacity exceeded");
785 int oldMask, t, b;
786 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 (t = top) - (b = base) > 0) {
789 int mask = size - 1;
790 do { // emulate poll from old array, push to new array
791 int index = b & oldMask;
792 long offset = ((long)index << ASHIFT) + ABASE;
793 ForkJoinTask<?> x = (ForkJoinTask<?>)
794 U.getObjectVolatile(oldA, offset);
795 if (x != null &&
796 U.compareAndSwapObject(oldA, offset, x, null))
797 a[b & mask] = x;
798 } while (++b != t);
799 U.storeFence();
800 }
801 return a;
802 }
803
804 /**
805 * Takes next task, if one exists, in LIFO order. Call only
806 * by owner in unshared queues.
807 */
808 final ForkJoinTask<?> pop() {
809 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 if ((a = array) != null && b != s && (al = a.length) > 0) {
811 int index = (al - 1) & --s;
812 long offset = ((long)index << ASHIFT) + ABASE;
813 ForkJoinTask<?> t = (ForkJoinTask<?>)
814 U.getObject(a, offset);
815 if (t != null &&
816 U.compareAndSwapObject(a, offset, t, null)) {
817 top = s;
818 U.storeFence();
819 return t;
820 }
821 }
822 return null;
823 }
824
825 /**
826 * Takes next task, if one exists, in FIFO order.
827 */
828 final ForkJoinTask<?> poll() {
829 for (;;) {
830 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831 if ((a = array) != null && (d = b - s) < 0 &&
832 (al = a.length) > 0) {
833 int index = (al - 1) & b;
834 long offset = ((long)index << ASHIFT) + ABASE;
835 ForkJoinTask<?> t = (ForkJoinTask<?>)
836 U.getObjectVolatile(a, offset);
837 if (b++ == base) {
838 if (t != null) {
839 if (U.compareAndSwapObject(a, offset, t, null)) {
840 base = b;
841 return t;
842 }
843 }
844 else if (d == -1)
845 break; // now empty
846 }
847 }
848 else
849 break;
850 }
851 return null;
852 }
853
854 /**
855 * Takes next task, if one exists, in order specified by mode.
856 */
857 final ForkJoinTask<?> nextLocalTask() {
858 return ((id & FIFO) != 0) ? poll() : pop();
859 }
860
861 /**
862 * Returns next task, if one exists, in order specified by mode.
863 */
864 final ForkJoinTask<?> peek() {
865 int al; ForkJoinTask<?>[] a;
866 return ((a = array) != null && (al = a.length) > 0) ?
867 a[(al - 1) &
868 ((id & FIFO) != 0 ? base : top - 1)] : null;
869 }
870
871 /**
872 * Pops the given task only if it is at the current top.
873 */
874 final boolean tryUnpush(ForkJoinTask<?> task) {
875 int b = base, s = top, al; ForkJoinTask<?>[] a;
876 if ((a = array) != null && b != s && (al = a.length) > 0) {
877 int index = (al - 1) & --s;
878 long offset = ((long)index << ASHIFT) + ABASE;
879 if (U.compareAndSwapObject(a, offset, task, null)) {
880 top = s;
881 U.storeFence();
882 return true;
883 }
884 }
885 return false;
886 }
887
888 /**
889 * Removes and cancels all known tasks, ignoring any exceptions.
890 */
891 final void cancelAll() {
892 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 ForkJoinTask.cancelIgnoringExceptions(t);
894 }
895
896 // Specialized execution methods
897
898 /**
899 * Pops and executes up to limit consecutive tasks or until empty.
900 *
901 * @param limit max runs, or zero for no limit
902 */
903 final void localPopAndExec(int limit) {
904 for (;;) {
905 int b = base, s = top, al; ForkJoinTask<?>[] a;
906 if ((a = array) != null && b != s && (al = a.length) > 0) {
907 int index = (al - 1) & --s;
908 long offset = ((long)index << ASHIFT) + ABASE;
909 ForkJoinTask<?> t = (ForkJoinTask<?>)
910 U.getAndSetObject(a, offset, null);
911 if (t != null) {
912 top = s;
913 U.storeFence();
914 t.doExec();
915 if (limit != 0 && --limit == 0)
916 break;
917 }
918 else
919 break;
920 }
921 else
922 break;
923 }
924 }
925
926 /**
927 * Polls and executes up to limit consecutive tasks or until empty.
928 *
929 * @param limit, or zero for no limit
930 */
931 final void localPollAndExec(int limit) {
932 for (int polls = 0;;) {
933 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934 if ((a = array) != null && (d = b - s) < 0 &&
935 (al = a.length) > 0) {
936 int index = (al - 1) & b++;
937 long offset = ((long)index << ASHIFT) + ABASE;
938 ForkJoinTask<?> t = (ForkJoinTask<?>)
939 U.getAndSetObject(a, offset, null);
940 if (t != null) {
941 base = b;
942 t.doExec();
943 if (limit != 0 && ++polls == limit)
944 break;
945 }
946 else if (d == -1)
947 break; // now empty
948 else
949 polls = 0; // stolen; reset
950 }
951 else
952 break;
953 }
954 }
955
956 /**
957 * If present, removes task from queue and executes it.
958 */
959 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960 ForkJoinTask<?>[] wa; int s, wal;
961 if (base - (s = top) < 0 && // traverse from top
962 (wa = array) != null && (wal = wa.length) > 0) {
963 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964 int index = i & m;
965 long offset = (index << ASHIFT) + ABASE;
966 ForkJoinTask<?> t = (ForkJoinTask<?>)
967 U.getObject(wa, offset);
968 if (t == null)
969 break;
970 else if (t == task) {
971 if (U.compareAndSwapObject(wa, offset, t, null)) {
972 top = ns; // safely shift down
973 for (int j = i; j != ns; ++j) {
974 ForkJoinTask<?> f;
975 int pindex = (j + 1) & m;
976 long pOffset = (pindex << ASHIFT) + ABASE;
977 f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978 U.putObjectVolatile(wa, pOffset, null);
979
980 int jindex = j & m;
981 long jOffset = (jindex << ASHIFT) + ABASE;
982 U.putOrderedObject(wa, jOffset, f);
983 }
984 U.storeFence();
985 t.doExec();
986 }
987 break;
988 }
989 }
990 }
991 }
992
993 /**
994 * Tries to steal and run tasks within the target's
995 * computation until done, not found, or limit exceeded.
996 *
997 * @param task root of CountedCompleter computation
998 * @param limit max runs, or zero for no limit
999 * @return task status on exit
1000 */
1001 final int localHelpCC(CountedCompleter<?> task, int limit) {
1002 int status = 0;
1003 if (task != null && (status = task.status) >= 0) {
1004 for (;;) {
1005 boolean help = false;
1006 int b = base, s = top, al; ForkJoinTask<?>[] a;
1007 if ((a = array) != null && b != s && (al = a.length) > 0) {
1008 int index = (al - 1) & (s - 1);
1009 long offset = ((long)index << ASHIFT) + ABASE;
1010 ForkJoinTask<?> o = (ForkJoinTask<?>)
1011 U.getObject(a, offset);
1012 if (o instanceof CountedCompleter) {
1013 CountedCompleter<?> t = (CountedCompleter<?>)o;
1014 for (CountedCompleter<?> f = t;;) {
1015 if (f != task) {
1016 if ((f = f.completer) == null) // try parent
1017 break;
1018 }
1019 else {
1020 if (U.compareAndSwapObject(a, offset,
1021 t, null)) {
1022 top = s - 1;
1023 U.storeFence();
1024 t.doExec();
1025 help = true;
1026 }
1027 break;
1028 }
1029 }
1030 }
1031 }
1032 if ((status = task.status) < 0 || !help ||
1033 (limit != 0 && --limit == 0))
1034 break;
1035 }
1036 }
1037 return status;
1038 }
1039
1040 // Operations on shared queues
1041
1042 /**
1043 * Tries to lock shared queue by CASing phase field.
1044 */
1045 final boolean tryLockSharedQueue() {
1046 return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 }
1048
1049 /**
1050 * Shared version of tryUnpush.
1051 */
1052 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053 boolean popped = false;
1054 int s = top - 1, al; ForkJoinTask<?>[] a;
1055 if ((a = array) != null && (al = a.length) > 0) {
1056 int index = (al - 1) & s;
1057 long offset = ((long)index << ASHIFT) + ABASE;
1058 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059 if (t == task &&
1060 U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061 if (top == s + 1 && array == a &&
1062 U.compareAndSwapObject(a, offset, task, null)) {
1063 popped = true;
1064 top = s;
1065 }
1066 U.putOrderedInt(this, PHASE, 0);
1067 }
1068 }
1069 return popped;
1070 }
1071
1072 /**
1073 * Shared version of localHelpCC.
1074 */
1075 final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076 int status = 0;
1077 if (task != null && (status = task.status) >= 0) {
1078 for (;;) {
1079 boolean help = false;
1080 int b = base, s = top, al; ForkJoinTask<?>[] a;
1081 if ((a = array) != null && b != s && (al = a.length) > 0) {
1082 int index = (al - 1) & (s - 1);
1083 long offset = ((long)index << ASHIFT) + ABASE;
1084 ForkJoinTask<?> o = (ForkJoinTask<?>)
1085 U.getObject(a, offset);
1086 if (o instanceof CountedCompleter) {
1087 CountedCompleter<?> t = (CountedCompleter<?>)o;
1088 for (CountedCompleter<?> f = t;;) {
1089 if (f != task) {
1090 if ((f = f.completer) == null)
1091 break;
1092 }
1093 else {
1094 if (U.compareAndSwapInt(this, PHASE,
1095 0, QLOCK)) {
1096 if (top == s && array == a &&
1097 U.compareAndSwapObject(a, offset,
1098 t, null)) {
1099 help = true;
1100 top = s - 1;
1101 }
1102 U.putOrderedInt(this, PHASE, 0);
1103 if (help)
1104 t.doExec();
1105 }
1106 break;
1107 }
1108 }
1109 }
1110 }
1111 if ((status = task.status) < 0 || !help ||
1112 (limit != 0 && --limit == 0))
1113 break;
1114 }
1115 }
1116 return status;
1117 }
1118
1119 /**
1120 * Returns true if owned and not known to be blocked.
1121 */
1122 final boolean isApparentlyUnblocked() {
1123 Thread wt; Thread.State s;
1124 return ((wt = owner) != null &&
1125 (s = wt.getState()) != Thread.State.BLOCKED &&
1126 s != Thread.State.WAITING &&
1127 s != Thread.State.TIMED_WAITING);
1128 }
1129
1130 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 private static final long PHASE;
1133 private static final int ABASE;
1134 private static final int ASHIFT;
1135 static {
1136 try {
1137 PHASE = U.objectFieldOffset
1138 (WorkQueue.class.getDeclaredField("phase"));
1139 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140 int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 if ((scale & (scale - 1)) != 0)
1142 throw new Error("array index scale not a power of two");
1143 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 } catch (ReflectiveOperationException e) {
1145 throw new Error(e);
1146 }
1147 }
1148 }
1149
1150 // static fields (initialized in static initializer below)
1151
1152 /**
1153 * Creates a new ForkJoinWorkerThread. This factory is used unless
1154 * overridden in ForkJoinPool constructors.
1155 */
1156 public static final ForkJoinWorkerThreadFactory
1157 defaultForkJoinWorkerThreadFactory;
1158
1159 /**
1160 * Permission required for callers of methods that may start or
1161 * kill threads.
1162 */
1163 static final RuntimePermission modifyThreadPermission;
1164
1165 /**
1166 * Common (static) pool. Non-null for public use unless a static
1167 * construction exception, but internal usages null-check on use
1168 * to paranoically avoid potential initialization circularities
1169 * as well as to simplify generated code.
1170 */
1171 static final ForkJoinPool common;
1172
1173 /**
1174 * Common pool parallelism. To allow simpler use and management
1175 * when common pool threads are disabled, we allow the underlying
1176 * common.parallelism field to be zero, but in that case still report
1177 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 */
1179 static final int COMMON_PARALLELISM;
1180
1181 /**
1182 * Limit on spare thread construction in tryCompensate.
1183 */
1184 private static final int COMMON_MAX_SPARES;
1185
1186 /**
1187 * Sequence number for creating workerNamePrefix.
1188 */
1189 private static int poolNumberSequence;
1190
1191 /**
1192 * Returns the next sequence number. We don't expect this to
1193 * ever contend, so use simple builtin sync.
1194 */
1195 private static final synchronized int nextPoolId() {
1196 return ++poolNumberSequence;
1197 }
1198
1199 // static configuration constants
1200
1201 /**
1202 * Default idle timeout value (in milliseconds) for the thread
1203 * triggering quiescence to park waiting for new work
1204 */
1205 private static final long DEFAULT_KEEPALIVE = 60000L;
1206
1207 /**
1208 * Undershoot tolerance for idle timeouts
1209 */
1210 private static final long TIMEOUT_SLOP = 20L;
1211
1212 /**
1213 * The default value for COMMON_MAX_SPARES. Overridable using the
1214 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215 * property. The default value is far in excess of normal
1216 * requirements, but also far short of MAX_CAP and typical OS
1217 * thread limits, so allows JVMs to catch misuse/abuse before
1218 * running out of resources needed to do so.
1219 */
1220 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221
1222 /**
1223 * Increment for seed generators. See class ThreadLocal for
1224 * explanation.
1225 */
1226 private static final int SEED_INCREMENT = 0x9e3779b9;
1227
1228 /*
1229 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 * RC: Number of released (unqueued) workers minus target parallelism
1231 * TC: Number of total workers minus target parallelism
1232 * SS: version count and status of top waiting thread
1233 * ID: poolIndex of top of Treiber stack of waiters
1234 *
1235 * When convenient, we can extract the lower 32 stack top bits
1236 * (including version bits) as sp=(int)ctl. The offsets of counts
1237 * by the target parallelism and the positionings of fields makes
1238 * it possible to perform the most common checks via sign tests of
1239 * fields: When ac is negative, there are not enough unqueued
1240 * workers, when tc is negative, there are not enough total
1241 * workers. When sp is non-zero, there are waiting workers. To
1242 * deal with possibly negative fields, we use casts in and out of
1243 * "short" and/or signed shifts to maintain signedness.
1244 *
1245 * Because it occupies uppermost bits, we can add one release count
1246 * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 * from a blocked join. Other updates entail multiple subfields
1248 * and masking, requiring CAS.
1249 *
1250 * The limits packed in field "bounds" are also offset by the
1251 * parallelism level to make them comparable to the ctl rc and tc
1252 * fields.
1253 */
1254
1255 // Lower and upper word masks
1256 private static final long SP_MASK = 0xffffffffL;
1257 private static final long UC_MASK = ~SP_MASK;
1258
1259 // Release counts
1260 private static final int RC_SHIFT = 48;
1261 private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262 private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263
1264 // Total counts
1265 private static final int TC_SHIFT = 32;
1266 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269
1270 // Instance fields
1271
1272 // Segregate ctl field, For now using padding vs @Contended
1273 // @jdk.internal.vm.annotation.Contended("fjpctl")
1274 // @sun.misc.Contended("fjpctl")
1275 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 volatile long ctl; // main pool control
1278 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279 volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280
1281 volatile long stealCount; // collects worker nsteals
1282 final long keepAlive; // milliseconds before dropping if idle
1283 int indexSeed; // next worker index
1284 final int bounds; // min, max threads packed as shorts
1285 volatile int mode; // parallelism, runstate, queue mode
1286 WorkQueue[] workQueues; // main registry
1287 final String workerNamePrefix; // for worker thread string; sync lock
1288 final ForkJoinWorkerThreadFactory factory;
1289 final UncaughtExceptionHandler ueh; // per-worker UEH
1290
1291 // Creating, registering and deregistering workers
1292
1293 /**
1294 * Tries to construct and start one worker. Assumes that total
1295 * count has already been incremented as a reservation. Invokes
1296 * deregisterWorker on any failure.
1297 *
1298 * @return true if successful
1299 */
1300 private boolean createWorker() {
1301 ForkJoinWorkerThreadFactory fac = factory;
1302 Throwable ex = null;
1303 ForkJoinWorkerThread wt = null;
1304 try {
1305 if (fac != null && (wt = fac.newThread(this)) != null) {
1306 wt.start();
1307 return true;
1308 }
1309 } catch (Throwable rex) {
1310 ex = rex;
1311 }
1312 deregisterWorker(wt, ex);
1313 return false;
1314 }
1315
1316 /**
1317 * Tries to add one worker, incrementing ctl counts before doing
1318 * so, relying on createWorker to back out on failure.
1319 *
1320 * @param c incoming ctl value, with total count negative and no
1321 * idle workers. On CAS failure, c is refreshed and retried if
1322 * this holds (otherwise, a new worker is not needed).
1323 */
1324 private void tryAddWorker(long c) {
1325 do {
1326 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 (TC_MASK & (c + TC_UNIT)));
1328 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 createWorker();
1330 break;
1331 }
1332 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333 }
1334
1335 /**
1336 * Callback from ForkJoinWorkerThread constructor to establish and
1337 * record its WorkQueue.
1338 *
1339 * @param wt the worker thread
1340 * @return the worker's queue
1341 */
1342 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 UncaughtExceptionHandler handler;
1344 wt.setDaemon(true); // configure thread
1345 if ((handler = ueh) != null)
1346 wt.setUncaughtExceptionHandler(handler);
1347 WorkQueue w = new WorkQueue(this, wt);
1348 int tid = 0; // for thread name
1349 int fifo = mode & FIFO;
1350 String prefix = workerNamePrefix;
1351 if (prefix != null) {
1352 synchronized (prefix) {
1353 WorkQueue[] ws = workQueues; int n;
1354 int s = indexSeed += SEED_INCREMENT;
1355 if (ws != null && (n = ws.length) > 1) {
1356 int m = n - 1;
1357 tid = s & m;
1358 int i = m & ((s << 1) | 1); // odd-numbered indices
1359 for (int probes = n >>> 1;;) { // find empty slot
1360 WorkQueue q;
1361 if ((q = ws[i]) == null || q.phase == QUIET)
1362 break;
1363 else if (--probes == 0) {
1364 i = n | 1; // resize below
1365 break;
1366 }
1367 else
1368 i = (i + 2) & m;
1369 }
1370
1371 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372 w.phase = w.id = id; // now publishable
1373
1374 if (i < n)
1375 ws[i] = w;
1376 else { // expand array
1377 int an = n << 1;
1378 WorkQueue[] as = new WorkQueue[an];
1379 as[i] = w;
1380 int am = an - 1;
1381 for (int j = 0; j < n; ++j) {
1382 WorkQueue v; // copy external queue
1383 if ((v = ws[j]) != null) // position may change
1384 as[v.id & am & SQMASK] = v;
1385 if (++j >= n)
1386 break;
1387 as[j] = ws[j]; // copy worker
1388 }
1389 workQueues = as;
1390 }
1391 }
1392 }
1393 wt.setName(prefix.concat(Integer.toString(tid)));
1394 }
1395 return w;
1396 }
1397
1398 /**
1399 * Final callback from terminating worker, as well as upon failure
1400 * to construct or start a worker. Removes record of worker from
1401 * array, and adjusts counts. If pool is shutting down, tries to
1402 * complete termination.
1403 *
1404 * @param wt the worker thread, or null if construction failed
1405 * @param ex the exception causing failure, or null if none
1406 */
1407 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408 WorkQueue w = null;
1409 int phase = 0;
1410 if (wt != null && (w = wt.workQueue) != null) {
1411 Object lock = workerNamePrefix;
1412 long ns = (long)w.nsteals & 0xffffffffL;
1413 int idx = w.id & SMASK;
1414 if (lock != null) {
1415 WorkQueue[] ws; // remove index from array
1416 synchronized (lock) {
1417 if ((ws = workQueues) != null && ws.length > idx &&
1418 ws[idx] == w)
1419 ws[idx] = null;
1420 stealCount += ns;
1421 }
1422 }
1423 phase = w.phase;
1424 }
1425 if (phase != QUIET) { // else pre-adjusted
1426 long c; // decrement counts
1427 do {} while (!U.compareAndSwapLong
1428 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 (TC_MASK & (c - TC_UNIT)) |
1430 (SP_MASK & c))));
1431 }
1432 if (w != null)
1433 w.cancelAll(); // cancel remaining tasks
1434
1435 if (!tryTerminate(false, false) && // possibly replace worker
1436 w != null && w.array != null) // avoid repeated failures
1437 signalWork();
1438
1439 if (ex == null) // help clean on way out
1440 ForkJoinTask.helpExpungeStaleExceptions();
1441 else // rethrow
1442 ForkJoinTask.rethrow(ex);
1443 }
1444
1445 /**
1446 * Tries to create or release a worker if too few are running.
1447 */
1448 final void signalWork() {
1449 for (;;) {
1450 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 if ((c = ctl) >= 0L) // enough workers
1452 break;
1453 else if ((sp = (int)c) == 0) { // no idle workers
1454 if ((c & ADD_WORKER) != 0L) // too few workers
1455 tryAddWorker(c);
1456 break;
1457 }
1458 else if ((ws = workQueues) == null)
1459 break; // unstarted/terminated
1460 else if (ws.length <= (i = sp & SMASK))
1461 break; // terminated
1462 else if ((v = ws[i]) == null)
1463 break; // terminating
1464 else {
1465 int np = sp & ~UNSIGNALLED;
1466 int vp = v.phase;
1467 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468 Thread vt = v.owner;
1469 if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470 v.phase = np;
1471 if (v.source < 0)
1472 LockSupport.unpark(vt);
1473 break;
1474 }
1475 }
1476 }
1477 }
1478
1479 /**
1480 * Tries to decrement counts (sometimes implicitly) and possibly
1481 * arrange for a compensating worker in preparation for blocking:
1482 * If not all core workers yet exist, creates one, else if any are
1483 * unreleased (possibly including caller) releases one, else if
1484 * fewer than the minimum allowed number of workers running,
1485 * checks to see that they are all active, and if so creates an
1486 * extra worker unless over maximum limit and policy is to
1487 * saturate. Most of these steps can fail due to interference, in
1488 * which case 0 is returned so caller will retry. A negative
1489 * return value indicates that the caller doesn't need to
1490 * re-adjust counts when later unblocked.
1491 *
1492 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 */
1494 private int tryCompensate(WorkQueue w) {
1495 int t, n, sp;
1496 long c = ctl;
1497 WorkQueue[] ws = workQueues;
1498 if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499 if (ws == null || (n = ws.length) <= 0 || w == null)
1500 return 0; // disabled
1501 else if ((sp = (int)c) != 0) { // replace or release
1502 WorkQueue v = ws[sp & (n - 1)];
1503 int wp = w.phase;
1504 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505 int np = sp & ~UNSIGNALLED;
1506 if (v != null) {
1507 int vp = v.phase;
1508 Thread vt = v.owner;
1509 long nc = ((long)v.stackPred & SP_MASK) | uc;
1510 if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511 v.phase = np;
1512 if (v.source < 0)
1513 LockSupport.unpark(vt);
1514 return (wp < 0) ? -1 : 1;
1515 }
1516 }
1517 return 0;
1518 }
1519 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520 (short)(bounds & SMASK) > 0) {
1521 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522 return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523 }
1524 else { // validate
1525 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526 boolean unstable = false;
1527 for (int i = 1; i < n; i += 2) {
1528 WorkQueue q; Thread wt; Thread.State ts;
1529 if ((q = ws[i]) != null) {
1530 if (q.source == 0) {
1531 unstable = true;
1532 break;
1533 }
1534 else {
1535 --tc;
1536 if ((wt = q.owner) != null &&
1537 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538 ts == Thread.State.WAITING))
1539 ++bc; // worker is blocking
1540 }
1541 }
1542 }
1543 if (unstable || tc != 0 || ctl != c)
1544 return 0; // inconsistent
1545 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546 if ((md & SATURATE) != 0)
1547 return -1;
1548 else if (bc < pc) { // lagging
1549 Thread.yield(); // for retry spins
1550 return 0;
1551 }
1552 else
1553 throw new RejectedExecutionException(
1554 "Thread limit exceeded replacing blocked worker");
1555 }
1556 }
1557 }
1558
1559 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560 return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 }
1562
1563 /**
1564 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 * See above for explanation.
1566 */
1567 final void runWorker(WorkQueue w) {
1568 WorkQueue[] ws;
1569 w.growArray(); // allocate queue
1570 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571 if (r == 0) // initial nonzero seed
1572 r = 1;
1573 int lastSignalId = 0; // avoid unneeded signals
1574 while ((ws = workQueues) != null) {
1575 boolean nonempty = false; // scan
1576 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578 if ((i = r & m) >= 0 && i < n && // always true
1579 (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 (a = q.array) != null && (al = a.length) > 0) {
1581 int qid = q.id; // (never zero)
1582 int index = (al - 1) & b;
1583 long offset = ((long)index << ASHIFT) + ABASE;
1584 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585 U.getObjectVolatile(a, offset);
1586 if (t != null && b++ == q.base &&
1587 U.compareAndSwapObject(a, offset, t, null)) {
1588 if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589 signalWork(); // propagate signal
1590 w.source = lastSignalId = qid;
1591 t.doExec();
1592 if ((w.id & FIFO) != 0) // run remaining locals
1593 w.localPollAndExec(POLL_LIMIT);
1594 else
1595 w.localPopAndExec(POLL_LIMIT);
1596 ForkJoinWorkerThread thread = w.owner;
1597 ++w.nsteals;
1598 w.source = 0; // now idle
1599 if (thread != null)
1600 thread.afterTopLevelExec();
1601 }
1602 nonempty = true;
1603 }
1604 else if (nonempty)
1605 break;
1606 else
1607 ++r;
1608 }
1609
1610 if (nonempty) { // move (xorshift)
1611 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612 }
1613 else {
1614 int phase;
1615 lastSignalId = 0; // clear for next scan
1616 if ((phase = w.phase) >= 0) { // enqueue
1617 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618 long c, nc;
1619 do {
1620 w.stackPred = (int)(c = ctl);
1621 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622 } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623 }
1624 else { // already queued
1625 int pred = w.stackPred;
1626 w.source = DORMANT; // enable signal
1627 for (int steps = 0;;) {
1628 int md, rc; long c;
1629 if (w.phase >= 0) {
1630 w.source = 0;
1631 break;
1632 }
1633 else if ((md = mode) < 0) // shutting down
1634 return;
1635 else if ((rc = ((md & SMASK) + // possibly quiescent
1636 (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637 (md & SHUTDOWN) != 0 &&
1638 tryTerminate(false, false))
1639 return; // help terminate
1640 else if ((++steps & 1) == 0)
1641 Thread.interrupted(); // clear between parks
1642 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643 long d = keepAlive + System.currentTimeMillis();
1644 LockSupport.parkUntil(this, d);
1645 if (ctl == c &&
1646 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647 long nc = ((UC_MASK & (c - TC_UNIT)) |
1648 (SP_MASK & pred));
1649 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650 w.phase = QUIET;
1651 return; // drop on timeout
1652 }
1653 }
1654 }
1655 else
1656 LockSupport.park(this);
1657 }
1658 }
1659 }
1660 }
1661 }
1662
1663 /**
1664 * Helps and/or blocks until the given task is done or timeout.
1665 * First tries locally helping, then scans other queues for a task
1666 * produced by one of w's stealers; compensating and blocking if
1667 * none are found (rescanning if tryCompensate fails).
1668 *
1669 * @param w caller
1670 * @param task the task
1671 * @param deadline for timed waits, if nonzero
1672 * @return task status on exit
1673 */
1674 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1675 int s = 0;
1676 if (w != null && task != null &&
1677 (!(task instanceof CountedCompleter) ||
1678 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1679 w.tryRemoveAndExec(task);
1680 int src = w.source, id = w.id;
1681 s = task.status;
1682 while (s >= 0) {
1683 WorkQueue[] ws;
1684 boolean nonempty = false;
1685 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1686 if ((ws = workQueues) != null) { // scan for matching id
1687 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1688 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1689 if ((i = (r + j) & m) >= 0 && i < n &&
1690 (q = ws[i]) != null && q.source == id &&
1691 (b = q.base) - q.top < 0 &&
1692 (a = q.array) != null && (al = a.length) > 0) {
1693 int qid = q.id;
1694 int index = (al - 1) & b;
1695 long offset = ((long)index << ASHIFT) + ABASE;
1696 ForkJoinTask<?> t = (ForkJoinTask<?>)
1697 U.getObjectVolatile(a, offset);
1698 if (t != null && b++ == q.base && id == q.source &&
1699 U.compareAndSwapObject(a, offset, t, null)) {
1700 q.base = b;
1701 w.source = qid;
1702 t.doExec();
1703 w.source = src;
1704 }
1705 nonempty = true;
1706 break;
1707 }
1708 }
1709 }
1710 if ((s = task.status) < 0)
1711 break;
1712 else if (!nonempty) {
1713 long ms, ns; int block;
1714 if (deadline == 0L)
1715 ms = 0L; // untimed
1716 else if ((ns = deadline - System.nanoTime()) <= 0L)
1717 break; // timeout
1718 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1719 ms = 1L; // avoid 0 for timed wait
1720 if ((block = tryCompensate(w)) != 0) {
1721 task.internalWait(ms);
1722 U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1723 }
1724 s = task.status;
1725 }
1726 }
1727 }
1728 return s;
1729 }
1730
1731 /**
1732 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1733 * when tasks cannot be found, rescans until all others cannot
1734 * find tasks either.
1735 */
1736 final void helpQuiescePool(WorkQueue w) {
1737 int prevSrc = w.source, fifo = w.id & FIFO;
1738 for (int source = prevSrc, released = -1;;) { // -1 until known
1739 WorkQueue[] ws;
1740 if (fifo != 0)
1741 w.localPollAndExec(0);
1742 else
1743 w.localPopAndExec(0);
1744 if (released == -1 && w.phase >= 0)
1745 released = 1;
1746 boolean quiet = true, empty = true;
1747 int r = ThreadLocalRandom.nextSecondarySeed();
1748 if ((ws = workQueues) != null) {
1749 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1750 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1751 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1752 if ((b = q.base) - q.top < 0 &&
1753 (a = q.array) != null && (al = a.length) > 0) {
1754 int qid = q.id;
1755 if (released == 0) { // increment
1756 released = 1;
1757 U.getAndAddLong(this, CTL, RC_UNIT);
1758 }
1759 int index = (al - 1) & b;
1760 long offset = ((long)index << ASHIFT) + ABASE;
1761 ForkJoinTask<?> t = (ForkJoinTask<?>)
1762 U.getObjectVolatile(a, offset);
1763 if (t != null && b++ == q.base &&
1764 U.compareAndSwapObject(a, offset, t, null)) {
1765 q.base = b;
1766 w.source = source = q.id;
1767 t.doExec();
1768 w.source = source = prevSrc;
1769 }
1770 quiet = empty = false;
1771 break;
1772 }
1773 else if ((q.source & QUIET) == 0)
1774 quiet = false;
1775 }
1776 }
1777 }
1778 if (quiet) {
1779 if (released == 0)
1780 U.getAndAddLong(this, CTL, RC_UNIT);
1781 w.source = prevSrc;
1782 break;
1783 }
1784 else if (empty) {
1785 if (source != QUIET)
1786 w.source = source = QUIET;
1787 if (released == 1) { // decrement
1788 released = 0;
1789 U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1790 }
1791 }
1792 }
1793 }
1794
1795 /**
1796 * Scans for and returns a polled task, if available.
1797 * Used only for untracked polls.
1798 *
1799 * @param submissionsOnly if true, only scan submission queues
1800 */
1801 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1802 WorkQueue[] ws; int n;
1803 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1804 (n = ws.length) > 0) {
1805 int m = n - 1;
1806 int r = ThreadLocalRandom.nextSecondarySeed();
1807 int h = r >>> 16;
1808 int origin, step;
1809 if (submissionsOnly) {
1810 origin = (r & ~1) & m; // even indices and steps
1811 step = (h & ~1) | 2;
1812 }
1813 else {
1814 origin = r & m;
1815 step = h | 1;
1816 }
1817 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1818 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1819 if ((q = ws[k]) != null) {
1820 checkSum += b = q.base;
1821 if (b - q.top < 0 &&
1822 (a = q.array) != null && (al = a.length) > 0) {
1823 int index = (al - 1) & b;
1824 long offset = ((long)index << ASHIFT) + ABASE;
1825 ForkJoinTask<?> t = (ForkJoinTask<?>)
1826 U.getObjectVolatile(a, offset);
1827 if (t != null && b++ == q.base &&
1828 U.compareAndSwapObject(a, offset, t, null)) {
1829 q.base = b;
1830 return t;
1831 }
1832 else
1833 break; // restart
1834 }
1835 }
1836 if ((k = (k + step) & m) == origin) {
1837 if (oldSum == (oldSum = checkSum))
1838 break rescan;
1839 checkSum = 0;
1840 }
1841 }
1842 }
1843 return null;
1844 }
1845
1846 /**
1847 * Gets and removes a local or stolen task for the given worker.
1848 *
1849 * @return a task, if available
1850 */
1851 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1852 ForkJoinTask<?> t;
1853 if (w != null &&
1854 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1855 return t;
1856 else
1857 return pollScan(false);
1858 }
1859
1860 // External operations
1861
1862 /**
1863 * Adds the given task to a submission queue at submitter's
1864 * current queue, creating one if null or contended.
1865 *
1866 * @param task the task. Caller must ensure non-null.
1867 */
1868 final void externalPush(ForkJoinTask<?> task) {
1869 int r; // initialize caller's probe
1870 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1871 ThreadLocalRandom.localInit();
1872 r = ThreadLocalRandom.getProbe();
1873 }
1874 for (;;) {
1875 int md = mode, n;
1876 WorkQueue[] ws = workQueues;
1877 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1878 throw new RejectedExecutionException();
1879 else {
1880 WorkQueue q;
1881 boolean push = false, grow = false;
1882 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1883 Object lock = workerNamePrefix;
1884 int qid = (r | QUIET) & ~(FIFO | OWNED);
1885 q = new WorkQueue(this, null);
1886 q.id = qid;
1887 q.source = QUIET;
1888 q.phase = QLOCK; // lock queue
1889 if (lock != null) {
1890 synchronized (lock) { // lock pool to install
1891 int i;
1892 if ((ws = workQueues) != null &&
1893 (n = ws.length) > 0 &&
1894 ws[i = qid & (n - 1) & SQMASK] == null) {
1895 ws[i] = q;
1896 push = grow = true;
1897 }
1898 }
1899 }
1900 }
1901 else if (q.tryLockSharedQueue()) {
1902 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1903 if ((a = q.array) != null && (al = a.length) > 0 &&
1904 al - 1 + (d = b - s) > 0) {
1905 a[(al - 1) & s] = task;
1906 q.top = s + 1; // relaxed writes OK here
1907 q.phase = 0;
1908 if (d < 0 && q.base - s < 0)
1909 break; // no signal needed
1910 }
1911 else
1912 grow = true;
1913 push = true;
1914 }
1915 if (push) {
1916 if (grow) {
1917 try {
1918 q.growArray();
1919 int s = q.top, al; ForkJoinTask<?>[] a;
1920 if ((a = q.array) != null && (al = a.length) > 0) {
1921 a[(al - 1) & s] = task;
1922 q.top = s + 1;
1923 }
1924 } finally {
1925 q.phase = 0;
1926 }
1927 }
1928 signalWork();
1929 break;
1930 }
1931 else // move if busy
1932 r = ThreadLocalRandom.advanceProbe(r);
1933 }
1934 }
1935 }
1936
1937 /**
1938 * Pushes a possibly-external submission.
1939 */
1940 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1941 Thread t; ForkJoinWorkerThread w; WorkQueue q;
1942 if (task == null)
1943 throw new NullPointerException();
1944 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1945 (w = (ForkJoinWorkerThread)t).pool == this &&
1946 (q = w.workQueue) != null)
1947 q.push(task);
1948 else
1949 externalPush(task);
1950 return task;
1951 }
1952
1953 /**
1954 * Returns common pool queue for an external thread.
1955 */
1956 static WorkQueue commonSubmitterQueue() {
1957 ForkJoinPool p = common;
1958 int r = ThreadLocalRandom.getProbe();
1959 WorkQueue[] ws; int n;
1960 return (p != null && (ws = p.workQueues) != null &&
1961 (n = ws.length) > 0) ?
1962 ws[(n - 1) & r & SQMASK] : null;
1963 }
1964
1965 /**
1966 * Performs tryUnpush for an external submitter.
1967 */
1968 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1969 int r = ThreadLocalRandom.getProbe();
1970 WorkQueue[] ws; WorkQueue w; int n;
1971 return ((ws = workQueues) != null &&
1972 (n = ws.length) > 0 &&
1973 (w = ws[(n - 1) & r & SQMASK]) != null &&
1974 w.trySharedUnpush(task));
1975 }
1976
1977 /**
1978 * Performs helpComplete for an external submitter.
1979 */
1980 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1981 int r = ThreadLocalRandom.getProbe();
1982 WorkQueue[] ws; WorkQueue w; int n;
1983 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1984 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1985 w.sharedHelpCC(task, maxTasks) : 0;
1986 }
1987
1988 /**
1989 * Tries to steal and run tasks within the target's computation.
1990 * The maxTasks argument supports external usages; internal calls
1991 * use zero, allowing unbounded steps (external calls trap
1992 * non-positive values).
1993 *
1994 * @param w caller
1995 * @param maxTasks if non-zero, the maximum number of other tasks to run
1996 * @return task status on exit
1997 */
1998 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1999 int maxTasks) {
2000 return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2001 }
2002
2003 /**
2004 * Returns a cheap heuristic guide for task partitioning when
2005 * programmers, frameworks, tools, or languages have little or no
2006 * idea about task granularity. In essence, by offering this
2007 * method, we ask users only about tradeoffs in overhead vs
2008 * expected throughput and its variance, rather than how finely to
2009 * partition tasks.
2010 *
2011 * In a steady state strict (tree-structured) computation, each
2012 * thread makes available for stealing enough tasks for other
2013 * threads to remain active. Inductively, if all threads play by
2014 * the same rules, each thread should make available only a
2015 * constant number of tasks.
2016 *
2017 * The minimum useful constant is just 1. But using a value of 1
2018 * would require immediate replenishment upon each steal to
2019 * maintain enough tasks, which is infeasible. Further,
2020 * partitionings/granularities of offered tasks should minimize
2021 * steal rates, which in general means that threads nearer the top
2022 * of computation tree should generate more than those nearer the
2023 * bottom. In perfect steady state, each thread is at
2024 * approximately the same level of computation tree. However,
2025 * producing extra tasks amortizes the uncertainty of progress and
2026 * diffusion assumptions.
2027 *
2028 * So, users will want to use values larger (but not much larger)
2029 * than 1 to both smooth over transient shortages and hedge
2030 * against uneven progress; as traded off against the cost of
2031 * extra task overhead. We leave the user to pick a threshold
2032 * value to compare with the results of this call to guide
2033 * decisions, but recommend values such as 3.
2034 *
2035 * When all threads are active, it is on average OK to estimate
2036 * surplus strictly locally. In steady-state, if one thread is
2037 * maintaining say 2 surplus tasks, then so are others. So we can
2038 * just use estimated queue length. However, this strategy alone
2039 * leads to serious mis-estimates in some non-steady-state
2040 * conditions (ramp-up, ramp-down, other stalls). We can detect
2041 * many of these by further considering the number of "idle"
2042 * threads, that are known to have zero queued tasks, so
2043 * compensate by a factor of (#idle/#active) threads.
2044 */
2045 static int getSurplusQueuedTaskCount() {
2046 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2047 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2048 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2049 (q = wt.workQueue) != null) {
2050 int p = pool.mode & SMASK;
2051 int a = p + (int)(pool.ctl >> RC_SHIFT);
2052 int n = q.top - q.base;
2053 return n - (a > (p >>>= 1) ? 0 :
2054 a > (p >>>= 1) ? 1 :
2055 a > (p >>>= 1) ? 2 :
2056 a > (p >>>= 1) ? 4 :
2057 8);
2058 }
2059 return 0;
2060 }
2061
2062 // Termination
2063
2064 /**
2065 * Possibly initiates and/or completes termination.
2066 *
2067 * @param now if true, unconditionally terminate, else only
2068 * if no work and no active workers
2069 * @param enable if true, terminate when next possible
2070 * @return true if terminating or terminated
2071 */
2072 private boolean tryTerminate(boolean now, boolean enable) {
2073 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2074
2075 while (((md = mode) & SHUTDOWN) == 0) {
2076 if (!enable || this == common) // cannot shutdown
2077 return false;
2078 else
2079 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2080 }
2081
2082 while (((md = mode) & STOP) == 0) { // try to initiate termination
2083 if (!now) { // check if quiescent & empty
2084 for (long oldSum = 0L;;) { // repeat until stable
2085 boolean running = false;
2086 long checkSum = ctl;
2087 WorkQueue[] ws = workQueues;
2088 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2089 running = true;
2090 else if (ws != null) {
2091 WorkQueue w; int b;
2092 for (int i = 0; i < ws.length; ++i) {
2093 if ((w = ws[i]) != null) {
2094 checkSum += (b = w.base) + w.id;
2095 if (b != w.top ||
2096 ((i & 1) == 1 && w.source >= 0)) {
2097 running = true;
2098 break;
2099 }
2100 }
2101 }
2102 }
2103 if (((md = mode) & STOP) != 0)
2104 break; // already triggered
2105 else if (running)
2106 return false;
2107 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2108 break;
2109 }
2110 }
2111 if ((md & STOP) == 0)
2112 U.compareAndSwapInt(this, MODE, md, md | STOP);
2113 }
2114
2115 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2116 for (long oldSum = 0L;;) { // repeat until stable
2117 WorkQueue[] ws; WorkQueue w;
2118 long checkSum = ctl;
2119 if ((ws = workQueues) != null) {
2120 for (int i = 0; i < ws.length; ++i) {
2121 if ((w = ws[i]) != null) {
2122 ForkJoinWorkerThread wt = w.owner;
2123 w.cancelAll(); // clear queues
2124 if (wt != null) {
2125 try { // unblock join or park
2126 wt.interrupt();
2127 } catch (Throwable ignore) {
2128 }
2129 }
2130 checkSum += w.base + w.id;
2131 }
2132 }
2133 }
2134 if (((md = mode) & TERMINATED) != 0 ||
2135 (workQueues == ws && oldSum == (oldSum = checkSum)))
2136 break;
2137 }
2138 if ((md & TERMINATED) != 0)
2139 break;
2140 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2141 break;
2142 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2143 synchronized (this) {
2144 notifyAll(); // for awaitTermination
2145 }
2146 break;
2147 }
2148 }
2149 return true;
2150 }
2151
2152 // Exported methods
2153
2154 // Constructors
2155
2156 /**
2157 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2158 * java.lang.Runtime#availableProcessors}, using defaults for all
2159 * other parameters.
2160 *
2161 * @throws SecurityException if a security manager exists and
2162 * the caller is not permitted to modify threads
2163 * because it does not hold {@link
2164 * java.lang.RuntimePermission}{@code ("modifyThread")}
2165 */
2166 public ForkJoinPool() {
2167 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2168 defaultForkJoinWorkerThreadFactory, null, false,
2169 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2170 }
2171
2172 /**
2173 * Creates a {@code ForkJoinPool} with the indicated parallelism
2174 * level, using defaults for all other parameters.
2175 *
2176 * @param parallelism the parallelism level
2177 * @throws IllegalArgumentException if parallelism less than or
2178 * equal to zero, or greater than implementation limit
2179 * @throws SecurityException if a security manager exists and
2180 * the caller is not permitted to modify threads
2181 * because it does not hold {@link
2182 * java.lang.RuntimePermission}{@code ("modifyThread")}
2183 */
2184 public ForkJoinPool(int parallelism) {
2185 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2186 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2187 }
2188
2189 /**
2190 * Creates a {@code ForkJoinPool} with the given parameters (using
2191 * defaults for others).
2192 *
2193 * @param parallelism the parallelism level. For default value,
2194 * use {@link java.lang.Runtime#availableProcessors}.
2195 * @param factory the factory for creating new threads. For default value,
2196 * use {@link #defaultForkJoinWorkerThreadFactory}.
2197 * @param handler the handler for internal worker threads that
2198 * terminate due to unrecoverable errors encountered while executing
2199 * tasks. For default value, use {@code null}.
2200 * @param asyncMode if true,
2201 * establishes local first-in-first-out scheduling mode for forked
2202 * tasks that are never joined. This mode may be more appropriate
2203 * than default locally stack-based mode in applications in which
2204 * worker threads only process event-style asynchronous tasks.
2205 * For default value, use {@code false}.
2206 * @throws IllegalArgumentException if parallelism less than or
2207 * equal to zero, or greater than implementation limit
2208 * @throws NullPointerException if the factory is null
2209 * @throws SecurityException if a security manager exists and
2210 * the caller is not permitted to modify threads
2211 * because it does not hold {@link
2212 * java.lang.RuntimePermission}{@code ("modifyThread")}
2213 */
2214 public ForkJoinPool(int parallelism,
2215 ForkJoinWorkerThreadFactory factory,
2216 UncaughtExceptionHandler handler,
2217 boolean asyncMode) {
2218 this(parallelism, factory, handler, asyncMode,
2219 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2220 }
2221
2222 /**
2223 * Creates a {@code ForkJoinPool} with the given parameters.
2224 *
2225 * @param parallelism the parallelism level. For default value,
2226 * use {@link java.lang.Runtime#availableProcessors}.
2227 *
2228 * @param factory the factory for creating new threads. For
2229 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2230 *
2231 * @param handler the handler for internal worker threads that
2232 * terminate due to unrecoverable errors encountered while
2233 * executing tasks. For default value, use {@code null}.
2234 *
2235 * @param asyncMode if true, establishes local first-in-first-out
2236 * scheduling mode for forked tasks that are never joined. This
2237 * mode may be more appropriate than default locally stack-based
2238 * mode in applications in which worker threads only process
2239 * event-style asynchronous tasks. For default value, use {@code
2240 * false}.
2241 *
2242 * @param corePoolSize the number of threads to keep in the pool
2243 * (unless timed out after an elapsed keep-alive). Normally (and
2244 * by default) this is the same value as the parallelism level,
2245 * but may be set to a larger value to reduce dynamic overhead if
2246 * tasks regularly block. Using a smaller value (for example
2247 * {@code 0}) has the same effect as the default.
2248 *
2249 * @param maximumPoolSize the maximum number of threads allowed.
2250 * When the maximum is reached, attempts to replace blocked
2251 * threads fail. (However, because creation and termination of
2252 * different threads may overlap, and may be managed by the given
2253 * thread factory, this value may be transiently exceeded.) The
2254 * default for the common pool is {@code 256} plus the parallelism
2255 * level. Using a value (for example {@code Integer.MAX_VALUE})
2256 * larger than the implementation's total thread limit has the
2257 * same effect as using this limit.
2258 *
2259 * @param minimumRunnable the minimum allowed number of core
2260 * threads not blocked by a join or {@link ManagedBlocker}. To
2261 * ensure progress, when too few unblocked threads exist and
2262 * unexecuted tasks may exist, new threads are constructed, up to
2263 * the given maximumPoolSize. For the default value, use {@code
2264 * 1}, that ensures liveness. A larger value might improve
2265 * throughput in the presence of blocked activities, but might
2266 * not, due to increased overhead. A value of zero may be
2267 * acceptable when submitted tasks cannot have dependencies
2268 * requiring additional threads.
2269 *
2270 * @param rejectOnSaturation if true, attempts to create more than
2271 * the maximum total allowed threads throw {@link
2272 * RejectedExecutionException}. Otherwise, the pool continues to
2273 * operate, but with fewer than the target number of runnable
2274 * threads, so might not ensure progress. For default value, use
2275 * {@code true}.
2276 *
2277 * @param keepAliveTime the elapsed time since last use before
2278 * a thread is terminated (and then later replaced if needed).
2279 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2280 *
2281 * @param unit the time unit for the {@code keepAliveTime} argument
2282 *
2283 * @throws IllegalArgumentException if parallelism is less than or
2284 * equal to zero, or is greater than implementation limit,
2285 * or if maximumPoolSize is less than parallelism,
2286 * of if the keepAliveTime is less than or equal to zero.
2287 * @throws NullPointerException if the factory is null
2288 * @throws SecurityException if a security manager exists and
2289 * the caller is not permitted to modify threads
2290 * because it does not hold {@link
2291 * java.lang.RuntimePermission}{@code ("modifyThread")}
2292 * @since 9
2293 */
2294 public ForkJoinPool(int parallelism,
2295 ForkJoinWorkerThreadFactory factory,
2296 UncaughtExceptionHandler handler,
2297 boolean asyncMode,
2298 int corePoolSize,
2299 int maximumPoolSize,
2300 int minimumRunnable,
2301 boolean rejectOnSaturation,
2302 long keepAliveTime,
2303 TimeUnit unit) {
2304 // check, encode, pack parameters
2305 if (parallelism <= 0 || parallelism > MAX_CAP ||
2306 maximumPoolSize < parallelism || keepAliveTime <= 0L)
2307 throw new IllegalArgumentException();
2308 if (factory == null)
2309 throw new NullPointerException();
2310 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2311
2312 String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2313 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2314 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2315 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2316 int m = (parallelism |
2317 (asyncMode ? FIFO : 0) |
2318 (rejectOnSaturation ? 0 : SATURATE));
2319 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2320 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2321 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2322 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2323 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2324 n = (n + 1) << 1; // power of two, including space for submission queues
2325
2326 this.workQueues = new WorkQueue[n];
2327 this.workerNamePrefix = prefix;
2328 this.factory = factory;
2329 this.ueh = handler;
2330 this.keepAlive = ms;
2331 this.bounds = b;
2332 this.mode = m;
2333 this.ctl = c;
2334 checkPermission();
2335 }
2336
2337 /**
2338 * Constructor for common pool using parameters possibly
2339 * overridden by system properties
2340 */
2341 private ForkJoinPool(byte forCommonPoolOnly) {
2342 int parallelism = -1;
2343 ForkJoinWorkerThreadFactory fac = null;
2344 UncaughtExceptionHandler handler = null;
2345 try { // ignore exceptions in accessing/parsing properties
2346 String pp = System.getProperty
2347 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2348 String fp = System.getProperty
2349 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2350 String hp = System.getProperty
2351 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2352 if (pp != null)
2353 parallelism = Integer.parseInt(pp);
2354 if (fp != null)
2355 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2356 getSystemClassLoader().loadClass(fp).newInstance());
2357 if (hp != null)
2358 handler = ((UncaughtExceptionHandler)ClassLoader.
2359 getSystemClassLoader().loadClass(hp).newInstance());
2360 } catch (Exception ignore) {
2361 }
2362
2363 if (fac == null) {
2364 if (System.getSecurityManager() == null)
2365 fac = defaultForkJoinWorkerThreadFactory;
2366 else // use security-managed default
2367 fac = new InnocuousForkJoinWorkerThreadFactory();
2368 }
2369 if (parallelism < 0 && // default 1 less than #cores
2370 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2371 parallelism = 1;
2372 if (parallelism > MAX_CAP)
2373 parallelism = MAX_CAP;
2374
2375 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2376 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2377 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2378 int m = (parallelism < 1) ? 1 : parallelism;
2379 int n = (parallelism > 1) ? parallelism - 1 : 1;
2380 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2381 n = (n + 1) << 1;
2382
2383 this.workQueues = new WorkQueue[n];
2384 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2385 this.factory = fac;
2386 this.ueh = handler;
2387 this.keepAlive = DEFAULT_KEEPALIVE;
2388 this.bounds = b;
2389 this.mode = m;
2390 this.ctl = c;
2391 }
2392
2393 /**
2394 * Returns the common pool instance. This pool is statically
2395 * constructed; its run state is unaffected by attempts to {@link
2396 * #shutdown} or {@link #shutdownNow}. However this pool and any
2397 * ongoing processing are automatically terminated upon program
2398 * {@link System#exit}. Any program that relies on asynchronous
2399 * task processing to complete before program termination should
2400 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2401 * before exit.
2402 *
2403 * @return the common pool instance
2404 * @since 1.8
2405 */
2406 public static ForkJoinPool commonPool() {
2407 // assert common != null : "static init error";
2408 return common;
2409 }
2410
2411 // Execution methods
2412
2413 /**
2414 * Performs the given task, returning its result upon completion.
2415 * If the computation encounters an unchecked Exception or Error,
2416 * it is rethrown as the outcome of this invocation. Rethrown
2417 * exceptions behave in the same way as regular exceptions, but,
2418 * when possible, contain stack traces (as displayed for example
2419 * using {@code ex.printStackTrace()}) of both the current thread
2420 * as well as the thread actually encountering the exception;
2421 * minimally only the latter.
2422 *
2423 * @param task the task
2424 * @param <T> the type of the task's result
2425 * @return the task's result
2426 * @throws NullPointerException if the task is null
2427 * @throws RejectedExecutionException if the task cannot be
2428 * scheduled for execution
2429 */
2430 public <T> T invoke(ForkJoinTask<T> task) {
2431 if (task == null)
2432 throw new NullPointerException();
2433 externalSubmit(task);
2434 return task.join();
2435 }
2436
2437 /**
2438 * Arranges for (asynchronous) execution of the given task.
2439 *
2440 * @param task the task
2441 * @throws NullPointerException if the task is null
2442 * @throws RejectedExecutionException if the task cannot be
2443 * scheduled for execution
2444 */
2445 public void execute(ForkJoinTask<?> task) {
2446 externalSubmit(task);
2447 }
2448
2449 // AbstractExecutorService methods
2450
2451 /**
2452 * @throws NullPointerException if the task is null
2453 * @throws RejectedExecutionException if the task cannot be
2454 * scheduled for execution
2455 */
2456 public void execute(Runnable task) {
2457 if (task == null)
2458 throw new NullPointerException();
2459 ForkJoinTask<?> job;
2460 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2461 job = (ForkJoinTask<?>) task;
2462 else
2463 job = new ForkJoinTask.RunnableExecuteAction(task);
2464 externalSubmit(job);
2465 }
2466
2467 /**
2468 * Submits a ForkJoinTask for execution.
2469 *
2470 * @param task the task to submit
2471 * @param <T> the type of the task's result
2472 * @return the task
2473 * @throws NullPointerException if the task is null
2474 * @throws RejectedExecutionException if the task cannot be
2475 * scheduled for execution
2476 */
2477 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2478 return externalSubmit(task);
2479 }
2480
2481 /**
2482 * @throws NullPointerException if the task is null
2483 * @throws RejectedExecutionException if the task cannot be
2484 * scheduled for execution
2485 */
2486 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2487 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2488 }
2489
2490 /**
2491 * @throws NullPointerException if the task is null
2492 * @throws RejectedExecutionException if the task cannot be
2493 * scheduled for execution
2494 */
2495 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2496 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2497 }
2498
2499 /**
2500 * @throws NullPointerException if the task is null
2501 * @throws RejectedExecutionException if the task cannot be
2502 * scheduled for execution
2503 */
2504 public ForkJoinTask<?> submit(Runnable task) {
2505 if (task == null)
2506 throw new NullPointerException();
2507 ForkJoinTask<?> job;
2508 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2509 job = (ForkJoinTask<?>) task;
2510 else
2511 job = new ForkJoinTask.AdaptedRunnableAction(task);
2512 return externalSubmit(job);
2513 }
2514
2515 /**
2516 * @throws NullPointerException {@inheritDoc}
2517 * @throws RejectedExecutionException {@inheritDoc}
2518 */
2519 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2520 // In previous versions of this class, this method constructed
2521 // a task to run ForkJoinTask.invokeAll, but now external
2522 // invocation of multiple tasks is at least as efficient.
2523 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2524
2525 try {
2526 for (Callable<T> t : tasks) {
2527 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2528 futures.add(f);
2529 externalSubmit(f);
2530 }
2531 for (int i = 0, size = futures.size(); i < size; i++)
2532 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2533 return futures;
2534 } catch (Throwable t) {
2535 for (int i = 0, size = futures.size(); i < size; i++)
2536 futures.get(i).cancel(false);
2537 throw t;
2538 }
2539 }
2540
2541 /**
2542 * Returns the factory used for constructing new workers.
2543 *
2544 * @return the factory used for constructing new workers
2545 */
2546 public ForkJoinWorkerThreadFactory getFactory() {
2547 return factory;
2548 }
2549
2550 /**
2551 * Returns the handler for internal worker threads that terminate
2552 * due to unrecoverable errors encountered while executing tasks.
2553 *
2554 * @return the handler, or {@code null} if none
2555 */
2556 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2557 return ueh;
2558 }
2559
2560 /**
2561 * Returns the targeted parallelism level of this pool.
2562 *
2563 * @return the targeted parallelism level of this pool
2564 */
2565 public int getParallelism() {
2566 return mode & SMASK;
2567 }
2568
2569 /**
2570 * Returns the targeted parallelism level of the common pool.
2571 *
2572 * @return the targeted parallelism level of the common pool
2573 * @since 1.8
2574 */
2575 public static int getCommonPoolParallelism() {
2576 return COMMON_PARALLELISM;
2577 }
2578
2579 /**
2580 * Returns the number of worker threads that have started but not
2581 * yet terminated. The result returned by this method may differ
2582 * from {@link #getParallelism} when threads are created to
2583 * maintain parallelism when others are cooperatively blocked.
2584 *
2585 * @return the number of worker threads
2586 */
2587 public int getPoolSize() {
2588 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2589 }
2590
2591 /**
2592 * Returns {@code true} if this pool uses local first-in-first-out
2593 * scheduling mode for forked tasks that are never joined.
2594 *
2595 * @return {@code true} if this pool uses async mode
2596 */
2597 public boolean getAsyncMode() {
2598 return (mode & FIFO) != 0;
2599 }
2600
2601 /**
2602 * Returns an estimate of the number of worker threads that are
2603 * not blocked waiting to join tasks or for other managed
2604 * synchronization. This method may overestimate the
2605 * number of running threads.
2606 *
2607 * @return the number of worker threads
2608 */
2609 public int getRunningThreadCount() {
2610 int rc = 0;
2611 WorkQueue[] ws; WorkQueue w;
2612 if ((ws = workQueues) != null) {
2613 for (int i = 1; i < ws.length; i += 2) {
2614 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2615 ++rc;
2616 }
2617 }
2618 return rc;
2619 }
2620
2621 /**
2622 * Returns an estimate of the number of threads that are currently
2623 * stealing or executing tasks. This method may overestimate the
2624 * number of active threads.
2625 *
2626 * @return the number of active threads
2627 */
2628 public int getActiveThreadCount() {
2629 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2630 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2631 }
2632
2633 /**
2634 * Returns {@code true} if all worker threads are currently idle.
2635 * An idle worker is one that cannot obtain a task to execute
2636 * because none are available to steal from other threads, and
2637 * there are no pending submissions to the pool. This method is
2638 * conservative; it might not return {@code true} immediately upon
2639 * idleness of all threads, but will eventually become true if
2640 * threads remain inactive.
2641 *
2642 * @return {@code true} if all threads are currently idle
2643 */
2644 public boolean isQuiescent() {
2645 for (;;) {
2646 long c = ctl;
2647 int md = mode, pc = md & SMASK;
2648 int tc = pc + (short)(c >> TC_SHIFT);
2649 int rc = pc + (int)(c >> RC_SHIFT);
2650 if ((md & (STOP | TERMINATED)) != 0)
2651 return true;
2652 else if (rc > 0)
2653 return false;
2654 else {
2655 WorkQueue[] ws; WorkQueue v;
2656 if ((ws = workQueues) != null) {
2657 for (int i = 1; i < ws.length; i += 2) {
2658 if ((v = ws[i]) != null) {
2659 if ((v.source & QUIET) == 0)
2660 return false;
2661 --tc;
2662 }
2663 }
2664 }
2665 if (tc == 0 && ctl == c)
2666 return true;
2667 }
2668 }
2669 }
2670
2671 /**
2672 * Returns an estimate of the total number of tasks stolen from
2673 * one thread's work queue by another. The reported value
2674 * underestimates the actual total number of steals when the pool
2675 * is not quiescent. This value may be useful for monitoring and
2676 * tuning fork/join programs: in general, steal counts should be
2677 * high enough to keep threads busy, but low enough to avoid
2678 * overhead and contention across threads.
2679 *
2680 * @return the number of steals
2681 */
2682 public long getStealCount() {
2683 long count = stealCount;
2684 WorkQueue[] ws; WorkQueue w;
2685 if ((ws = workQueues) != null) {
2686 for (int i = 1; i < ws.length; i += 2) {
2687 if ((w = ws[i]) != null)
2688 count += (long)w.nsteals & 0xffffffffL;
2689 }
2690 }
2691 return count;
2692 }
2693
2694 /**
2695 * Returns an estimate of the total number of tasks currently held
2696 * in queues by worker threads (but not including tasks submitted
2697 * to the pool that have not begun executing). This value is only
2698 * an approximation, obtained by iterating across all threads in
2699 * the pool. This method may be useful for tuning task
2700 * granularities.
2701 *
2702 * @return the number of queued tasks
2703 */
2704 public long getQueuedTaskCount() {
2705 long count = 0;
2706 WorkQueue[] ws; WorkQueue w;
2707 if ((ws = workQueues) != null) {
2708 for (int i = 1; i < ws.length; i += 2) {
2709 if ((w = ws[i]) != null)
2710 count += w.queueSize();
2711 }
2712 }
2713 return count;
2714 }
2715
2716 /**
2717 * Returns an estimate of the number of tasks submitted to this
2718 * pool that have not yet begun executing. This method may take
2719 * time proportional to the number of submissions.
2720 *
2721 * @return the number of queued submissions
2722 */
2723 public int getQueuedSubmissionCount() {
2724 int count = 0;
2725 WorkQueue[] ws; WorkQueue w;
2726 if ((ws = workQueues) != null) {
2727 for (int i = 0; i < ws.length; i += 2) {
2728 if ((w = ws[i]) != null)
2729 count += w.queueSize();
2730 }
2731 }
2732 return count;
2733 }
2734
2735 /**
2736 * Returns {@code true} if there are any tasks submitted to this
2737 * pool that have not yet begun executing.
2738 *
2739 * @return {@code true} if there are any queued submissions
2740 */
2741 public boolean hasQueuedSubmissions() {
2742 WorkQueue[] ws; WorkQueue w;
2743 if ((ws = workQueues) != null) {
2744 for (int i = 0; i < ws.length; i += 2) {
2745 if ((w = ws[i]) != null && !w.isEmpty())
2746 return true;
2747 }
2748 }
2749 return false;
2750 }
2751
2752 /**
2753 * Removes and returns the next unexecuted submission if one is
2754 * available. This method may be useful in extensions to this
2755 * class that re-assign work in systems with multiple pools.
2756 *
2757 * @return the next submission, or {@code null} if none
2758 */
2759 protected ForkJoinTask<?> pollSubmission() {
2760 return pollScan(true);
2761 }
2762
2763 /**
2764 * Removes all available unexecuted submitted and forked tasks
2765 * from scheduling queues and adds them to the given collection,
2766 * without altering their execution status. These may include
2767 * artificially generated or wrapped tasks. This method is
2768 * designed to be invoked only when the pool is known to be
2769 * quiescent. Invocations at other times may not remove all
2770 * tasks. A failure encountered while attempting to add elements
2771 * to collection {@code c} may result in elements being in
2772 * neither, either or both collections when the associated
2773 * exception is thrown. The behavior of this operation is
2774 * undefined if the specified collection is modified while the
2775 * operation is in progress.
2776 *
2777 * @param c the collection to transfer elements into
2778 * @return the number of elements transferred
2779 */
2780 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2781 int count = 0;
2782 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2783 if ((ws = workQueues) != null) {
2784 for (int i = 0; i < ws.length; ++i) {
2785 if ((w = ws[i]) != null) {
2786 while ((t = w.poll()) != null) {
2787 c.add(t);
2788 ++count;
2789 }
2790 }
2791 }
2792 }
2793 return count;
2794 }
2795
2796 /**
2797 * Returns a string identifying this pool, as well as its state,
2798 * including indications of run state, parallelism level, and
2799 * worker and task counts.
2800 *
2801 * @return a string identifying this pool, as well as its state
2802 */
2803 public String toString() {
2804 // Use a single pass through workQueues to collect counts
2805 long qt = 0L, qs = 0L; int rc = 0;
2806 long st = stealCount;
2807 WorkQueue[] ws; WorkQueue w;
2808 if ((ws = workQueues) != null) {
2809 for (int i = 0; i < ws.length; ++i) {
2810 if ((w = ws[i]) != null) {
2811 int size = w.queueSize();
2812 if ((i & 1) == 0)
2813 qs += size;
2814 else {
2815 qt += size;
2816 st += (long)w.nsteals & 0xffffffffL;
2817 if (w.isApparentlyUnblocked())
2818 ++rc;
2819 }
2820 }
2821 }
2822 }
2823
2824 int md = mode;
2825 int pc = (md & SMASK);
2826 long c = ctl;
2827 int tc = pc + (short)(c >>> TC_SHIFT);
2828 int ac = pc + (int)(c >> RC_SHIFT);
2829 if (ac < 0) // ignore transient negative
2830 ac = 0;
2831 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2832 (md & STOP) != 0 ? "Terminating" :
2833 (md & SHUTDOWN) != 0 ? "Shutting down" :
2834 "Running");
2835 return super.toString() +
2836 "[" + level +
2837 ", parallelism = " + pc +
2838 ", size = " + tc +
2839 ", active = " + ac +
2840 ", running = " + rc +
2841 ", steals = " + st +
2842 ", tasks = " + qt +
2843 ", submissions = " + qs +
2844 "]";
2845 }
2846
2847 /**
2848 * Possibly initiates an orderly shutdown in which previously
2849 * submitted tasks are executed, but no new tasks will be
2850 * accepted. Invocation has no effect on execution state if this
2851 * is the {@link #commonPool()}, and no additional effect if
2852 * already shut down. Tasks that are in the process of being
2853 * submitted concurrently during the course of this method may or
2854 * may not be rejected.
2855 *
2856 * @throws SecurityException if a security manager exists and
2857 * the caller is not permitted to modify threads
2858 * because it does not hold {@link
2859 * java.lang.RuntimePermission}{@code ("modifyThread")}
2860 */
2861 public void shutdown() {
2862 checkPermission();
2863 tryTerminate(false, true);
2864 }
2865
2866 /**
2867 * Possibly attempts to cancel and/or stop all tasks, and reject
2868 * all subsequently submitted tasks. Invocation has no effect on
2869 * execution state if this is the {@link #commonPool()}, and no
2870 * additional effect if already shut down. Otherwise, tasks that
2871 * are in the process of being submitted or executed concurrently
2872 * during the course of this method may or may not be
2873 * rejected. This method cancels both existing and unexecuted
2874 * tasks, in order to permit termination in the presence of task
2875 * dependencies. So the method always returns an empty list
2876 * (unlike the case for some other Executors).
2877 *
2878 * @return an empty list
2879 * @throws SecurityException if a security manager exists and
2880 * the caller is not permitted to modify threads
2881 * because it does not hold {@link
2882 * java.lang.RuntimePermission}{@code ("modifyThread")}
2883 */
2884 public List<Runnable> shutdownNow() {
2885 checkPermission();
2886 tryTerminate(true, true);
2887 return Collections.emptyList();
2888 }
2889
2890 /**
2891 * Returns {@code true} if all tasks have completed following shut down.
2892 *
2893 * @return {@code true} if all tasks have completed following shut down
2894 */
2895 public boolean isTerminated() {
2896 return (mode & TERMINATED) != 0;
2897 }
2898
2899 /**
2900 * Returns {@code true} if the process of termination has
2901 * commenced but not yet completed. This method may be useful for
2902 * debugging. A return of {@code true} reported a sufficient
2903 * period after shutdown may indicate that submitted tasks have
2904 * ignored or suppressed interruption, or are waiting for I/O,
2905 * causing this executor not to properly terminate. (See the
2906 * advisory notes for class {@link ForkJoinTask} stating that
2907 * tasks should not normally entail blocking operations. But if
2908 * they do, they must abort them on interrupt.)
2909 *
2910 * @return {@code true} if terminating but not yet terminated
2911 */
2912 public boolean isTerminating() {
2913 int md = mode;
2914 return (md & STOP) != 0 && (md & TERMINATED) == 0;
2915 }
2916
2917 /**
2918 * Returns {@code true} if this pool has been shut down.
2919 *
2920 * @return {@code true} if this pool has been shut down
2921 */
2922 public boolean isShutdown() {
2923 return (mode & SHUTDOWN) != 0;
2924 }
2925
2926 /**
2927 * Blocks until all tasks have completed execution after a
2928 * shutdown request, or the timeout occurs, or the current thread
2929 * is interrupted, whichever happens first. Because the {@link
2930 * #commonPool()} never terminates until program shutdown, when
2931 * applied to the common pool, this method is equivalent to {@link
2932 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2933 *
2934 * @param timeout the maximum time to wait
2935 * @param unit the time unit of the timeout argument
2936 * @return {@code true} if this executor terminated and
2937 * {@code false} if the timeout elapsed before termination
2938 * @throws InterruptedException if interrupted while waiting
2939 */
2940 public boolean awaitTermination(long timeout, TimeUnit unit)
2941 throws InterruptedException {
2942 if (Thread.interrupted())
2943 throw new InterruptedException();
2944 if (this == common) {
2945 awaitQuiescence(timeout, unit);
2946 return false;
2947 }
2948 long nanos = unit.toNanos(timeout);
2949 if (isTerminated())
2950 return true;
2951 if (nanos <= 0L)
2952 return false;
2953 long deadline = System.nanoTime() + nanos;
2954 synchronized (this) {
2955 for (;;) {
2956 if (isTerminated())
2957 return true;
2958 if (nanos <= 0L)
2959 return false;
2960 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2961 wait(millis > 0L ? millis : 1L);
2962 nanos = deadline - System.nanoTime();
2963 }
2964 }
2965 }
2966
2967 /**
2968 * If called by a ForkJoinTask operating in this pool, equivalent
2969 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2970 * waits and/or attempts to assist performing tasks until this
2971 * pool {@link #isQuiescent} or the indicated timeout elapses.
2972 *
2973 * @param timeout the maximum time to wait
2974 * @param unit the time unit of the timeout argument
2975 * @return {@code true} if quiescent; {@code false} if the
2976 * timeout elapsed.
2977 */
2978 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2979 long nanos = unit.toNanos(timeout);
2980 ForkJoinWorkerThread wt;
2981 Thread thread = Thread.currentThread();
2982 if ((thread instanceof ForkJoinWorkerThread) &&
2983 (wt = (ForkJoinWorkerThread)thread).pool == this) {
2984 helpQuiescePool(wt.workQueue);
2985 return true;
2986 }
2987 else {
2988 for (long startTime = System.nanoTime();;) {
2989 ForkJoinTask<?> t;
2990 if ((t = pollScan(false)) != null)
2991 t.doExec();
2992 else if (isQuiescent())
2993 return true;
2994 else if ((System.nanoTime() - startTime) > nanos)
2995 return false;
2996 else
2997 Thread.yield(); // cannot block
2998 }
2999 }
3000 }
3001
3002 /**
3003 * Waits and/or attempts to assist performing tasks indefinitely
3004 * until the {@link #commonPool()} {@link #isQuiescent}.
3005 */
3006 static void quiesceCommonPool() {
3007 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3008 }
3009
3010 /**
3011 * Interface for extending managed parallelism for tasks running
3012 * in {@link ForkJoinPool}s.
3013 *
3014 * <p>A {@code ManagedBlocker} provides two methods. Method
3015 * {@link #isReleasable} must return {@code true} if blocking is
3016 * not necessary. Method {@link #block} blocks the current thread
3017 * if necessary (perhaps internally invoking {@code isReleasable}
3018 * before actually blocking). These actions are performed by any
3019 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3020 * The unusual methods in this API accommodate synchronizers that
3021 * may, but don't usually, block for long periods. Similarly, they
3022 * allow more efficient internal handling of cases in which
3023 * additional workers may be, but usually are not, needed to
3024 * ensure sufficient parallelism. Toward this end,
3025 * implementations of method {@code isReleasable} must be amenable
3026 * to repeated invocation.
3027 *
3028 * <p>For example, here is a ManagedBlocker based on a
3029 * ReentrantLock:
3030 * <pre> {@code
3031 * class ManagedLocker implements ManagedBlocker {
3032 * final ReentrantLock lock;
3033 * boolean hasLock = false;
3034 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3035 * public boolean block() {
3036 * if (!hasLock)
3037 * lock.lock();
3038 * return true;
3039 * }
3040 * public boolean isReleasable() {
3041 * return hasLock || (hasLock = lock.tryLock());
3042 * }
3043 * }}</pre>
3044 *
3045 * <p>Here is a class that possibly blocks waiting for an
3046 * item on a given queue:
3047 * <pre> {@code
3048 * class QueueTaker<E> implements ManagedBlocker {
3049 * final BlockingQueue<E> queue;
3050 * volatile E item = null;
3051 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3052 * public boolean block() throws InterruptedException {
3053 * if (item == null)
3054 * item = queue.take();
3055 * return true;
3056 * }
3057 * public boolean isReleasable() {
3058 * return item != null || (item = queue.poll()) != null;
3059 * }
3060 * public E getItem() { // call after pool.managedBlock completes
3061 * return item;
3062 * }
3063 * }}</pre>
3064 */
3065 public static interface ManagedBlocker {
3066 /**
3067 * Possibly blocks the current thread, for example waiting for
3068 * a lock or condition.
3069 *
3070 * @return {@code true} if no additional blocking is necessary
3071 * (i.e., if isReleasable would return true)
3072 * @throws InterruptedException if interrupted while waiting
3073 * (the method is not required to do so, but is allowed to)
3074 */
3075 boolean block() throws InterruptedException;
3076
3077 /**
3078 * Returns {@code true} if blocking is unnecessary.
3079 * @return {@code true} if blocking is unnecessary
3080 */
3081 boolean isReleasable();
3082 }
3083
3084 /**
3085 * Runs the given possibly blocking task. When {@linkplain
3086 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3087 * method possibly arranges for a spare thread to be activated if
3088 * necessary to ensure sufficient parallelism while the current
3089 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3090 *
3091 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3092 * {@code blocker.block()} until either method returns {@code true}.
3093 * Every call to {@code blocker.block()} is preceded by a call to
3094 * {@code blocker.isReleasable()} that returned {@code false}.
3095 *
3096 * <p>If not running in a ForkJoinPool, this method is
3097 * behaviorally equivalent to
3098 * <pre> {@code
3099 * while (!blocker.isReleasable())
3100 * if (blocker.block())
3101 * break;}</pre>
3102 *
3103 * If running in a ForkJoinPool, the pool may first be expanded to
3104 * ensure sufficient parallelism available during the call to
3105 * {@code blocker.block()}.
3106 *
3107 * @param blocker the blocker task
3108 * @throws InterruptedException if {@code blocker.block()} did so
3109 */
3110 public static void managedBlock(ManagedBlocker blocker)
3111 throws InterruptedException {
3112 ForkJoinPool p;
3113 ForkJoinWorkerThread wt;
3114 WorkQueue w;
3115 Thread t = Thread.currentThread();
3116 if ((t instanceof ForkJoinWorkerThread) &&
3117 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3118 (w = wt.workQueue) != null) {
3119 int block;
3120 while (!blocker.isReleasable()) {
3121 if ((block = p.tryCompensate(w)) != 0) {
3122 try {
3123 do {} while (!blocker.isReleasable() &&
3124 !blocker.block());
3125 } finally {
3126 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3127 }
3128 break;
3129 }
3130 }
3131 }
3132 else {
3133 do {} while (!blocker.isReleasable() &&
3134 !blocker.block());
3135 }
3136 }
3137
3138 // AbstractExecutorService overrides. These rely on undocumented
3139 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3140 // implement RunnableFuture.
3141
3142 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3143 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3144 }
3145
3146 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3147 return new ForkJoinTask.AdaptedCallable<T>(callable);
3148 }
3149
3150 // Unsafe mechanics
3151 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3152 private static final long CTL;
3153 private static final long MODE;
3154 private static final int ABASE;
3155 private static final int ASHIFT;
3156
3157 static {
3158 try {
3159 CTL = U.objectFieldOffset
3160 (ForkJoinPool.class.getDeclaredField("ctl"));
3161 MODE = U.objectFieldOffset
3162 (ForkJoinPool.class.getDeclaredField("mode"));
3163 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3164 int scale = U.arrayIndexScale(ForkJoinTask[].class);
3165 if ((scale & (scale - 1)) != 0)
3166 throw new Error("array index scale not a power of two");
3167 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3168 } catch (ReflectiveOperationException e) {
3169 throw new Error(e);
3170 }
3171
3172 // Reduce the risk of rare disastrous classloading in first call to
3173 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3174 Class<?> ensureLoaded = LockSupport.class;
3175
3176 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3177 try {
3178 String p = System.getProperty
3179 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3180 if (p != null)
3181 commonMaxSpares = Integer.parseInt(p);
3182 } catch (Exception ignore) {}
3183 COMMON_MAX_SPARES = commonMaxSpares;
3184
3185 defaultForkJoinWorkerThreadFactory =
3186 new DefaultForkJoinWorkerThreadFactory();
3187 modifyThreadPermission = new RuntimePermission("modifyThread");
3188
3189 common = java.security.AccessController.doPrivileged
3190 (new java.security.PrivilegedAction<ForkJoinPool>() {
3191 public ForkJoinPool run() {
3192 return new ForkJoinPool((byte)0); }});
3193
3194 COMMON_PARALLELISM = common.mode & SMASK;
3195 }
3196
3197 /**
3198 * Factory for innocuous worker threads.
3199 */
3200 private static final class InnocuousForkJoinWorkerThreadFactory
3201 implements ForkJoinWorkerThreadFactory {
3202
3203 /**
3204 * An ACC to restrict permissions for the factory itself.
3205 * The constructed workers have no permissions set.
3206 */
3207 private static final AccessControlContext innocuousAcc;
3208 static {
3209 Permissions innocuousPerms = new Permissions();
3210 innocuousPerms.add(modifyThreadPermission);
3211 innocuousPerms.add(new RuntimePermission(
3212 "enableContextClassLoaderOverride"));
3213 innocuousPerms.add(new RuntimePermission(
3214 "modifyThreadGroup"));
3215 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3216 new ProtectionDomain(null, innocuousPerms)
3217 });
3218 }
3219
3220 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3221 return java.security.AccessController.doPrivileged(
3222 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3223 public ForkJoinWorkerThread run() {
3224 return new ForkJoinWorkerThread.
3225 InnocuousForkJoinWorkerThread(pool);
3226 }}, innocuousAcc);
3227 }
3228 }
3229
3230 }