ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.215
Committed: Sun Jul 13 22:43:54 2014 UTC (9 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.214: +66 -40 lines
Log Message:
Box steal counter

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.AbstractExecutorService;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.RunnableFuture;
21 import java.util.concurrent.ThreadLocalRandom;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicLong;
24 import java.security.AccessControlContext;
25 import java.security.ProtectionDomain;
26 import java.security.Permissions;
27
28 /**
29 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
30 * A {@code ForkJoinPool} provides the entry point for submissions
31 * from non-{@code ForkJoinTask} clients, as well as management and
32 * monitoring operations.
33 *
34 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
35 * ExecutorService} mainly by virtue of employing
36 * <em>work-stealing</em>: all threads in the pool attempt to find and
37 * execute tasks submitted to the pool and/or created by other active
38 * tasks (eventually blocking waiting for work if none exist). This
39 * enables efficient processing when most tasks spawn other subtasks
40 * (as do most {@code ForkJoinTask}s), as well as when many small
41 * tasks are submitted to the pool from external clients. Especially
42 * when setting <em>asyncMode</em> to true in constructors, {@code
43 * ForkJoinPool}s may also be appropriate for use with event-style
44 * tasks that are never joined.
45 *
46 * <p>A static {@link #commonPool()} is available and appropriate for
47 * most applications. The common pool is used by any ForkJoinTask that
48 * is not explicitly submitted to a specified pool. Using the common
49 * pool normally reduces resource usage (its threads are slowly
50 * reclaimed during periods of non-use, and reinstated upon subsequent
51 * use).
52 *
53 * <p>For applications that require separate or custom pools, a {@code
54 * ForkJoinPool} may be constructed with a given target parallelism
55 * level; by default, equal to the number of available processors.
56 * The pool attempts to maintain enough active (or available) threads
57 * by dynamically adding, suspending, or resuming internal worker
58 * threads, even if some tasks are stalled waiting to join others.
59 * However, no such adjustments are guaranteed in the face of blocked
60 * I/O or other unmanaged synchronization. The nested {@link
61 * ManagedBlocker} interface enables extension of the kinds of
62 * synchronization accommodated.
63 *
64 * <p>In addition to execution and lifecycle control methods, this
65 * class provides status check methods (for example
66 * {@link #getStealCount}) that are intended to aid in developing,
67 * tuning, and monitoring fork/join applications. Also, method
68 * {@link #toString} returns indications of pool state in a
69 * convenient form for informal monitoring.
70 *
71 * <p>As is the case with other ExecutorServices, there are three
72 * main task execution methods summarized in the following table.
73 * These are designed to be used primarily by clients not already
74 * engaged in fork/join computations in the current pool. The main
75 * forms of these methods accept instances of {@code ForkJoinTask},
76 * but overloaded forms also allow mixed execution of plain {@code
77 * Runnable}- or {@code Callable}- based activities as well. However,
78 * tasks that are already executing in a pool should normally instead
79 * use the within-computation forms listed in the table unless using
80 * async event-style tasks that are not usually joined, in which case
81 * there is little difference among choice of methods.
82 *
83 * <table BORDER CELLPADDING=3 CELLSPACING=1>
84 * <caption>Summary of task execution methods</caption>
85 * <tr>
86 * <td></td>
87 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
88 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
89 * </tr>
90 * <tr>
91 * <td> <b>Arrange async execution</b></td>
92 * <td> {@link #execute(ForkJoinTask)}</td>
93 * <td> {@link ForkJoinTask#fork}</td>
94 * </tr>
95 * <tr>
96 * <td> <b>Await and obtain result</b></td>
97 * <td> {@link #invoke(ForkJoinTask)}</td>
98 * <td> {@link ForkJoinTask#invoke}</td>
99 * </tr>
100 * <tr>
101 * <td> <b>Arrange exec and obtain Future</b></td>
102 * <td> {@link #submit(ForkJoinTask)}</td>
103 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
104 * </tr>
105 * </table>
106 *
107 * <p>The common pool is by default constructed with default
108 * parameters, but these may be controlled by setting three
109 * {@linkplain System#getProperty system properties}:
110 * <ul>
111 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
112 * - the parallelism level, a non-negative integer
113 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
114 * - the class name of a {@link ForkJoinWorkerThreadFactory}
115 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
116 * - the class name of a {@link UncaughtExceptionHandler}
117 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
118 * - the maximum number of alloed extra threads to maintain target
119 * parallelism (default 256).
120 * </ul>
121 * If a {@link SecurityManager} is present and no factory is
122 * specified, then the default pool uses a factory supplying
123 * threads that have no {@link Permissions} enabled.
124 * The system class loader is used to load these classes.
125 * Upon any error in establishing these settings, default parameters
126 * are used. It is possible to disable or limit the use of threads in
127 * the common pool by setting the parallelism property to zero, and/or
128 * using a factory that may return {@code null}. However doing so may
129 * cause unjoined tasks to never be executed.
130 *
131 * <p><b>Implementation notes</b>: This implementation restricts the
132 * maximum number of running threads to 32767. Attempts to create
133 * pools with greater than the maximum number result in
134 * {@code IllegalArgumentException}.
135 *
136 * <p>This implementation rejects submitted tasks (that is, by throwing
137 * {@link RejectedExecutionException}) only when the pool is shut down
138 * or internal resources have been exhausted.
139 *
140 * @since 1.7
141 * @author Doug Lea
142 */
143 @sun.misc.Contended
144 public class ForkJoinPool extends AbstractExecutorService {
145
146 /*
147 * Implementation Overview
148 *
149 * This class and its nested classes provide the main
150 * functionality and control for a set of worker threads:
151 * Submissions from non-FJ threads enter into submission queues.
152 * Workers take these tasks and typically split them into subtasks
153 * that may be stolen by other workers. Preference rules give
154 * first priority to processing tasks from their own queues (LIFO
155 * or FIFO, depending on mode), then to randomized FIFO steals of
156 * tasks in other queues. This framework began as vehicle for
157 * supporting tree-structured parallelism using work-stealing.
158 * Over time, its scalability advantages led to extensions and
159 * changes to better support more diverse usage contexts. Because
160 * most internal methods and nested classes are interrelated,
161 * their main rationale and descriptions are presented here;
162 * individual methods and nested classes contain only brief
163 * comments about details.
164 *
165 * WorkQueues
166 * ==========
167 *
168 * Most operations occur within work-stealing queues (in nested
169 * class WorkQueue). These are special forms of Deques that
170 * support only three of the four possible end-operations -- push,
171 * pop, and poll (aka steal), under the further constraints that
172 * push and pop are called only from the owning thread (or, as
173 * extended here, under a lock), while poll may be called from
174 * other threads. (If you are unfamiliar with them, you probably
175 * want to read Herlihy and Shavit's book "The Art of
176 * Multiprocessor programming", chapter 16 describing these in
177 * more detail before proceeding.) The main work-stealing queue
178 * design is roughly similar to those in the papers "Dynamic
179 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
180 * (http://research.sun.com/scalable/pubs/index.html) and
181 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
182 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
183 * The main differences ultimately stem from GC requirements that
184 * we null out taken slots as soon as we can, to maintain as small
185 * a footprint as possible even in programs generating huge
186 * numbers of tasks. To accomplish this, we shift the CAS
187 * arbitrating pop vs poll (steal) from being on the indices
188 * ("base" and "top") to the slots themselves.
189 *
190 * Adding tasks then takes the form of a classic array push(task):
191 * q.array[q.top] = task; ++q.top;
192 *
193 * (The actual code needs to null-check and size-check the array,
194 * properly fence the accesses, and possibly signal waiting
195 * workers to start scanning -- see below.) Both a successful pop
196 * and poll mainly entail a CAS of a slot from non-null to null.
197 *
198 * The pop operation (always performed by owner) is:
199 * if ((base != top) and
200 * (the task at top slot is not null) and
201 * (CAS slot to null))
202 * decrement top and return task;
203 *
204 * And the poll operation (usually by a stealer) is
205 * if ((base != top) and
206 * (the task at base slot is not null) and
207 * (base has not changed) and
208 * (CAS slot to null))
209 * increment base and return task;
210 *
211 * Because we rely on CASes of references, we do not need tag bits
212 * on base or top. They are simple ints as used in any circular
213 * array-based queue (see for example ArrayDeque). Updates to the
214 * indices guarantee that top == base means the queue is empty,
215 * but otherwise may err on the side of possibly making the queue
216 * appear nonempty when a push, pop, or poll have not fully
217 * committed. (Method isEmpty() checks the case of a partially
218 * completed removal of the last element.) Because of this, the
219 * poll operation, considered individually, is not wait-free. One
220 * thief cannot successfully continue until another in-progress
221 * one (or, if previously empty, a push) completes. However, in
222 * the aggregate, we ensure at least probabilistic
223 * non-blockingness. If an attempted steal fails, a thief always
224 * chooses a different random victim target to try next. So, in
225 * order for one thief to progress, it suffices for any
226 * in-progress poll or new push on any empty queue to
227 * complete. (This is why we normally use method pollAt and its
228 * variants that try once at the apparent base index, else
229 * consider alternative actions, rather than method poll, which
230 * retries.)
231 *
232 * This approach also enables support of a user mode in which
233 * local task processing is in FIFO, not LIFO order, simply by
234 * using poll rather than pop. This can be useful in
235 * message-passing frameworks in which tasks are never joined.
236 * However neither mode considers affinities, loads, cache
237 * localities, etc, so rarely provide the best possible
238 * performance on a given machine, but portably provide good
239 * throughput by averaging over these factors. Further, even if
240 * we did try to use such information, we do not usually have a
241 * basis for exploiting it. For example, some sets of tasks
242 * profit from cache affinities, but others are harmed by cache
243 * pollution effects. Additionally, even though it requires
244 * scanning, long-term throughput is often best using random
245 * selection rather than directed selection policies, so cheap
246 * randomization of sufficient quality is used whenever
247 * applicable. Various Marsaglia XorShifts (some with different
248 * shift constants) are inlined at use points.
249 *
250 * WorkQueues are also used in a similar way for tasks submitted
251 * to the pool. We cannot mix these tasks in the same queues used
252 * by workers. Instead, we randomly associate submission queues
253 * with submitting threads, using a form of hashing. The
254 * ThreadLocalRandom probe value serves as a hash code for
255 * choosing existing queues, and may be randomly repositioned upon
256 * contention with other submitters. In essence, submitters act
257 * like workers except that they are restricted to executing local
258 * tasks that they submitted (or in the case of CountedCompleters,
259 * others with the same root task). Insertion of tasks in shared
260 * mode requires a lock (mainly to protect in the case of
261 * resizing) but we use only a simple spinlock (using field
262 * qlock), because submitters encountering a busy queue move on to
263 * try or create other queues -- they block only when creating and
264 * registering new queues. Additionally, "qlock" saturates to an
265 * unlockable value (-1) at shutdown. Unlocking still can be and
266 * is performed by cheaper ordered writes of "qlock" in successful
267 * cases, but uses CAS in unsuccessful cases.
268 *
269 * Management
270 * ==========
271 *
272 * The main throughput advantages of work-stealing stem from
273 * decentralized control -- workers mostly take tasks from
274 * themselves or each other, at rates that can exceed a billion
275 * per second. The pool itself creates, activates (enables
276 * scanning for and running tasks), deactivates, blocks, and
277 * terminates threads, all with minimal central information.
278 * There are only a few properties that we can globally track or
279 * maintain, so we pack them into a small number of variables,
280 * often maintaining atomicity without blocking or locking.
281 * Nearly all essentially atomic control state is held in two
282 * volatile variables that are by far most often read (not
283 * written) as status and consistency checks. (Also, field
284 * "config" holds unchanging configuration state.)
285 *
286 * Field "ctl" contains 64 bits holding information needed to
287 * atomically decide to add, inactivate, enqueue (on an event
288 * queue), dequeue, and/or re-activate workers. To enable this
289 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
290 * far in excess of normal operating range) to allow ids, counts,
291 * and their negations (used for thresholding) to fit into 16bit
292 * subfields.
293 *
294 * Field "runState" holds lockable state bits (STARTED, STOP, etc)
295 * also protecting updates to the workQueues array. When used as
296 * a lock, it is normally held only for a few instructions (the
297 * only exceptions are one-time array initialization and uncommon
298 * resizing), so is nearly always available after at most a brief
299 * spin. But to be extra-cautious, after spinning, method
300 * awaitRunStateLock (called only if an initial CAS fails), uses a
301 * wait/notify mechanics on a builtin monitor to block when
302 * (rarely) needed. This would be a terrible idea for a highly
303 * contended lock, but most pools run without the lock ever
304 * contending after the spin limit, so this works fine as a more
305 * conservative alternative. Because we don't otherwise have an
306 * internal Object to use as a monitor, the "stealCounter" (an
307 * AtomicLong) is used when available (it too must be lazily
308 * initialized; see externalSubmit).
309
310 * Usages of "runState" vs "ctl" interact in only one case:
311 * deciding to add a worker thread (see tryAddWorker), in which
312 * case the ctl CAS is performed while the lock is held.
313 *
314 * Recording WorkQueues. WorkQueues are recorded in the
315 * "workQueues" array. The array is created upon first use (see
316 * externalSubmit) and expanded if necessary. Updates to the
317 * array while recording new workers and unrecording terminated
318 * ones are protected from each other by the runState lock, but
319 * the array is otherwise concurrently readable, and accessed
320 * directly. We also ensure that reads of the array reference
321 * itself never become too stale. To simplify index-based
322 * operations, the array size is always a power of two, and all
323 * readers must tolerate null slots. Worker queues are at odd
324 * indices. Shared (submission) queues are at even indices, up to
325 * a maximum of 64 slots, to limit growth even if array needs to
326 * expand to add more workers. Grouping them together in this way
327 * simplifies and speeds up task scanning.
328 *
329 * All worker thread creation is on-demand, triggered by task
330 * submissions, replacement of terminated workers, and/or
331 * compensation for blocked workers. However, all other support
332 * code is set up to work with other policies. To ensure that we
333 * do not hold on to worker references that would prevent GC, All
334 * accesses to workQueues are via indices into the workQueues
335 * array (which is one source of some of the messy code
336 * constructions here). In essence, the workQueues array serves as
337 * a weak reference mechanism. Thus for example the stack top
338 * subfield of ctl stores indices, not references.
339 *
340 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
341 * cannot let workers spin indefinitely scanning for tasks when
342 * none can be found immediately, and we cannot start/resume
343 * workers unless there appear to be tasks available. On the
344 * other hand, we must quickly prod them into action when new
345 * tasks are submitted or generated. In many usages, ramp-up time
346 * to activate workers is the main limiting factor in overall
347 * performance, which is compounded at program start-up by JIT
348 * compilation and allocation. So we streamline this as much as
349 * possible.
350 *
351 * The "ctl" field atomically maintains active and total worker
352 * counts as well as a queue to place waiting threads so they can
353 * be located for signalling. Active counts also play the role of
354 * quiescence indicators, so are decremented when workers believe
355 * that there are no more tasks to execute. The "queue" is
356 * actually a form of Treiber stack. A stack is ideal for
357 * activating threads in most-recently used order. This improves
358 * performance and locality, outweighing the disadvantages of
359 * being prone to contention and inability to release a worker
360 * unless it is topmost on stack. We park/unpark workers after
361 * pushing on the idle worker stack (represented by the lower
362 * 32bit subfield of ctl) when they cannot find work. The top
363 * stack state holds the value of the "scanState" field of the
364 * worker: its index and status, plus a version counter that, in
365 * addition to the count subfields (also serving as version
366 * stamps) provide protection against Treiber stack ABA effects.
367 *
368 * Field scanState is used by both workers and the pool to manage
369 * and track whether a worker is INACTIVE (possibly blocked
370 * waiting for a signal), or SCANNING for tasks (when neither hold
371 * it is busy running tasks). When a worker is inactivated, its
372 * scanState field is set, and is prevented from executing tasks,
373 * even though it must scan once for them to avoid queuing
374 * races. Note that scanState updates lag queue CAS releases so
375 * usage requires care. When queued, the lower 16 bits of
376 * scanState must hold its pool index. So we place the index there
377 * upon initialization (see registerWorker) and otherwise keep it
378 * there or restore it when necessary.
379 *
380 * Memory ordering. See "Correct and Efficient Work-Stealing for
381 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
382 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
383 * analysis of memory ordering requirements in work-stealing
384 * algorithms similar to the one used here. We usually need
385 * stronger than minimal ordering because we must sometimes signal
386 * workers, requiring Dekker-like full-fences to avoid lost
387 * signals. Arranging for enough ordering without expensive
388 * over-fencing requires tradeoffs among the supported means of
389 * expressing access constraints. The most central operations,
390 * taking from queues and updating ctl state, require full-fence
391 * CAS. Array slots are read using the emulation of volatiles
392 * provided by Unsafe. Access from other threads to WorkQueue
393 * base, top, and array requires a volatile load of the first of
394 * any of these read. We use the convention of declaring the
395 * "base" index volatile, and always read it before other fields.
396 * The owner thread must ensure ordered updates, so writes use
397 * ordered intrinsics unless they can piggyback on those for other
398 * writes. Similar conventions and rationales hold for other
399 * WorkQueue fields (such as "currentSteal") that are only written
400 * by owners but observed by others.
401 *
402 * Creating workers. To create a worker, we pre-increment total
403 * count (serving as a reservation), and attempt to construct a
404 * ForkJoinWorkerThread via its factory. Upon construction, the
405 * new thread invokes registerWorker, where it constructs a
406 * WorkQueue and is assigned an index in the workQueues array
407 * (expanding the array if necessary). The thread is then
408 * started. Upon any exception across these steps, or null return
409 * from factory, deregisterWorker adjusts counts and records
410 * accordingly. If a null return, the pool continues running with
411 * fewer than the target number workers. If exceptional, the
412 * exception is propagated, generally to some external caller.
413 * Worker index assignment avoids the bias in scanning that would
414 * occur if entries were sequentially packed starting at the front
415 * of the workQueues array. We treat the array as a simple
416 * power-of-two hash table, expanding as needed. The seedIndex
417 * increment ensures no collisions until a resize is needed or a
418 * worker is deregistered and replaced, and thereafter keeps
419 * probability of collision low. We cannot use
420 * ThreadLocalRandom.getProbe() for similar purposes here because
421 * the thread has not started yet, but do so for creating
422 * submission queues for existing external threads.
423 *
424 * Deactivation and waiting. Queuing encounters several intrinsic
425 * races; most notably that a task-producing thread can miss
426 * seeing (and signalling) another thread that gave up looking for
427 * work but has not yet entered the wait queue. When a worker
428 * cannot find a task to steal, it deactivates and enqueues. Very
429 * often, the lack of tasks is transient due to GC or OS
430 * scheduling. To reduce false-alarm deactivation, scanners
431 * compute checksums of queue states during sweeps. (The
432 * stability checks used here and elsewhere are probabilistic
433 * variants of snapshot techniques -- see Herlihy & Shavit.)
434 * Workers give up and try to deactivate only after the sum is
435 * stable across scans. Further, to avoid missed signals, they
436 * repeat this scanning process after successful enqueuing until
437 * again stable. In this state, the worker cannot take/run a task
438 * it sees until it is released from the queue, so the worker
439 * itself eventually tries to release itself or any successor (see
440 * tryRelease). Otherwise, upon an empty scan, a deactivated
441 * worker uses an adaptive local spin construction (see awaitWork)
442 * before blocking (via park). Note the unusual conventions about
443 * Thread.interrupts surrounding parking and other blocking:
444 * Because interrupts are used solely to alert threads to check
445 * termination, which is checked anyway upon blocking, we clear
446 * status (using Thread.interrupted) before any call to park, so
447 * that park does not immediately return due to status being set
448 * via some other unrelated call to interrupt in user code.
449 *
450 * Signalling and activation. Workers are created or activated
451 * only when there appears to be at least one task they might be
452 * able to find and execute. Upon push (either by a worker or an
453 * external submission) to a previously (possibly) empty queue,
454 * workers are signalled if idle, or created if fewer exist than
455 * the given parallelism level. These primary signals are
456 * buttressed by others whenever other threads remove a task from
457 * a queue and notice that there are other tasks there as well.
458 * On most platforms, signalling (unpark) overhead time is
459 * noticeably long, and the time between signalling a thread and
460 * it actually making progress can be very noticeably long, so it
461 * is worth offloading these delays from critical paths as much as
462 * possible. Also, because inactive workers are often rescanning
463 * or spinning rather than blocking, we set and clear the "parker"
464 * field of WorkQueues to reduce unnecessary calls to unpark.
465 * (This requires a secondary recheck to avoid missed signals.)
466 *
467 * Trimming workers. To release resources after periods of lack of
468 * use, a worker starting to wait when the pool is quiescent will
469 * time out and terminate (see awaitWork) if the pool has remained
470 * quiescent for period IDLE_TIMEOUT, increasing the period as the
471 * number of threads decreases, eventually removing all workers.
472 * Also, when more than two spare threads exist, excess threads
473 * are immediately terminated at the next quiescent point.
474 * (Padding by two avoids hysteresis.)
475 *
476 * Shutdown and Termination. A call to shutdownNow invokes
477 * tryTerminate to atomically set a runState bit. The calling
478 * thread, as well as every other worker thereafter terminating,
479 * helps terminate others by setting their (qlock) status,
480 * cancelling their unprocessed tasks, and waking them up, doing
481 * so repeatedly until stable (but with a loop bounded by the
482 * number of workers). Calls to non-abrupt shutdown() preface
483 * this by checking whether termination should commence. This
484 * relies primarily on the active count bits of "ctl" maintaining
485 * consensus -- tryTerminate is called from awaitWork whenever
486 * quiescent. However, external submitters do not take part in
487 * this consensus. So, tryTerminate sweeps through queues (until
488 * stable) to ensure lack of in-flight submissions and workers
489 * about to process them before triggering the "STOP" phase of
490 * termination. (Note: there is an intrinsic conflict if
491 * helpQuiescePool is called when shutdown is enabled. Both wait
492 * for quiescence, but tryTerminate is biased to not trigger until
493 * helpQuiescePool completes.)
494 *
495 *
496 * Joining Tasks
497 * =============
498 *
499 * Any of several actions may be taken when one worker is waiting
500 * to join a task stolen (or always held) by another. Because we
501 * are multiplexing many tasks on to a pool of workers, we can't
502 * just let them block (as in Thread.join). We also cannot just
503 * reassign the joiner's run-time stack with another and replace
504 * it later, which would be a form of "continuation", that even if
505 * possible is not necessarily a good idea since we may need both
506 * an unblocked task and its continuation to progress. Instead we
507 * combine two tactics:
508 *
509 * Helping: Arranging for the joiner to execute some task that it
510 * would be running if the steal had not occurred.
511 *
512 * Compensating: Unless there are already enough live threads,
513 * method tryCompensate() may create or re-activate a spare
514 * thread to compensate for blocked joiners until they unblock.
515 *
516 * A third form (implemented in tryRemoveAndExec) amounts to
517 * helping a hypothetical compensator: If we can readily tell that
518 * a possible action of a compensator is to steal and execute the
519 * task being joined, the joining thread can do so directly,
520 * without the need for a compensation thread (although at the
521 * expense of larger run-time stacks, but the tradeoff is
522 * typically worthwhile).
523 *
524 * The ManagedBlocker extension API can't use helping so relies
525 * only on compensation in method awaitBlocker.
526 *
527 * The algorithm in helpStealer entails a form of "linear
528 * helping". Each worker records (in field currentSteal) the most
529 * recent task it stole from some other worker (or a submission).
530 * It also records (in field currentJoin) the task it is currently
531 * actively joining. Method helpStealer uses these markers to try
532 * to find a worker to help (i.e., steal back a task from and
533 * execute it) that could hasten completion of the actively joined
534 * task. Thus, the joiner executes a task that would be on its
535 * own local deque had the to-be-joined task not been stolen. This
536 * is a conservative variant of the approach described in Wagner &
537 * Calder "Leapfrogging: a portable technique for implementing
538 * efficient futures" SIGPLAN Notices, 1993
539 * (http://portal.acm.org/citation.cfm?id=155354). It differs in
540 * that: (1) We only maintain dependency links across workers upon
541 * steals, rather than use per-task bookkeeping. This sometimes
542 * requires a linear scan of workQueues array to locate stealers,
543 * but often doesn't because stealers leave hints (that may become
544 * stale/wrong) of where to locate them. It is only a hint
545 * because a worker might have had multiple steals and the hint
546 * records only one of them (usually the most current). Hinting
547 * isolates cost to when it is needed, rather than adding to
548 * per-task overhead. (2) It is "shallow", ignoring nesting and
549 * potentially cyclic mutual steals. (3) It is intentionally
550 * racy: field currentJoin is updated only while actively joining,
551 * which means that we miss links in the chain during long-lived
552 * tasks, GC stalls etc (which is OK since blocking in such cases
553 * is usually a good idea). (4) We bound the number of attempts
554 * to find work using checksums and fall back to suspending the
555 * worker and if necessary replacing it with another.
556 *
557 * Helping actions for CountedCompleters do not require tracking
558 * currentJoins: Method helpComplete takes and executes any task
559 * with the same root as the task being waited on (preferring
560 * local pops to non-local polls). However, this still entails
561 * some traversal of completer chains, so is less efficient than
562 * using CountedCompleters without explicit joins.
563 *
564 * Compensation does not aim to keep exactly the target
565 * parallelism number of unblocked threads running at any given
566 * time. Some previous versions of this class employed immediate
567 * compensations for any blocked join. However, in practice, the
568 * vast majority of blockages are transient byproducts of GC and
569 * other JVM or OS activities that are made worse by replacement.
570 * Currently, compensation is attempted only after validating that
571 * all purportedly active threads are processing tasks by checking
572 * field WorkQueue.scanState, which eliminates most false
573 * positives. Also, compensation is bypassed (tolerating fewer
574 * threads) in the most common case in which it is rarely
575 * beneficial: when a worker with an empty queue (thus no
576 * continuation tasks) blocks on a join and there still remain
577 * enough threads to ensure liveness.
578 *
579 * The compensation mechanism may be bounded. Bounds for the
580 * commonPool (see commonMaxSpares) better enable JVMs to cope
581 * with programming errors and abuse before running out of
582 * resources to do so. In other cases, users may supply factories
583 * that limit thread construction. The effects of bounding in this
584 * pool (like all others) is imprecise. Total worker counts are
585 * decremented when threads deregister, not when they exit and
586 * resources are reclaimed by the JVM and OS. So the number of
587 * simultaneously live threads may transiently exceed bounds.
588 *
589 * Common Pool
590 * ===========
591 *
592 * The static common pool always exists after static
593 * initialization. Since it (or any other created pool) need
594 * never be used, we minimize initial construction overhead and
595 * footprint to the setup of about a dozen fields, with no nested
596 * allocation. Most bootstrapping occurs within method
597 * externalSubmit during the first submission to the pool.
598 *
599 * When external threads submit to the common pool, they can
600 * perform subtask processing (see externalHelpComplete and
601 * related methods) upon joins. This caller-helps policy makes it
602 * sensible to set common pool parallelism level to one (or more)
603 * less than the total number of available cores, or even zero for
604 * pure caller-runs. We do not need to record whether external
605 * submissions are to the common pool -- if not, external help
606 * methods return quickly. These submitters would otherwise be
607 * blocked waiting for completion, so the extra effort (with
608 * liberally sprinkled task status checks) in inapplicable cases
609 * amounts to an odd form of limited spin-wait before blocking in
610 * ForkJoinTask.join.
611 *
612 * As a more appropriate default in managed environments, unless
613 * overridden by system properties, we use workers of subclass
614 * InnocuousForkJoinWorkerThread when there is a SecurityManager
615 * present. These workers have no permissions set, do not belong
616 * to any user-defined ThreadGroup, and erase all ThreadLocals
617 * after executing any top-level task (see WorkQueue.runTask).
618 * The associated mechanics (mainly in ForkJoinWorkerThread) may
619 * be JVM-dependent and must access particular Thread class fields
620 * to achieve this effect.
621 *
622 * Style notes
623 * ===========
624 *
625 * Memory ordering relies mainly on Unsafe intrinsics that carry
626 * the further responsibility of explicitly performing null- and
627 * bounds- checks otherwise carried out implicitly by JVMs. This
628 * can be awkward and ugly, but also reflects the need to control
629 * outcomes across the unusual cases that arise in very racy code
630 * with very few invariants. So these explicit checks would exist
631 * in some form anyway. All fields are read into locals before
632 * use, and null-checked if they are references. This is usually
633 * done in a "C"-like style of listing declarations at the heads
634 * of methods or blocks, and using inline assignments on first
635 * encounter. Array bounds-checks are usually performed by
636 * masking with array.length-1, which relies on the invariant that
637 * these arrays are created with positive lengths, which is itself
638 * paranoically checked. Nearly all explicit checks lead to
639 * bypass/return, not exception throws, because they may
640 * legitimately arise due to cancellation/revocation during
641 * shutdown.
642 *
643 * There is a lot of representation-level coupling among classes
644 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
645 * fields of WorkQueue maintain data structures managed by
646 * ForkJoinPool, so are directly accessed. There is little point
647 * trying to reduce this, since any associated future changes in
648 * representations will need to be accompanied by algorithmic
649 * changes anyway. Several methods intrinsically sprawl because
650 * they must accumulate sets of consistent reads of fields held in
651 * local variables. There are also other coding oddities
652 * (including several unnecessary-looking hoisted null checks)
653 * that help some methods perform reasonably even when interpreted
654 * (not compiled).
655 *
656 * The order of declarations in this file is (with a few exceptions):
657 * (1) Static utility functions
658 * (2) Nested (static) classes
659 * (3) Static fields
660 * (4) Fields, along with constants used when unpacking some of them
661 * (5) Internal control methods
662 * (6) Callbacks and other support for ForkJoinTask methods
663 * (7) Exported methods
664 * (8) Static block initializing statics in minimally dependent order
665 */
666
667 // Static utilities
668
669 /**
670 * If there is a security manager, makes sure caller has
671 * permission to modify threads.
672 */
673 private static void checkPermission() {
674 SecurityManager security = System.getSecurityManager();
675 if (security != null)
676 security.checkPermission(modifyThreadPermission);
677 }
678
679 // Nested classes
680
681 /**
682 * Factory for creating new {@link ForkJoinWorkerThread}s.
683 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
684 * for {@code ForkJoinWorkerThread} subclasses that extend base
685 * functionality or initialize threads with different contexts.
686 */
687 public static interface ForkJoinWorkerThreadFactory {
688 /**
689 * Returns a new worker thread operating in the given pool.
690 *
691 * @param pool the pool this thread works in
692 * @return the new worker thread
693 * @throws NullPointerException if the pool is null
694 */
695 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
696 }
697
698 /**
699 * Default ForkJoinWorkerThreadFactory implementation; creates a
700 * new ForkJoinWorkerThread.
701 */
702 static final class DefaultForkJoinWorkerThreadFactory
703 implements ForkJoinWorkerThreadFactory {
704 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
705 return new ForkJoinWorkerThread(pool);
706 }
707 }
708
709 /**
710 * Class for artificial tasks that are used to replace the target
711 * of local joins if they are removed from an interior queue slot
712 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
713 * actually do anything beyond having a unique identity.
714 */
715 static final class EmptyTask extends ForkJoinTask<Void> {
716 private static final long serialVersionUID = -7721805057305804111L;
717 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
718 public final Void getRawResult() { return null; }
719 public final void setRawResult(Void x) {}
720 public final boolean exec() { return true; }
721 }
722
723 // Constants shared across ForkJoinPool and WorkQueue
724
725 // Bounds
726 static final int SMASK = 0xffff; // short bits == max index
727 static final int MAX_CAP = 0x7fff; // max #workers - 1
728 static final int EVENMASK = 0xfffe; // even short bits
729 static final int SQMASK = 0x007e; // max 64 (even) slots
730
731 // Masks and units for WorkQueue.scanState and ctl sp subfield
732 static final int SCANNING = 1; // false when running tasks
733 static final int INACTIVE = 1 << 31; // must be negative
734 static final int SS_SEQ = 1 << 16; // version count
735
736 // Mode bits for ForkJoinPool.config and WorkQueue.config
737 static final int MODE_MASK = 0xffff << 16; // top half of int
738 static final int LIFO_QUEUE = 0;
739 static final int FIFO_QUEUE = 1 << 16;
740 static final int SHARED_QUEUE = 1 << 31; // must be negative
741
742 /**
743 * Queues supporting work-stealing as well as external task
744 * submission. See above for descriptions and algorithms.
745 * Performance on most platforms is very sensitive to placement of
746 * instances of both WorkQueues and their arrays -- we absolutely
747 * do not want multiple WorkQueue instances or multiple queue
748 * arrays sharing cache lines. The @Contended annotation alerts
749 * JVMs to try to keep instances apart.
750 */
751 @sun.misc.Contended
752 static final class WorkQueue {
753
754 /**
755 * Capacity of work-stealing queue array upon initialization.
756 * Must be a power of two; at least 4, but should be larger to
757 * reduce or eliminate cacheline sharing among queues.
758 * Currently, it is much larger, as a partial workaround for
759 * the fact that JVMs often place arrays in locations that
760 * share GC bookkeeping (especially cardmarks) such that
761 * per-write accesses encounter serious memory contention.
762 */
763 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
764
765 /**
766 * Maximum size for queue arrays. Must be a power of two less
767 * than or equal to 1 << (31 - width of array entry) to ensure
768 * lack of wraparound of index calculations, but defined to a
769 * value a bit less than this to help users trap runaway
770 * programs before saturating systems.
771 */
772 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
773
774 // Instance fields
775 volatile int scanState; // versioned, <0: inactive; odd:scanning
776 int stackPred; // pool stack (ctl) predecessor
777 int nsteals; // number of steals
778 int hint; // randomization and stealer index hint
779 int config; // pool index and mode
780 volatile int qlock; // 1: locked, < 0: terminate; else 0
781 volatile int base; // index of next slot for poll
782 int top; // index of next slot for push
783 ForkJoinTask<?>[] array; // the elements (initially unallocated)
784 final ForkJoinPool pool; // the containing pool (may be null)
785 final ForkJoinWorkerThread owner; // owning thread or null if shared
786 volatile Thread parker; // == owner during call to park; else null
787 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
788 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
789
790 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
791 this.pool = pool;
792 this.owner = owner;
793 // Place indices in the center of array (that is not yet allocated)
794 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
795 }
796
797 /**
798 * Returns an exportable index (used by ForkJoinWorkerThread)
799 */
800 final int getPoolIndex() {
801 return (config & 0xffff) >>> 1; // ignore odd/even tag bit
802 }
803
804 /**
805 * Returns the approximate number of tasks in the queue.
806 */
807 final int queueSize() {
808 int n = base - top; // non-owner callers must read base first
809 return (n >= 0) ? 0 : -n; // ignore transient negative
810 }
811
812 /**
813 * Provides a more accurate estimate of whether this queue has
814 * any tasks than does queueSize, by checking whether a
815 * near-empty queue has at least one unclaimed task.
816 */
817 final boolean isEmpty() {
818 ForkJoinTask<?>[] a; int n, m, s;
819 return ((n = base - (s = top)) >= 0 ||
820 (n == -1 && // possibly one task
821 ((a = array) == null || (m = a.length - 1) < 0 ||
822 U.getObject
823 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
824 }
825
826 /**
827 * Pushes a task. Call only by owner in unshared queues. (The
828 * shared-queue version is embedded in method externalPush.)
829 *
830 * @param task the task. Caller must ensure non-null.
831 * @throws RejectedExecutionException if array cannot be resized
832 */
833 final void push(ForkJoinTask<?> task) {
834 ForkJoinTask<?>[] a; ForkJoinPool p;
835 int b = base, s = top, n;
836 if ((a = array) != null) { // ignore if queue removed
837 int m = a.length - 1; // fenced write for task visibility
838 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
839 U.putOrderedInt(this, QTOP, s + 1);
840 if ((n = s - b) <= 1) {
841 if ((p = pool) != null)
842 p.signalWork(p.workQueues, this);
843 }
844 else if (n >= m)
845 growArray();
846 }
847 }
848
849 /**
850 * Initializes or doubles the capacity of array. Call either
851 * by owner or with lock held -- it is OK for base, but not
852 * top, to move while resizings are in progress.
853 */
854 final ForkJoinTask<?>[] growArray() {
855 ForkJoinTask<?>[] oldA = array;
856 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
857 if (size > MAXIMUM_QUEUE_CAPACITY)
858 throw new RejectedExecutionException("Queue capacity exceeded");
859 int oldMask, t, b;
860 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
861 if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
862 (t = top) - (b = base) > 0) {
863 int mask = size - 1;
864 do { // emulate poll from old array, push to new array
865 ForkJoinTask<?> x;
866 int oldj = ((b & oldMask) << ASHIFT) + ABASE;
867 int j = ((b & mask) << ASHIFT) + ABASE;
868 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
869 if (x != null &&
870 U.compareAndSwapObject(oldA, oldj, x, null))
871 U.putObjectVolatile(a, j, x);
872 } while (++b != t);
873 }
874 return a;
875 }
876
877 /**
878 * Takes next task, if one exists, in LIFO order. Call only
879 * by owner in unshared queues.
880 */
881 final ForkJoinTask<?> pop() {
882 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
883 if ((a = array) != null && (m = a.length - 1) >= 0) {
884 for (int s; (s = top - 1) - base >= 0;) {
885 long j = ((m & s) << ASHIFT) + ABASE;
886 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
887 break;
888 if (U.compareAndSwapObject(a, j, t, null)) {
889 U.putOrderedInt(this, QTOP, s);
890 return t;
891 }
892 }
893 }
894 return null;
895 }
896
897 /**
898 * Takes a task in FIFO order if b is base of queue and a task
899 * can be claimed without contention. Specialized versions
900 * appear in ForkJoinPool methods scan and helpStealer.
901 */
902 final ForkJoinTask<?> pollAt(int b) {
903 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
904 if ((a = array) != null) {
905 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
906 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
907 base == b && U.compareAndSwapObject(a, j, t, null)) {
908 base = b + 1;
909 return t;
910 }
911 }
912 return null;
913 }
914
915 /**
916 * Takes next task, if one exists, in FIFO order.
917 */
918 final ForkJoinTask<?> poll() {
919 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
920 while ((b = base) - top < 0 && (a = array) != null) {
921 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
922 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
923 if (base == b) {
924 if (t != null) {
925 if (U.compareAndSwapObject(a, j, t, null)) {
926 base = b + 1;
927 return t;
928 }
929 }
930 else if (b + 1 == top) // now empty
931 break;
932 }
933 }
934 return null;
935 }
936
937 /**
938 * Takes next task, if one exists, in order specified by mode.
939 */
940 final ForkJoinTask<?> nextLocalTask() {
941 return (config & FIFO_QUEUE) == 0 ? pop() : poll();
942 }
943
944 /**
945 * Returns next task, if one exists, in order specified by mode.
946 */
947 final ForkJoinTask<?> peek() {
948 ForkJoinTask<?>[] a = array; int m;
949 if (a == null || (m = a.length - 1) < 0)
950 return null;
951 int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
952 int j = ((i & m) << ASHIFT) + ABASE;
953 return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
954 }
955
956 /**
957 * Pops the given task only if it is at the current top.
958 * (A shared version is available only via FJP.tryExternalUnpush)
959 */
960 final boolean tryUnpush(ForkJoinTask<?> t) {
961 ForkJoinTask<?>[] a; int s;
962 if ((a = array) != null && (s = top) != base &&
963 U.compareAndSwapObject
964 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
965 U.putOrderedInt(this, QTOP, s);
966 return true;
967 }
968 return false;
969 }
970
971 /**
972 * Removes and cancels all known tasks, ignoring any exceptions.
973 */
974 final void cancelAll() {
975 ForkJoinTask<?> t;
976 if ((t = currentJoin) != null) {
977 currentJoin = null;
978 ForkJoinTask.cancelIgnoringExceptions(t);
979 }
980 if ((t = currentSteal) != null) {
981 currentSteal = null;
982 ForkJoinTask.cancelIgnoringExceptions(t);
983 }
984 while ((t = poll()) != null)
985 ForkJoinTask.cancelIgnoringExceptions(t);
986 }
987
988 // Specialized execution methods
989
990 /**
991 * Polls and runs tasks until empty.
992 */
993 final void pollAndExecAll() {
994 for (ForkJoinTask<?> t; (t = poll()) != null;)
995 t.doExec();
996 }
997
998 /**
999 * Removes and executes all local tasks. If LIFO, invokes
1000 * pollAndExecAll. Otherwise implements a specialized pop loop
1001 * to exec until empty.
1002 */
1003 final void execLocalTasks() {
1004 int b = base, m, s;
1005 ForkJoinTask<?>[] a = array;
1006 if (b - (s = top - 1) <= 0 && a != null &&
1007 (m = a.length - 1) >= 0) {
1008 if ((config & FIFO_QUEUE) == 0) {
1009 for (ForkJoinTask<?> t;;) {
1010 if ((t = (ForkJoinTask<?>)U.getAndSetObject
1011 (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
1012 break;
1013 U.putOrderedInt(this, QTOP, s);
1014 t.doExec();
1015 if (base - (s = top - 1) > 0)
1016 break;
1017 }
1018 }
1019 else
1020 pollAndExecAll();
1021 }
1022 }
1023
1024 /**
1025 * Executes the given task and any remaining local tasks.
1026 */
1027 final void runTask(ForkJoinTask<?> task) {
1028 if (task != null) {
1029 scanState &= ~SCANNING; // mark as busy
1030 (currentSteal = task).doExec();
1031 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
1032 execLocalTasks();
1033 ForkJoinWorkerThread thread = owner;
1034 if (++nsteals < 0) // collect on overflow
1035 transferStealCount(pool);
1036 scanState |= SCANNING;
1037 if (thread != null)
1038 thread.afterTopLevelExec();
1039 }
1040 }
1041
1042 /**
1043 * Adds steal count to pool stealCounter if it exists, and resets.
1044 */
1045 final void transferStealCount(ForkJoinPool p) {
1046 AtomicLong sc;
1047 if (p != null && (sc = p.stealCounter) != null) {
1048 int s = nsteals;
1049 nsteals = 0; // if negative, correct for overflow
1050 sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
1051 }
1052 }
1053
1054 /**
1055 * If present, removes from queue and executes the given task,
1056 * or any other cancelled task. Used only by awaitJoin.
1057 *
1058 * @return true if queue empty and task not known to be done
1059 */
1060 final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
1061 ForkJoinTask<?>[] a; int m, s, b, n;
1062 if ((a = array) != null && (m = a.length - 1) >= 0 &&
1063 task != null) {
1064 while ((n = (s = top) - (b = base)) > 0) {
1065 for (ForkJoinTask<?> t;;) { // traverse from s to b
1066 long j = ((--s & m) << ASHIFT) + ABASE;
1067 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
1068 return s + 1 == top; // shorter than expected
1069 else if (t == task) {
1070 boolean removed = false;
1071 if (s + 1 == top) { // pop
1072 if (U.compareAndSwapObject(a, j, task, null)) {
1073 U.putOrderedInt(this, QTOP, s);
1074 removed = true;
1075 }
1076 }
1077 else if (base == b) // replace with proxy
1078 removed = U.compareAndSwapObject(
1079 a, j, task, new EmptyTask());
1080 if (removed)
1081 task.doExec();
1082 break;
1083 }
1084 else if (t.status < 0 && s + 1 == top) {
1085 if (U.compareAndSwapObject(a, j, t, null))
1086 U.putOrderedInt(this, QTOP, s);
1087 break; // was cancelled
1088 }
1089 if (--n == 0)
1090 return false;
1091 }
1092 if (task.status < 0)
1093 return false;
1094 }
1095 }
1096 return true;
1097 }
1098
1099 /**
1100 * Pops task if in the same CC computation as the given task,
1101 * in either shared or owned mode. Used only by helpComplete.
1102 */
1103 final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1104 int s; ForkJoinTask<?>[] a; Object o;
1105 if (base - (s = top) < 0 && (a = array) != null) {
1106 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
1107 if ((o = U.getObjectVolatile(a, j)) != null &&
1108 (o instanceof CountedCompleter)) {
1109 CountedCompleter<?> t = (CountedCompleter<?>)o;
1110 for (CountedCompleter<?> r = t;;) {
1111 if (r == task) {
1112 if (mode < 0) { // must lock
1113 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1114 if (top == s && array == a &&
1115 U.compareAndSwapObject(a, j, t, null)) {
1116 U.putOrderedInt(this, QTOP, s - 1);
1117 U.putOrderedInt(this, QLOCK, 0);
1118 return t;
1119 }
1120 U.compareAndSwapInt(this, QLOCK, 1, 0);
1121 }
1122 }
1123 else if (U.compareAndSwapObject(a, j, t, null)) {
1124 U.putOrderedInt(this, QTOP, s - 1);
1125 return t;
1126 }
1127 break;
1128 }
1129 else if ((r = r.completer) == null) // try parent
1130 break;
1131 }
1132 }
1133 }
1134 return null;
1135 }
1136
1137 /**
1138 * Steals and runs a task in the same CC computation as the
1139 * given task if one exists and can be taken without
1140 * contention. Otherwise returns a checksum/control value for
1141 * use by method helpComplete.
1142 *
1143 * @return 1 if successful, 2 if retryable (lost to another
1144 * stealer), -1 if non-empty but no matching task found, else
1145 * the base index, forced negative.
1146 */
1147 final int pollAndExecCC(CountedCompleter<?> task) {
1148 int b, h; ForkJoinTask<?>[] a; Object o;
1149 if ((b = base) - top >= 0 || (a = array) == null)
1150 h = b | Integer.MIN_VALUE; // to sense movement on re-poll
1151 else {
1152 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
1153 if ((o = U.getObjectVolatile(a, j)) == null)
1154 h = 2; // retryable
1155 else if (!(o instanceof CountedCompleter))
1156 h = -1; // unmatchable
1157 else {
1158 CountedCompleter<?> t = (CountedCompleter<?>)o;
1159 for (CountedCompleter<?> r = t;;) {
1160 if (r == task) {
1161 if (base == b &&
1162 U.compareAndSwapObject(a, j, t, null)) {
1163 base = b + 1;
1164 t.doExec();
1165 h = 1; // success
1166 }
1167 else
1168 h = 2; // lost CAS
1169 break;
1170 }
1171 else if ((r = r.completer) == null) {
1172 h = -1; // unmatched
1173 break;
1174 }
1175 }
1176 }
1177 }
1178 return h;
1179 }
1180
1181 /**
1182 * Returns true if owned and not known to be blocked.
1183 */
1184 final boolean isApparentlyUnblocked() {
1185 Thread wt; Thread.State s;
1186 return (scanState >= 0 &&
1187 (wt = owner) != null &&
1188 (s = wt.getState()) != Thread.State.BLOCKED &&
1189 s != Thread.State.WAITING &&
1190 s != Thread.State.TIMED_WAITING);
1191 }
1192
1193 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1194 private static final sun.misc.Unsafe U;
1195 private static final int ABASE;
1196 private static final int ASHIFT;
1197 private static final long QBASE;
1198 private static final long QTOP;
1199 private static final long QLOCK;
1200 private static final long QSCANSTATE;
1201 private static final long QCURRENTSTEAL;
1202 static {
1203 try {
1204 U = sun.misc.Unsafe.getUnsafe();
1205 Class<?> wk = WorkQueue.class;
1206 Class<?> ak = ForkJoinTask[].class;
1207 QBASE = U.objectFieldOffset
1208 (wk.getDeclaredField("base"));
1209 QTOP = U.objectFieldOffset
1210 (wk.getDeclaredField("top"));
1211 QLOCK = U.objectFieldOffset
1212 (wk.getDeclaredField("qlock"));
1213 QSCANSTATE = U.objectFieldOffset
1214 (wk.getDeclaredField("scanState"));
1215 QCURRENTSTEAL = U.objectFieldOffset
1216 (wk.getDeclaredField("currentSteal"));
1217 ABASE = U.arrayBaseOffset(ak);
1218 int scale = U.arrayIndexScale(ak);
1219 if ((scale & (scale - 1)) != 0)
1220 throw new Error("data type scale not a power of two");
1221 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1222 } catch (Exception e) {
1223 throw new Error(e);
1224 }
1225 }
1226 }
1227
1228 // static fields (initialized in static initializer below)
1229
1230 /**
1231 * Creates a new ForkJoinWorkerThread. This factory is used unless
1232 * overridden in ForkJoinPool constructors.
1233 */
1234 public static final ForkJoinWorkerThreadFactory
1235 defaultForkJoinWorkerThreadFactory;
1236
1237 /**
1238 * Permission required for callers of methods that may start or
1239 * kill threads.
1240 */
1241 private static final RuntimePermission modifyThreadPermission;
1242
1243 /**
1244 * Common (static) pool. Non-null for public use unless a static
1245 * construction exception, but internal usages null-check on use
1246 * to paranoically avoid potential initialization circularities
1247 * as well as to simplify generated code.
1248 */
1249 static final ForkJoinPool common;
1250
1251 /**
1252 * Common pool parallelism. To allow simpler use and management
1253 * when common pool threads are disabled, we allow the underlying
1254 * common.parallelism field to be zero, but in that case still report
1255 * parallelism as 1 to reflect resulting caller-runs mechanics.
1256 */
1257 static final int commonParallelism;
1258
1259 /**
1260 * Limit on spare thread construction in tryCompensate.
1261 */
1262 private static int commonMaxSpares;
1263
1264 /**
1265 * Sequence number for creating workerNamePrefix.
1266 */
1267 private static int poolNumberSequence;
1268
1269 /**
1270 * Returns the next sequence number. We don't expect this to
1271 * ever contend, so use simple builtin sync.
1272 */
1273 private static final synchronized int nextPoolId() {
1274 return ++poolNumberSequence;
1275 }
1276
1277 // static configuration constants
1278
1279 /**
1280 * Initial timeout value (in nanoseconds) for the thread
1281 * triggering quiescence to park waiting for new work. On timeout,
1282 * the thread will instead try to shrink the number of
1283 * workers. The value should be large enough to avoid overly
1284 * aggressive shrinkage during most transient stalls (long GCs
1285 * etc).
1286 */
1287 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
1288
1289 /**
1290 * Tolerance for idle timeouts, to cope with timer undershoots
1291 */
1292 private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
1293
1294 /**
1295 * The initial value for commonMaxSpares during static
1296 * initialization unless overridden using System property
1297 * "java.util.concurrent.ForkJoinPool.common.maximumSpares". The
1298 * default value is far in excess of normal requirements, but also
1299 * far short of MAX_CAP and typical OS thread limits, so allows
1300 * JVMs to catch misuse/abuse before running out of resources
1301 * needed to do so.
1302 */
1303 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1304
1305 /**
1306 * Number of times to spin-wait before blocking. The spins (in
1307 * awaitRunStateLock and awaitWork) currently use randomized
1308 * spins. If/when MWAIT-like intrinsics becomes available, they
1309 * may allow quieter spinning. The value of SPINS must be a power
1310 * of two, at least 4. The current value causes spinning for a
1311 * small fraction of typical context-switch times, well worthwhile
1312 * given the typical likelihoods that blocking is not necessary.
1313 */
1314 private static final int SPINS = 1 << 11;
1315
1316 /**
1317 * Increment for seed generators. See class ThreadLocal for
1318 * explanation.
1319 */
1320 private static final int SEED_INCREMENT = 0x9e3779b9;
1321
1322 /*
1323 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1324 * AC: Number of active running workers minus target parallelism
1325 * TC: Number of total workers minus target parallelism
1326 * SS: version count and status of top waiting thread
1327 * ID: poolIndex of top of Treiber stack of waiters
1328 *
1329 * When convenient, we can extract the lower 32 stack top bits
1330 * (including version bits) as sp=(int)ctl. The offsets of counts
1331 * by the target parallelism and the positionings of fields makes
1332 * it possible to perform the most common checks via sign tests of
1333 * fields: When ac is negative, there are not enough active
1334 * workers, when tc is negative, there are not enough total
1335 * workers. When sp is non-zero, there are waiting workers. To
1336 * deal with possibly negative fields, we use casts in and out of
1337 * "short" and/or signed shifts to maintain signedness.
1338 *
1339 * Because it occupies uppermost bits, we can add one active count
1340 * using getAndAddLong of AC_UNIT, rather than CAS, when returning
1341 * from a blocked join. Other updates entail multiple subfields
1342 * and masking, requiring CAS.
1343 */
1344
1345 // Lower and upper word masks
1346 private static final long SP_MASK = 0xffffffffL;
1347 private static final long UC_MASK = ~SP_MASK;
1348
1349 // Active counts
1350 private static final int AC_SHIFT = 48;
1351 private static final long AC_UNIT = 0x0001L << AC_SHIFT;
1352 private static final long AC_MASK = 0xffffL << AC_SHIFT;
1353
1354 // Total counts
1355 private static final int TC_SHIFT = 32;
1356 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1357 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1358 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1359
1360 // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
1361 private static final int RSLOCK = 1;
1362 private static final int RSIGNAL = 1 << 1;
1363 private static final int STARTED = 1 << 2;
1364 private static final int STOP = 1 << 29;
1365 private static final int TERMINATED = 1 << 30;
1366 private static final int SHUTDOWN = 1 << 31;
1367
1368 // Instance fields
1369 volatile long ctl; // main pool control
1370 volatile int runState; // lockable status
1371 final int config; // parallelism, mode
1372 int indexSeed; // to generate worker index
1373 volatile WorkQueue[] workQueues; // main registry
1374 final ForkJoinWorkerThreadFactory factory;
1375 final UncaughtExceptionHandler ueh; // per-worker UEH
1376 final String workerNamePrefix; // to create worker name string
1377 volatile AtomicLong stealCounter; // also used as sync monitor
1378
1379 /**
1380 * Acquires the runState lock; returns current (locked) runState.
1381 */
1382 private int lockRunState() {
1383 int rs;
1384 return ((((rs = runState) & RSLOCK) != 0 ||
1385 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
1386 awaitRunStateLock() : rs);
1387 }
1388
1389 /**
1390 * Spins and/or blocks until runstate lock is available. See
1391 * above for explanation.
1392 */
1393 private int awaitRunStateLock() {
1394 Object lock;
1395 boolean wasInterrupted = false;
1396 for (int spins = SPINS, r = 0, rs, ns;;) {
1397 if (((rs = runState) & RSLOCK) == 0) {
1398 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
1399 if (wasInterrupted) {
1400 try {
1401 Thread.currentThread().interrupt();
1402 } catch (SecurityException ignore) {
1403 }
1404 }
1405 return ns;
1406 }
1407 }
1408 else if (r == 0)
1409 r = ThreadLocalRandom.nextSecondarySeed();
1410 else if (spins > 0) {
1411 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1412 if (r >= 0)
1413 --spins;
1414 }
1415 else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
1416 Thread.yield(); // initialization race
1417 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
1418 synchronized (lock) {
1419 if ((runState & RSIGNAL) != 0) {
1420 try {
1421 lock.wait();
1422 } catch (InterruptedException ie) {
1423 if (!(Thread.currentThread() instanceof
1424 ForkJoinWorkerThread))
1425 wasInterrupted = true;
1426 }
1427 }
1428 else
1429 lock.notifyAll();
1430 }
1431 }
1432 }
1433 }
1434
1435 /**
1436 * Unlocks and sets runState to newRunState.
1437 *
1438 * @param oldRunState a value returned from lockRunState
1439 * @param newRunState the next value (must have lock bit clear).
1440 */
1441 private void unlockRunState(int oldRunState, int newRunState) {
1442 if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
1443 Object lock = stealCounter;
1444 runState = newRunState; // clears RSIGNAL bit
1445 if (lock != null)
1446 synchronized (lock) { lock.notifyAll(); }
1447 }
1448 }
1449
1450 // Creating, registering and deregistering workers
1451
1452 /**
1453 * Tries to construct and start one worker. Assumes that total
1454 * count has already been incremented as a reservation. Invokes
1455 * deregisterWorker on any failure.
1456 *
1457 * @return true if successful
1458 */
1459 private boolean createWorker() {
1460 ForkJoinWorkerThreadFactory fac = factory;
1461 Throwable ex = null;
1462 ForkJoinWorkerThread wt = null;
1463 try {
1464 if (fac != null && (wt = fac.newThread(this)) != null) {
1465 wt.start();
1466 return true;
1467 }
1468 } catch (Throwable rex) {
1469 ex = rex;
1470 }
1471 deregisterWorker(wt, ex);
1472 return false;
1473 }
1474
1475 /**
1476 * Tries to add one worker, incrementing ctl counts before doing
1477 * so, relying on createWorker to back out on failure.
1478 *
1479 * @param c incoming ctl value, with total count negative and no
1480 * idle workers. On CAS failure, c is refreshed and retried if
1481 * this holds (otherwise, a new worker is not needed).
1482 */
1483 private void tryAddWorker(long c) {
1484 boolean add = false;
1485 do {
1486 long nc = ((AC_MASK & (c + AC_UNIT)) |
1487 (TC_MASK & (c + TC_UNIT)));
1488 if (ctl == c) {
1489 int rs, stop; // check if terminating
1490 if ((stop = (rs = lockRunState()) & STOP) == 0)
1491 add = U.compareAndSwapLong(this, CTL, c, nc);
1492 unlockRunState(rs, rs & ~RSLOCK);
1493 if (stop != 0)
1494 break;
1495 if (add) {
1496 createWorker();
1497 break;
1498 }
1499 }
1500 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1501 }
1502
1503 /**
1504 * Callback from ForkJoinWorkerThread constructor to establish and
1505 * record its WorkQueue.
1506 *
1507 * @param wt the worker thread
1508 * @return the worker's queue
1509 */
1510 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1511 UncaughtExceptionHandler handler;
1512 wt.setDaemon(true); // configure thread
1513 if ((handler = ueh) != null)
1514 wt.setUncaughtExceptionHandler(handler);
1515 WorkQueue w = new WorkQueue(this, wt);
1516 int i = 0; // assign a pool index
1517 int mode = config & MODE_MASK;
1518 int rs = lockRunState();
1519 try {
1520 WorkQueue[] ws; int n; // skip if no array
1521 if ((ws = workQueues) != null && (n = ws.length) > 0) {
1522 int s = indexSeed += SEED_INCREMENT; // unlikely to collide
1523 int m = n - 1;
1524 i = ((s << 1) | 1) & m; // odd-numbered indices
1525 if (ws[i] != null) { // collision
1526 int probes = 0; // step by approx half n
1527 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1528 while (ws[i = (i + step) & m] != null) {
1529 if (++probes >= n) {
1530 workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1531 m = n - 1;
1532 probes = 0;
1533 }
1534 }
1535 }
1536 w.hint = s; // use as random seed
1537 w.config = i | mode;
1538 w.scanState = i; // publication fence
1539 ws[i] = w;
1540 }
1541 } finally {
1542 unlockRunState(rs, rs & ~RSLOCK);
1543 }
1544 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
1545 return w;
1546 }
1547
1548 /**
1549 * Final callback from terminating worker, as well as upon failure
1550 * to construct or start a worker. Removes record of worker from
1551 * array, and adjusts counts. If pool is shutting down, tries to
1552 * complete termination.
1553 *
1554 * @param wt the worker thread, or null if construction failed
1555 * @param ex the exception causing failure, or null if none
1556 */
1557 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1558 WorkQueue w = null;
1559 if (wt != null && (w = wt.workQueue) != null) {
1560 WorkQueue[] ws; // remove index from array
1561 int idx = w.config & SMASK;
1562 int rs = lockRunState();
1563 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
1564 ws[idx] = null;
1565 unlockRunState(rs, rs & ~RSLOCK);
1566 }
1567 long c; // decrement counts
1568 do {} while (!U.compareAndSwapLong
1569 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
1570 (TC_MASK & (c - TC_UNIT)) |
1571 (SP_MASK & c))));
1572 if (w != null) {
1573 w.qlock = -1; // ensure set
1574 w.transferStealCount(this);
1575 w.cancelAll(); // cancel remaining tasks
1576 }
1577 for (;;) { // possibly replace
1578 WorkQueue[] ws; int m, sp;
1579 if (tryTerminate(false, false) || w == null || w.array == null ||
1580 (runState & STOP) != 0 || (ws = workQueues) == null ||
1581 (m = ws.length - 1) < 0) // already terminating
1582 break;
1583 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
1584 if (tryRelease(c, ws[sp & m], AC_UNIT))
1585 break;
1586 }
1587 else if (ex != null && (c & ADD_WORKER) != 0L) {
1588 tryAddWorker(c); // create replacement
1589 break;
1590 }
1591 else // don't need replacement
1592 break;
1593 }
1594 if (ex == null) // help clean on way out
1595 ForkJoinTask.helpExpungeStaleExceptions();
1596 else // rethrow
1597 ForkJoinTask.rethrow(ex);
1598 }
1599
1600 // Signalling
1601
1602 /**
1603 * Tries to create or activate a worker if too few are active.
1604 *
1605 * @param ws the worker array to use to find signallees
1606 * @param q a WorkQueue --if non-null, don't retry if now empty
1607 */
1608 final void signalWork(WorkQueue[] ws, WorkQueue q) {
1609 long c; int sp, i; WorkQueue v; Thread p;
1610 while ((c = ctl) < 0L) { // too few active
1611 if ((sp = (int)c) == 0) { // no idle workers
1612 if ((c & ADD_WORKER) != 0L) // too few workers
1613 tryAddWorker(c);
1614 break;
1615 }
1616 if (ws == null) // unstarted/terminated
1617 break;
1618 if (ws.length <= (i = sp & SMASK)) // terminated
1619 break;
1620 if ((v = ws[i]) == null) // terminating
1621 break;
1622 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
1623 int d = sp - v.scanState; // screen CAS
1624 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
1625 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
1626 v.scanState = vs; // activate v
1627 if ((p = v.parker) != null)
1628 U.unpark(p);
1629 break;
1630 }
1631 if (q != null && q.base == q.top) // no more work
1632 break;
1633 }
1634 }
1635
1636 /**
1637 * Signals and releases worker v if it is top of idle worker
1638 * stack. This performs a one-shot version of signalWork only if
1639 * there is (apparently) at least one idle worker.
1640 *
1641 * @param c incoming ctl value
1642 * @param v if non-null, a worker
1643 * @param inc the increment to active count (zero when compensating)
1644 * @return true if successful
1645 */
1646 private boolean tryRelease(long c, WorkQueue v, long inc) {
1647 int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
1648 if (v != null && v.scanState == sp) { // v is at top of stack
1649 long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
1650 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1651 v.scanState = vs;
1652 if ((p = v.parker) != null)
1653 U.unpark(p);
1654 return true;
1655 }
1656 }
1657 return false;
1658 }
1659
1660 // Scanning for tasks
1661
1662 /**
1663 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1664 */
1665 final void runWorker(WorkQueue w) {
1666 w.growArray(); // allocate queue
1667 int seed = w.hint; // initially holds randomization hint
1668 int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
1669 for (ForkJoinTask<?> t;;) {
1670 if ((t = scan(w, r)) != null)
1671 w.runTask(t);
1672 else if (!awaitWork(w, r))
1673 break;
1674 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1675 }
1676 }
1677
1678 /**
1679 * Scans for and tries to steal a top-level task. Scans start at a
1680 * random location, randomly moving on apparent contention,
1681 * otherwise continuing linearly until reaching two consecutive
1682 * empty passes over all queues with the same checksum (summing
1683 * each base index of each queue, that moves on each steal), at
1684 * which point the worker tries to inactivate and then re-scans,
1685 * attempting to re-activate (itself or some other worker) if
1686 * finding a task; otherwise returning null to await work. Scans
1687 * otherwise touch as little memory as possible, to reduce
1688 * disruption on other scanning threads.
1689 *
1690 * @param w the worker (via its WorkQueue)
1691 * @param r a random seed
1692 * @return a task, or null if none found
1693 */
1694 private ForkJoinTask<?> scan(WorkQueue w, int r) {
1695 WorkQueue[] ws; int m;
1696 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
1697 int ss = w.scanState; // initially non-negative
1698 for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
1699 WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1700 int b, n; long c;
1701 if ((q = ws[k]) != null) {
1702 if ((n = (b = q.base) - q.top) < 0 &&
1703 (a = q.array) != null) { // non-empty
1704 long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1705 if ((t = ((ForkJoinTask<?>)
1706 U.getObjectVolatile(a, i))) != null &&
1707 q.base == b) {
1708 if (ss >= 0) {
1709 if (U.compareAndSwapObject(a, i, t, null)) {
1710 q.base = b + 1;
1711 if (n < -1) // signal others
1712 signalWork(ws, q);
1713 return t;
1714 }
1715 }
1716 else if (oldSum == 0 && // try to activate
1717 w.scanState < 0)
1718 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
1719 }
1720 if (ss < 0) // refresh
1721 ss = w.scanState;
1722 r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
1723 origin = k = r & m; // move and rescan
1724 oldSum = checkSum = 0;
1725 continue;
1726 }
1727 checkSum += b;
1728 }
1729 if ((k = (k + 1) & m) == origin) { // continue until stable
1730 if ((ss >= 0 || (ss == (ss = w.scanState))) &&
1731 oldSum == (oldSum = checkSum)) {
1732 if (ss < 0 || w.qlock < 0) // already inactive
1733 break;
1734 int ns = ss | INACTIVE; // try to inactivate
1735 long nc = ((SP_MASK & ns) |
1736 (UC_MASK & ((c = ctl) - AC_UNIT)));
1737 w.stackPred = (int)c; // hold prev stack top
1738 U.putInt(w, QSCANSTATE, ns);
1739 if (U.compareAndSwapLong(this, CTL, c, nc))
1740 ss = ns;
1741 else
1742 w.scanState = ss; // back out
1743 }
1744 checkSum = 0;
1745 }
1746 }
1747 }
1748 return null;
1749 }
1750
1751 /**
1752 * Possibly blocks worker w waiting for a task to steal, or
1753 * returns false if the worker should terminate. If inactivating
1754 * w has caused the pool to become quiescent, checks for pool
1755 * termination, and, so long as this is not the only worker, waits
1756 * for up to a given duration. On timeout, if ctl has not
1757 * changed, terminates the worker, which will in turn wake up
1758 * another worker to possibly repeat this process.
1759 *
1760 * @param w the calling worker
1761 * param r a random seed (for spins)
1762 * @return false if the worker should terminate
1763 */
1764 private boolean awaitWork(WorkQueue w, int r) {
1765 if (w == null || w.qlock < 0) // w is terminating
1766 return false;
1767 for (int pred = w.stackPred, spins = SPINS, ss;;) {
1768 if ((ss = w.scanState) >= 0)
1769 break;
1770 else if (spins > 0) {
1771 r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
1772 if (r >= 0 && --spins == 0) { // randomize spins
1773 WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
1774 if (pred != 0 && (ws = workQueues) != null &&
1775 (j = pred & SMASK) < ws.length &&
1776 (v = ws[j]) != null && // see if pred parking
1777 (v.parker == null || v.scanState >= 0))
1778 spins = SPINS; // continue spinning
1779 }
1780 }
1781 else if (w.qlock < 0) // recheck after spins
1782 return false;
1783 else if (!Thread.interrupted()) {
1784 long c, prevctl, parkTime, deadline;
1785 int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
1786 if ((ac <= 0 && tryTerminate(false, false)) ||
1787 (runState & STOP) != 0) // pool terminating
1788 return false;
1789 if (ac <= 0 && ss == (int)c) { // is last waiter
1790 prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
1791 int t = (short)(c >>> TC_SHIFT); // shrink excess spares
1792 if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
1793 return false; // else use timed wait
1794 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
1795 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1796 }
1797 else
1798 prevctl = parkTime = deadline = 0L;
1799 Thread wt = Thread.currentThread();
1800 U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
1801 w.parker = wt;
1802 if (w.scanState < 0 && ctl == c) // recheck before park
1803 U.park(false, parkTime);
1804 U.putOrderedObject(w, QPARKER, null);
1805 U.putObject(wt, PARKBLOCKER, null);
1806 if (w.scanState >= 0)
1807 break;
1808 if (parkTime != 0L && ctl == c &&
1809 deadline - System.nanoTime() <= 0L &&
1810 U.compareAndSwapLong(this, CTL, c, prevctl))
1811 return false; // shrink pool
1812 }
1813 }
1814 return true;
1815 }
1816
1817 // Joining tasks
1818
1819 /**
1820 * Tries to steal and run tasks within the target's computation.
1821 * Uses a variant of the top-level algorithm, restricted to tasks
1822 * with the given task as ancestor: It prefers taking and running
1823 * eligible tasks popped from the worker's own queue (via
1824 * popCC). Otherwise it scans others, randomly moving on
1825 * contention or execution, deciding to give up based on a
1826 * checksum (via return codes frob pollAndExecCC). The maxTasks
1827 * argument supports external usages; internal calls use zero,
1828 * allowing unbounded steps (external calls trap non-positive
1829 * values).
1830 *
1831 * @param w caller
1832 * @param maxTasks if non-zero, the maximum number of other tasks to run
1833 * @return task status on exit
1834 */
1835 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1836 int maxTasks) {
1837 WorkQueue[] ws; int s = 0, m;
1838 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1839 task != null && w != null) {
1840 int mode = w.config; // for popCC
1841 int r = w.hint ^ w.top; // arbitrary seed for origin
1842 int origin = r & m; // first queue to scan
1843 int h = 1; // 1:ran, >1:contended, <0:hash
1844 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1845 CountedCompleter<?> p; WorkQueue q;
1846 if ((s = task.status) < 0)
1847 break;
1848 if (h == 1 && (p = w.popCC(task, mode)) != null) {
1849 p.doExec(); // run local task
1850 if (maxTasks != 0 && --maxTasks == 0)
1851 break;
1852 origin = k; // reset
1853 oldSum = checkSum = 0;
1854 }
1855 else { // poll other queues
1856 if ((q = ws[k]) == null)
1857 h = 0;
1858 else if ((h = q.pollAndExecCC(task)) < 0)
1859 checkSum += h;
1860 if (h > 0) {
1861 if (h == 1 && maxTasks != 0 && --maxTasks == 0)
1862 break;
1863 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1864 origin = k = r & m; // move and restart
1865 oldSum = checkSum = 0;
1866 }
1867 else if ((k = (k + 1) & m) == origin) {
1868 if (oldSum == (oldSum = checkSum))
1869 break;
1870 checkSum = 0;
1871 }
1872 }
1873 }
1874 }
1875 return s;
1876 }
1877
1878 /**
1879 * Tries to locate and execute tasks for a stealer of the given
1880 * task, or in turn one of its stealers, Traces currentSteal ->
1881 * currentJoin links looking for a thread working on a descendant
1882 * of the given task and with a non-empty queue to steal back and
1883 * execute tasks from. The first call to this method upon a
1884 * waiting join will often entail scanning/search, (which is OK
1885 * because the joiner has nothing better to do), but this method
1886 * leaves hints in workers to speed up subsequent calls.
1887 *
1888 * @param w caller
1889 * @param task the task to join
1890 */
1891 private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
1892 WorkQueue[] ws = workQueues;
1893 int oldSum = 0, checkSum, m;
1894 if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
1895 task != null) {
1896 do { // restart point
1897 checkSum = 0; // for stability check
1898 ForkJoinTask<?> subtask;
1899 WorkQueue j = w, v; // v is subtask stealer
1900 descent: for (subtask = task; subtask.status >= 0; ) {
1901 for (int h = j.hint | 1, k = 0, i; ; k += 2) {
1902 if (k > m) // can't find stealer
1903 break descent;
1904 if ((v = ws[i = (h + k) & m]) != null) {
1905 if (v.currentSteal == subtask) {
1906 j.hint = i;
1907 break;
1908 }
1909 checkSum += v.base;
1910 }
1911 }
1912 for (;;) { // help v or descend
1913 ForkJoinTask<?>[] a; int b;
1914 checkSum += (b = v.base);
1915 ForkJoinTask<?> next = v.currentJoin;
1916 if (subtask.status < 0 || j.currentJoin != subtask ||
1917 v.currentSteal != subtask) // stale
1918 break descent;
1919 if (b - v.top >= 0 || (a = v.array) == null) {
1920 if ((subtask = next) == null)
1921 break descent;
1922 j = v;
1923 break;
1924 }
1925 int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1926 ForkJoinTask<?> t = ((ForkJoinTask<?>)
1927 U.getObjectVolatile(a, i));
1928 if (v.base == b) {
1929 if (t == null) // stale
1930 break descent;
1931 if (U.compareAndSwapObject(a, i, t, null)) {
1932 v.base = b + 1;
1933 ForkJoinTask<?> ps = w.currentSteal;
1934 int top = w.top;
1935 do {
1936 U.putOrderedObject(w, QCURRENTSTEAL, t);
1937 t.doExec(); // clear local tasks too
1938 } while (task.status >= 0 &&
1939 w.top != top &&
1940 (t = w.pop()) != null);
1941 U.putOrderedObject(w, QCURRENTSTEAL, ps);
1942 if (w.base != w.top)
1943 return; // can't further help
1944 }
1945 }
1946 }
1947 }
1948 } while (task.status >= 0 && oldSum != (oldSum = checkSum));
1949 }
1950 }
1951
1952 /**
1953 * Tries to decrement active count (sometimes implicitly) and
1954 * possibly release or create a compensating worker in preparation
1955 * for blocking. Returns false (retryable by caller), on
1956 * contention, detected staleness, instability, or termination.
1957 *
1958 * @param w caller
1959 */
1960 private boolean tryCompensate(WorkQueue w) {
1961 boolean canBlock;
1962 WorkQueue[] ws; long c; int m, pc, sp;
1963 if (w == null || w.qlock < 0 || // caller terminating
1964 (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
1965 (pc = config & SMASK) == 0) // parallelism disabled
1966 canBlock = false;
1967 else if ((sp = (int)(c = ctl)) != 0) // release idle worker
1968 canBlock = tryRelease(c, ws[sp & m], 0L);
1969 else {
1970 int ac = (int)(c >> AC_SHIFT) + pc;
1971 int tc = (short)(c >> TC_SHIFT) + pc;
1972 int nbusy = 0; // validate saturation
1973 for (int i = 0; i <= m; ++i) { // two passes of odd indices
1974 WorkQueue v;
1975 if ((v = ws[((i << 1) | 1) & m]) != null) {
1976 if ((v.scanState & SCANNING) != 0)
1977 break;
1978 ++nbusy;
1979 }
1980 }
1981 if (nbusy != (tc << 1) || ctl != c)
1982 canBlock = false; // unstable or stale
1983 else if (tc >= pc && ac > 1 && w.isEmpty()) {
1984 long nc = ((AC_MASK & (c - AC_UNIT)) |
1985 (~AC_MASK & c)); // uncompensated
1986 canBlock = U.compareAndSwapLong(this, CTL, c, nc);
1987 }
1988 else if (tc >= MAX_CAP ||
1989 (this == common && tc >= pc + commonMaxSpares))
1990 throw new RejectedExecutionException(
1991 "Thread limit exceeded replacing blocked worker");
1992 else { // similar to tryAddWorker
1993 boolean add = false; int rs; // CAS within lock
1994 long nc = ((AC_MASK & c) |
1995 (TC_MASK & (c + TC_UNIT)));
1996 if (((rs = lockRunState()) & STOP) == 0)
1997 add = U.compareAndSwapLong(this, CTL, c, nc);
1998 unlockRunState(rs, rs & ~RSLOCK);
1999 canBlock = add && createWorker(); // throws on exception
2000 }
2001 }
2002 return canBlock;
2003 }
2004
2005 /**
2006 * Helps and/or blocks until the given task is done or timeout.
2007 *
2008 * @param w caller
2009 * @param task the task
2010 * @param if nonzero, deadline for timed waits
2011 * @return task status on exit
2012 */
2013 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
2014 int s = 0;
2015 if (task != null && w != null) {
2016 ForkJoinTask<?> prevJoin = w.currentJoin;
2017 U.putOrderedObject(w, QCURRENTJOIN, task);
2018 CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
2019 (CountedCompleter<?>)task : null;
2020 for (;;) {
2021 if ((s = task.status) < 0)
2022 break;
2023 if (cc != null)
2024 helpComplete(w, cc, 0);
2025 else if (w.base == w.top || w.tryRemoveAndExec(task))
2026 helpStealer(w, task);
2027 if ((s = task.status) < 0)
2028 break;
2029 long ms, ns;
2030 if (deadline == 0L)
2031 ms = 0L;
2032 else if ((ns = deadline - System.nanoTime()) <= 0L)
2033 break;
2034 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
2035 ms = 1L;
2036 if (tryCompensate(w)) {
2037 task.internalWait(ms);
2038 U.getAndAddLong(this, CTL, AC_UNIT);
2039 }
2040 }
2041 U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
2042 }
2043 return s;
2044 }
2045
2046 // Specialized scanning
2047
2048 /**
2049 * Returns a (probably) non-empty steal queue, if one is found
2050 * during a scan, else null. This method must be retried by
2051 * caller if, by the time it tries to use the queue, it is empty.
2052 */
2053 private WorkQueue findNonEmptyStealQueue() {
2054 WorkQueue[] ws; int m; // one-shot version of scan loop
2055 int r = ThreadLocalRandom.nextSecondarySeed();
2056 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2057 for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
2058 WorkQueue q; int b;
2059 if ((q = ws[k]) != null) {
2060 if ((b = q.base) - q.top < 0)
2061 return q;
2062 checkSum += b;
2063 }
2064 if ((k = (k + 1) & m) == origin) {
2065 if (oldSum == (oldSum = checkSum))
2066 break;
2067 checkSum = 0;
2068 }
2069 }
2070 }
2071 return null;
2072 }
2073
2074 /**
2075 * Runs tasks until {@code isQuiescent()}. We piggyback on
2076 * active count ctl maintenance, but rather than blocking
2077 * when tasks cannot be found, we rescan until all others cannot
2078 * find tasks either.
2079 */
2080 final void helpQuiescePool(WorkQueue w) {
2081 ForkJoinTask<?> ps = w.currentSteal; // save context
2082 for (boolean active = true;;) {
2083 long c; WorkQueue q; ForkJoinTask<?> t; int b;
2084 w.execLocalTasks(); // run locals before each scan
2085 if ((q = findNonEmptyStealQueue()) != null) {
2086 if (!active) { // re-establish active count
2087 active = true;
2088 U.getAndAddLong(this, CTL, AC_UNIT);
2089 }
2090 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2091 U.putOrderedObject(w, QCURRENTSTEAL, t);
2092 t.doExec();
2093 if (++w.nsteals < 0)
2094 w.transferStealCount(this);
2095 }
2096 }
2097 else if (active) { // decrement active count without queuing
2098 long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
2099 if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
2100 break; // bypass decrement-then-increment
2101 if (U.compareAndSwapLong(this, CTL, c, nc))
2102 active = false;
2103 }
2104 else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
2105 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2106 break;
2107 }
2108 U.putOrderedObject(w, QCURRENTSTEAL, ps);
2109 }
2110
2111 /**
2112 * Gets and removes a local or stolen task for the given worker.
2113 *
2114 * @return a task, if available
2115 */
2116 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2117 for (ForkJoinTask<?> t;;) {
2118 WorkQueue q; int b;
2119 if ((t = w.nextLocalTask()) != null)
2120 return t;
2121 if ((q = findNonEmptyStealQueue()) == null)
2122 return null;
2123 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2124 return t;
2125 }
2126 }
2127
2128 /**
2129 * Returns a cheap heuristic guide for task partitioning when
2130 * programmers, frameworks, tools, or languages have little or no
2131 * idea about task granularity. In essence by offering this
2132 * method, we ask users only about tradeoffs in overhead vs
2133 * expected throughput and its variance, rather than how finely to
2134 * partition tasks.
2135 *
2136 * In a steady state strict (tree-structured) computation, each
2137 * thread makes available for stealing enough tasks for other
2138 * threads to remain active. Inductively, if all threads play by
2139 * the same rules, each thread should make available only a
2140 * constant number of tasks.
2141 *
2142 * The minimum useful constant is just 1. But using a value of 1
2143 * would require immediate replenishment upon each steal to
2144 * maintain enough tasks, which is infeasible. Further,
2145 * partitionings/granularities of offered tasks should minimize
2146 * steal rates, which in general means that threads nearer the top
2147 * of computation tree should generate more than those nearer the
2148 * bottom. In perfect steady state, each thread is at
2149 * approximately the same level of computation tree. However,
2150 * producing extra tasks amortizes the uncertainty of progress and
2151 * diffusion assumptions.
2152 *
2153 * So, users will want to use values larger (but not much larger)
2154 * than 1 to both smooth over transient shortages and hedge
2155 * against uneven progress; as traded off against the cost of
2156 * extra task overhead. We leave the user to pick a threshold
2157 * value to compare with the results of this call to guide
2158 * decisions, but recommend values such as 3.
2159 *
2160 * When all threads are active, it is on average OK to estimate
2161 * surplus strictly locally. In steady-state, if one thread is
2162 * maintaining say 2 surplus tasks, then so are others. So we can
2163 * just use estimated queue length. However, this strategy alone
2164 * leads to serious mis-estimates in some non-steady-state
2165 * conditions (ramp-up, ramp-down, other stalls). We can detect
2166 * many of these by further considering the number of "idle"
2167 * threads, that are known to have zero queued tasks, so
2168 * compensate by a factor of (#idle/#active) threads.
2169 */
2170 static int getSurplusQueuedTaskCount() {
2171 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2172 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2173 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).
2174 config & SMASK;
2175 int n = (q = wt.workQueue).top - q.base;
2176 int a = (int)(pool.ctl >> AC_SHIFT) + p;
2177 return n - (a > (p >>>= 1) ? 0 :
2178 a > (p >>>= 1) ? 1 :
2179 a > (p >>>= 1) ? 2 :
2180 a > (p >>>= 1) ? 4 :
2181 8);
2182 }
2183 return 0;
2184 }
2185
2186 // Termination
2187
2188 /**
2189 * Possibly initiates and/or completes termination.
2190 *
2191 * @param now if true, unconditionally terminate, else only
2192 * if no work and no active workers
2193 * @param enable if true, enable shutdown when next possible
2194 * @return true if now terminating or terminated
2195 */
2196 private boolean tryTerminate(boolean now, boolean enable) {
2197 int rs;
2198 if (this == common) // cannot shut down
2199 return false;
2200 if ((rs = runState) >= 0) {
2201 if (!enable)
2202 return false;
2203 rs = lockRunState(); // enter SHUTDOWN phase
2204 unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
2205 }
2206
2207 if ((rs & STOP) == 0) {
2208 if (!now) { // check quiescence
2209 for (long oldSum = 0L;;) { // repeat until stable
2210 WorkQueue[] ws; WorkQueue w; int m, b; long c;
2211 long checkSum = ctl;
2212 if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
2213 return false; // still active workers
2214 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
2215 break; // check queues
2216 for (int i = 0; i <= m; ++i) {
2217 if ((w = ws[i]) != null) {
2218 if ((b = w.base) != w.top || w.scanState >= 0 ||
2219 w.currentSteal != null) {
2220 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
2221 return false; // arrange for recheck
2222 }
2223 checkSum += b;
2224 if ((i & 1) == 0)
2225 w.qlock = -1; // try to disable external
2226 }
2227 }
2228 if (oldSum == (oldSum = checkSum))
2229 break;
2230 }
2231 }
2232 if ((runState & STOP) == 0) {
2233 rs = lockRunState(); // enter STOP phase
2234 unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2235 }
2236 }
2237
2238 int pass = 0; // 3 passes to help terminate
2239 for (long oldSum = 0L;;) { // or until done or stable
2240 WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
2241 long checkSum = ctl;
2242 if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
2243 (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
2244 if ((runState & TERMINATED) == 0) {
2245 rs = lockRunState(); // done
2246 unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
2247 synchronized (this) { notifyAll(); } // for awaitTermination
2248 }
2249 break;
2250 }
2251 for (int i = 0; i <= m; ++i) {
2252 if ((w = ws[i]) != null) {
2253 checkSum += w.base;
2254 w.qlock = -1; // try to disable
2255 if (pass > 0) {
2256 w.cancelAll(); // clear queue
2257 if (pass > 1 && (wt = w.owner) != null) {
2258 if (!wt.isInterrupted()) {
2259 try { // unblock join
2260 wt.interrupt();
2261 } catch (Throwable ignore) {
2262 }
2263 }
2264 if (w.scanState < 0)
2265 U.unpark(wt); // wake up
2266 }
2267 }
2268 }
2269 }
2270 if (checkSum != oldSum) { // unstable
2271 oldSum = checkSum;
2272 pass = 0;
2273 }
2274 else if (pass > 3 && pass > m) // can't further help
2275 break;
2276 else if (++pass > 1) { // try to dequeue
2277 long c; int j = 0, sp; // bound attempts
2278 while (j++ <= m && (sp = (int)(c = ctl)) != 0)
2279 tryRelease(c, ws[sp & m], AC_UNIT);
2280 }
2281 }
2282 return true;
2283 }
2284
2285 // External operations
2286
2287 /**
2288 * Full version of externalPush, handling uncommon cases, as well
2289 * as performing secondary initialization upon the first
2290 * submission of the first task to the pool. It also detects
2291 * first submission by an external thread and creates a new shared
2292 * queue if the one at index if empty or contended.
2293 *
2294 * @param task the task. Caller must ensure non-null.
2295 */
2296 private void externalSubmit(ForkJoinTask<?> task) {
2297 int r; // initialize caller's probe
2298 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2299 ThreadLocalRandom.localInit();
2300 r = ThreadLocalRandom.getProbe();
2301 }
2302 for (;;) {
2303 WorkQueue[] ws; WorkQueue q; int rs, m, k;
2304 boolean move = false;
2305 if ((rs = runState) < 0) {
2306 tryTerminate(false, false); // help terminate
2307 throw new RejectedExecutionException();
2308 }
2309 else if ((rs & STARTED) == 0 || // initialize
2310 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
2311 int ns = 0;
2312 rs = lockRunState();
2313 try {
2314 if ((rs & STARTED) == 0) {
2315 U.compareAndSwapObject(this, STEALCOUNTER, null,
2316 new AtomicLong());
2317 // create workQueues array with size a power of two
2318 int p = config & SMASK; // ensure at least 2 slots
2319 int n = (p > 1) ? p - 1 : 1;
2320 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
2321 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
2322 workQueues = new WorkQueue[n];
2323 ns = STARTED;
2324 }
2325 } finally {
2326 unlockRunState(rs, (rs & ~RSLOCK) | ns);
2327 }
2328 }
2329 else if ((q = ws[k = r & m & SQMASK]) != null) {
2330 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2331 ForkJoinTask<?>[] a = q.array;
2332 int s = q.top;
2333 boolean submitted = false; // initial submission or resizing
2334 try { // locked version of push
2335 if ((a != null && a.length > s + 1 - q.base) ||
2336 (a = q.growArray()) != null) {
2337 int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
2338 U.putOrderedObject(a, j, task);
2339 U.putOrderedInt(q, QTOP, s + 1);
2340 submitted = true;
2341 }
2342 } finally {
2343 U.compareAndSwapInt(q, QLOCK, 1, 0);
2344 }
2345 if (submitted) {
2346 signalWork(ws, q);
2347 return;
2348 }
2349 }
2350 move = true; // move on failure
2351 }
2352 else if (((rs = runState) & RSLOCK) == 0) { // create new queue
2353 q = new WorkQueue(this, null);
2354 q.hint = r;
2355 q.config = k | SHARED_QUEUE;
2356 q.scanState = INACTIVE;
2357 rs = lockRunState(); // publish index
2358 if (rs > 0 && (ws = workQueues) != null &&
2359 k < ws.length && ws[k] == null)
2360 ws[k] = q; // else terminated
2361 unlockRunState(rs, rs & ~RSLOCK);
2362 }
2363 else
2364 move = true; // move if busy
2365 if (move)
2366 r = ThreadLocalRandom.advanceProbe(r);
2367 }
2368 }
2369
2370 /**
2371 * Tries to add the given task to a submission queue at
2372 * submitter's current queue. Only the (vastly) most common path
2373 * is directly handled in this method, while screening for need
2374 * for externalSubmit.
2375 *
2376 * @param task the task. Caller must ensure non-null.
2377 */
2378 final void externalPush(ForkJoinTask<?> task) {
2379 WorkQueue[] ws; WorkQueue q; int m;
2380 int r = ThreadLocalRandom.getProbe();
2381 int rs = runState;
2382 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
2383 (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
2384 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2385 ForkJoinTask<?>[] a; int am, n, s;
2386 if ((a = q.array) != null &&
2387 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
2388 int j = ((am & s) << ASHIFT) + ABASE;
2389 U.putOrderedObject(a, j, task);
2390 U.putOrderedInt(q, QTOP, s + 1);
2391 U.putOrderedInt(q, QLOCK, 0);
2392 if (n <= 1)
2393 signalWork(ws, q);
2394 return;
2395 }
2396 U.compareAndSwapInt(q, QLOCK, 1, 0);
2397 }
2398 externalSubmit(task);
2399 }
2400
2401 /**
2402 * Returns common pool queue for an external thread.
2403 */
2404 static WorkQueue commonSubmitterQueue() {
2405 ForkJoinPool p = common;
2406 int r = ThreadLocalRandom.getProbe();
2407 WorkQueue[] ws; int m;
2408 return (p != null && (ws = p.workQueues) != null &&
2409 (m = ws.length - 1) >= 0) ?
2410 ws[m & r & SQMASK] : null;
2411 }
2412
2413 /**
2414 * Performs tryUnpush for an external submitter: Finds queue,
2415 * locks if apparently non-empty, validates upon locking, and
2416 * adjusts top. Each check can fail but rarely does.
2417 */
2418 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2419 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
2420 int r = ThreadLocalRandom.getProbe();
2421 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
2422 (w = ws[m & r & SQMASK]) != null &&
2423 (a = w.array) != null && (s = w.top) != w.base) {
2424 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2425 if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
2426 if (w.top == s && w.array == a &&
2427 U.getObject(a, j) == task &&
2428 U.compareAndSwapObject(a, j, task, null)) {
2429 U.putOrderedInt(w, QTOP, s - 1);
2430 U.putOrderedInt(w, QLOCK, 0);
2431 return true;
2432 }
2433 U.compareAndSwapInt(w, QLOCK, 1, 0);
2434 }
2435 }
2436 return false;
2437 }
2438
2439 /**
2440 * Performs helpComplete for an external submitter.
2441 */
2442 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2443 WorkQueue[] ws; int n;
2444 int r = ThreadLocalRandom.getProbe();
2445 return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
2446 helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
2447 }
2448
2449 // Exported methods
2450
2451 // Constructors
2452
2453 /**
2454 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2455 * java.lang.Runtime#availableProcessors}, using the {@linkplain
2456 * #defaultForkJoinWorkerThreadFactory default thread factory},
2457 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2458 *
2459 * @throws SecurityException if a security manager exists and
2460 * the caller is not permitted to modify threads
2461 * because it does not hold {@link
2462 * java.lang.RuntimePermission}{@code ("modifyThread")}
2463 */
2464 public ForkJoinPool() {
2465 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2466 defaultForkJoinWorkerThreadFactory, null, false);
2467 }
2468
2469 /**
2470 * Creates a {@code ForkJoinPool} with the indicated parallelism
2471 * level, the {@linkplain
2472 * #defaultForkJoinWorkerThreadFactory default thread factory},
2473 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2474 *
2475 * @param parallelism the parallelism level
2476 * @throws IllegalArgumentException if parallelism less than or
2477 * equal to zero, or greater than implementation limit
2478 * @throws SecurityException if a security manager exists and
2479 * the caller is not permitted to modify threads
2480 * because it does not hold {@link
2481 * java.lang.RuntimePermission}{@code ("modifyThread")}
2482 */
2483 public ForkJoinPool(int parallelism) {
2484 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2485 }
2486
2487 /**
2488 * Creates a {@code ForkJoinPool} with the given parameters.
2489 *
2490 * @param parallelism the parallelism level. For default value,
2491 * use {@link java.lang.Runtime#availableProcessors}.
2492 * @param factory the factory for creating new threads. For default value,
2493 * use {@link #defaultForkJoinWorkerThreadFactory}.
2494 * @param handler the handler for internal worker threads that
2495 * terminate due to unrecoverable errors encountered while executing
2496 * tasks. For default value, use {@code null}.
2497 * @param asyncMode if true,
2498 * establishes local first-in-first-out scheduling mode for forked
2499 * tasks that are never joined. This mode may be more appropriate
2500 * than default locally stack-based mode in applications in which
2501 * worker threads only process event-style asynchronous tasks.
2502 * For default value, use {@code false}.
2503 * @throws IllegalArgumentException if parallelism less than or
2504 * equal to zero, or greater than implementation limit
2505 * @throws NullPointerException if the factory is null
2506 * @throws SecurityException if a security manager exists and
2507 * the caller is not permitted to modify threads
2508 * because it does not hold {@link
2509 * java.lang.RuntimePermission}{@code ("modifyThread")}
2510 */
2511 public ForkJoinPool(int parallelism,
2512 ForkJoinWorkerThreadFactory factory,
2513 UncaughtExceptionHandler handler,
2514 boolean asyncMode) {
2515 this(checkParallelism(parallelism),
2516 checkFactory(factory),
2517 handler,
2518 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
2519 "ForkJoinPool-" + nextPoolId() + "-worker-");
2520 checkPermission();
2521 }
2522
2523 private static int checkParallelism(int parallelism) {
2524 if (parallelism <= 0 || parallelism > MAX_CAP)
2525 throw new IllegalArgumentException();
2526 return parallelism;
2527 }
2528
2529 private static ForkJoinWorkerThreadFactory checkFactory
2530 (ForkJoinWorkerThreadFactory factory) {
2531 if (factory == null)
2532 throw new NullPointerException();
2533 return factory;
2534 }
2535
2536 /**
2537 * Creates a {@code ForkJoinPool} with the given parameters, without
2538 * any security checks or parameter validation. Invoked directly by
2539 * makeCommonPool.
2540 */
2541 private ForkJoinPool(int parallelism,
2542 ForkJoinWorkerThreadFactory factory,
2543 UncaughtExceptionHandler handler,
2544 int mode,
2545 String workerNamePrefix) {
2546 this.workerNamePrefix = workerNamePrefix;
2547 this.factory = factory;
2548 this.ueh = handler;
2549 this.config = (parallelism & SMASK) | mode;
2550 long np = (long)(-parallelism); // offset ctl counts
2551 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2552 }
2553
2554 /**
2555 * Returns the common pool instance. This pool is statically
2556 * constructed; its run state is unaffected by attempts to {@link
2557 * #shutdown} or {@link #shutdownNow}. However this pool and any
2558 * ongoing processing are automatically terminated upon program
2559 * {@link System#exit}. Any program that relies on asynchronous
2560 * task processing to complete before program termination should
2561 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2562 * before exit.
2563 *
2564 * @return the common pool instance
2565 * @since 1.8
2566 */
2567 public static ForkJoinPool commonPool() {
2568 // assert common != null : "static init error";
2569 return common;
2570 }
2571
2572 // Execution methods
2573
2574 /**
2575 * Performs the given task, returning its result upon completion.
2576 * If the computation encounters an unchecked Exception or Error,
2577 * it is rethrown as the outcome of this invocation. Rethrown
2578 * exceptions behave in the same way as regular exceptions, but,
2579 * when possible, contain stack traces (as displayed for example
2580 * using {@code ex.printStackTrace()}) of both the current thread
2581 * as well as the thread actually encountering the exception;
2582 * minimally only the latter.
2583 *
2584 * @param task the task
2585 * @param <T> the type of the task's result
2586 * @return the task's result
2587 * @throws NullPointerException if the task is null
2588 * @throws RejectedExecutionException if the task cannot be
2589 * scheduled for execution
2590 */
2591 public <T> T invoke(ForkJoinTask<T> task) {
2592 if (task == null)
2593 throw new NullPointerException();
2594 externalPush(task);
2595 return task.join();
2596 }
2597
2598 /**
2599 * Arranges for (asynchronous) execution of the given task.
2600 *
2601 * @param task the task
2602 * @throws NullPointerException if the task is null
2603 * @throws RejectedExecutionException if the task cannot be
2604 * scheduled for execution
2605 */
2606 public void execute(ForkJoinTask<?> task) {
2607 if (task == null)
2608 throw new NullPointerException();
2609 externalPush(task);
2610 }
2611
2612 // AbstractExecutorService methods
2613
2614 /**
2615 * @throws NullPointerException if the task is null
2616 * @throws RejectedExecutionException if the task cannot be
2617 * scheduled for execution
2618 */
2619 public void execute(Runnable task) {
2620 if (task == null)
2621 throw new NullPointerException();
2622 ForkJoinTask<?> job;
2623 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2624 job = (ForkJoinTask<?>) task;
2625 else
2626 job = new ForkJoinTask.RunnableExecuteAction(task);
2627 externalPush(job);
2628 }
2629
2630 /**
2631 * Submits a ForkJoinTask for execution.
2632 *
2633 * @param task the task to submit
2634 * @param <T> the type of the task's result
2635 * @return the task
2636 * @throws NullPointerException if the task is null
2637 * @throws RejectedExecutionException if the task cannot be
2638 * scheduled for execution
2639 */
2640 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2641 if (task == null)
2642 throw new NullPointerException();
2643 externalPush(task);
2644 return task;
2645 }
2646
2647 /**
2648 * @throws NullPointerException if the task is null
2649 * @throws RejectedExecutionException if the task cannot be
2650 * scheduled for execution
2651 */
2652 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2653 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2654 externalPush(job);
2655 return job;
2656 }
2657
2658 /**
2659 * @throws NullPointerException if the task is null
2660 * @throws RejectedExecutionException if the task cannot be
2661 * scheduled for execution
2662 */
2663 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2664 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2665 externalPush(job);
2666 return job;
2667 }
2668
2669 /**
2670 * @throws NullPointerException if the task is null
2671 * @throws RejectedExecutionException if the task cannot be
2672 * scheduled for execution
2673 */
2674 public ForkJoinTask<?> submit(Runnable task) {
2675 if (task == null)
2676 throw new NullPointerException();
2677 ForkJoinTask<?> job;
2678 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2679 job = (ForkJoinTask<?>) task;
2680 else
2681 job = new ForkJoinTask.AdaptedRunnableAction(task);
2682 externalPush(job);
2683 return job;
2684 }
2685
2686 /**
2687 * @throws NullPointerException {@inheritDoc}
2688 * @throws RejectedExecutionException {@inheritDoc}
2689 */
2690 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2691 // In previous versions of this class, this method constructed
2692 // a task to run ForkJoinTask.invokeAll, but now external
2693 // invocation of multiple tasks is at least as efficient.
2694 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2695
2696 boolean done = false;
2697 try {
2698 for (Callable<T> t : tasks) {
2699 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2700 futures.add(f);
2701 externalPush(f);
2702 }
2703 for (int i = 0, size = futures.size(); i < size; i++)
2704 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2705 done = true;
2706 return futures;
2707 } finally {
2708 if (!done)
2709 for (int i = 0, size = futures.size(); i < size; i++)
2710 futures.get(i).cancel(false);
2711 }
2712 }
2713
2714 /**
2715 * Returns the factory used for constructing new workers.
2716 *
2717 * @return the factory used for constructing new workers
2718 */
2719 public ForkJoinWorkerThreadFactory getFactory() {
2720 return factory;
2721 }
2722
2723 /**
2724 * Returns the handler for internal worker threads that terminate
2725 * due to unrecoverable errors encountered while executing tasks.
2726 *
2727 * @return the handler, or {@code null} if none
2728 */
2729 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2730 return ueh;
2731 }
2732
2733 /**
2734 * Returns the targeted parallelism level of this pool.
2735 *
2736 * @return the targeted parallelism level of this pool
2737 */
2738 public int getParallelism() {
2739 int par;
2740 return ((par = config & SMASK) > 0) ? par : 1;
2741 }
2742
2743 /**
2744 * Returns the targeted parallelism level of the common pool.
2745 *
2746 * @return the targeted parallelism level of the common pool
2747 * @since 1.8
2748 */
2749 public static int getCommonPoolParallelism() {
2750 return commonParallelism;
2751 }
2752
2753 /**
2754 * Returns the number of worker threads that have started but not
2755 * yet terminated. The result returned by this method may differ
2756 * from {@link #getParallelism} when threads are created to
2757 * maintain parallelism when others are cooperatively blocked.
2758 *
2759 * @return the number of worker threads
2760 */
2761 public int getPoolSize() {
2762 return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2763 }
2764
2765 /**
2766 * Returns {@code true} if this pool uses local first-in-first-out
2767 * scheduling mode for forked tasks that are never joined.
2768 *
2769 * @return {@code true} if this pool uses async mode
2770 */
2771 public boolean getAsyncMode() {
2772 return (config & FIFO_QUEUE) != 0;
2773 }
2774
2775 /**
2776 * Returns an estimate of the number of worker threads that are
2777 * not blocked waiting to join tasks or for other managed
2778 * synchronization. This method may overestimate the
2779 * number of running threads.
2780 *
2781 * @return the number of worker threads
2782 */
2783 public int getRunningThreadCount() {
2784 int rc = 0;
2785 WorkQueue[] ws; WorkQueue w;
2786 if ((ws = workQueues) != null) {
2787 for (int i = 1; i < ws.length; i += 2) {
2788 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2789 ++rc;
2790 }
2791 }
2792 return rc;
2793 }
2794
2795 /**
2796 * Returns an estimate of the number of threads that are currently
2797 * stealing or executing tasks. This method may overestimate the
2798 * number of active threads.
2799 *
2800 * @return the number of active threads
2801 */
2802 public int getActiveThreadCount() {
2803 int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2804 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2805 }
2806
2807 /**
2808 * Returns {@code true} if all worker threads are currently idle.
2809 * An idle worker is one that cannot obtain a task to execute
2810 * because none are available to steal from other threads, and
2811 * there are no pending submissions to the pool. This method is
2812 * conservative; it might not return {@code true} immediately upon
2813 * idleness of all threads, but will eventually become true if
2814 * threads remain inactive.
2815 *
2816 * @return {@code true} if all threads are currently idle
2817 */
2818 public boolean isQuiescent() {
2819 return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
2820 }
2821
2822 /**
2823 * Returns an estimate of the total number of tasks stolen from
2824 * one thread's work queue by another. The reported value
2825 * underestimates the actual total number of steals when the pool
2826 * is not quiescent. This value may be useful for monitoring and
2827 * tuning fork/join programs: in general, steal counts should be
2828 * high enough to keep threads busy, but low enough to avoid
2829 * overhead and contention across threads.
2830 *
2831 * @return the number of steals
2832 */
2833 public long getStealCount() {
2834 AtomicLong sc = stealCounter;
2835 long count = (sc == null) ? 0L : sc.get();
2836 WorkQueue[] ws; WorkQueue w;
2837 if ((ws = workQueues) != null) {
2838 for (int i = 1; i < ws.length; i += 2) {
2839 if ((w = ws[i]) != null)
2840 count += w.nsteals;
2841 }
2842 }
2843 return count;
2844 }
2845
2846 /**
2847 * Returns an estimate of the total number of tasks currently held
2848 * in queues by worker threads (but not including tasks submitted
2849 * to the pool that have not begun executing). This value is only
2850 * an approximation, obtained by iterating across all threads in
2851 * the pool. This method may be useful for tuning task
2852 * granularities.
2853 *
2854 * @return the number of queued tasks
2855 */
2856 public long getQueuedTaskCount() {
2857 long count = 0;
2858 WorkQueue[] ws; WorkQueue w;
2859 if ((ws = workQueues) != null) {
2860 for (int i = 1; i < ws.length; i += 2) {
2861 if ((w = ws[i]) != null)
2862 count += w.queueSize();
2863 }
2864 }
2865 return count;
2866 }
2867
2868 /**
2869 * Returns an estimate of the number of tasks submitted to this
2870 * pool that have not yet begun executing. This method may take
2871 * time proportional to the number of submissions.
2872 *
2873 * @return the number of queued submissions
2874 */
2875 public int getQueuedSubmissionCount() {
2876 int count = 0;
2877 WorkQueue[] ws; WorkQueue w;
2878 if ((ws = workQueues) != null) {
2879 for (int i = 0; i < ws.length; i += 2) {
2880 if ((w = ws[i]) != null)
2881 count += w.queueSize();
2882 }
2883 }
2884 return count;
2885 }
2886
2887 /**
2888 * Returns {@code true} if there are any tasks submitted to this
2889 * pool that have not yet begun executing.
2890 *
2891 * @return {@code true} if there are any queued submissions
2892 */
2893 public boolean hasQueuedSubmissions() {
2894 WorkQueue[] ws; WorkQueue w;
2895 if ((ws = workQueues) != null) {
2896 for (int i = 0; i < ws.length; i += 2) {
2897 if ((w = ws[i]) != null && !w.isEmpty())
2898 return true;
2899 }
2900 }
2901 return false;
2902 }
2903
2904 /**
2905 * Removes and returns the next unexecuted submission if one is
2906 * available. This method may be useful in extensions to this
2907 * class that re-assign work in systems with multiple pools.
2908 *
2909 * @return the next submission, or {@code null} if none
2910 */
2911 protected ForkJoinTask<?> pollSubmission() {
2912 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2913 if ((ws = workQueues) != null) {
2914 for (int i = 0; i < ws.length; i += 2) {
2915 if ((w = ws[i]) != null && (t = w.poll()) != null)
2916 return t;
2917 }
2918 }
2919 return null;
2920 }
2921
2922 /**
2923 * Removes all available unexecuted submitted and forked tasks
2924 * from scheduling queues and adds them to the given collection,
2925 * without altering their execution status. These may include
2926 * artificially generated or wrapped tasks. This method is
2927 * designed to be invoked only when the pool is known to be
2928 * quiescent. Invocations at other times may not remove all
2929 * tasks. A failure encountered while attempting to add elements
2930 * to collection {@code c} may result in elements being in
2931 * neither, either or both collections when the associated
2932 * exception is thrown. The behavior of this operation is
2933 * undefined if the specified collection is modified while the
2934 * operation is in progress.
2935 *
2936 * @param c the collection to transfer elements into
2937 * @return the number of elements transferred
2938 */
2939 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2940 int count = 0;
2941 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2942 if ((ws = workQueues) != null) {
2943 for (int i = 0; i < ws.length; ++i) {
2944 if ((w = ws[i]) != null) {
2945 while ((t = w.poll()) != null) {
2946 c.add(t);
2947 ++count;
2948 }
2949 }
2950 }
2951 }
2952 return count;
2953 }
2954
2955 /**
2956 * Returns a string identifying this pool, as well as its state,
2957 * including indications of run state, parallelism level, and
2958 * worker and task counts.
2959 *
2960 * @return a string identifying this pool, as well as its state
2961 */
2962 public String toString() {
2963 // Use a single pass through workQueues to collect counts
2964 long qt = 0L, qs = 0L; int rc = 0;
2965 AtomicLong sc = stealCounter;
2966 long st = (sc == null) ? 0L : sc.get();
2967 long c = ctl;
2968 WorkQueue[] ws; WorkQueue w;
2969 if ((ws = workQueues) != null) {
2970 for (int i = 0; i < ws.length; ++i) {
2971 if ((w = ws[i]) != null) {
2972 int size = w.queueSize();
2973 if ((i & 1) == 0)
2974 qs += size;
2975 else {
2976 qt += size;
2977 st += w.nsteals;
2978 if (w.isApparentlyUnblocked())
2979 ++rc;
2980 }
2981 }
2982 }
2983 }
2984 int pc = (config & SMASK);
2985 int tc = pc + (short)(c >>> TC_SHIFT);
2986 int ac = pc + (int)(c >> AC_SHIFT);
2987 if (ac < 0) // ignore transient negative
2988 ac = 0;
2989 int rs = runState;
2990 String level = ((rs & TERMINATED) != 0 ? "Terminated" :
2991 (rs & STOP) != 0 ? "Terminating" :
2992 (rs & SHUTDOWN) != 0 ? "Shutting down" :
2993 "Running");
2994 return super.toString() +
2995 "[" + level +
2996 ", parallelism = " + pc +
2997 ", size = " + tc +
2998 ", active = " + ac +
2999 ", running = " + rc +
3000 ", steals = " + st +
3001 ", tasks = " + qt +
3002 ", submissions = " + qs +
3003 "]";
3004 }
3005
3006 /**
3007 * Possibly initiates an orderly shutdown in which previously
3008 * submitted tasks are executed, but no new tasks will be
3009 * accepted. Invocation has no effect on execution state if this
3010 * is the {@link #commonPool()}, and no additional effect if
3011 * already shut down. Tasks that are in the process of being
3012 * submitted concurrently during the course of this method may or
3013 * may not be rejected.
3014 *
3015 * @throws SecurityException if a security manager exists and
3016 * the caller is not permitted to modify threads
3017 * because it does not hold {@link
3018 * java.lang.RuntimePermission}{@code ("modifyThread")}
3019 */
3020 public void shutdown() {
3021 checkPermission();
3022 tryTerminate(false, true);
3023 }
3024
3025 /**
3026 * Possibly attempts to cancel and/or stop all tasks, and reject
3027 * all subsequently submitted tasks. Invocation has no effect on
3028 * execution state if this is the {@link #commonPool()}, and no
3029 * additional effect if already shut down. Otherwise, tasks that
3030 * are in the process of being submitted or executed concurrently
3031 * during the course of this method may or may not be
3032 * rejected. This method cancels both existing and unexecuted
3033 * tasks, in order to permit termination in the presence of task
3034 * dependencies. So the method always returns an empty list
3035 * (unlike the case for some other Executors).
3036 *
3037 * @return an empty list
3038 * @throws SecurityException if a security manager exists and
3039 * the caller is not permitted to modify threads
3040 * because it does not hold {@link
3041 * java.lang.RuntimePermission}{@code ("modifyThread")}
3042 */
3043 public List<Runnable> shutdownNow() {
3044 checkPermission();
3045 tryTerminate(true, true);
3046 return Collections.emptyList();
3047 }
3048
3049 /**
3050 * Returns {@code true} if all tasks have completed following shut down.
3051 *
3052 * @return {@code true} if all tasks have completed following shut down
3053 */
3054 public boolean isTerminated() {
3055 return (runState & TERMINATED) != 0;
3056 }
3057
3058 /**
3059 * Returns {@code true} if the process of termination has
3060 * commenced but not yet completed. This method may be useful for
3061 * debugging. A return of {@code true} reported a sufficient
3062 * period after shutdown may indicate that submitted tasks have
3063 * ignored or suppressed interruption, or are waiting for I/O,
3064 * causing this executor not to properly terminate. (See the
3065 * advisory notes for class {@link ForkJoinTask} stating that
3066 * tasks should not normally entail blocking operations. But if
3067 * they do, they must abort them on interrupt.)
3068 *
3069 * @return {@code true} if terminating but not yet terminated
3070 */
3071 public boolean isTerminating() {
3072 int rs = runState;
3073 return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
3074 }
3075
3076 /**
3077 * Returns {@code true} if this pool has been shut down.
3078 *
3079 * @return {@code true} if this pool has been shut down
3080 */
3081 public boolean isShutdown() {
3082 return (runState & SHUTDOWN) != 0;
3083 }
3084
3085 /**
3086 * Blocks until all tasks have completed execution after a
3087 * shutdown request, or the timeout occurs, or the current thread
3088 * is interrupted, whichever happens first. Because the {@link
3089 * #commonPool()} never terminates until program shutdown, when
3090 * applied to the common pool, this method is equivalent to {@link
3091 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3092 *
3093 * @param timeout the maximum time to wait
3094 * @param unit the time unit of the timeout argument
3095 * @return {@code true} if this executor terminated and
3096 * {@code false} if the timeout elapsed before termination
3097 * @throws InterruptedException if interrupted while waiting
3098 */
3099 public boolean awaitTermination(long timeout, TimeUnit unit)
3100 throws InterruptedException {
3101 if (Thread.interrupted())
3102 throw new InterruptedException();
3103 if (this == common) {
3104 awaitQuiescence(timeout, unit);
3105 return false;
3106 }
3107 long nanos = unit.toNanos(timeout);
3108 if (isTerminated())
3109 return true;
3110 if (nanos <= 0L)
3111 return false;
3112 long deadline = System.nanoTime() + nanos;
3113 synchronized (this) {
3114 for (;;) {
3115 if (isTerminated())
3116 return true;
3117 if (nanos <= 0L)
3118 return false;
3119 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3120 wait(millis > 0L ? millis : 1L);
3121 nanos = deadline - System.nanoTime();
3122 }
3123 }
3124 }
3125
3126 /**
3127 * If called by a ForkJoinTask operating in this pool, equivalent
3128 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3129 * waits and/or attempts to assist performing tasks until this
3130 * pool {@link #isQuiescent} or the indicated timeout elapses.
3131 *
3132 * @param timeout the maximum time to wait
3133 * @param unit the time unit of the timeout argument
3134 * @return {@code true} if quiescent; {@code false} if the
3135 * timeout elapsed.
3136 */
3137 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3138 long nanos = unit.toNanos(timeout);
3139 ForkJoinWorkerThread wt;
3140 Thread thread = Thread.currentThread();
3141 if ((thread instanceof ForkJoinWorkerThread) &&
3142 (wt = (ForkJoinWorkerThread)thread).pool == this) {
3143 helpQuiescePool(wt.workQueue);
3144 return true;
3145 }
3146 long startTime = System.nanoTime();
3147 WorkQueue[] ws;
3148 int r = 0, m;
3149 boolean found = true;
3150 while (!isQuiescent() && (ws = workQueues) != null &&
3151 (m = ws.length - 1) >= 0) {
3152 if (!found) {
3153 if ((System.nanoTime() - startTime) > nanos)
3154 return false;
3155 Thread.yield(); // cannot block
3156 }
3157 found = false;
3158 for (int j = (m + 1) << 2; j >= 0; --j) {
3159 ForkJoinTask<?> t; WorkQueue q; int b, k;
3160 if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
3161 (b = q.base) - q.top < 0) {
3162 found = true;
3163 if ((t = q.pollAt(b)) != null)
3164 t.doExec();
3165 break;
3166 }
3167 }
3168 }
3169 return true;
3170 }
3171
3172 /**
3173 * Waits and/or attempts to assist performing tasks indefinitely
3174 * until the {@link #commonPool()} {@link #isQuiescent}.
3175 */
3176 static void quiesceCommonPool() {
3177 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3178 }
3179
3180 /**
3181 * Interface for extending managed parallelism for tasks running
3182 * in {@link ForkJoinPool}s.
3183 *
3184 * <p>A {@code ManagedBlocker} provides two methods. Method
3185 * {@code isReleasable} must return {@code true} if blocking is
3186 * not necessary. Method {@code block} blocks the current thread
3187 * if necessary (perhaps internally invoking {@code isReleasable}
3188 * before actually blocking). These actions are performed by any
3189 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3190 * The unusual methods in this API accommodate synchronizers that
3191 * may, but don't usually, block for long periods. Similarly, they
3192 * allow more efficient internal handling of cases in which
3193 * additional workers may be, but usually are not, needed to
3194 * ensure sufficient parallelism. Toward this end,
3195 * implementations of method {@code isReleasable} must be amenable
3196 * to repeated invocation.
3197 *
3198 * <p>For example, here is a ManagedBlocker based on a
3199 * ReentrantLock:
3200 * <pre> {@code
3201 * class ManagedLocker implements ManagedBlocker {
3202 * final ReentrantLock lock;
3203 * boolean hasLock = false;
3204 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3205 * public boolean block() {
3206 * if (!hasLock)
3207 * lock.lock();
3208 * return true;
3209 * }
3210 * public boolean isReleasable() {
3211 * return hasLock || (hasLock = lock.tryLock());
3212 * }
3213 * }}</pre>
3214 *
3215 * <p>Here is a class that possibly blocks waiting for an
3216 * item on a given queue:
3217 * <pre> {@code
3218 * class QueueTaker<E> implements ManagedBlocker {
3219 * final BlockingQueue<E> queue;
3220 * volatile E item = null;
3221 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3222 * public boolean block() throws InterruptedException {
3223 * if (item == null)
3224 * item = queue.take();
3225 * return true;
3226 * }
3227 * public boolean isReleasable() {
3228 * return item != null || (item = queue.poll()) != null;
3229 * }
3230 * public E getItem() { // call after pool.managedBlock completes
3231 * return item;
3232 * }
3233 * }}</pre>
3234 */
3235 public static interface ManagedBlocker {
3236 /**
3237 * Possibly blocks the current thread, for example waiting for
3238 * a lock or condition.
3239 *
3240 * @return {@code true} if no additional blocking is necessary
3241 * (i.e., if isReleasable would return true)
3242 * @throws InterruptedException if interrupted while waiting
3243 * (the method is not required to do so, but is allowed to)
3244 */
3245 boolean block() throws InterruptedException;
3246
3247 /**
3248 * Returns {@code true} if blocking is unnecessary.
3249 * @return {@code true} if blocking is unnecessary
3250 */
3251 boolean isReleasable();
3252 }
3253
3254 /**
3255 * Blocks in accord with the given blocker. If the current thread
3256 * is a {@link ForkJoinWorkerThread}, this method possibly
3257 * arranges for a spare thread to be activated if necessary to
3258 * ensure sufficient parallelism while the current thread is blocked.
3259 *
3260 * <p>If the caller is not a {@link ForkJoinTask}, this method is
3261 * behaviorally equivalent to
3262 * <pre> {@code
3263 * while (!blocker.isReleasable())
3264 * if (blocker.block())
3265 * return;}</pre>
3266 *
3267 * If the caller is a {@code ForkJoinTask}, then the pool may
3268 * first be expanded to ensure parallelism, and later adjusted.
3269 *
3270 * @param blocker the blocker
3271 * @throws InterruptedException if blocker.block did so
3272 */
3273 public static void managedBlock(ManagedBlocker blocker)
3274 throws InterruptedException {
3275 ForkJoinPool p;
3276 ForkJoinWorkerThread wt;
3277 Thread t = Thread.currentThread();
3278 if ((t instanceof ForkJoinWorkerThread) &&
3279 (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
3280 WorkQueue w = wt.workQueue;
3281 while (!blocker.isReleasable()) {
3282 if (p.tryCompensate(w)) {
3283 try {
3284 do {} while (!blocker.isReleasable() &&
3285 !blocker.block());
3286 } finally {
3287 U.getAndAddLong(p, CTL, AC_UNIT);
3288 }
3289 break;
3290 }
3291 }
3292 }
3293 else {
3294 do {} while (!blocker.isReleasable() &&
3295 !blocker.block());
3296 }
3297 }
3298
3299 // AbstractExecutorService overrides. These rely on undocumented
3300 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3301 // implement RunnableFuture.
3302
3303 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3304 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3305 }
3306
3307 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3308 return new ForkJoinTask.AdaptedCallable<T>(callable);
3309 }
3310
3311 // Unsafe mechanics
3312 private static final sun.misc.Unsafe U;
3313 private static final int ABASE;
3314 private static final int ASHIFT;
3315 private static final long CTL;
3316 private static final long RUNSTATE;
3317 private static final long STEALCOUNTER;
3318 private static final long PARKBLOCKER;
3319 private static final long QBASE; // these must be same as in WorkQueue
3320 private static final long QTOP;
3321 private static final long QLOCK;
3322 private static final long QSCANSTATE;
3323 private static final long QPARKER;
3324 private static final long QCURRENTSTEAL;
3325 private static final long QCURRENTJOIN;
3326
3327 static {
3328 // initialize field offsets for CAS etc
3329 try {
3330 U = sun.misc.Unsafe.getUnsafe();
3331 Class<?> k = ForkJoinPool.class;
3332 CTL = U.objectFieldOffset
3333 (k.getDeclaredField("ctl"));
3334 RUNSTATE = U.objectFieldOffset
3335 (k.getDeclaredField("runState"));
3336 STEALCOUNTER = U.objectFieldOffset
3337 (k.getDeclaredField("stealCounter"));
3338 Class<?> tk = Thread.class;
3339 PARKBLOCKER = U.objectFieldOffset
3340 (tk.getDeclaredField("parkBlocker"));
3341 Class<?> wk = WorkQueue.class;
3342 QBASE = U.objectFieldOffset
3343 (wk.getDeclaredField("base"));
3344 QTOP = U.objectFieldOffset
3345 (wk.getDeclaredField("top"));
3346 QLOCK = U.objectFieldOffset
3347 (wk.getDeclaredField("qlock"));
3348 QSCANSTATE = U.objectFieldOffset
3349 (wk.getDeclaredField("scanState"));
3350 QPARKER = U.objectFieldOffset
3351 (wk.getDeclaredField("parker"));
3352 QCURRENTSTEAL = U.objectFieldOffset
3353 (wk.getDeclaredField("currentSteal"));
3354 QCURRENTJOIN = U.objectFieldOffset
3355 (wk.getDeclaredField("currentJoin"));
3356 Class<?> ak = ForkJoinTask[].class;
3357 ABASE = U.arrayBaseOffset(ak);
3358 int scale = U.arrayIndexScale(ak);
3359 if ((scale & (scale - 1)) != 0)
3360 throw new Error("data type scale not a power of two");
3361 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3362 } catch (Exception e) {
3363 throw new Error(e);
3364 }
3365
3366 commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3367 defaultForkJoinWorkerThreadFactory =
3368 new DefaultForkJoinWorkerThreadFactory();
3369 modifyThreadPermission = new RuntimePermission("modifyThread");
3370
3371 common = java.security.AccessController.doPrivileged
3372 (new java.security.PrivilegedAction<ForkJoinPool>() {
3373 public ForkJoinPool run() { return makeCommonPool(); }});
3374 int par = common.config & SMASK; // report 1 even if threads disabled
3375 commonParallelism = par > 0 ? par : 1;
3376 }
3377
3378 /**
3379 * Creates and returns the common pool, respecting user settings
3380 * specified via system properties.
3381 */
3382 private static ForkJoinPool makeCommonPool() {
3383 int parallelism = -1;
3384 ForkJoinWorkerThreadFactory factory = null;
3385 UncaughtExceptionHandler handler = null;
3386 try { // ignore exceptions in accessing/parsing properties
3387 String pp = System.getProperty
3388 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3389 String fp = System.getProperty
3390 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3391 String hp = System.getProperty
3392 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3393 String mp = System.getProperty
3394 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3395 if (pp != null)
3396 parallelism = Integer.parseInt(pp);
3397 if (fp != null)
3398 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3399 getSystemClassLoader().loadClass(fp).newInstance());
3400 if (hp != null)
3401 handler = ((UncaughtExceptionHandler)ClassLoader.
3402 getSystemClassLoader().loadClass(hp).newInstance());
3403 if (mp != null)
3404 commonMaxSpares = Integer.parseInt(mp);
3405 } catch (Exception ignore) {
3406 }
3407 if (factory == null) {
3408 if (System.getSecurityManager() == null)
3409 factory = defaultForkJoinWorkerThreadFactory;
3410 else // use security-managed default
3411 factory = new InnocuousForkJoinWorkerThreadFactory();
3412 }
3413 if (parallelism < 0 && // default 1 less than #cores
3414 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3415 parallelism = 1;
3416 if (parallelism > MAX_CAP)
3417 parallelism = MAX_CAP;
3418 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3419 "ForkJoinPool.commonPool-worker-");
3420 }
3421
3422 /**
3423 * Factory for innocuous worker threads
3424 */
3425 static final class InnocuousForkJoinWorkerThreadFactory
3426 implements ForkJoinWorkerThreadFactory {
3427
3428 /**
3429 * An ACC to restrict permissions for the factory itself.
3430 * The constructed workers have no permissions set.
3431 */
3432 private static final AccessControlContext innocuousAcc;
3433 static {
3434 Permissions innocuousPerms = new Permissions();
3435 innocuousPerms.add(modifyThreadPermission);
3436 innocuousPerms.add(new RuntimePermission(
3437 "enableContextClassLoaderOverride"));
3438 innocuousPerms.add(new RuntimePermission(
3439 "modifyThreadGroup"));
3440 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3441 new ProtectionDomain(null, innocuousPerms)
3442 });
3443 }
3444
3445 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3446 return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
3447 java.security.AccessController.doPrivileged(
3448 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3449 public ForkJoinWorkerThread run() {
3450 return new ForkJoinWorkerThread.
3451 InnocuousForkJoinWorkerThread(pool);
3452 }}, innocuousAcc);
3453 }
3454 }
3455
3456 }