ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.207
Committed: Tue Jul 8 19:09:41 2014 UTC (9 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.206: +3 -1 lines
Log Message:
More termination compatibility

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.AbstractExecutorService;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.RunnableFuture;
21 import java.util.concurrent.ThreadLocalRandom;
22 import java.util.concurrent.TimeUnit;
23 import java.security.AccessControlContext;
24 import java.security.ProtectionDomain;
25 import java.security.Permissions;
26
27 /**
28 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
29 * A {@code ForkJoinPool} provides the entry point for submissions
30 * from non-{@code ForkJoinTask} clients, as well as management and
31 * monitoring operations.
32 *
33 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
34 * ExecutorService} mainly by virtue of employing
35 * <em>work-stealing</em>: all threads in the pool attempt to find and
36 * execute tasks submitted to the pool and/or created by other active
37 * tasks (eventually blocking waiting for work if none exist). This
38 * enables efficient processing when most tasks spawn other subtasks
39 * (as do most {@code ForkJoinTask}s), as well as when many small
40 * tasks are submitted to the pool from external clients. Especially
41 * when setting <em>asyncMode</em> to true in constructors, {@code
42 * ForkJoinPool}s may also be appropriate for use with event-style
43 * tasks that are never joined.
44 *
45 * <p>A static {@link #commonPool()} is available and appropriate for
46 * most applications. The common pool is used by any ForkJoinTask that
47 * is not explicitly submitted to a specified pool. Using the common
48 * pool normally reduces resource usage (its threads are slowly
49 * reclaimed during periods of non-use, and reinstated upon subsequent
50 * use).
51 *
52 * <p>For applications that require separate or custom pools, a {@code
53 * ForkJoinPool} may be constructed with a given target parallelism
54 * level; by default, equal to the number of available processors. The
55 * pool attempts to maintain enough active (or available) threads by
56 * dynamically adding, suspending, or resuming internal worker
57 * threads, even if some tasks are stalled waiting to join others.
58 * However, no such adjustments are guaranteed in the face of blocked
59 * I/O or other unmanaged synchronization. The nested {@link
60 * ManagedBlocker} interface enables extension of the kinds of
61 * synchronization accommodated.
62 *
63 * <p>In addition to execution and lifecycle control methods, this
64 * class provides status check methods (for example
65 * {@link #getStealCount}) that are intended to aid in developing,
66 * tuning, and monitoring fork/join applications. Also, method
67 * {@link #toString} returns indications of pool state in a
68 * convenient form for informal monitoring.
69 *
70 * <p>As is the case with other ExecutorServices, there are three
71 * main task execution methods summarized in the following table.
72 * These are designed to be used primarily by clients not already
73 * engaged in fork/join computations in the current pool. The main
74 * forms of these methods accept instances of {@code ForkJoinTask},
75 * but overloaded forms also allow mixed execution of plain {@code
76 * Runnable}- or {@code Callable}- based activities as well. However,
77 * tasks that are already executing in a pool should normally instead
78 * use the within-computation forms listed in the table unless using
79 * async event-style tasks that are not usually joined, in which case
80 * there is little difference among choice of methods.
81 *
82 * <table BORDER CELLPADDING=3 CELLSPACING=1>
83 * <caption>Summary of task execution methods</caption>
84 * <tr>
85 * <td></td>
86 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
87 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
88 * </tr>
89 * <tr>
90 * <td> <b>Arrange async execution</b></td>
91 * <td> {@link #execute(ForkJoinTask)}</td>
92 * <td> {@link ForkJoinTask#fork}</td>
93 * </tr>
94 * <tr>
95 * <td> <b>Await and obtain result</b></td>
96 * <td> {@link #invoke(ForkJoinTask)}</td>
97 * <td> {@link ForkJoinTask#invoke}</td>
98 * </tr>
99 * <tr>
100 * <td> <b>Arrange exec and obtain Future</b></td>
101 * <td> {@link #submit(ForkJoinTask)}</td>
102 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
103 * </tr>
104 * </table>
105 *
106 * <p>The common pool is by default constructed with default
107 * parameters, but these may be controlled by setting three
108 * {@linkplain System#getProperty system properties}:
109 * <ul>
110 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
111 * - the parallelism level, a non-negative integer
112 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
113 * - the class name of a {@link ForkJoinWorkerThreadFactory}
114 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
115 * - the class name of a {@link UncaughtExceptionHandler}
116 * </ul>
117 * If a {@link SecurityManager} is present and no factory is
118 * specified, then the default pool uses a factory supplying
119 * threads that have no {@link Permissions} enabled.
120 * The system class loader is used to load these classes.
121 * Upon any error in establishing these settings, default parameters
122 * are used. It is possible to disable or limit the use of threads in
123 * the common pool by setting the parallelism property to zero, and/or
124 * using a factory that may return {@code null}. However doing so may
125 * cause unjoined tasks to never be executed.
126 *
127 * <p><b>Implementation notes</b>: This implementation restricts the
128 * maximum number of running threads to 32767. Attempts to create
129 * pools with greater than the maximum number result in
130 * {@code IllegalArgumentException}.
131 *
132 * <p>This implementation rejects submitted tasks (that is, by throwing
133 * {@link RejectedExecutionException}) only when the pool is shut down
134 * or internal resources have been exhausted.
135 *
136 * @since 1.7
137 * @author Doug Lea
138 */
139 @sun.misc.Contended
140 public class ForkJoinPool extends AbstractExecutorService {
141
142 /*
143 * Implementation Overview
144 *
145 * This class and its nested classes provide the main
146 * functionality and control for a set of worker threads:
147 * Submissions from non-FJ threads enter into submission queues.
148 * Workers take these tasks and typically split them into subtasks
149 * that may be stolen by other workers. Preference rules give
150 * first priority to processing tasks from their own queues (LIFO
151 * or FIFO, depending on mode), then to randomized FIFO steals of
152 * tasks in other queues. This framework began as vehicle for
153 * supporting tree-structured parallelism using work-stealing.
154 * Over time, its scalability advantages led to extensions and
155 * changes to better support more diverse usage contexts.
156 *
157 * WorkQueues
158 * ==========
159 *
160 * Most operations occur within work-stealing queues (in nested
161 * class WorkQueue). These are special forms of Deques that
162 * support only three of the four possible end-operations -- push,
163 * pop, and poll (aka steal), under the further constraints that
164 * push and pop are called only from the owning thread (or, as
165 * extended here, under a lock), while poll may be called from
166 * other threads. (If you are unfamiliar with them, you probably
167 * want to read Herlihy and Shavit's book "The Art of
168 * Multiprocessor programming", chapter 16 describing these in
169 * more detail before proceeding.) The main work-stealing queue
170 * design is roughly similar to those in the papers "Dynamic
171 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
172 * (http://research.sun.com/scalable/pubs/index.html) and
173 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
174 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
175 * The main differences ultimately stem from GC requirements that
176 * we null out taken slots as soon as we can, to maintain as small
177 * a footprint as possible even in programs generating huge
178 * numbers of tasks. To accomplish this, we shift the CAS
179 * arbitrating pop vs poll (steal) from being on the indices
180 * ("base" and "top") to the slots themselves.
181 *
182 * Adding tasks then takes the form of a classic array push(task):
183 * q.array[q.top] = task; ++q.top;
184 *
185 * (The actual code needs to null-check and size-check the array,
186 * properly fence the accesses, and possibly signal waiting
187 * workers to start scanning -- see below.) Both a successful pop
188 * and poll mainly entail a CAS of a slot from non-null to null.
189 *
190 * The pop operation (always performed by owner) is:
191 * if ((base != top) and
192 * (the task at top slot is not null) and
193 * (CAS slot to null))
194 * decrement top and return task;
195 *
196 * And the poll operation (usually by a stealer) is
197 * if ((base != top) and
198 * (the task at base slot is not null) and
199 * (base has not changed) and
200 * (CAS slot to null))
201 * increment base and return task;
202 *
203 * Because we rely on CASes of references, we do not need tag bits
204 * on base or top. They are simple ints as used in any circular
205 * array-based queue (see for example ArrayDeque). Updates to the
206 * indices guarantee that top == base means the queue is empty,
207 * but otherwise may err on the side of possibly making the queue
208 * appear nonempty when a push, pop, or poll have not fully
209 * committed. (Method isEmpty() checks the case of a partially
210 * completed removal of the last element.) Note that this means
211 * that the poll operation, considered individually, is not
212 * wait-free. One thief cannot successfully continue until another
213 * in-progress one (or, if previously empty, a push) completes.
214 * However, in the aggregate, we ensure at least probabilistic
215 * non-blockingness. If an attempted steal fails, a thief always
216 * chooses a different random victim target to try next. So, in
217 * order for one thief to progress, it suffices for any
218 * in-progress poll or new push on any empty queue to
219 * complete. (This is why we normally use method pollAt and its
220 * variants that try once at the apparent base index, else
221 * consider alternative actions, rather than method poll, which
222 * retries.)
223 *
224 * This approach also enables support of a user mode in which
225 * local task processing is in FIFO, not LIFO order, simply by
226 * using poll rather than pop. This can be useful in
227 * message-passing frameworks in which tasks are never joined.
228 * However neither mode considers affinities, loads, cache
229 * localities, etc, so rarely provide the best possible
230 * performance on a given machine, but portably provide good
231 * throughput by averaging over these factors. Further, even if
232 * we did try to use such information, we do not usually have a
233 * basis for exploiting it. For example, some sets of tasks
234 * profit from cache affinities, but others are harmed by cache
235 * pollution effects. Additionally, even though it requires
236 * scanning, long-term throughput is often best using random
237 * selection rather than directed selection policies, so cheap
238 * randomization of sufficient quality is used whenever
239 * applicable. Various Marsaglia XorShifts (some with different
240 * shift constants) are inlined at use points.
241 *
242 * WorkQueues are also used in a similar way for tasks submitted
243 * to the pool. We cannot mix these tasks in the same queues used
244 * by workers. Instead, we randomly associate submission queues
245 * with submitting threads, using a form of hashing. The
246 * ThreadLocalRandom probe value serves as a hash code for
247 * choosing existing queues, and may be randomly repositioned upon
248 * contention with other submitters. In essence, submitters act
249 * like workers except that they are restricted to executing local
250 * tasks that they submitted (or in the case of CountedCompleters,
251 * others with the same root task). Insertion of tasks in shared
252 * mode requires a lock (mainly to protect in the case of
253 * resizing) but we use only a simple spinlock (using field
254 * qlock), because submitters encountering a busy queue move on to
255 * try or create other queues -- they block only when creating and
256 * registering new queues.
257 *
258 * Management
259 * ==========
260 *
261 * The main throughput advantages of work-stealing stem from
262 * decentralized control -- workers mostly take tasks from
263 * themselves or each other, at rates that can exceed a billion
264 * per second. The pool itself creates, activates (enables
265 * scanning for and running tasks), deactivates, blocks, and
266 * terminates threads, all with minimal central information.
267 * There are only a few properties that we can globally track or
268 * maintain, so we pack them into a small number of variables,
269 * often maintaining atomicity without blocking or locking.
270 * Nearly all essentially atomic control state is held in two
271 * volatile variables that are by far most often read (not
272 * written) as status and consistency checks.
273 *
274 * Field "ctl" contains 64 bits holding information needed to
275 * atomically decide to add, inactivate, enqueue (on an event
276 * queue), dequeue, and/or re-activate workers. To enable this
277 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
278 * far in excess of normal operating range) to allow ids, counts,
279 * and their negations (used for thresholding) to fit into 16bit
280 * subfields. Field "runState" holds lockable state bits
281 * (STARTED, STOP, etc) also protecting updates to the workQueues
282 * array. When used as a lock, it is normally held only for a few
283 * instructions (the only exceptions are one-time array
284 * initialization and uncommon resizing), so is nearly always
285 * available after at most a brief spin. But to be extra-cautious,
286 * we use a monitor-based backup strategy to block when needed
287 * (see awaitRunStateLock). Usages of "runState" vs "ctl"
288 * interact in only one case: deciding to add a worker thread (see
289 * tryAddWorker), in which case the ctl CAS is performed while the
290 * lock is held. Field "config" holds unchanging configuration
291 * state.
292 *
293 * Recording WorkQueues. WorkQueues are recorded in the
294 * "workQueues" array. The array is created upon first use (see
295 * externalSubmit) and expanded if necessary. Updates to the
296 * array while recording new workers and unrecording terminated
297 * ones are protected from each other by the runState lock, but
298 * the array is otherwise concurrently readable, and accessed
299 * directly. We also ensure that reads of the array reference
300 * itself never become too stale. To simplify index-based
301 * operations, the array size is always a power of two, and all
302 * readers must tolerate null slots. Worker queues are at odd
303 * indices. Shared (submission) queues are at even indices, up to
304 * a maximum of 64 slots, to limit growth even if array needs to
305 * expand to add more workers. Grouping them together in this way
306 * simplifies and speeds up task scanning.
307 *
308 * All worker thread creation is on-demand, triggered by task
309 * submissions, replacement of terminated workers, and/or
310 * compensation for blocked workers. However, all other support
311 * code is set up to work with other policies. To ensure that we
312 * do not hold on to worker references that would prevent GC, All
313 * accesses to workQueues are via indices into the workQueues
314 * array (which is one source of some of the messy code
315 * constructions here). In essence, the workQueues array serves as
316 * a weak reference mechanism. Thus for example the stack top
317 * subfield of ctl stores indices, not references.
318 *
319 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
320 * cannot let workers spin indefinitely scanning for tasks when
321 * none can be found immediately, and we cannot start/resume
322 * workers unless there appear to be tasks available. On the
323 * other hand, we must quickly prod them into action when new
324 * tasks are submitted or generated. In many usages, ramp-up time
325 * to activate workers is the main limiting factor in overall
326 * performance, which is compounded at program start-up by JIT
327 * compilation and allocation. So we streamline this as much as
328 * possible.
329 *
330 * The "ctl" field atomically maintains active and total worker
331 * counts as well as a queue to place waiting threads so they can
332 * be located for signalling. Active counts also play the role of
333 * quiescence indicators, so are decremented when workers believe
334 * that there are no more tasks to execute. The "queue" is
335 * actually a form of Treiber stack. A stack is ideal for
336 * activating threads in most-recently used order. This improves
337 * performance and locality, outweighing the disadvantages of
338 * being prone to contention and inability to release a worker
339 * unless it is topmost on stack. We park/unpark workers after
340 * pushing on the idle worker stack (represented by the lower
341 * 32bit subfield of ctl) when they cannot find work. The top
342 * stack state holds the value of the "scanState" field of the
343 * worker: its index and status, plus a version counter that, in
344 * addition to the count subfields (also serving as version
345 * stamps) provide protection against Treiber stack ABA effects.
346 *
347 * Field scanState is used by both workers and the pool to manage
348 * and track whether a worker is INACTIVE (possibly blocked
349 * waiting for a signal), or SCANNING for tasks (when neither hold
350 * it is busy running tasks). When a worker is inactivated, its
351 * scanState field is set, and is prevented from executing tasks,
352 * even though it must scan once for them to avoid queuing
353 * races. Note that scanState updates lag queue CAS releases so
354 * usage requires care. When queued, the lower 16 bits of
355 * scanState must hold its pool index. So we place the index there
356 * upon initialization (see registerWorker) and otherwise keep it
357 * there or restore it when necessary.
358 *
359 * Memory ordering. See "Correct and Efficient Work-Stealing for
360 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
361 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
362 * analysis of memory ordering requirements in work-stealing
363 * algorithms similar to the one used here. We usually need
364 * stronger than minimal ordering because we must sometimes signal
365 * workers, requiring Dekker-like full-fences to avoid lost
366 * signals. Arranging for enough ordering without expensive
367 * over-fencing requires tradeoffs among the supported means of
368 * expressing access constraints. The most central operations,
369 * taking from queues and updating ctl state, require full-fence
370 * CAS. Array slots are read using the emulation of volatiles
371 * provided by Unsafe. Access from other threads to WorkQueue
372 * base, top, and array requires a volatile load of the first of
373 * any of these read. We use the convention of declaring the
374 * "base" index volatile, and always read it before other fields.
375 * The owner thread must ensure ordered updates, so writes use
376 * ordered intrinsics unless they can piggyback on those for other
377 * writes. Similar conventions and rationales hold for other
378 * WorkQueue fields (such as "currentSteal") that are only written
379 * by owners but observed by others.
380 *
381 * Creating workers. To create a worker, we pre-increment total
382 * count (serving as a reservation), and attempt to construct a
383 * ForkJoinWorkerThread via its factory. Upon construction, the
384 * new thread invokes registerWorker, where it constructs a
385 * WorkQueue and is assigned an index in the workQueues array
386 * (expanding the array if necessary). The thread is then
387 * started. Upon any exception across these steps, or null return
388 * from factory, deregisterWorker adjusts counts and records
389 * accordingly. If a null return, the pool continues running with
390 * fewer than the target number workers. If exceptional, the
391 * exception is propagated, generally to some external caller.
392 * Worker index assignment avoids the bias in scanning that would
393 * occur if entries were sequentially packed starting at the front
394 * of the workQueues array. We treat the array as a simple
395 * power-of-two hash table, expanding as needed. The seedIndex
396 * increment ensures no collisions until a resize is needed or a
397 * worker is deregistered and replaced, and thereafter keeps
398 * probability of collision low. We cannot use
399 * ThreadLocalRandom.getProbe() for similar purposes here because
400 * the thread has not started yet, but do so for creating
401 * submission queues for existing external threads.
402 *
403 * Deactivation and waiting. Queuing encounters several intrinsic
404 * races; most notably that a task-producing thread can miss
405 * seeing (and signalling) another thread that gave up looking for
406 * work but has not yet entered the wait queue. When a worker
407 * cannot find a task to steal, it deactivates and enqueues. Very
408 * often, the lack of tasks is transient due to GC or OS
409 * scheduling. To reduce false-alarm deactivation, scanners
410 * compute checksums of queue states during sweeps. They give up
411 * and try to deactivate only after the sum is stable across
412 * scans. Further, to avoid missed signals, they repeat this
413 * scanning process after successful enqueuing until again stable.
414 * In this state, the worker cannot take/run a task it sees until
415 * it is released from the queue, so the worker itself eventually
416 * tries to release itself or any successor (see tryRelease).
417 * Otherwise, upon an empty scan, a deactivated worker uses an
418 * adaptive local spin construction (see awaitWork) before
419 * blocking (via park). Note the unusual conventions about
420 * Thread.interrupts surrounding parking and other blocking:
421 * Because interrupts are used solely to alert threads to check
422 * termination, which is checked anyway upon blocking, we clear
423 * status (using Thread.interrupted) before any call to park, so
424 * that park does not immediately return due to status being set
425 * via some other unrelated call to interrupt in user code.
426 *
427 * Signalling and activation. Workers are created or activated
428 * only when there appears to be at least one task they might be
429 * able to find and execute. Upon push (either by a worker or an
430 * external submission) to a previously (possibly) empty queue,
431 * workers are signalled if idle, or created if fewer exist than
432 * the given parallelism level. These primary signals are
433 * buttressed by others whenever other threads remove a task from
434 * a queue and notice that there are other tasks there as well.
435 * On most platforms, signalling (unpark) overhead time is
436 * noticeably long, and the time between signalling a thread and
437 * it actually making progress can be very noticeably long, so it
438 * is worth offloading these delays from critical paths as much as
439 * possible. Also, because enqueued workers are often rescanning
440 * or spinning rather than blocking, we set and clear the "parker"
441 * field of WorkQueues to reduce unnecessary calls to unpark.
442 * (This requires a secondary recheck to avoid missed signals.)
443 *
444 * Trimming workers. To release resources after periods of lack of
445 * use, a worker starting to wait when the pool is quiescent will
446 * time out and terminate if the pool has remained quiescent for a
447 * given period -- a short period if there are more threads than
448 * parallelism, longer as the number of threads decreases. This
449 * eventually terminates all workers after periods of non-use.
450 *
451 * Shutdown and Termination. A call to shutdownNow atomically sets
452 * a runState bit and then (non-atomically) sets each worker's
453 * qlock status, cancels all unprocessed tasks, and wakes up all
454 * waiting workers (see tryTerminate). Detecting whether
455 * termination should commence after a non-abrupt shutdown() call
456 * relies on the active count bits of "ctl" maintaining consensus
457 * about quiescence. However, external submitters do not take part
458 * in this consensus. So, tryTerminate sweeps through submission
459 * queues to ensure lack of in-flight submissions before
460 * triggering the "STOP" phase of termination.
461 *
462 * Joining Tasks
463 * =============
464 *
465 * Any of several actions may be taken when one worker is waiting
466 * to join a task stolen (or always held) by another. Because we
467 * are multiplexing many tasks on to a pool of workers, we can't
468 * just let them block (as in Thread.join). We also cannot just
469 * reassign the joiner's run-time stack with another and replace
470 * it later, which would be a form of "continuation", that even if
471 * possible is not necessarily a good idea since we may need both
472 * an unblocked task and its continuation to progress. Instead we
473 * combine two tactics:
474 *
475 * Helping: Arranging for the joiner to execute some task that it
476 * would be running if the steal had not occurred.
477 *
478 * Compensating: Unless there are already enough live threads,
479 * method tryCompensate() may create or re-activate a spare
480 * thread to compensate for blocked joiners until they unblock.
481 *
482 * A third form (implemented in tryRemoveAndExec) amounts to
483 * helping a hypothetical compensator: If we can readily tell that
484 * a possible action of a compensator is to steal and execute the
485 * task being joined, the joining thread can do so directly,
486 * without the need for a compensation thread (although at the
487 * expense of larger run-time stacks, but the tradeoff is
488 * typically worthwhile).
489 *
490 * The ManagedBlocker extension API can't use helping so relies
491 * only on compensation in method awaitBlocker.
492 *
493 * The algorithm in helpStealer entails a form of "linear
494 * helping". Each worker records (in field currentSteal) the most
495 * recent task it stole from some other worker (or a submission).
496 * It also records (in field currentJoin) the task it is currently
497 * actively joining. Method helpStealer uses these markers to try
498 * to find a worker to help (i.e., steal back a task from and
499 * execute it) that could hasten completion of the actively joined
500 * task. Thus, the joiner executes a task that would be on its
501 * own local deque had the to-be-joined task not been stolen. This
502 * is a conservative variant of the approach described in Wagner &
503 * Calder "Leapfrogging: a portable technique for implementing
504 * efficient futures" SIGPLAN Notices, 1993
505 * (http://portal.acm.org/citation.cfm?id=155354). It differs in
506 * that: (1) We only maintain dependency links across workers upon
507 * steals, rather than use per-task bookkeeping. This sometimes
508 * requires a linear scan of workQueues array to locate stealers,
509 * but often doesn't because stealers leave hints (that may become
510 * stale/wrong) of where to locate them. It is only a hint
511 * because a worker might have had multiple steals and the hint
512 * records only one of them (usually the most current). Hinting
513 * isolates cost to when it is needed, rather than adding to
514 * per-task overhead. (2) It is "shallow", ignoring nesting and
515 * potentially cyclic mutual steals. (3) It is intentionally
516 * racy: field currentJoin is updated only while actively joining,
517 * which means that we miss links in the chain during long-lived
518 * tasks, GC stalls etc (which is OK since blocking in such cases
519 * is usually a good idea). (4) We bound the number of attempts
520 * to find work using checksums and fall back to suspending the
521 * worker and if necessary replacing it with another.
522 *
523 * Helping actions for CountedCompleters do not require tracking
524 * currentJoins: Method helpComplete takes and executes any task
525 * with the same root as the task being waited on (preferring
526 * local pops to non-local polls). However, this still entails
527 * some traversal of completer chains, so is less efficient than
528 * using CountedCompleters without explicit joins.
529 *
530 * Compensation does not aim to keep exactly the target
531 * parallelism number of unblocked threads running at any given
532 * time. Some previous versions of this class employed immediate
533 * compensations for any blocked join. However, in practice, the
534 * vast majority of blockages are transient byproducts of GC and
535 * other JVM or OS activities that are made worse by replacement.
536 * Currently, compensation is attempted only after validating that
537 * all purportedly active threads are processing tasks by checking
538 * field WorkQueue.scanState, which eliminates most false
539 * positives. Also, compensation is bypassed (tolerating fewer
540 * threads) in the most common case in which it is rarely
541 * beneficial: when a worker with an empty queue (thus no
542 * continuation tasks) blocks on a join and there still remain
543 * enough threads to ensure liveness. Also, whenever more than two
544 * spare threads are generated, they are killed (see awaitWork) at
545 * the next quiescent point (padding by two avoids hysteresis).
546 *
547 * Bounds. The compensation mechanism is bounded (see MAX_SPARES),
548 * to better enable JVMs to cope with programming errors and abuse
549 * before running out of resources to do so, and may be further
550 * bounded via factories that limit thread construction. The
551 * effects of bounding in this pool (like all others) is
552 * imprecise. Total worker counts are decremented when threads
553 * deregister, not when they exit and resources are reclaimed by
554 * the JVM and OS. So the number of simultaneously live threads
555 * may transiently exceed bounds.
556 *
557 *
558 * Common Pool
559 * ===========
560 *
561 * The static common pool always exists after static
562 * initialization. Since it (or any other created pool) need
563 * never be used, we minimize initial construction overhead and
564 * footprint to the setup of about a dozen fields, with no nested
565 * allocation. Most bootstrapping occurs within method
566 * externalSubmit during the first submission to the pool.
567 *
568 * When external threads submit to the common pool, they can
569 * perform subtask processing (see externalHelpComplete and
570 * related methods) upon joins. This caller-helps policy makes it
571 * sensible to set common pool parallelism level to one (or more)
572 * less than the total number of available cores, or even zero for
573 * pure caller-runs. We do not need to record whether external
574 * submissions are to the common pool -- if not, external help
575 * methods return quickly. These submitters would otherwise be
576 * blocked waiting for completion, so the extra effort (with
577 * liberally sprinkled task status checks) in inapplicable cases
578 * amounts to an odd form of limited spin-wait before blocking in
579 * ForkJoinTask.join.
580 *
581 * As a more appropriate default in managed environments, unless
582 * overridden by system properties, we use workers of subclass
583 * InnocuousForkJoinWorkerThread when there is a SecurityManager
584 * present. These workers have no permissions set, do not belong
585 * to any user-defined ThreadGroup, and erase all ThreadLocals
586 * after executing any top-level task (see WorkQueue.runTask).
587 * The associated mechanics (mainly in ForkJoinWorkerThread) may
588 * be JVM-dependent and must access particular Thread class fields
589 * to achieve this effect.
590 *
591 * Style notes
592 * ===========
593 *
594 * Memory ordering relies mainly on Unsafe intrinsics that carry
595 * the further responsibility of explicitly performing null- and
596 * bounds- checks otherwise carried out implicitly by JVMs. This
597 * can be awkward and ugly, but also reflects the need to control
598 * outcomes across the unusual cases that arise in very racy code
599 * with very few invariants. So these explicit checks would exist
600 * in some form anyway. All fields are read into locals before
601 * use, and null-checked if they are references. This is usually
602 * done in a "C"-like style of listing declarations at the heads
603 * of methods or blocks, and using inline assignments on first
604 * encounter. Array bounds-checks are usually performed by
605 * masking with array.length-1, which relies on the invariant that
606 * these arrays are created with positive lengths, which is itself
607 * paranoically checked. Nearly all explicit checks lead to
608 * bypass/return, not exception throws, because they may
609 * legitimately arise due to cancellation/revocation during
610 * shutdown.
611 *
612 * There is a lot of representation-level coupling among classes
613 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
614 * fields of WorkQueue maintain data structures managed by
615 * ForkJoinPool, so are directly accessed. There is little point
616 * trying to reduce this, since any associated future changes in
617 * representations will need to be accompanied by algorithmic
618 * changes anyway. Several methods intrinsically sprawl because
619 * they must accumulate sets of consistent reads of fields held in
620 * local variables. There are also other coding oddities
621 * (including several unnecessary-looking hoisted null checks)
622 * that help some methods perform reasonably even when interpreted
623 * (not compiled).
624 *
625 * The order of declarations in this file is:
626 * (1) Static utility functions
627 * (2) Nested (static) classes
628 * (3) Static fields
629 * (4) Fields, along with constants used when unpacking some of them
630 * (5) Internal control methods
631 * (6) Callbacks and other support for ForkJoinTask methods
632 * (7) Exported methods
633 * (8) Static block initializing statics in minimally dependent order
634 */
635
636 // Static utilities
637
638 /**
639 * If there is a security manager, makes sure caller has
640 * permission to modify threads.
641 */
642 private static void checkPermission() {
643 SecurityManager security = System.getSecurityManager();
644 if (security != null)
645 security.checkPermission(modifyThreadPermission);
646 }
647
648 // Nested classes
649
650 /**
651 * Factory for creating new {@link ForkJoinWorkerThread}s.
652 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
653 * for {@code ForkJoinWorkerThread} subclasses that extend base
654 * functionality or initialize threads with different contexts.
655 */
656 public static interface ForkJoinWorkerThreadFactory {
657 /**
658 * Returns a new worker thread operating in the given pool.
659 *
660 * @param pool the pool this thread works in
661 * @return the new worker thread
662 * @throws NullPointerException if the pool is null
663 */
664 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
665 }
666
667 /**
668 * Default ForkJoinWorkerThreadFactory implementation; creates a
669 * new ForkJoinWorkerThread.
670 */
671 static final class DefaultForkJoinWorkerThreadFactory
672 implements ForkJoinWorkerThreadFactory {
673 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
674 return new ForkJoinWorkerThread(pool);
675 }
676 }
677
678 /**
679 * Class for artificial tasks that are used to replace the target
680 * of local joins if they are removed from an interior queue slot
681 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
682 * actually do anything beyond having a unique identity.
683 */
684 static final class EmptyTask extends ForkJoinTask<Void> {
685 private static final long serialVersionUID = -7721805057305804111L;
686 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
687 public final Void getRawResult() { return null; }
688 public final void setRawResult(Void x) {}
689 public final boolean exec() { return true; }
690 }
691
692 // Constants shared across ForkJoinPool and WorkQueue
693
694 // Bounds
695 static final int SMASK = 0xffff; // short bits == max index
696 static final int MAX_CAP = 0x7fff; // max #workers - 1
697 static final int EVENMASK = 0xfffe; // even short bits
698 static final int SQMASK = 0x007e; // max 64 (even) slots
699
700 // Masks and units for WorkQueue.scanState and ctl sp subfield
701 static final int SCANNING = 1; // false when running tasks
702 static final int INACTIVE = 1 << 31; // must be negative
703 static final int SS_SHIFT = 16; // shift for version count
704 static final int SS_SEQ = 1 << SS_SHIFT; // version number
705 static final int SS_MASK = 0x7fffffff; // mask on update
706
707 // Mode bits for ForkJoinPool.config and WorkQueue.config
708 static final int MODE_MASK = SMASK << 16;
709 static final int LIFO_QUEUE = 0;
710 static final int FIFO_QUEUE = 1 << 16;
711 static final int SHARED_QUEUE = 1 << 31; // must be negative
712
713 /**
714 * Queues supporting work-stealing as well as external task
715 * submission. See above for descriptions and algorithms.
716 * Performance on most platforms is very sensitive to placement of
717 * instances of both WorkQueues and their arrays -- we absolutely
718 * do not want multiple WorkQueue instances or multiple queue
719 * arrays sharing cache lines. The @Contended annotation alerts
720 * JVMs to try to keep instances apart.
721 */
722 @sun.misc.Contended
723 static final class WorkQueue {
724
725 /**
726 * Capacity of work-stealing queue array upon initialization.
727 * Must be a power of two; at least 4, but should be larger to
728 * reduce or eliminate cacheline sharing among queues.
729 * Currently, it is much larger, as a partial workaround for
730 * the fact that JVMs often place arrays in locations that
731 * share GC bookkeeping (especially cardmarks) such that
732 * per-write accesses encounter serious memory contention.
733 */
734 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
735
736 /**
737 * Maximum size for queue arrays. Must be a power of two less
738 * than or equal to 1 << (31 - width of array entry) to ensure
739 * lack of wraparound of index calculations, but defined to a
740 * value a bit less than this to help users trap runaway
741 * programs before saturating systems.
742 */
743 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
744
745 // Instance fields
746 volatile int scanState; // versioned, <0: inactive; odd:scanning
747 int stackPred; // pool stack (ctl) predecessor
748 int nsteals; // number of steals
749 int hint; // randomization and stealer index hint
750 int config; // pool index and mode
751 volatile int qlock; // 1: locked, < 0: terminate; else 0
752 volatile int base; // index of next slot for poll
753 int top; // index of next slot for push
754 ForkJoinTask<?>[] array; // the elements (initially unallocated)
755 final ForkJoinPool pool; // the containing pool (may be null)
756 final ForkJoinWorkerThread owner; // owning thread or null if shared
757 volatile Thread parker; // == owner during call to park; else null
758 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
759 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
760
761 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
762 this.pool = pool;
763 this.owner = owner;
764 // Place indices in the center of array (that is not yet allocated)
765 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
766 }
767
768 /**
769 * Returns an exportable index (used by ForkJoinWorkerThread)
770 */
771 final int getPoolIndex() {
772 return (config & 0xffff) >>> 1; // ignore odd/even tag bit
773 }
774
775 /**
776 * Returns the approximate number of tasks in the queue.
777 */
778 final int queueSize() {
779 int n = base - top; // non-owner callers must read base first
780 return (n >= 0) ? 0 : -n; // ignore transient negative
781 }
782
783 /**
784 * Provides a more accurate estimate of whether this queue has
785 * any tasks than does queueSize, by checking whether a
786 * near-empty queue has at least one unclaimed task.
787 */
788 final boolean isEmpty() {
789 ForkJoinTask<?>[] a; int n, m, s;
790 return ((n = base - (s = top)) >= 0 ||
791 (n == -1 && // possibly one task
792 ((a = array) == null || (m = a.length - 1) < 0 ||
793 U.getObject
794 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
795 }
796
797 /**
798 * Pushes a task. Call only by owner in unshared queues. (The
799 * shared-queue version is embedded in method externalPush.)
800 *
801 * @param task the task. Caller must ensure non-null.
802 * @throws RejectedExecutionException if array cannot be resized
803 */
804 final void push(ForkJoinTask<?> task) {
805 ForkJoinTask<?>[] a; ForkJoinPool p;
806 int b = base, s = top, n;
807 if ((a = array) != null) { // ignore if queue removed
808 int m = a.length - 1; // fenced write for task visibility
809 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
810 U.putOrderedInt(this, QTOP, s + 1);
811 if ((n = s - b) <= 1) {
812 if ((p = pool) != null)
813 p.signalWork(p.workQueues, this);
814 }
815 else if (n == m)
816 growArray();
817 }
818 }
819
820 /**
821 * Initializes or doubles the capacity of array. Call either
822 * by owner or with lock held -- it is OK for base, but not
823 * top, to move while resizings are in progress.
824 */
825 final ForkJoinTask<?>[] growArray() {
826 ForkJoinTask<?>[] oldA = array;
827 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
828 if (size > MAXIMUM_QUEUE_CAPACITY)
829 throw new RejectedExecutionException("Queue capacity exceeded");
830 int oldMask, t, b;
831 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
832 if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
833 (t = top) - (b = base) > 0) {
834 int mask = size - 1;
835 do { // emulate poll from old array, push to new array
836 ForkJoinTask<?> x;
837 int oldj = ((b & oldMask) << ASHIFT) + ABASE;
838 int j = ((b & mask) << ASHIFT) + ABASE;
839 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
840 if (x != null &&
841 U.compareAndSwapObject(oldA, oldj, x, null))
842 U.putObjectVolatile(a, j, x);
843 } while (++b != t);
844 }
845 return a;
846 }
847
848 /**
849 * Takes next task, if one exists, in LIFO order. Call only
850 * by owner in unshared queues.
851 */
852 final ForkJoinTask<?> pop() {
853 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
854 if ((a = array) != null && (m = a.length - 1) >= 0) {
855 for (int s; (s = top - 1) - base >= 0;) {
856 long j = ((m & s) << ASHIFT) + ABASE;
857 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
858 break;
859 if (U.compareAndSwapObject(a, j, t, null)) {
860 U.putOrderedInt(this, QTOP, s);
861 return t;
862 }
863 }
864 }
865 return null;
866 }
867
868 /**
869 * Takes a task in FIFO order if b is base of queue and a task
870 * can be claimed without contention. Specialized versions
871 * appear in ForkJoinPool methods scan and helpStealer.
872 */
873 final ForkJoinTask<?> pollAt(int b) {
874 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
875 if ((a = array) != null) {
876 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
877 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
878 base == b && U.compareAndSwapObject(a, j, t, null)) {
879 base = b + 1;
880 return t;
881 }
882 }
883 return null;
884 }
885
886 /**
887 * Takes next task, if one exists, in FIFO order.
888 */
889 final ForkJoinTask<?> poll() {
890 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
891 while ((b = base) - top < 0 && (a = array) != null) {
892 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
893 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
894 if (base == b) {
895 if (t != null) {
896 if (U.compareAndSwapObject(a, j, t, null)) {
897 base = b + 1;
898 return t;
899 }
900 }
901 else if (b + 1 == top) // now empty
902 break;
903 }
904 }
905 return null;
906 }
907
908 /**
909 * Takes next task, if one exists, in order specified by mode.
910 */
911 final ForkJoinTask<?> nextLocalTask() {
912 return (config & FIFO_QUEUE) == 0 ? pop() : poll();
913 }
914
915 /**
916 * Returns next task, if one exists, in order specified by mode.
917 */
918 final ForkJoinTask<?> peek() {
919 ForkJoinTask<?>[] a = array; int m;
920 if (a == null || (m = a.length - 1) < 0)
921 return null;
922 int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
923 int j = ((i & m) << ASHIFT) + ABASE;
924 return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
925 }
926
927 /**
928 * Pops the given task only if it is at the current top.
929 * (A shared version is available only via FJP.tryExternalUnpush)
930 */
931 final boolean tryUnpush(ForkJoinTask<?> t) {
932 ForkJoinTask<?>[] a; int s;
933 if ((a = array) != null && (s = top) != base &&
934 U.compareAndSwapObject
935 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
936 U.putOrderedInt(this, QTOP, s);
937 return true;
938 }
939 return false;
940 }
941
942 /**
943 * Removes and cancels all known tasks, ignoring any exceptions.
944 */
945 final void cancelAll() {
946 ForkJoinTask<?> t;
947 if ((t = currentJoin) != null) {
948 currentJoin = null;
949 ForkJoinTask.cancelIgnoringExceptions(t);
950 }
951 if ((t = currentSteal) != null) {
952 currentSteal = null;
953 ForkJoinTask.cancelIgnoringExceptions(t);
954 }
955 while ((t = poll()) != null)
956 ForkJoinTask.cancelIgnoringExceptions(t);
957 }
958
959 // Specialized execution methods
960
961 /**
962 * Polls and runs tasks until empty.
963 */
964 final void pollAndExecAll() {
965 for (ForkJoinTask<?> t; (t = poll()) != null;)
966 t.doExec();
967 }
968
969 /**
970 * Removes and executes all local tasks. If LIFO, invokes
971 * pollAndExecAll. Otherwise implements a specialized pop loop
972 * to exec until empty.
973 */
974 final void execLocalTasks() {
975 int b = base, m, s;
976 ForkJoinTask<?>[] a = array;
977 if (b - (s = top - 1) <= 0 && a != null &&
978 (m = a.length - 1) >= 0) {
979 if ((config & FIFO_QUEUE) == 0) {
980 for (ForkJoinTask<?> t;;) {
981 if ((t = (ForkJoinTask<?>)U.getAndSetObject
982 (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
983 break;
984 U.putOrderedInt(this, QTOP, s);
985 t.doExec();
986 if (base - (s = top - 1) > 0)
987 break;
988 }
989 }
990 else
991 pollAndExecAll();
992 }
993 }
994
995 /**
996 * Executes the given task and any remaining local tasks
997 */
998 final void runTask(ForkJoinTask<?> task) {
999 if (task != null) {
1000 scanState &= ~SCANNING; // mark as busy
1001 (currentSteal = task).doExec();
1002 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
1003 execLocalTasks();
1004 ForkJoinWorkerThread thread = owner;
1005 ++nsteals;
1006 scanState |= SCANNING;
1007 if (thread != null)
1008 thread.afterTopLevelExec();
1009 }
1010 }
1011
1012 /**
1013 * If present, removes from queue and executes the given task,
1014 * or any other cancelled task. Used only by awaitJoin
1015 *
1016 * Returns true if queue empty and task not known to be done
1017 */
1018 final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
1019 ForkJoinTask<?>[] a; int m, s, b, n;
1020 if ((a = array) != null && (m = a.length - 1) >= 0 &&
1021 task != null) {
1022 while ((n = (s = top) - (b = base)) > 0) {
1023 for (ForkJoinTask<?> t;;) { // traverse from s to b
1024 long j = ((--s & m) << ASHIFT) + ABASE;
1025 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
1026 return s + 1 == top; // shorter than expected
1027 else if (t == task) {
1028 boolean removed = false;
1029 if (s + 1 == top) { // pop
1030 if (U.compareAndSwapObject(a, j, task, null)) {
1031 U.putOrderedInt(this, QTOP, s);
1032 removed = true;
1033 }
1034 }
1035 else if (base == b) // replace with proxy
1036 removed = U.compareAndSwapObject(
1037 a, j, task, new EmptyTask());
1038 if (removed)
1039 task.doExec();
1040 break;
1041 }
1042 else if (t.status < 0 && s + 1 == top) {
1043 if (U.compareAndSwapObject(a, j, t, null))
1044 U.putOrderedInt(this, QTOP, s);
1045 break; // was cancelled
1046 }
1047 if (--n == 0)
1048 return false;
1049 }
1050 if (task.status < 0)
1051 return false;
1052 }
1053 }
1054 return true;
1055 }
1056
1057 /**
1058 * Pops task if in the same CC computation as the given task,
1059 * in either shared or owned mode. Used only by helpComplete.
1060 */
1061 final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1062 int s; ForkJoinTask<?>[] a; Object o;
1063 if (base - (s = top) < 0 && (a = array) != null) {
1064 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
1065 if ((o = U.getObjectVolatile(a, j)) != null &&
1066 (o instanceof CountedCompleter)) {
1067 CountedCompleter<?> t = (CountedCompleter<?>)o;
1068 for (CountedCompleter<?> r = t;;) {
1069 if (r == task) {
1070 if (mode < 0) { // must lock
1071 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1072 if (top == s && array == a &&
1073 U.compareAndSwapObject(a, j, t, null)) {
1074 U.putOrderedInt(this, QTOP, s - 1);
1075 U.putOrderedInt(this, QLOCK, 0);
1076 return t;
1077 }
1078 qlock = 0;
1079 }
1080 }
1081 else if (U.compareAndSwapObject(a, j, t, null)) {
1082 U.putOrderedInt(this, QTOP, s - 1);
1083 return t;
1084 }
1085 break;
1086 }
1087 else if ((r = r.completer) == null) // try parent
1088 break;
1089 }
1090 }
1091 }
1092 return null;
1093 }
1094
1095 /**
1096 * Steals and runs a task in the same CC computation as the
1097 * given task if one exists and can be taken without
1098 * contention. Otherwise returns a checksum/control value for
1099 * use by method helpComplete.
1100 *
1101 * @return 1 if successful, 2 if retryable (lost to another
1102 * stealer), -1 if non-empty but no matching task found, else
1103 * the base index, forced negative.
1104 */
1105 final int pollAndExecCC(CountedCompleter<?> task) {
1106 int b, h; ForkJoinTask<?>[] a; Object o;
1107 if ((b = base) - top >= 0 || (a = array) == null)
1108 h = b | Integer.MIN_VALUE; // to sense movement on re-poll
1109 else {
1110 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
1111 if ((o = U.getObjectVolatile(a, j)) == null)
1112 h = 2; // retryable
1113 else if (!(o instanceof CountedCompleter))
1114 h = -1; // unmatchable
1115 else {
1116 CountedCompleter<?> t = (CountedCompleter<?>)o;
1117 for (CountedCompleter<?> r = t;;) {
1118 if (r == task) {
1119 if (base == b &&
1120 U.compareAndSwapObject(a, j, t, null)) {
1121 base = b + 1;
1122 t.doExec();
1123 h = 1; // success
1124 }
1125 else
1126 h = 2; // lost CAS
1127 break;
1128 }
1129 else if ((r = r.completer) == null) {
1130 h = -1; // unmatched
1131 break;
1132 }
1133 }
1134 }
1135 }
1136 return h;
1137 }
1138
1139 /**
1140 * Returns true if owned and not known to be blocked.
1141 */
1142 final boolean isApparentlyUnblocked() {
1143 Thread wt; Thread.State s;
1144 return (scanState >= 0 &&
1145 (wt = owner) != null &&
1146 (s = wt.getState()) != Thread.State.BLOCKED &&
1147 s != Thread.State.WAITING &&
1148 s != Thread.State.TIMED_WAITING);
1149 }
1150
1151 // Unsafe mechanics
1152 private static final sun.misc.Unsafe U;
1153 private static final int ABASE;
1154 private static final int ASHIFT;
1155 private static final long QBASE;
1156 private static final long QTOP;
1157 private static final long QLOCK;
1158 private static final long QSCANSTATE;
1159 private static final long QCURRENTSTEAL;
1160 static {
1161 try {
1162 U = sun.misc.Unsafe.getUnsafe();
1163 Class<?> wk = WorkQueue.class;
1164 Class<?> ak = ForkJoinTask[].class;
1165 QBASE = U.objectFieldOffset
1166 (wk.getDeclaredField("base"));
1167 QTOP = U.objectFieldOffset
1168 (wk.getDeclaredField("top"));
1169 QLOCK = U.objectFieldOffset
1170 (wk.getDeclaredField("qlock"));
1171 QSCANSTATE = U.objectFieldOffset
1172 (wk.getDeclaredField("scanState"));
1173 QCURRENTSTEAL = U.objectFieldOffset
1174 (wk.getDeclaredField("currentSteal"));
1175 ABASE = U.arrayBaseOffset(ak);
1176 int scale = U.arrayIndexScale(ak);
1177 if ((scale & (scale - 1)) != 0)
1178 throw new Error("data type scale not a power of two");
1179 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1180 } catch (Exception e) {
1181 throw new Error(e);
1182 }
1183 }
1184 }
1185
1186 // static fields (initialized in static initializer below)
1187
1188 /**
1189 * Creates a new ForkJoinWorkerThread. This factory is used unless
1190 * overridden in ForkJoinPool constructors.
1191 */
1192 public static final ForkJoinWorkerThreadFactory
1193 defaultForkJoinWorkerThreadFactory;
1194
1195 /**
1196 * Permission required for callers of methods that may start or
1197 * kill threads.
1198 */
1199 private static final RuntimePermission modifyThreadPermission;
1200
1201 /**
1202 * Common (static) pool. Non-null for public use unless a static
1203 * construction exception, but internal usages null-check on use
1204 * to paranoically avoid potential initialization circularities
1205 * as well as to simplify generated code.
1206 */
1207 static final ForkJoinPool common;
1208
1209 /**
1210 * Common pool parallelism. To allow simpler use and management
1211 * when common pool threads are disabled, we allow the underlying
1212 * common.parallelism field to be zero, but in that case still report
1213 * parallelism as 1 to reflect resulting caller-runs mechanics.
1214 */
1215 static final int commonParallelism;
1216
1217 /**
1218 * Sequence number for creating workerNamePrefix.
1219 */
1220 private static int poolNumberSequence;
1221
1222 /**
1223 * Returns the next sequence number. We don't expect this to
1224 * ever contend, so use simple builtin sync.
1225 */
1226 private static final synchronized int nextPoolId() {
1227 return ++poolNumberSequence;
1228 }
1229
1230 // static configuration constants
1231
1232 /**
1233 * Initial timeout value (in nanoseconds) for the thread
1234 * triggering quiescence to park waiting for new work. On timeout,
1235 * the thread will instead try to shrink the number of
1236 * workers. The value should be large enough to avoid overly
1237 * aggressive shrinkage during most transient stalls (long GCs
1238 * etc).
1239 */
1240 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
1241
1242 /**
1243 * Tolerance for idle timeouts, to cope with timer undershoots
1244 */
1245 private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
1246
1247 /**
1248 * Limit on spare thread construction in tryCompensate. The
1249 * current value is far in excess of normal requirements, but also
1250 * far short of MAX_CAP and typical OS thread limits, so allows
1251 * JVMs to catch misuse/abuse before running out of resources
1252 * needed to do so.
1253 */
1254 private static int MAX_SPARES = 256;
1255
1256 /**
1257 * Number of times to spin-wait before blocking. The spins (in
1258 * awaitRunStateLock and awaitWork) currently use randomized
1259 * spins. If/when MWAIT-like intrinsics becomes available, they
1260 * may allow quieter spinning. The value of SPINS must be a power
1261 * of two, at least 4. The current value causes spinning for a
1262 * small fraction of context-switch times that is worthwhile given
1263 * the typical likelihoods that blocking is not necessary.
1264 */
1265 private static final int SPINS = 1 << 11;
1266
1267 /**
1268 * Increment for seed generators. See class ThreadLocal for
1269 * explanation.
1270 */
1271 private static final int SEED_INCREMENT = 0x9e3779b9;
1272
1273 /*
1274 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1275 * AC: Number of active running workers minus target parallelism
1276 * TC: Number of total workers minus target parallelism
1277 * SS: version count and status of top waiting thread
1278 * ID: poolIndex of top of Treiber stack of waiters
1279 *
1280 * When convenient, we can extract the lower 32 stack top bits
1281 * (including version bits) as sp=(int)ctl. The offsets of counts
1282 * by the target parallelism and the positionings of fields makes
1283 * it possible to perform the most common checks via sign tests of
1284 * fields: When ac is negative, there are not enough active
1285 * workers, when tc is negative, there are not enough total
1286 * workers. When sp is non-zero, there are waiting workers. To
1287 * deal with possibly negative fields, we use casts in and out of
1288 * "short" and/or signed shifts to maintain signedness.
1289 *
1290 * Because it occupies uppermost bits, we can add one active count
1291 * using getAndAddLong of AC_UNIT, rather than CAS, when returning
1292 * from a blocked join. Other updates entail multiple subfields
1293 * and masking, requiring CAS.
1294 */
1295
1296 // Lower and upper word masks
1297 private static final long SP_MASK = 0xffffffffL;
1298 private static final long UC_MASK = ~SP_MASK;
1299
1300 // Active counts
1301 private static final int AC_SHIFT = 48;
1302 private static final long AC_UNIT = 0x0001L << AC_SHIFT;
1303 private static final long AC_MASK = 0xffffL << AC_SHIFT;
1304
1305 // Total counts
1306 private static final int TC_SHIFT = 32;
1307 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1308 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1309 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1310
1311 // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
1312 private static final int RSLOCK = 1;
1313 private static final int RSIGNAL = 1 << 1;
1314 private static final int STARTED = 1 << 2;
1315 private static final int STOP = 1 << 29;
1316 private static final int TERMINATED = 1 << 30;
1317 private static final int SHUTDOWN = 1 << 31;
1318
1319 // Instance fields
1320 volatile long stealCount; // collects worker counts
1321 volatile long ctl; // main pool control
1322 volatile int runState; // lockable status
1323 final int config; // parallelism, mode
1324 int indexSeed; // to generate worker index
1325 volatile WorkQueue[] workQueues; // main registry
1326 final ForkJoinWorkerThreadFactory factory;
1327 final UncaughtExceptionHandler ueh; // per-worker UEH
1328 final String workerNamePrefix; // to create worker name string
1329
1330 /**
1331 * Acquires the runState lock; returns current (locked) runState
1332 */
1333 private int lockRunState() {
1334 int rs;
1335 return ((((rs = runState) & RSLOCK) != 0 ||
1336 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
1337 awaitRunStateLock() : rs);
1338 }
1339
1340 /**
1341 * Spins and/or blocks until runstate lock is available. This
1342 * method is called only if an initial CAS fails. This acts as a
1343 * spinlock for normal cases, but falls back to builtin monitor to
1344 * block when (rarely) needed. This would be a terrible idea for a
1345 * highly contended lock, but most pools run without the lock ever
1346 * contending after the spin limit, so this works fine as a more
1347 * conservative alternative to a pure spinlock.
1348 */
1349 private int awaitRunStateLock() {
1350 for (int spins = SPINS, r = 0, rs, nrs;;) {
1351 if (((rs = runState) & RSLOCK) == 0) {
1352 if (U.compareAndSwapInt(this, RUNSTATE, rs, nrs = rs | RSLOCK))
1353 return nrs;
1354 }
1355 else if (r == 0)
1356 r = ThreadLocalRandom.nextSecondarySeed();
1357 else if (spins > 0) {
1358 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1359 if (r >= 0)
1360 --spins;
1361 }
1362 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
1363 synchronized (this) {
1364 if ((runState & RSIGNAL) != 0) {
1365 try {
1366 wait();
1367 } catch (InterruptedException ie) {
1368 try {
1369 Thread.currentThread().interrupt();
1370 } catch (SecurityException ignore) {
1371 }
1372 }
1373 }
1374 else
1375 notifyAll();
1376 }
1377 }
1378 }
1379 }
1380
1381 /**
1382 * Unlocks and sets runState to newRunState.
1383 *
1384 * @param oldRunState a value returned from lockRunState
1385 * @param newRunState the next value (must have lock bit clear).
1386 */
1387 private void unlockRunState(int oldRunState, int newRunState) {
1388 if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
1389 runState = newRunState; // clears RSIGNAL bit
1390 synchronized (this) { notifyAll(); }
1391 }
1392 }
1393
1394 // Creating, registering and deregistering workers
1395
1396 /**
1397 * Tries to construct and start one worker. Assumes that total
1398 * count has already been incremented as a reservation. Invokes
1399 * deregisterWorker on any failure.
1400 *
1401 * @return true if successful
1402 */
1403 private boolean createWorker() {
1404 ForkJoinWorkerThreadFactory fac = factory;
1405 Throwable ex = null;
1406 ForkJoinWorkerThread wt = null;
1407 try {
1408 if (fac != null && (wt = fac.newThread(this)) != null) {
1409 wt.start();
1410 return true;
1411 }
1412 } catch (Throwable rex) {
1413 ex = rex;
1414 }
1415 deregisterWorker(wt, ex);
1416 return false;
1417 }
1418
1419 /**
1420 * Tries to add one worker, incrementing ctl counts before doing
1421 * so, relying on createWorker to back out on failure.
1422 *
1423 * @param c incoming ctl value, with total count negative and no
1424 * idle workers. On CAS failure, c is refreshed and retried if
1425 * this holds (otherwise, a new worker is not needed).
1426 */
1427 private void tryAddWorker(long c) {
1428 boolean add = false;
1429 do {
1430 long nc = ((AC_MASK & (c + AC_UNIT)) |
1431 (TC_MASK & (c + TC_UNIT)));
1432 if (ctl == c) {
1433 int rs, stop; // check if terminating
1434 if ((stop = (rs = lockRunState()) & STOP) == 0)
1435 add = U.compareAndSwapLong(this, CTL, c, nc);
1436 unlockRunState(rs, rs & ~RSLOCK);
1437 if (stop != 0)
1438 break;
1439 if (add) {
1440 createWorker();
1441 break;
1442 }
1443 }
1444 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1445 }
1446
1447 /**
1448 * Callback from ForkJoinWorkerThread constructor to establish and
1449 * record its WorkQueue.
1450 *
1451 * @param wt the worker thread
1452 * @return the worker's queue
1453 */
1454 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1455 UncaughtExceptionHandler handler;
1456 wt.setDaemon(true); // configure thread
1457 if ((handler = ueh) != null)
1458 wt.setUncaughtExceptionHandler(handler);
1459 WorkQueue w = new WorkQueue(this, wt);
1460 int i = 0; // assign a pool index
1461 int mode = config & MODE_MASK;
1462 int rs = lockRunState();
1463 try {
1464 WorkQueue[] ws; int n; // skip if no array
1465 if ((ws = workQueues) != null && (n = ws.length) > 0) {
1466 int s = indexSeed += SEED_INCREMENT; // unlikely to collide
1467 int m = n - 1;
1468 i = ((s << 1) | 1) & m; // odd-numbered indices
1469 if (ws[i] != null) { // collision
1470 int probes = 0; // step by approx half n
1471 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1472 while (ws[i = (i + step) & m] != null) {
1473 if (++probes >= n) {
1474 workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1475 m = n - 1;
1476 probes = 0;
1477 }
1478 }
1479 }
1480 w.hint = s; // use as random seed
1481 w.config = i | mode;
1482 w.scanState = i; // publication fence
1483 ws[i] = w;
1484 }
1485 } finally {
1486 unlockRunState(rs, rs & ~RSLOCK);
1487 }
1488 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
1489 return w;
1490 }
1491
1492 /**
1493 * Final callback from terminating worker, as well as upon failure
1494 * to construct or start a worker. Removes record of worker from
1495 * array, and adjusts counts. If pool is shutting down, tries to
1496 * complete termination.
1497 *
1498 * @param wt the worker thread, or null if construction failed
1499 * @param ex the exception causing failure, or null if none
1500 */
1501 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1502 WorkQueue w = null;
1503 if (wt != null && (w = wt.workQueue) != null) {
1504 WorkQueue[] ws; // remove index from array
1505 int idx = w.config & SMASK;
1506 int rs = lockRunState();
1507 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
1508 ws[idx] = null;
1509 unlockRunState(rs, rs & ~RSLOCK);
1510 }
1511 long c; // decrement counts
1512 do {} while (!U.compareAndSwapLong
1513 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
1514 (TC_MASK & (c - TC_UNIT)) |
1515 (SP_MASK & c))));
1516 if (w != null) {
1517 w.qlock = -1; // ensure set
1518 w.cancelAll(); // cancel remaining tasks
1519 U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1520 }
1521 if (!tryTerminate(false, false) && w != null && w.array != null) {
1522 WorkQueue[] ws; int m, sp;
1523 while ((runState & STOP) == 0 &&
1524 (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1525 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
1526 if (tryRelease(c, ws[sp & m], AC_UNIT))
1527 break;
1528 }
1529 else if (ex != null && (c & ADD_WORKER) != 0L) {
1530 tryAddWorker(c); // create replacement
1531 break;
1532 }
1533 else
1534 break;
1535 }
1536 }
1537 if (ex == null) // help clean on way out
1538 ForkJoinTask.helpExpungeStaleExceptions();
1539 else // rethrow
1540 ForkJoinTask.rethrow(ex);
1541 }
1542
1543 // Signalling
1544
1545 /**
1546 * Tries to create or activate a worker if too few are active.
1547 *
1548 * @param ws the worker array to use to find signallees
1549 * @param q a WorkQueue --if non-null, don't retry if now empty
1550 */
1551 final void signalWork(WorkQueue[] ws, WorkQueue q) {
1552 long c; int sp, i; WorkQueue v; Thread p;
1553 while ((c = ctl) < 0L) {
1554 if ((sp = (int)c) == 0) { // no idle workers
1555 if ((c & ADD_WORKER) != 0L) // too few workers
1556 tryAddWorker(c);
1557 break;
1558 }
1559 if (ws == null) // unstarted/terminated
1560 break;
1561 if (ws.length <= (i = sp & SMASK)) // terminated
1562 break;
1563 if ((v = ws[i]) == null) // terminating
1564 break;
1565 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
1566 int d = sp - v.scanState; // screen CAS
1567 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
1568 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
1569 v.scanState = vs; // activate v
1570 if ((p = v.parker) != null)
1571 U.unpark(p);
1572 break;
1573 }
1574 if (q != null && q.base == q.top) // no more work
1575 break;
1576 }
1577 }
1578
1579 /**
1580 * Signals and releases worker v if it is top of idle worker
1581 * stack. This performs a one-shot version of signalWork only if
1582 * there is (apparently) at least one idle worker.
1583 *
1584 * @param c incoming ctl value
1585 * @param v if non-null, a worker
1586 * @param inc the increment to active count (zero when compensating)
1587 * @return true if successful
1588 */
1589 private boolean tryRelease(long c, WorkQueue v, long inc) {
1590 int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
1591 if (v != null && v.scanState == sp) { // v is at top of stack
1592 long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
1593 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1594 v.scanState = vs;
1595 if ((p = v.parker) != null)
1596 U.unpark(p);
1597 return true;
1598 }
1599 }
1600 return false;
1601 }
1602
1603 // Scanning for tasks
1604
1605 /**
1606 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1607 */
1608 final void runWorker(WorkQueue w) {
1609 w.growArray(); // allocate queue
1610 int seed = w.hint; // initially holds randomization hint
1611 int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
1612 for (ForkJoinTask<?> t;;) {
1613 if ((t = scan(w, r)) != null)
1614 w.runTask(t);
1615 else if (!awaitWork(w, r))
1616 break;
1617 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1618 }
1619 }
1620
1621 /**
1622 * Scans for and tries to steal a top-level task. Scans start at a
1623 * random location, randomly moving on apparent contention,
1624 * otherwise continuing linearly until reaching two consecutive
1625 * empty passes over all queues with the same checksum (summing
1626 * each base index of each queue, that moves on each steal), at
1627 * which point the worker tries to inactivate and then re-scans,
1628 * attempting to re-activate (itself or some other worker) if
1629 * finding a task; otherwise returning null to await work. Scans
1630 * otherwise touch as little memory as possible, to reduce
1631 * disruption on other scanning threads.
1632 *
1633 * @param w the worker (via its WorkQueue)
1634 * @param r a random seed
1635 * @return a task, or null if none found
1636 */
1637 private ForkJoinTask<?> scan(WorkQueue w, int r) {
1638 WorkQueue[] ws; int m;
1639 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
1640 int ss = w.scanState; // initially non-negative
1641 for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
1642 WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1643 int b, n; long c;
1644 if ((q = ws[k]) != null) {
1645 if ((n = (b = q.base) - q.top) < 0 &&
1646 (a = q.array) != null) { // non-empty
1647 long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1648 if ((t = ((ForkJoinTask<?>)
1649 U.getObjectVolatile(a, i))) != null &&
1650 q.base == b) {
1651 if (ss >= 0) {
1652 if (U.compareAndSwapObject(a, i, t, null)) {
1653 q.base = b + 1;
1654 if (n < -1) // signal others
1655 signalWork(ws, q);
1656 return t;
1657 }
1658 }
1659 else if (oldSum == 0 && // try to activate
1660 w.scanState < 0)
1661 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
1662 }
1663 if (ss < 0) // refresh
1664 ss = w.scanState;
1665 r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
1666 origin = k = r & m; // move and rescan
1667 oldSum = checkSum = 0;
1668 continue;
1669 }
1670 checkSum += b;
1671 }
1672 if ((k = (k + 1) & m) == origin) { // continue until stable
1673 if ((ss >= 0 || (ss == (ss = w.scanState))) &&
1674 oldSum == (oldSum = checkSum)) {
1675 if (ss < 0) // already inactive
1676 break;
1677 int ns = ss | INACTIVE; // try to inactivate
1678 long nc = ((SP_MASK & ns) |
1679 (UC_MASK & ((c = ctl) - AC_UNIT)));
1680 w.stackPred = (int)c; // hold prev stack top
1681 U.putInt(w, QSCANSTATE, ns);
1682 if (U.compareAndSwapLong(this, CTL, c, nc))
1683 ss = ns;
1684 else
1685 w.scanState = ss; // back out
1686 }
1687 checkSum = 0;
1688 }
1689 }
1690 }
1691 return null;
1692 }
1693
1694 /**
1695 * Possibly blocks worker w waiting for a task to steal, or
1696 * returns false if the worker should terminate. If inactivating
1697 * w has caused the pool to become quiescent, checks for pool
1698 * termination, and, so long as this is not the only worker, waits
1699 * for up to a given duration. On timeout, if ctl has not
1700 * changed, terminates the worker, which will in turn wake up
1701 * another worker to possibly repeat this process.
1702 *
1703 * @param w the calling worker
1704 * param r a random seed (for spins)
1705 * @return false if the worker should terminate
1706 */
1707 private boolean awaitWork(WorkQueue w, int r) {
1708 if (w == null || w.qlock < 0) // w is terminating
1709 return false;
1710 for (int pred = w.stackPred, spins = SPINS, ss;;) {
1711 if ((ss = w.scanState) >= 0)
1712 break;
1713 else if (spins > 0) {
1714 r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
1715 if (r >= 0 && --spins == 0) { // randomize spins
1716 WorkQueue v; WorkQueue[] ws; int s, j;
1717 if (pred != 0 && (ws = workQueues) != null &&
1718 (j = pred & SMASK) < ws.length &&
1719 (v = ws[j]) != null && // see if pred parking
1720 (v.parker == null || v.scanState >= 0))
1721 spins = SPINS; // continue spinning
1722 else if ((s = w.nsteals) != 0) {
1723 w.nsteals = 0; // collect steals
1724 U.getAndAddLong(this, STEALCOUNT, s);
1725 }
1726 }
1727 }
1728 else if (w.qlock < 0) // recheck after spins
1729 return false;
1730 else if (!Thread.interrupted()) {
1731 long c, prevctl, parkTime, deadline;
1732 if ((runState & STOP) != 0) // pool terminating
1733 return false;
1734 int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
1735 if (ac <= 0 && tryTerminate(false, false))
1736 return false;
1737 if (ac <= 0 && ss == (int)c) { // is last waiter
1738 prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
1739 int t = (short)(c >>> TC_SHIFT); // shrink excess spares
1740 if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
1741 return false;
1742 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
1743 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1744 }
1745 else
1746 prevctl = parkTime = deadline = 0L;
1747 Thread wt = Thread.currentThread();
1748 U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
1749 w.parker = wt;
1750 if (w.scanState < 0 && ctl == c) // recheck before park
1751 U.park(false, parkTime);
1752 U.putOrderedObject(w, QPARKER, null);
1753 U.putObject(wt, PARKBLOCKER, null);
1754 if (w.scanState >= 0)
1755 break;
1756 if (parkTime != 0L && ctl == c &&
1757 deadline - System.nanoTime() <= 0L &&
1758 U.compareAndSwapLong(this, CTL, c, prevctl))
1759 return false; // shrink pool
1760 }
1761 }
1762 return true;
1763 }
1764
1765 // Joining tasks
1766
1767 /**
1768 * Tries to steal and run tasks within the target's computation.
1769 * Uses a variant of the top-level algorithm, restricted to tasks
1770 * with the given task as ancestor: It prefers taking and running
1771 * eligible tasks popped from the worker's own queue (via
1772 * popCC). Otherwise it scans others, randomly moving on
1773 * contention or execution, deciding to give up based on a
1774 * checksum (via return codes frob pollAndExecCC). The maxTasks
1775 * argument supports external usages; internal calls use zero,
1776 * allowing unbounded steps (external calls trap non-positive
1777 * values).
1778 *
1779 * @param w caller
1780 * @param maxTasks if non-zero, the maximum number of other tasks to run
1781 * @return task status on exit
1782 */
1783 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1784 int maxTasks) {
1785 WorkQueue[] ws; int s = 0, m;
1786 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1787 task != null && w != null) {
1788 int mode = w.config; // for popCC
1789 int r = w.hint ^ w.top; // arbitrary seed for origin
1790 int origin = r & m; // first queue to scan
1791 int h = 1; // 1:ran, >1:contended, <0:hash
1792 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1793 CountedCompleter<?> p; WorkQueue q;
1794 if ((s = task.status) < 0)
1795 break;
1796 if (h == 1 && (p = w.popCC(task, mode)) != null) {
1797 p.doExec(); // run local task
1798 if (maxTasks != 0 && --maxTasks == 0)
1799 break;
1800 origin = k; // reset
1801 oldSum = checkSum = 0;
1802 }
1803 else { // poll other queues
1804 if ((q = ws[k]) == null)
1805 h = 0;
1806 else if ((h = q.pollAndExecCC(task)) < 0)
1807 checkSum += h;
1808 if (h > 0) {
1809 if (h == 1 && maxTasks != 0 && --maxTasks == 0)
1810 break;
1811 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1812 origin = k = r & m; // move and restart
1813 oldSum = checkSum = 0;
1814 }
1815 else if ((k = (k + 1) & m) == origin) {
1816 if (oldSum == (oldSum = checkSum))
1817 break;
1818 checkSum = 0;
1819 }
1820 }
1821 }
1822 }
1823 return s;
1824 }
1825
1826 /**
1827 * Tries to locate and execute tasks for a stealer of the given
1828 * task, or in turn one of its stealers, Traces currentSteal ->
1829 * currentJoin links looking for a thread working on a descendant
1830 * of the given task and with a non-empty queue to steal back and
1831 * execute tasks from. The first call to this method upon a
1832 * waiting join will often entail scanning/search, (which is OK
1833 * because the joiner has nothing better to do), but this method
1834 * leaves hints in workers to speed up subsequent calls.
1835 *
1836 * @param w caller
1837 * @param task the task to join
1838 */
1839 private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
1840 WorkQueue[] ws = workQueues;
1841 int oldSum = 0, checkSum, m;
1842 if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
1843 task != null) {
1844 do { // restart point
1845 checkSum = 0; // for stability check
1846 ForkJoinTask<?> subtask;
1847 WorkQueue j = w, v; // v is subtask stealer
1848 descent: for (subtask = task; subtask.status >= 0; ) {
1849 for (int h = j.hint | 1, k = 0, i; ; k += 2) {
1850 if (k > m) // can't find stealer
1851 break descent;
1852 if ((v = ws[i = (h + k) & m]) != null) {
1853 if (v.currentSteal == subtask) {
1854 j.hint = i;
1855 break;
1856 }
1857 checkSum += v.base;
1858 }
1859 }
1860 for (;;) { // help v or descend
1861 ForkJoinTask<?>[] a; int b;
1862 checkSum += (b = v.base);
1863 ForkJoinTask<?> next = v.currentJoin;
1864 if (subtask.status < 0 || j.currentJoin != subtask ||
1865 v.currentSteal != subtask) // stale
1866 break descent;
1867 if (b - v.top >= 0 || (a = v.array) == null) {
1868 if ((subtask = next) == null)
1869 break descent;
1870 j = v;
1871 break;
1872 }
1873 int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1874 ForkJoinTask<?> t = ((ForkJoinTask<?>)
1875 U.getObjectVolatile(a, i));
1876 if (v.base == b) {
1877 if (t == null) // stale
1878 break descent;
1879 if (U.compareAndSwapObject(a, i, t, null)) {
1880 v.base = b + 1;
1881 ForkJoinTask<?> ps = w.currentSteal;
1882 int top = w.top;
1883 do {
1884 U.putOrderedObject(w, QCURRENTSTEAL, t);
1885 t.doExec(); // clear local tasks too
1886 } while (task.status >= 0 &&
1887 w.top != top &&
1888 (t = w.pop()) != null);
1889 U.putOrderedObject(w, QCURRENTSTEAL, ps);
1890 if (w.base != w.top)
1891 return; // can't further help
1892 }
1893 }
1894 }
1895 }
1896 } while (task.status >= 0 && oldSum != (oldSum = checkSum));
1897 }
1898 }
1899
1900 /**
1901 * Tries to decrement active count (sometimes implicitly) and
1902 * possibly release or create a compensating worker in preparation
1903 * for blocking. Returns false (retryable by caller), on
1904 * contention, detected staleness, instability or termination.
1905 *
1906 * @param w caller
1907 */
1908 private boolean tryCompensate(WorkQueue w) {
1909 boolean canBlock;
1910 WorkQueue[] ws; long c; int m, pc, sp;
1911 if (w == null || w.qlock < 0 || // caller terminating
1912 (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
1913 (pc = config & SMASK) == 0) // parallelism disabled
1914 canBlock = false;
1915 else if ((sp = (int)(c = ctl)) != 0) // release idle worker
1916 canBlock = tryRelease(c, ws[sp & m], 0L);
1917 else {
1918 int ac = (int)(c >> AC_SHIFT) + pc;
1919 int tc = (short)(c >> TC_SHIFT) + pc;
1920 int nbusy = 0; // validate saturation
1921 for (int i = 0; i <= m; ++i) { // two passes of odd indices
1922 WorkQueue v;
1923 if ((v = ws[((i << 1) | 1) & m]) != null) {
1924 if ((v.scanState & SCANNING) != 0)
1925 break;
1926 ++nbusy;
1927 }
1928 }
1929 if (nbusy != (tc << 1) || ctl != c)
1930 canBlock = false; // unstable or stale
1931 else if (tc >= pc && ac > 1 && w.isEmpty()) {
1932 long nc = ((AC_MASK & (c - AC_UNIT)) |
1933 (~AC_MASK & c)); // uncompensated
1934 canBlock = U.compareAndSwapLong(this, CTL, c, nc);
1935 }
1936 else if (tc >= MAX_CAP || tc >= pc + MAX_SPARES)
1937 throw new RejectedExecutionException(
1938 "Thread limit exceeded replacing blocked worker");
1939 else { // similar to tryAddWorker
1940 boolean add = false; int rs; // CAS within lock
1941 long nc = ((AC_MASK & c) |
1942 (TC_MASK & (c + TC_UNIT)));
1943 if (((rs = lockRunState()) & STOP) == 0)
1944 add = U.compareAndSwapLong(this, CTL, c, nc);
1945 unlockRunState(rs, rs & ~RSLOCK);
1946 canBlock = add && createWorker(); // throws on exception
1947 }
1948 }
1949 return canBlock;
1950 }
1951
1952 /**
1953 * Helps and/or blocks until the given task is done or timeout.
1954 *
1955 * @param w caller
1956 * @param task the task
1957 * @param if nonzero, deadline for timed waits
1958 * @return task status on exit
1959 */
1960 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1961 int s = 0;
1962 if (task != null && w != null) {
1963 ForkJoinTask<?> prevJoin = w.currentJoin;
1964 U.putOrderedObject(w, QCURRENTJOIN, task);
1965 CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
1966 (CountedCompleter<?>)task : null;
1967 for (;;) {
1968 if ((s = task.status) < 0)
1969 break;
1970 if (cc != null)
1971 helpComplete(w, cc, 0);
1972 else if (w.base == w.top || w.tryRemoveAndExec(task))
1973 helpStealer(w, task);
1974 if ((s = task.status) < 0)
1975 break;
1976 long ms, ns;
1977 if (deadline == 0L)
1978 ms = 0L;
1979 else if ((ns = deadline - System.nanoTime()) <= 0L)
1980 break;
1981 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1982 ms = 1L;
1983 if (tryCompensate(w)) {
1984 task.internalWait(ms);
1985 U.getAndAddLong(this, CTL, AC_UNIT);
1986 }
1987 }
1988 U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
1989 }
1990 return s;
1991 }
1992
1993 // Specialized scanning
1994
1995 /**
1996 * Returns a (probably) non-empty steal queue, if one is found
1997 * during a scan, else null. This method must be retried by
1998 * caller if, by the time it tries to use the queue, it is empty.
1999 */
2000 private WorkQueue findNonEmptyStealQueue() {
2001 int r = ThreadLocalRandom.nextSecondarySeed(), oldSum = 0, checkSum;
2002 do {
2003 checkSum = 0;
2004 WorkQueue[] ws; WorkQueue q; int m, k, b;
2005 if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
2006 for (int i = 0; i <= m; ++i) {
2007 if ((k = (i + r + m) & m) <= m && k >= 0 &&
2008 (q = ws[k]) != null) {
2009 if ((b = q.base) - q.top < 0)
2010 return q;
2011 checkSum += b;
2012 }
2013 }
2014 }
2015 } while (oldSum != (oldSum = checkSum));
2016 return null;
2017 }
2018
2019 /**
2020 * Runs tasks until {@code isQuiescent()}. We piggyback on
2021 * active count ctl maintenance, but rather than blocking
2022 * when tasks cannot be found, we rescan until all others cannot
2023 * find tasks either.
2024 */
2025 final void helpQuiescePool(WorkQueue w) {
2026 for (boolean active = true;;) {
2027 long c; WorkQueue q; ForkJoinTask<?> t; int b;
2028 while ((t = w.nextLocalTask()) != null)
2029 t.doExec();
2030 if ((q = findNonEmptyStealQueue()) != null) {
2031 if (!active) { // re-establish active count
2032 active = true;
2033 U.getAndAddLong(this, CTL, AC_UNIT);
2034 }
2035 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2036 ForkJoinTask<?> ps = w.currentSteal;
2037 U.putOrderedObject(w, QCURRENTSTEAL, t);
2038 t.doExec();
2039 U.putOrderedObject(w, QCURRENTSTEAL, ps);
2040 ++w.nsteals;
2041 }
2042 }
2043 else if (active) { // decrement active count without queuing
2044 long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
2045 if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
2046 break; // bypass decrement-then-increment
2047 if (U.compareAndSwapLong(this, CTL, c, nc))
2048 active = false;
2049 }
2050 else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
2051 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2052 break;
2053 }
2054 }
2055
2056 /**
2057 * Gets and removes a local or stolen task for the given worker.
2058 *
2059 * @return a task, if available
2060 */
2061 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2062 for (ForkJoinTask<?> t;;) {
2063 WorkQueue q; int b;
2064 if ((t = w.nextLocalTask()) != null)
2065 return t;
2066 if ((q = findNonEmptyStealQueue()) == null)
2067 return null;
2068 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2069 return t;
2070 }
2071 }
2072
2073 /**
2074 * Returns a cheap heuristic guide for task partitioning when
2075 * programmers, frameworks, tools, or languages have little or no
2076 * idea about task granularity. In essence by offering this
2077 * method, we ask users only about tradeoffs in overhead vs
2078 * expected throughput and its variance, rather than how finely to
2079 * partition tasks.
2080 *
2081 * In a steady state strict (tree-structured) computation, each
2082 * thread makes available for stealing enough tasks for other
2083 * threads to remain active. Inductively, if all threads play by
2084 * the same rules, each thread should make available only a
2085 * constant number of tasks.
2086 *
2087 * The minimum useful constant is just 1. But using a value of 1
2088 * would require immediate replenishment upon each steal to
2089 * maintain enough tasks, which is infeasible. Further,
2090 * partitionings/granularities of offered tasks should minimize
2091 * steal rates, which in general means that threads nearer the top
2092 * of computation tree should generate more than those nearer the
2093 * bottom. In perfect steady state, each thread is at
2094 * approximately the same level of computation tree. However,
2095 * producing extra tasks amortizes the uncertainty of progress and
2096 * diffusion assumptions.
2097 *
2098 * So, users will want to use values larger (but not much larger)
2099 * than 1 to both smooth over transient shortages and hedge
2100 * against uneven progress; as traded off against the cost of
2101 * extra task overhead. We leave the user to pick a threshold
2102 * value to compare with the results of this call to guide
2103 * decisions, but recommend values such as 3.
2104 *
2105 * When all threads are active, it is on average OK to estimate
2106 * surplus strictly locally. In steady-state, if one thread is
2107 * maintaining say 2 surplus tasks, then so are others. So we can
2108 * just use estimated queue length. However, this strategy alone
2109 * leads to serious mis-estimates in some non-steady-state
2110 * conditions (ramp-up, ramp-down, other stalls). We can detect
2111 * many of these by further considering the number of "idle"
2112 * threads, that are known to have zero queued tasks, so
2113 * compensate by a factor of (#idle/#active) threads.
2114 */
2115 static int getSurplusQueuedTaskCount() {
2116 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2117 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2118 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).
2119 config & SMASK;
2120 int n = (q = wt.workQueue).top - q.base;
2121 int a = (int)(pool.ctl >> AC_SHIFT) + p;
2122 return n - (a > (p >>>= 1) ? 0 :
2123 a > (p >>>= 1) ? 1 :
2124 a > (p >>>= 1) ? 2 :
2125 a > (p >>>= 1) ? 4 :
2126 8);
2127 }
2128 return 0;
2129 }
2130
2131 // Termination
2132
2133 /**
2134 * Possibly initiates and/or completes termination. When
2135 * terminating (STOP phase), runs three passes through workQueues:
2136 * (0) Setting termination status (which also stops external
2137 * submitters by locking queues), (1) cancelling all tasks; (2)
2138 * interrupting lagging threads (likely in external tasks, but
2139 * possibly also blocked in joins). Each pass repeats previous
2140 * steps because of potential lagging thread creation.
2141 *
2142 * @param now if true, unconditionally terminate, else only
2143 * if no work and no active workers
2144 * @param enable if true, enable shutdown when next possible
2145 * @return true if now terminating or terminated
2146 */
2147 private boolean tryTerminate(boolean now, boolean enable) {
2148 int rs;
2149 if (this == common) // cannot shut down
2150 return false;
2151 if ((rs = runState) >= 0) { // else already shutdown
2152 if (!enable)
2153 return false;
2154 rs = lockRunState(); // enable
2155 unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
2156 }
2157 if ((rs & STOP) == 0) {
2158 if (!now) {
2159 if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2160 return false;
2161 WorkQueue[] ws; WorkQueue w; // check external submissions
2162 if ((ws = workQueues) != null) {
2163 for (int i = 0; i < ws.length; ++i) {
2164 if ((w = ws[i]) != null &&
2165 (!w.isEmpty() ||
2166 ((i & 1) != 0 && w.scanState >= 0))) {
2167 signalWork(ws, w);
2168 return false;
2169 }
2170 }
2171 }
2172 if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2173 return false; // recheck
2174 }
2175 rs = lockRunState(); // enter STOP phase
2176 unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2177 }
2178 for (int pass = 0; pass < 3; ++pass) { // clobber other workers
2179 WorkQueue[] ws; int n;
2180 if ((ws = workQueues) != null && (n = ws.length) > 0) {
2181 WorkQueue w; Thread wt;
2182 for (int i = 0; i < n; ++i) {
2183 if ((w = ws[i]) != null) {
2184 w.qlock = -1;
2185 if (pass > 0) {
2186 w.cancelAll(); // clear queue
2187 if (pass > 1 && (wt = w.owner) != null) {
2188 if (!wt.isInterrupted()) {
2189 try {
2190 wt.interrupt();
2191 } catch (Throwable ignore) {
2192 }
2193 }
2194 U.unpark(wt); // wake up
2195 }
2196 }
2197 }
2198 }
2199 }
2200 }
2201 if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2202 rs = lockRunState(); // done -- no more workers
2203 unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
2204 synchronized (this) { // release awaitTermination
2205 notifyAll();
2206 }
2207 }
2208 return true;
2209 }
2210
2211 // External operations
2212
2213 /**
2214 * Full version of externalPush, handling uncommon cases, as well
2215 * as performing secondary initialization upon the first
2216 * submission of the first task to the pool. It also detects
2217 * first submission by an external thread and creates a new shared
2218 * queue if the one at index if empty or contended.
2219 *
2220 * @param task the task. Caller must ensure non-null.
2221 */
2222 private void externalSubmit(ForkJoinTask<?> task) {
2223 int r; // initialize caller's probe
2224 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2225 ThreadLocalRandom.localInit();
2226 r = ThreadLocalRandom.getProbe();
2227 }
2228 for (;;) {
2229 WorkQueue[] ws; WorkQueue q; int rs, m, k;
2230 boolean move = false;
2231 if ((rs = runState) < 0)
2232 throw new RejectedExecutionException();
2233 else if ((rs & STARTED) == 0 || // initialize workQueues array
2234 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
2235 int ns = 0;
2236 rs = lockRunState();
2237 try {
2238 if ((rs & STARTED) == 0) { // find power of two table size
2239 int p = config & SMASK; // ensure at least 2 slots
2240 int n = (p > 1) ? p - 1 : 1;
2241 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
2242 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
2243 workQueues = new WorkQueue[n];
2244 ns = STARTED;
2245 }
2246 } finally {
2247 unlockRunState(rs, (rs & ~RSLOCK) | ns);
2248 }
2249 }
2250 else if ((q = ws[k = r & m & SQMASK]) != null) {
2251 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2252 ForkJoinTask<?>[] a = q.array;
2253 int s = q.top;
2254 boolean submitted = false; // initial submission or resizing
2255 try { // locked version of push
2256 if ((a != null && a.length > s + 1 - q.base) ||
2257 (a = q.growArray()) != null) {
2258 int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
2259 U.putOrderedObject(a, j, task);
2260 U.putOrderedInt(q, QTOP, s + 1);
2261 submitted = true;
2262 }
2263 } finally {
2264 q.qlock = 0;
2265 }
2266 if (submitted) {
2267 signalWork(ws, q);
2268 return;
2269 }
2270 }
2271 move = true; // move on failure
2272 }
2273 else if (((rs = runState) & RSLOCK) == 0) { // create new queue
2274 q = new WorkQueue(this, null);
2275 q.hint = r;
2276 q.config = k | SHARED_QUEUE;
2277 rs = lockRunState(); // publish index
2278 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
2279 ws[k] = q; // else terminated
2280 unlockRunState(rs, rs & ~RSLOCK);
2281 }
2282 else
2283 move = true; // move if busy
2284 if (move)
2285 r = ThreadLocalRandom.advanceProbe(r);
2286 }
2287 }
2288
2289 /**
2290 * Tries to add the given task to a submission queue at
2291 * submitter's current queue. Only the (vastly) most common path
2292 * is directly handled in this method, while screening for need
2293 * for externalSubmit.
2294 *
2295 * @param task the task. Caller must ensure non-null.
2296 */
2297 final void externalPush(ForkJoinTask<?> task) {
2298 WorkQueue[] ws; WorkQueue q; int m;
2299 int r = ThreadLocalRandom.getProbe();
2300 int rs = runState;
2301 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
2302 (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
2303 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2304 ForkJoinTask<?>[] a; int am, n, s;
2305 if ((a = q.array) != null &&
2306 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
2307 int j = ((am & s) << ASHIFT) + ABASE;
2308 U.putOrderedObject(a, j, task);
2309 U.putOrderedInt(q, QTOP, s + 1);
2310 U.putOrderedInt(q, QLOCK, 0);
2311 if (n <= 1)
2312 signalWork(ws, q);
2313 return;
2314 }
2315 q.qlock = 0;
2316 }
2317 externalSubmit(task);
2318 }
2319
2320 /**
2321 * Returns common pool queue for an external thread
2322 */
2323 static WorkQueue commonSubmitterQueue() {
2324 ForkJoinPool p = common;
2325 int r = ThreadLocalRandom.getProbe();
2326 WorkQueue[] ws; int m;
2327 return (p != null && (ws = p.workQueues) != null &&
2328 (m = ws.length - 1) >= 0) ?
2329 ws[m & r & SQMASK] : null;
2330 }
2331
2332 /**
2333 * Performs tryUnpush for an external submitter: Finds queue,
2334 * locks if apparently non-empty, validates upon locking, and
2335 * adjusts top. Each check can fail but rarely does.
2336 */
2337 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2338 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
2339 int r = ThreadLocalRandom.getProbe();
2340 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
2341 (w = ws[m & r & SQMASK]) != null &&
2342 (a = w.array) != null && (s = w.top) != w.base) {
2343 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2344 if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
2345 if (w.top == s && w.array == a &&
2346 U.getObject(a, j) == task &&
2347 U.compareAndSwapObject(a, j, task, null)) {
2348 U.putOrderedInt(w, QTOP, s - 1);
2349 U.putOrderedInt(w, QLOCK, 0);
2350 return true;
2351 }
2352 w.qlock = 0;
2353 }
2354 }
2355 return false;
2356 }
2357
2358 /**
2359 * Performs helpComplete for an external submitter
2360 */
2361 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2362 WorkQueue[] ws; int n;
2363 int r = ThreadLocalRandom.getProbe();
2364 return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
2365 helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
2366 }
2367
2368 // Exported methods
2369
2370 // Constructors
2371
2372 /**
2373 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2374 * java.lang.Runtime#availableProcessors}, using the {@linkplain
2375 * #defaultForkJoinWorkerThreadFactory default thread factory},
2376 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2377 *
2378 * @throws SecurityException if a security manager exists and
2379 * the caller is not permitted to modify threads
2380 * because it does not hold {@link
2381 * java.lang.RuntimePermission}{@code ("modifyThread")}
2382 */
2383 public ForkJoinPool() {
2384 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2385 defaultForkJoinWorkerThreadFactory, null, false);
2386 }
2387
2388 /**
2389 * Creates a {@code ForkJoinPool} with the indicated parallelism
2390 * level, the {@linkplain
2391 * #defaultForkJoinWorkerThreadFactory default thread factory},
2392 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2393 *
2394 * @param parallelism the parallelism level
2395 * @throws IllegalArgumentException if parallelism less than or
2396 * equal to zero, or greater than implementation limit
2397 * @throws SecurityException if a security manager exists and
2398 * the caller is not permitted to modify threads
2399 * because it does not hold {@link
2400 * java.lang.RuntimePermission}{@code ("modifyThread")}
2401 */
2402 public ForkJoinPool(int parallelism) {
2403 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2404 }
2405
2406 /**
2407 * Creates a {@code ForkJoinPool} with the given parameters.
2408 *
2409 * @param parallelism the parallelism level. For default value,
2410 * use {@link java.lang.Runtime#availableProcessors}.
2411 * @param factory the factory for creating new threads. For default value,
2412 * use {@link #defaultForkJoinWorkerThreadFactory}.
2413 * @param handler the handler for internal worker threads that
2414 * terminate due to unrecoverable errors encountered while executing
2415 * tasks. For default value, use {@code null}.
2416 * @param asyncMode if true,
2417 * establishes local first-in-first-out scheduling mode for forked
2418 * tasks that are never joined. This mode may be more appropriate
2419 * than default locally stack-based mode in applications in which
2420 * worker threads only process event-style asynchronous tasks.
2421 * For default value, use {@code false}.
2422 * @throws IllegalArgumentException if parallelism less than or
2423 * equal to zero, or greater than implementation limit
2424 * @throws NullPointerException if the factory is null
2425 * @throws SecurityException if a security manager exists and
2426 * the caller is not permitted to modify threads
2427 * because it does not hold {@link
2428 * java.lang.RuntimePermission}{@code ("modifyThread")}
2429 */
2430 public ForkJoinPool(int parallelism,
2431 ForkJoinWorkerThreadFactory factory,
2432 UncaughtExceptionHandler handler,
2433 boolean asyncMode) {
2434 this(checkParallelism(parallelism),
2435 checkFactory(factory),
2436 handler,
2437 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
2438 "ForkJoinPool-" + nextPoolId() + "-worker-");
2439 checkPermission();
2440 }
2441
2442 private static int checkParallelism(int parallelism) {
2443 if (parallelism <= 0 || parallelism > MAX_CAP)
2444 throw new IllegalArgumentException();
2445 return parallelism;
2446 }
2447
2448 private static ForkJoinWorkerThreadFactory checkFactory
2449 (ForkJoinWorkerThreadFactory factory) {
2450 if (factory == null)
2451 throw new NullPointerException();
2452 return factory;
2453 }
2454
2455 /**
2456 * Creates a {@code ForkJoinPool} with the given parameters, without
2457 * any security checks or parameter validation. Invoked directly by
2458 * makeCommonPool.
2459 */
2460 private ForkJoinPool(int parallelism,
2461 ForkJoinWorkerThreadFactory factory,
2462 UncaughtExceptionHandler handler,
2463 int mode,
2464 String workerNamePrefix) {
2465 this.workerNamePrefix = workerNamePrefix;
2466 this.factory = factory;
2467 this.ueh = handler;
2468 this.config = (parallelism & SMASK) | mode;
2469 long np = (long)(-parallelism); // offset ctl counts
2470 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2471 }
2472
2473 /**
2474 * Returns the common pool instance. This pool is statically
2475 * constructed; its run state is unaffected by attempts to {@link
2476 * #shutdown} or {@link #shutdownNow}. However this pool and any
2477 * ongoing processing are automatically terminated upon program
2478 * {@link System#exit}. Any program that relies on asynchronous
2479 * task processing to complete before program termination should
2480 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2481 * before exit.
2482 *
2483 * @return the common pool instance
2484 * @since 1.8
2485 */
2486 public static ForkJoinPool commonPool() {
2487 // assert common != null : "static init error";
2488 return common;
2489 }
2490
2491 // Execution methods
2492
2493 /**
2494 * Performs the given task, returning its result upon completion.
2495 * If the computation encounters an unchecked Exception or Error,
2496 * it is rethrown as the outcome of this invocation. Rethrown
2497 * exceptions behave in the same way as regular exceptions, but,
2498 * when possible, contain stack traces (as displayed for example
2499 * using {@code ex.printStackTrace()}) of both the current thread
2500 * as well as the thread actually encountering the exception;
2501 * minimally only the latter.
2502 *
2503 * @param task the task
2504 * @param <T> the type of the task's result
2505 * @return the task's result
2506 * @throws NullPointerException if the task is null
2507 * @throws RejectedExecutionException if the task cannot be
2508 * scheduled for execution
2509 */
2510 public <T> T invoke(ForkJoinTask<T> task) {
2511 if (task == null)
2512 throw new NullPointerException();
2513 externalPush(task);
2514 return task.join();
2515 }
2516
2517 /**
2518 * Arranges for (asynchronous) execution of the given task.
2519 *
2520 * @param task the task
2521 * @throws NullPointerException if the task is null
2522 * @throws RejectedExecutionException if the task cannot be
2523 * scheduled for execution
2524 */
2525 public void execute(ForkJoinTask<?> task) {
2526 if (task == null)
2527 throw new NullPointerException();
2528 externalPush(task);
2529 }
2530
2531 // AbstractExecutorService methods
2532
2533 /**
2534 * @throws NullPointerException if the task is null
2535 * @throws RejectedExecutionException if the task cannot be
2536 * scheduled for execution
2537 */
2538 public void execute(Runnable task) {
2539 if (task == null)
2540 throw new NullPointerException();
2541 ForkJoinTask<?> job;
2542 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2543 job = (ForkJoinTask<?>) task;
2544 else
2545 job = new ForkJoinTask.RunnableExecuteAction(task);
2546 externalPush(job);
2547 }
2548
2549 /**
2550 * Submits a ForkJoinTask for execution.
2551 *
2552 * @param task the task to submit
2553 * @param <T> the type of the task's result
2554 * @return the task
2555 * @throws NullPointerException if the task is null
2556 * @throws RejectedExecutionException if the task cannot be
2557 * scheduled for execution
2558 */
2559 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2560 if (task == null)
2561 throw new NullPointerException();
2562 externalPush(task);
2563 return task;
2564 }
2565
2566 /**
2567 * @throws NullPointerException if the task is null
2568 * @throws RejectedExecutionException if the task cannot be
2569 * scheduled for execution
2570 */
2571 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2572 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2573 externalPush(job);
2574 return job;
2575 }
2576
2577 /**
2578 * @throws NullPointerException if the task is null
2579 * @throws RejectedExecutionException if the task cannot be
2580 * scheduled for execution
2581 */
2582 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2583 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2584 externalPush(job);
2585 return job;
2586 }
2587
2588 /**
2589 * @throws NullPointerException if the task is null
2590 * @throws RejectedExecutionException if the task cannot be
2591 * scheduled for execution
2592 */
2593 public ForkJoinTask<?> submit(Runnable task) {
2594 if (task == null)
2595 throw new NullPointerException();
2596 ForkJoinTask<?> job;
2597 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2598 job = (ForkJoinTask<?>) task;
2599 else
2600 job = new ForkJoinTask.AdaptedRunnableAction(task);
2601 externalPush(job);
2602 return job;
2603 }
2604
2605 /**
2606 * @throws NullPointerException {@inheritDoc}
2607 * @throws RejectedExecutionException {@inheritDoc}
2608 */
2609 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2610 // In previous versions of this class, this method constructed
2611 // a task to run ForkJoinTask.invokeAll, but now external
2612 // invocation of multiple tasks is at least as efficient.
2613 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2614
2615 boolean done = false;
2616 try {
2617 for (Callable<T> t : tasks) {
2618 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2619 futures.add(f);
2620 externalPush(f);
2621 }
2622 for (int i = 0, size = futures.size(); i < size; i++)
2623 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2624 done = true;
2625 return futures;
2626 } finally {
2627 if (!done)
2628 for (int i = 0, size = futures.size(); i < size; i++)
2629 futures.get(i).cancel(false);
2630 }
2631 }
2632
2633 /**
2634 * Returns the factory used for constructing new workers.
2635 *
2636 * @return the factory used for constructing new workers
2637 */
2638 public ForkJoinWorkerThreadFactory getFactory() {
2639 return factory;
2640 }
2641
2642 /**
2643 * Returns the handler for internal worker threads that terminate
2644 * due to unrecoverable errors encountered while executing tasks.
2645 *
2646 * @return the handler, or {@code null} if none
2647 */
2648 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2649 return ueh;
2650 }
2651
2652 /**
2653 * Returns the targeted parallelism level of this pool.
2654 *
2655 * @return the targeted parallelism level of this pool
2656 */
2657 public int getParallelism() {
2658 int par;
2659 return ((par = config & SMASK) > 0) ? par : 1;
2660 }
2661
2662 /**
2663 * Returns the targeted parallelism level of the common pool.
2664 *
2665 * @return the targeted parallelism level of the common pool
2666 * @since 1.8
2667 */
2668 public static int getCommonPoolParallelism() {
2669 return commonParallelism;
2670 }
2671
2672 /**
2673 * Returns the number of worker threads that have started but not
2674 * yet terminated. The result returned by this method may differ
2675 * from {@link #getParallelism} when threads are created to
2676 * maintain parallelism when others are cooperatively blocked.
2677 *
2678 * @return the number of worker threads
2679 */
2680 public int getPoolSize() {
2681 return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2682 }
2683
2684 /**
2685 * Returns {@code true} if this pool uses local first-in-first-out
2686 * scheduling mode for forked tasks that are never joined.
2687 *
2688 * @return {@code true} if this pool uses async mode
2689 */
2690 public boolean getAsyncMode() {
2691 return (config & FIFO_QUEUE) != 0;
2692 }
2693
2694 /**
2695 * Returns an estimate of the number of worker threads that are
2696 * not blocked waiting to join tasks or for other managed
2697 * synchronization. This method may overestimate the
2698 * number of running threads.
2699 *
2700 * @return the number of worker threads
2701 */
2702 public int getRunningThreadCount() {
2703 int rc = 0;
2704 WorkQueue[] ws; WorkQueue w;
2705 if ((ws = workQueues) != null) {
2706 for (int i = 1; i < ws.length; i += 2) {
2707 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2708 ++rc;
2709 }
2710 }
2711 return rc;
2712 }
2713
2714 /**
2715 * Returns an estimate of the number of threads that are currently
2716 * stealing or executing tasks. This method may overestimate the
2717 * number of active threads.
2718 *
2719 * @return the number of active threads
2720 */
2721 public int getActiveThreadCount() {
2722 int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2723 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2724 }
2725
2726 /**
2727 * Returns {@code true} if all worker threads are currently idle.
2728 * An idle worker is one that cannot obtain a task to execute
2729 * because none are available to steal from other threads, and
2730 * there are no pending submissions to the pool. This method is
2731 * conservative; it might not return {@code true} immediately upon
2732 * idleness of all threads, but will eventually become true if
2733 * threads remain inactive.
2734 *
2735 * @return {@code true} if all threads are currently idle
2736 */
2737 public boolean isQuiescent() {
2738 return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
2739 }
2740
2741 /**
2742 * Returns an estimate of the total number of tasks stolen from
2743 * one thread's work queue by another. The reported value
2744 * underestimates the actual total number of steals when the pool
2745 * is not quiescent. This value may be useful for monitoring and
2746 * tuning fork/join programs: in general, steal counts should be
2747 * high enough to keep threads busy, but low enough to avoid
2748 * overhead and contention across threads.
2749 *
2750 * @return the number of steals
2751 */
2752 public long getStealCount() {
2753 long count = stealCount;
2754 WorkQueue[] ws; WorkQueue w;
2755 if ((ws = workQueues) != null) {
2756 for (int i = 1; i < ws.length; i += 2) {
2757 if ((w = ws[i]) != null)
2758 count += w.nsteals;
2759 }
2760 }
2761 return count;
2762 }
2763
2764 /**
2765 * Returns an estimate of the total number of tasks currently held
2766 * in queues by worker threads (but not including tasks submitted
2767 * to the pool that have not begun executing). This value is only
2768 * an approximation, obtained by iterating across all threads in
2769 * the pool. This method may be useful for tuning task
2770 * granularities.
2771 *
2772 * @return the number of queued tasks
2773 */
2774 public long getQueuedTaskCount() {
2775 long count = 0;
2776 WorkQueue[] ws; WorkQueue w;
2777 if ((ws = workQueues) != null) {
2778 for (int i = 1; i < ws.length; i += 2) {
2779 if ((w = ws[i]) != null)
2780 count += w.queueSize();
2781 }
2782 }
2783 return count;
2784 }
2785
2786 /**
2787 * Returns an estimate of the number of tasks submitted to this
2788 * pool that have not yet begun executing. This method may take
2789 * time proportional to the number of submissions.
2790 *
2791 * @return the number of queued submissions
2792 */
2793 public int getQueuedSubmissionCount() {
2794 int count = 0;
2795 WorkQueue[] ws; WorkQueue w;
2796 if ((ws = workQueues) != null) {
2797 for (int i = 0; i < ws.length; i += 2) {
2798 if ((w = ws[i]) != null)
2799 count += w.queueSize();
2800 }
2801 }
2802 return count;
2803 }
2804
2805 /**
2806 * Returns {@code true} if there are any tasks submitted to this
2807 * pool that have not yet begun executing.
2808 *
2809 * @return {@code true} if there are any queued submissions
2810 */
2811 public boolean hasQueuedSubmissions() {
2812 WorkQueue[] ws; WorkQueue w;
2813 if ((ws = workQueues) != null) {
2814 for (int i = 0; i < ws.length; i += 2) {
2815 if ((w = ws[i]) != null && !w.isEmpty())
2816 return true;
2817 }
2818 }
2819 return false;
2820 }
2821
2822 /**
2823 * Removes and returns the next unexecuted submission if one is
2824 * available. This method may be useful in extensions to this
2825 * class that re-assign work in systems with multiple pools.
2826 *
2827 * @return the next submission, or {@code null} if none
2828 */
2829 protected ForkJoinTask<?> pollSubmission() {
2830 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2831 if ((ws = workQueues) != null) {
2832 for (int i = 0; i < ws.length; i += 2) {
2833 if ((w = ws[i]) != null && (t = w.poll()) != null)
2834 return t;
2835 }
2836 }
2837 return null;
2838 }
2839
2840 /**
2841 * Removes all available unexecuted submitted and forked tasks
2842 * from scheduling queues and adds them to the given collection,
2843 * without altering their execution status. These may include
2844 * artificially generated or wrapped tasks. This method is
2845 * designed to be invoked only when the pool is known to be
2846 * quiescent. Invocations at other times may not remove all
2847 * tasks. A failure encountered while attempting to add elements
2848 * to collection {@code c} may result in elements being in
2849 * neither, either or both collections when the associated
2850 * exception is thrown. The behavior of this operation is
2851 * undefined if the specified collection is modified while the
2852 * operation is in progress.
2853 *
2854 * @param c the collection to transfer elements into
2855 * @return the number of elements transferred
2856 */
2857 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2858 int count = 0;
2859 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2860 if ((ws = workQueues) != null) {
2861 for (int i = 0; i < ws.length; ++i) {
2862 if ((w = ws[i]) != null) {
2863 while ((t = w.poll()) != null) {
2864 c.add(t);
2865 ++count;
2866 }
2867 }
2868 }
2869 }
2870 return count;
2871 }
2872
2873 /**
2874 * Returns a string identifying this pool, as well as its state,
2875 * including indications of run state, parallelism level, and
2876 * worker and task counts.
2877 *
2878 * @return a string identifying this pool, as well as its state
2879 */
2880 public String toString() {
2881 // Use a single pass through workQueues to collect counts
2882 long qt = 0L, qs = 0L; int rc = 0;
2883 long st = stealCount;
2884 long c = ctl;
2885 WorkQueue[] ws; WorkQueue w;
2886 if ((ws = workQueues) != null) {
2887 for (int i = 0; i < ws.length; ++i) {
2888 if ((w = ws[i]) != null) {
2889 int size = w.queueSize();
2890 if ((i & 1) == 0)
2891 qs += size;
2892 else {
2893 qt += size;
2894 st += w.nsteals;
2895 if (w.isApparentlyUnblocked())
2896 ++rc;
2897 }
2898 }
2899 }
2900 }
2901 int pc = (config & SMASK);
2902 int tc = pc + (short)(c >>> TC_SHIFT);
2903 int ac = pc + (int)(c >> AC_SHIFT);
2904 if (ac < 0) // ignore transient negative
2905 ac = 0;
2906 int rs = runState;
2907 String level = ((rs & TERMINATED) != 0 ? "Terminated" :
2908 (rs & STOP) != 0 ? "Terminating" :
2909 (rs & SHUTDOWN) != 0 ? "Shutting down" :
2910 "Running");
2911 return super.toString() +
2912 "[" + level +
2913 ", parallelism = " + pc +
2914 ", size = " + tc +
2915 ", active = " + ac +
2916 ", running = " + rc +
2917 ", steals = " + st +
2918 ", tasks = " + qt +
2919 ", submissions = " + qs +
2920 "]";
2921 }
2922
2923 /**
2924 * Possibly initiates an orderly shutdown in which previously
2925 * submitted tasks are executed, but no new tasks will be
2926 * accepted. Invocation has no effect on execution state if this
2927 * is the {@link #commonPool()}, and no additional effect if
2928 * already shut down. Tasks that are in the process of being
2929 * submitted concurrently during the course of this method may or
2930 * may not be rejected.
2931 *
2932 * @throws SecurityException if a security manager exists and
2933 * the caller is not permitted to modify threads
2934 * because it does not hold {@link
2935 * java.lang.RuntimePermission}{@code ("modifyThread")}
2936 */
2937 public void shutdown() {
2938 checkPermission();
2939 tryTerminate(false, true);
2940 }
2941
2942 /**
2943 * Possibly attempts to cancel and/or stop all tasks, and reject
2944 * all subsequently submitted tasks. Invocation has no effect on
2945 * execution state if this is the {@link #commonPool()}, and no
2946 * additional effect if already shut down. Otherwise, tasks that
2947 * are in the process of being submitted or executed concurrently
2948 * during the course of this method may or may not be
2949 * rejected. This method cancels both existing and unexecuted
2950 * tasks, in order to permit termination in the presence of task
2951 * dependencies. So the method always returns an empty list
2952 * (unlike the case for some other Executors).
2953 *
2954 * @return an empty list
2955 * @throws SecurityException if a security manager exists and
2956 * the caller is not permitted to modify threads
2957 * because it does not hold {@link
2958 * java.lang.RuntimePermission}{@code ("modifyThread")}
2959 */
2960 public List<Runnable> shutdownNow() {
2961 checkPermission();
2962 tryTerminate(true, true);
2963 return Collections.emptyList();
2964 }
2965
2966 /**
2967 * Returns {@code true} if all tasks have completed following shut down.
2968 *
2969 * @return {@code true} if all tasks have completed following shut down
2970 */
2971 public boolean isTerminated() {
2972 return (runState & TERMINATED) != 0;
2973 }
2974
2975 /**
2976 * Returns {@code true} if the process of termination has
2977 * commenced but not yet completed. This method may be useful for
2978 * debugging. A return of {@code true} reported a sufficient
2979 * period after shutdown may indicate that submitted tasks have
2980 * ignored or suppressed interruption, or are waiting for I/O,
2981 * causing this executor not to properly terminate. (See the
2982 * advisory notes for class {@link ForkJoinTask} stating that
2983 * tasks should not normally entail blocking operations. But if
2984 * they do, they must abort them on interrupt.)
2985 *
2986 * @return {@code true} if terminating but not yet terminated
2987 */
2988 public boolean isTerminating() {
2989 int rs = runState;
2990 return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
2991 }
2992
2993 /**
2994 * Returns {@code true} if this pool has been shut down.
2995 *
2996 * @return {@code true} if this pool has been shut down
2997 */
2998 public boolean isShutdown() {
2999 return (runState & SHUTDOWN) != 0;
3000 }
3001
3002 /**
3003 * Blocks until all tasks have completed execution after a
3004 * shutdown request, or the timeout occurs, or the current thread
3005 * is interrupted, whichever happens first. Because the {@link
3006 * #commonPool()} never terminates until program shutdown, when
3007 * applied to the common pool, this method is equivalent to {@link
3008 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3009 *
3010 * @param timeout the maximum time to wait
3011 * @param unit the time unit of the timeout argument
3012 * @return {@code true} if this executor terminated and
3013 * {@code false} if the timeout elapsed before termination
3014 * @throws InterruptedException if interrupted while waiting
3015 */
3016 public boolean awaitTermination(long timeout, TimeUnit unit)
3017 throws InterruptedException {
3018 if (Thread.interrupted())
3019 throw new InterruptedException();
3020 if (this == common) {
3021 awaitQuiescence(timeout, unit);
3022 return false;
3023 }
3024 long nanos = unit.toNanos(timeout);
3025 if (isTerminated())
3026 return true;
3027 if (nanos <= 0L)
3028 return false;
3029 long deadline = System.nanoTime() + nanos;
3030 synchronized (this) {
3031 for (;;) {
3032 if (isTerminated())
3033 return true;
3034 if (nanos <= 0L)
3035 return false;
3036 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3037 wait(millis > 0L ? millis : 1L);
3038 nanos = deadline - System.nanoTime();
3039 }
3040 }
3041 }
3042
3043 /**
3044 * If called by a ForkJoinTask operating in this pool, equivalent
3045 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3046 * waits and/or attempts to assist performing tasks until this
3047 * pool {@link #isQuiescent} or the indicated timeout elapses.
3048 *
3049 * @param timeout the maximum time to wait
3050 * @param unit the time unit of the timeout argument
3051 * @return {@code true} if quiescent; {@code false} if the
3052 * timeout elapsed.
3053 */
3054 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3055 long nanos = unit.toNanos(timeout);
3056 ForkJoinWorkerThread wt;
3057 Thread thread = Thread.currentThread();
3058 if ((thread instanceof ForkJoinWorkerThread) &&
3059 (wt = (ForkJoinWorkerThread)thread).pool == this) {
3060 helpQuiescePool(wt.workQueue);
3061 return true;
3062 }
3063 long startTime = System.nanoTime();
3064 WorkQueue[] ws;
3065 int r = 0, m;
3066 boolean found = true;
3067 while (!isQuiescent() && (ws = workQueues) != null &&
3068 (m = ws.length - 1) >= 0) {
3069 if (!found) {
3070 if ((System.nanoTime() - startTime) > nanos)
3071 return false;
3072 Thread.yield(); // cannot block
3073 }
3074 found = false;
3075 for (int j = (m + 1) << 2; j >= 0; --j) {
3076 ForkJoinTask<?> t; WorkQueue q; int b, k;
3077 if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
3078 (b = q.base) - q.top < 0) {
3079 found = true;
3080 if ((t = q.pollAt(b)) != null)
3081 t.doExec();
3082 break;
3083 }
3084 }
3085 }
3086 return true;
3087 }
3088
3089 /**
3090 * Waits and/or attempts to assist performing tasks indefinitely
3091 * until the {@link #commonPool()} {@link #isQuiescent}.
3092 */
3093 static void quiesceCommonPool() {
3094 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3095 }
3096
3097 /**
3098 * Interface for extending managed parallelism for tasks running
3099 * in {@link ForkJoinPool}s.
3100 *
3101 * <p>A {@code ManagedBlocker} provides two methods. Method
3102 * {@code isReleasable} must return {@code true} if blocking is
3103 * not necessary. Method {@code block} blocks the current thread
3104 * if necessary (perhaps internally invoking {@code isReleasable}
3105 * before actually blocking). These actions are performed by any
3106 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3107 * The unusual methods in this API accommodate synchronizers that
3108 * may, but don't usually, block for long periods. Similarly, they
3109 * allow more efficient internal handling of cases in which
3110 * additional workers may be, but usually are not, needed to
3111 * ensure sufficient parallelism. Toward this end,
3112 * implementations of method {@code isReleasable} must be amenable
3113 * to repeated invocation.
3114 *
3115 * <p>For example, here is a ManagedBlocker based on a
3116 * ReentrantLock:
3117 * <pre> {@code
3118 * class ManagedLocker implements ManagedBlocker {
3119 * final ReentrantLock lock;
3120 * boolean hasLock = false;
3121 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3122 * public boolean block() {
3123 * if (!hasLock)
3124 * lock.lock();
3125 * return true;
3126 * }
3127 * public boolean isReleasable() {
3128 * return hasLock || (hasLock = lock.tryLock());
3129 * }
3130 * }}</pre>
3131 *
3132 * <p>Here is a class that possibly blocks waiting for an
3133 * item on a given queue:
3134 * <pre> {@code
3135 * class QueueTaker<E> implements ManagedBlocker {
3136 * final BlockingQueue<E> queue;
3137 * volatile E item = null;
3138 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3139 * public boolean block() throws InterruptedException {
3140 * if (item == null)
3141 * item = queue.take();
3142 * return true;
3143 * }
3144 * public boolean isReleasable() {
3145 * return item != null || (item = queue.poll()) != null;
3146 * }
3147 * public E getItem() { // call after pool.managedBlock completes
3148 * return item;
3149 * }
3150 * }}</pre>
3151 */
3152 public static interface ManagedBlocker {
3153 /**
3154 * Possibly blocks the current thread, for example waiting for
3155 * a lock or condition.
3156 *
3157 * @return {@code true} if no additional blocking is necessary
3158 * (i.e., if isReleasable would return true)
3159 * @throws InterruptedException if interrupted while waiting
3160 * (the method is not required to do so, but is allowed to)
3161 */
3162 boolean block() throws InterruptedException;
3163
3164 /**
3165 * Returns {@code true} if blocking is unnecessary.
3166 * @return {@code true} if blocking is unnecessary
3167 */
3168 boolean isReleasable();
3169 }
3170
3171 /**
3172 * Blocks in accord with the given blocker. If the current thread
3173 * is a {@link ForkJoinWorkerThread}, this method possibly
3174 * arranges for a spare thread to be activated if necessary to
3175 * ensure sufficient parallelism while the current thread is blocked.
3176 *
3177 * <p>If the caller is not a {@link ForkJoinTask}, this method is
3178 * behaviorally equivalent to
3179 * <pre> {@code
3180 * while (!blocker.isReleasable())
3181 * if (blocker.block())
3182 * return;}</pre>
3183 *
3184 * If the caller is a {@code ForkJoinTask}, then the pool may
3185 * first be expanded to ensure parallelism, and later adjusted.
3186 *
3187 * @param blocker the blocker
3188 * @throws InterruptedException if blocker.block did so
3189 */
3190 public static void managedBlock(ManagedBlocker blocker)
3191 throws InterruptedException {
3192 ForkJoinPool p;
3193 ForkJoinWorkerThread wt;
3194 Thread t = Thread.currentThread();
3195 if ((t instanceof ForkJoinWorkerThread) &&
3196 (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
3197 WorkQueue w = wt.workQueue;
3198 while (!blocker.isReleasable()) {
3199 if (p.tryCompensate(w)) {
3200 try {
3201 do {} while (!blocker.isReleasable() &&
3202 !blocker.block());
3203 } finally {
3204 U.getAndAddLong(p, CTL, AC_UNIT);
3205 }
3206 break;
3207 }
3208 }
3209 }
3210 else {
3211 do {} while (!blocker.isReleasable() &&
3212 !blocker.block());
3213 }
3214 }
3215
3216 // AbstractExecutorService overrides. These rely on undocumented
3217 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3218 // implement RunnableFuture.
3219
3220 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3221 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3222 }
3223
3224 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3225 return new ForkJoinTask.AdaptedCallable<T>(callable);
3226 }
3227
3228 // Unsafe mechanics
3229 private static final sun.misc.Unsafe U;
3230 private static final int ABASE;
3231 private static final int ASHIFT;
3232 private static final long CTL;
3233 private static final long PARKBLOCKER;
3234 private static final long STEALCOUNT;
3235 private static final long RUNSTATE;
3236 private static final long QBASE;
3237 private static final long QTOP;
3238 private static final long QLOCK;
3239 private static final long QSCANSTATE;
3240 private static final long QPARKER;
3241 private static final long QCURRENTSTEAL;
3242 private static final long QCURRENTJOIN;
3243
3244 static {
3245 // initialize field offsets for CAS etc
3246 try {
3247 U = sun.misc.Unsafe.getUnsafe();
3248 Class<?> k = ForkJoinPool.class;
3249 CTL = U.objectFieldOffset
3250 (k.getDeclaredField("ctl"));
3251 STEALCOUNT = U.objectFieldOffset
3252 (k.getDeclaredField("stealCount"));
3253 RUNSTATE = U.objectFieldOffset
3254 (k.getDeclaredField("runState"));
3255 Class<?> tk = Thread.class;
3256 PARKBLOCKER = U.objectFieldOffset
3257 (tk.getDeclaredField("parkBlocker"));
3258 Class<?> wk = WorkQueue.class;
3259 QBASE = U.objectFieldOffset
3260 (wk.getDeclaredField("base"));
3261 QTOP = U.objectFieldOffset
3262 (wk.getDeclaredField("top"));
3263 QLOCK = U.objectFieldOffset
3264 (wk.getDeclaredField("qlock"));
3265 QSCANSTATE = U.objectFieldOffset
3266 (wk.getDeclaredField("scanState"));
3267 QPARKER = U.objectFieldOffset
3268 (wk.getDeclaredField("parker"));
3269 QCURRENTSTEAL = U.objectFieldOffset
3270 (wk.getDeclaredField("currentSteal"));
3271 QCURRENTJOIN = U.objectFieldOffset
3272 (wk.getDeclaredField("currentJoin"));
3273 Class<?> ak = ForkJoinTask[].class;
3274 ABASE = U.arrayBaseOffset(ak);
3275 int scale = U.arrayIndexScale(ak);
3276 if ((scale & (scale - 1)) != 0)
3277 throw new Error("data type scale not a power of two");
3278 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3279 } catch (Exception e) {
3280 throw new Error(e);
3281 }
3282
3283 defaultForkJoinWorkerThreadFactory =
3284 new DefaultForkJoinWorkerThreadFactory();
3285 modifyThreadPermission = new RuntimePermission("modifyThread");
3286
3287 common = java.security.AccessController.doPrivileged
3288 (new java.security.PrivilegedAction<ForkJoinPool>() {
3289 public ForkJoinPool run() { return makeCommonPool(); }});
3290 int par = common.config & SMASK; // report 1 even if threads disabled
3291 commonParallelism = par > 0 ? par : 1;
3292 }
3293
3294 /**
3295 * Creates and returns the common pool, respecting user settings
3296 * specified via system properties.
3297 */
3298 private static ForkJoinPool makeCommonPool() {
3299 int parallelism = -1;
3300 ForkJoinWorkerThreadFactory factory = null;
3301 UncaughtExceptionHandler handler = null;
3302 try { // ignore exceptions in accessing/parsing properties
3303 String pp = System.getProperty
3304 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3305 String fp = System.getProperty
3306 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3307 String hp = System.getProperty
3308 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3309 if (pp != null)
3310 parallelism = Integer.parseInt(pp);
3311 if (fp != null)
3312 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3313 getSystemClassLoader().loadClass(fp).newInstance());
3314 if (hp != null)
3315 handler = ((UncaughtExceptionHandler)ClassLoader.
3316 getSystemClassLoader().loadClass(hp).newInstance());
3317 } catch (Exception ignore) {
3318 }
3319 if (factory == null) {
3320 if (System.getSecurityManager() == null)
3321 factory = defaultForkJoinWorkerThreadFactory;
3322 else // use security-managed default
3323 factory = new InnocuousForkJoinWorkerThreadFactory();
3324 }
3325 if (parallelism < 0 && // default 1 less than #cores
3326 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3327 parallelism = 1;
3328 if (parallelism > MAX_CAP)
3329 parallelism = MAX_CAP;
3330 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3331 "ForkJoinPool.commonPool-worker-");
3332 }
3333
3334 /**
3335 * Factory for innocuous worker threads
3336 */
3337 static final class InnocuousForkJoinWorkerThreadFactory
3338 implements ForkJoinWorkerThreadFactory {
3339
3340 /**
3341 * An ACC to restrict permissions for the factory itself.
3342 * The constructed workers have no permissions set.
3343 */
3344 private static final AccessControlContext innocuousAcc;
3345 static {
3346 Permissions innocuousPerms = new Permissions();
3347 innocuousPerms.add(modifyThreadPermission);
3348 innocuousPerms.add(new RuntimePermission(
3349 "enableContextClassLoaderOverride"));
3350 innocuousPerms.add(new RuntimePermission(
3351 "modifyThreadGroup"));
3352 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3353 new ProtectionDomain(null, innocuousPerms)
3354 });
3355 }
3356
3357 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3358 return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
3359 java.security.AccessController.doPrivileged(
3360 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3361 public ForkJoinWorkerThread run() {
3362 return new ForkJoinWorkerThread.
3363 InnocuousForkJoinWorkerThread(pool);
3364 }}, innocuousAcc);
3365 }
3366 }
3367
3368 }