ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.302
Committed: Mon Mar 14 17:54:54 2016 UTC (8 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.301: +3 -3 lines
Log Message:
lint

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.Thread.UncaughtExceptionHandler;
10 import java.security.AccessControlContext;
11 import java.security.Permissions;
12 import java.security.ProtectionDomain;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.CountedCompleter;
20 import java.util.concurrent.ForkJoinTask;
21 import java.util.concurrent.ForkJoinWorkerThread;
22 import java.util.concurrent.locks.LockSupport;
23
24 /**
25 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 * A {@code ForkJoinPool} provides the entry point for submissions
27 * from non-{@code ForkJoinTask} clients, as well as management and
28 * monitoring operations.
29 *
30 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31 * ExecutorService} mainly by virtue of employing
32 * <em>work-stealing</em>: all threads in the pool attempt to find and
33 * execute tasks submitted to the pool and/or created by other active
34 * tasks (eventually blocking waiting for work if none exist). This
35 * enables efficient processing when most tasks spawn other subtasks
36 * (as do most {@code ForkJoinTask}s), as well as when many small
37 * tasks are submitted to the pool from external clients. Especially
38 * when setting <em>asyncMode</em> to true in constructors, {@code
39 * ForkJoinPool}s may also be appropriate for use with event-style
40 * tasks that are never joined.
41 *
42 * <p>A static {@link #commonPool()} is available and appropriate for
43 * most applications. The common pool is used by any ForkJoinTask that
44 * is not explicitly submitted to a specified pool. Using the common
45 * pool normally reduces resource usage (its threads are slowly
46 * reclaimed during periods of non-use, and reinstated upon subsequent
47 * use).
48 *
49 * <p>For applications that require separate or custom pools, a {@code
50 * ForkJoinPool} may be constructed with a given target parallelism
51 * level; by default, equal to the number of available processors.
52 * The pool attempts to maintain enough active (or available) threads
53 * by dynamically adding, suspending, or resuming internal worker
54 * threads, even if some tasks are stalled waiting to join others.
55 * However, no such adjustments are guaranteed in the face of blocked
56 * I/O or other unmanaged synchronization. The nested {@link
57 * ManagedBlocker} interface enables extension of the kinds of
58 * synchronization accommodated. The default policies may be
59 * overridden using a constructor with parameters corresponding to
60 * those documented in class {@link ThreadPoolExecutor}.
61 *
62 * <p>In addition to execution and lifecycle control methods, this
63 * class provides status check methods (for example
64 * {@link #getStealCount}) that are intended to aid in developing,
65 * tuning, and monitoring fork/join applications. Also, method
66 * {@link #toString} returns indications of pool state in a
67 * convenient form for informal monitoring.
68 *
69 * <p>As is the case with other ExecutorServices, there are three
70 * main task execution methods summarized in the following table.
71 * These are designed to be used primarily by clients not already
72 * engaged in fork/join computations in the current pool. The main
73 * forms of these methods accept instances of {@code ForkJoinTask},
74 * but overloaded forms also allow mixed execution of plain {@code
75 * Runnable}- or {@code Callable}- based activities as well. However,
76 * tasks that are already executing in a pool should normally instead
77 * use the within-computation forms listed in the table unless using
78 * async event-style tasks that are not usually joined, in which case
79 * there is little difference among choice of methods.
80 *
81 * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 * <caption>Summary of task execution methods</caption>
83 * <tr>
84 * <td></td>
85 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87 * </tr>
88 * <tr>
89 * <td> <b>Arrange async execution</b></td>
90 * <td> {@link #execute(ForkJoinTask)}</td>
91 * <td> {@link ForkJoinTask#fork}</td>
92 * </tr>
93 * <tr>
94 * <td> <b>Await and obtain result</b></td>
95 * <td> {@link #invoke(ForkJoinTask)}</td>
96 * <td> {@link ForkJoinTask#invoke}</td>
97 * </tr>
98 * <tr>
99 * <td> <b>Arrange exec and obtain Future</b></td>
100 * <td> {@link #submit(ForkJoinTask)}</td>
101 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102 * </tr>
103 * </table>
104 *
105 * <p>The common pool is by default constructed with default
106 * parameters, but these may be controlled by setting three
107 * {@linkplain System#getProperty system properties}:
108 * <ul>
109 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110 * - the parallelism level, a non-negative integer
111 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112 * - the class name of a {@link ForkJoinWorkerThreadFactory}
113 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114 * - the class name of a {@link UncaughtExceptionHandler}
115 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 * - the maximum number of allowed extra threads to maintain target
117 * parallelism (default 256).
118 * </ul>
119 * If a {@link SecurityManager} is present and no factory is
120 * specified, then the default pool uses a factory supplying
121 * threads that have no {@link Permissions} enabled.
122 * The system class loader is used to load these classes.
123 * Upon any error in establishing these settings, default parameters
124 * are used. It is possible to disable or limit the use of threads in
125 * the common pool by setting the parallelism property to zero, and/or
126 * using a factory that may return {@code null}. However doing so may
127 * cause unjoined tasks to never be executed.
128 *
129 * <p><b>Implementation notes</b>: This implementation restricts the
130 * maximum number of running threads to 32767. Attempts to create
131 * pools with greater than the maximum number result in
132 * {@code IllegalArgumentException}.
133 *
134 * <p>This implementation rejects submitted tasks (that is, by throwing
135 * {@link RejectedExecutionException}) only when the pool is shut down
136 * or internal resources have been exhausted.
137 *
138 * @since 1.7
139 * @author Doug Lea
140 */
141 public class ForkJoinPool extends AbstractExecutorService {
142
143 /*
144 * Implementation Overview
145 *
146 * This class and its nested classes provide the main
147 * functionality and control for a set of worker threads:
148 * Submissions from non-FJ threads enter into submission queues.
149 * Workers take these tasks and typically split them into subtasks
150 * that may be stolen by other workers. Preference rules give
151 * first priority to processing tasks from their own queues (LIFO
152 * or FIFO, depending on mode), then to randomized FIFO steals of
153 * tasks in other queues. This framework began as vehicle for
154 * supporting tree-structured parallelism using work-stealing.
155 * Over time, its scalability advantages led to extensions and
156 * changes to better support more diverse usage contexts. Because
157 * most internal methods and nested classes are interrelated,
158 * their main rationale and descriptions are presented here;
159 * individual methods and nested classes contain only brief
160 * comments about details.
161 *
162 * WorkQueues
163 * ==========
164 *
165 * Most operations occur within work-stealing queues (in nested
166 * class WorkQueue). These are special forms of Deques that
167 * support only three of the four possible end-operations -- push,
168 * pop, and poll (aka steal), under the further constraints that
169 * push and pop are called only from the owning thread (or, as
170 * extended here, under a lock), while poll may be called from
171 * other threads. (If you are unfamiliar with them, you probably
172 * want to read Herlihy and Shavit's book "The Art of
173 * Multiprocessor programming", chapter 16 describing these in
174 * more detail before proceeding.) The main work-stealing queue
175 * design is roughly similar to those in the papers "Dynamic
176 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177 * (http://research.sun.com/scalable/pubs/index.html) and
178 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 * The main differences ultimately stem from GC requirements that
181 * we null out taken slots as soon as we can, to maintain as small
182 * a footprint as possible even in programs generating huge
183 * numbers of tasks. To accomplish this, we shift the CAS
184 * arbitrating pop vs poll (steal) from being on the indices
185 * ("base" and "top") to the slots themselves.
186 *
187 * Adding tasks then takes the form of a classic array push(task)
188 * in a circular buffer:
189 * q.array[q.top++ % length] = task;
190 *
191 * (The actual code needs to null-check and size-check the array,
192 * uses masking, not mod, for indexing a power-of-two-sized array,
193 * properly fences accesses, and possibly signals waiting workers
194 * to start scanning -- see below.) Both a successful pop and
195 * poll mainly entail a CAS of a slot from non-null to null.
196 *
197 * The pop operation (always performed by owner) is:
198 * if ((the task at top slot is not null) and
199 * (CAS slot to null))
200 * decrement top and return task;
201 *
202 * And the poll operation (usually by a stealer) is
203 * if ((the task at base slot is not null) and
204 * (CAS slot to null))
205 * increment base and return task;
206 *
207 * There are several variants of each of these. In particular,
208 * almost all uses of poll occur within scan operations that also
209 * interleave contention tracking (with associated code sprawl.)
210 *
211 * Memory ordering. See "Correct and Efficient Work-Stealing for
212 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214 * analysis of memory ordering requirements in work-stealing
215 * algorithms similar to (but different than) the one used here.
216 * Extracting tasks in array slots via (fully fenced) CAS provides
217 * primary synchronization. The base and top indices imprecisely
218 * guide where to extract from. We do not always require strict
219 * orderings of array and index updates, so sometimes let them be
220 * subject to compiler and processor reorderings. However, the
221 * volatile "base" index also serves as a basis for memory
222 * ordering: Slot accesses are preceded by a read of base,
223 * ensuring happens-before ordering with respect to stealers (so
224 * the slots themselves can be read via plain array reads.) The
225 * only other memory orderings relied on are maintained in the
226 * course of signalling and activation (see below). A check that
227 * base == top indicates (momentary) emptiness, but otherwise may
228 * err on the side of possibly making the queue appear nonempty
229 * when a push, pop, or poll have not fully committed, or making
230 * it appear empty when an update of top has not yet been visibly
231 * written. (Method isEmpty() checks the case of a partially
232 * completed removal of the last element.) Because of this, the
233 * poll operation, considered individually, is not wait-free. One
234 * thief cannot successfully continue until another in-progress
235 * one (or, if previously empty, a push) visibly completes.
236 * However, in the aggregate, we ensure at least probabilistic
237 * non-blockingness. If an attempted steal fails, a scanning
238 * thief chooses a different random victim target to try next. So,
239 * in order for one thief to progress, it suffices for any
240 * in-progress poll or new push on any empty queue to
241 * complete.
242 *
243 * This approach also enables support of a user mode in which
244 * local task processing is in FIFO, not LIFO order, simply by
245 * using poll rather than pop. This can be useful in
246 * message-passing frameworks in which tasks are never joined.
247 *
248 * WorkQueues are also used in a similar way for tasks submitted
249 * to the pool. We cannot mix these tasks in the same queues used
250 * by workers. Instead, we randomly associate submission queues
251 * with submitting threads, using a form of hashing. The
252 * ThreadLocalRandom probe value serves as a hash code for
253 * choosing existing queues, and may be randomly repositioned upon
254 * contention with other submitters. In essence, submitters act
255 * like workers except that they are restricted to executing local
256 * tasks that they submitted. Insertion of tasks in shared mode
257 * requires a lock but we use only a simple spinlock (using field
258 * phase), because submitters encountering a busy queue move on to
259 * try or create other queues -- they block only when creating and
260 * registering new queues. Because it is used only as a spinlock,
261 * unlocking requires only a "releasing" store (using
262 * putOrderedInt).
263 *
264 * Management
265 * ==========
266 *
267 * The main throughput advantages of work-stealing stem from
268 * decentralized control -- workers mostly take tasks from
269 * themselves or each other, at rates that can exceed a billion
270 * per second. The pool itself creates, activates (enables
271 * scanning for and running tasks), deactivates, blocks, and
272 * terminates threads, all with minimal central information.
273 * There are only a few properties that we can globally track or
274 * maintain, so we pack them into a small number of variables,
275 * often maintaining atomicity without blocking or locking.
276 * Nearly all essentially atomic control state is held in a few
277 * volatile variables that are by far most often read (not
278 * written) as status and consistency checks. We pack as much
279 * information into them as we can.
280 *
281 * Field "ctl" contains 64 bits holding information needed to
282 * atomically decide to add, enqueue (on an event queue), and
283 * dequeue (and release)-activate workers. To enable this
284 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285 * far in excess of normal operating range) to allow ids, counts,
286 * and their negations (used for thresholding) to fit into 16bit
287 * subfields.
288 *
289 * Field "mode" holds configuration parameters as well as lifetime
290 * status, atomically and monotonically setting SHUTDOWN, STOP,
291 * and finally TERMINATED bits.
292 *
293 * Field "workQueues" holds references to WorkQueues. It is
294 * updated (only during worker creation and termination) under
295 * lock (using field workerNamePrefix as lock), but is otherwise
296 * concurrently readable, and accessed directly. We also ensure
297 * that uses of the array reference itself never become too stale
298 * in case of resizing. To simplify index-based operations, the
299 * array size is always a power of two, and all readers must
300 * tolerate null slots. Worker queues are at odd indices. Shared
301 * (submission) queues are at even indices, up to a maximum of 64
302 * slots, to limit growth even if array needs to expand to add
303 * more workers. Grouping them together in this way simplifies and
304 * speeds up task scanning.
305 *
306 * All worker thread creation is on-demand, triggered by task
307 * submissions, replacement of terminated workers, and/or
308 * compensation for blocked workers. However, all other support
309 * code is set up to work with other policies. To ensure that we
310 * do not hold on to worker references that would prevent GC, all
311 * accesses to workQueues are via indices into the workQueues
312 * array (which is one source of some of the messy code
313 * constructions here). In essence, the workQueues array serves as
314 * a weak reference mechanism. Thus for example the stack top
315 * subfield of ctl stores indices, not references.
316 *
317 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318 * cannot let workers spin indefinitely scanning for tasks when
319 * none can be found immediately, and we cannot start/resume
320 * workers unless there appear to be tasks available. On the
321 * other hand, we must quickly prod them into action when new
322 * tasks are submitted or generated. In many usages, ramp-up time
323 * is the main limiting factor in overall performance, which is
324 * compounded at program start-up by JIT compilation and
325 * allocation. So we streamline this as much as possible.
326 *
327 * The "ctl" field atomically maintains total worker and
328 * "released" worker counts, plus the head of the available worker
329 * queue (actually stack, represented by the lower 32bit subfield
330 * of ctl). Released workers are those known to be scanning for
331 * and/or running tasks. Unreleased ("available") workers are
332 * recorded in the ctl stack. These workers are made available for
333 * signalling by enqueuing in ctl (see method runWorker). The
334 * "queue" is a form of Treiber stack. This is ideal for
335 * activating threads in most-recently used order, and improves
336 * performance and locality, outweighing the disadvantages of
337 * being prone to contention and inability to release a worker
338 * unless it is topmost on stack. To avoid missed signal problems
339 * inherent in any wait/signal design, available workers rescan
340 * for (and if found run) tasks after enqueuing. Normally their
341 * release status will be updated while doing so, but the released
342 * worker ctl count may underestimate the number of active
343 * threads. (However, it is still possible to determine quiescence
344 * via a validation traversal -- see isQuiescent). After an
345 * unsuccessful rescan, available workers are blocked until
346 * signalled (see signalWork). The top stack state holds the
347 * value of the "phase" field of the worker: its index and status,
348 * plus a version counter that, in addition to the count subfields
349 * (also serving as version stamps) provide protection against
350 * Treiber stack ABA effects.
351 *
352 * Creating workers. To create a worker, we pre-increment counts
353 * (serving as a reservation), and attempt to construct a
354 * ForkJoinWorkerThread via its factory. Upon construction, the
355 * new thread invokes registerWorker, where it constructs a
356 * WorkQueue and is assigned an index in the workQueues array
357 * (expanding the array if necessary). The thread is then started.
358 * Upon any exception across these steps, or null return from
359 * factory, deregisterWorker adjusts counts and records
360 * accordingly. If a null return, the pool continues running with
361 * fewer than the target number workers. If exceptional, the
362 * exception is propagated, generally to some external caller.
363 * Worker index assignment avoids the bias in scanning that would
364 * occur if entries were sequentially packed starting at the front
365 * of the workQueues array. We treat the array as a simple
366 * power-of-two hash table, expanding as needed. The seedIndex
367 * increment ensures no collisions until a resize is needed or a
368 * worker is deregistered and replaced, and thereafter keeps
369 * probability of collision low. We cannot use
370 * ThreadLocalRandom.getProbe() for similar purposes here because
371 * the thread has not started yet, but do so for creating
372 * submission queues for existing external threads (see
373 * externalPush).
374 *
375 * WorkQueue field "phase" is used by both workers and the pool to
376 * manage and track whether a worker is UNSIGNALLED (possibly
377 * blocked waiting for a signal). When a worker is enqueued its
378 * phase field is set. Note that phase field updates lag queue CAS
379 * releases so usage requires care -- seeing a negative phase does
380 * not guarantee that the worker is available. When queued, the
381 * lower 16 bits of scanState must hold its pool index. So we
382 * place the index there upon initialization (see registerWorker)
383 * and otherwise keep it there or restore it when necessary.
384 *
385 * The ctl field also serves as the basis for memory
386 * synchronization surrounding activation. This uses a more
387 * efficient version of a Dekker-like rule that task producers and
388 * consumers sync with each other by both writing/CASing ctl (even
389 * if to its current value). This would be extremely costly. So
390 * we relax it in several ways: (1) Producers only signal when
391 * their queue is empty. Other workers propagate this signal (in
392 * method scan) when they find tasks; to further reduce flailing,
393 * each worker signals only one other per activation. (2) Workers
394 * only enqueue after scanning (see below) and not finding any
395 * tasks. (3) Rather than CASing ctl to its current value in the
396 * common case where no action is required, we reduce write
397 * contention by equivalently prefacing signalWork when called by
398 * an external task producer using a memory access with
399 * full-volatile semantics or a "fullFence".
400 *
401 * Almost always, too many signals are issued. A task producer
402 * cannot in general tell if some existing worker is in the midst
403 * of finishing one task (or already scanning) and ready to take
404 * another without being signalled. So the producer might instead
405 * activate a different worker that does not find any work, and
406 * then inactivates. This scarcely matters in steady-state
407 * computations involving all workers, but can create contention
408 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 * computations involving only a few workers.
410 *
411 * Scanning. Method runWorker performs top-level scanning for
412 * tasks. Each scan traverses and tries to poll from each queue
413 * starting at a random index and circularly stepping. Scans are
414 * not performed in ideal random permutation order, to reduce
415 * cacheline contention. The pseudorandom generator need not have
416 * high-quality statistical properties in the long term, but just
417 * within computations; We use Marsaglia XorShifts (often via
418 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419 * suffice. Scanning also employs contention reduction: When
420 * scanning workers fail to extract an apparently existing task,
421 * they soon restart at a different pseudorandom index. This
422 * improves throughput when many threads are trying to take tasks
423 * from few queues, which can be common in some usages. Scans do
424 * not otherwise explicitly take into account core affinities,
425 * loads, cache localities, etc, However, they do exploit temporal
426 * locality (which usually approximates these) by preferring to
427 * re-poll (at most #workers times) from the same queue after a
428 * successful poll before trying others.
429 *
430 * Trimming workers. To release resources after periods of lack of
431 * use, a worker starting to wait when the pool is quiescent will
432 * time out and terminate (see method scan) if the pool has
433 * remained quiescent for period given by field keepAlive.
434 *
435 * Shutdown and Termination. A call to shutdownNow invokes
436 * tryTerminate to atomically set a runState bit. The calling
437 * thread, as well as every other worker thereafter terminating,
438 * helps terminate others by cancelling their unprocessed tasks,
439 * and waking them up, doing so repeatedly until stable. Calls to
440 * non-abrupt shutdown() preface this by checking whether
441 * termination should commence by sweeping through queues (until
442 * stable) to ensure lack of in-flight submissions and workers
443 * about to process them before triggering the "STOP" phase of
444 * termination.
445 *
446 * Joining Tasks
447 * =============
448 *
449 * Any of several actions may be taken when one worker is waiting
450 * to join a task stolen (or always held) by another. Because we
451 * are multiplexing many tasks on to a pool of workers, we can't
452 * always just let them block (as in Thread.join). We also cannot
453 * just reassign the joiner's run-time stack with another and
454 * replace it later, which would be a form of "continuation", that
455 * even if possible is not necessarily a good idea since we may
456 * need both an unblocked task and its continuation to progress.
457 * Instead we combine two tactics:
458 *
459 * Helping: Arranging for the joiner to execute some task that it
460 * would be running if the steal had not occurred.
461 *
462 * Compensating: Unless there are already enough live threads,
463 * method tryCompensate() may create or re-activate a spare
464 * thread to compensate for blocked joiners until they unblock.
465 *
466 * A third form (implemented in tryRemoveAndExec) amounts to
467 * helping a hypothetical compensator: If we can readily tell that
468 * a possible action of a compensator is to steal and execute the
469 * task being joined, the joining thread can do so directly,
470 * without the need for a compensation thread.
471 *
472 * The ManagedBlocker extension API can't use helping so relies
473 * only on compensation in method awaitBlocker.
474 *
475 * The algorithm in awaitJoin entails a form of "linear helping".
476 * Each worker records (in field source) the id of the queue from
477 * which it last stole a task. The scan in method awaitJoin uses
478 * these markers to try to find a worker to help (i.e., steal back
479 * a task from and execute it) that could hasten completion of the
480 * actively joined task. Thus, the joiner executes a task that
481 * would be on its own local deque if the to-be-joined task had
482 * not been stolen. This is a conservative variant of the approach
483 * described in Wagner & Calder "Leapfrogging: a portable
484 * technique for implementing efficient futures" SIGPLAN Notices,
485 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486 * mainly in that we only record queue ids, not full dependency
487 * links. This requires a linear scan of the workQueues array to
488 * locate stealers, but isolates cost to when it is needed, rather
489 * than adding to per-task overhead. Searches can fail to locate
490 * stealers GC stalls and the like delay recording sources.
491 * Further, even when accurately identified, stealers might not
492 * ever produce a task that the joiner can in turn help with. So,
493 * compensation is tried upon failure to find tasks to run.
494 *
495 * Compensation does not by default aim to keep exactly the target
496 * parallelism number of unblocked threads running at any given
497 * time. Some previous versions of this class employed immediate
498 * compensations for any blocked join. However, in practice, the
499 * vast majority of blockages are transient byproducts of GC and
500 * other JVM or OS activities that are made worse by replacement.
501 * Rather than impose arbitrary policies, we allow users to
502 * override the default of only adding threads upon apparent
503 * starvation. The compensation mechanism may also be bounded.
504 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505 * JVMs to cope with programming errors and abuse before running
506 * out of resources to do so.
507 *
508 * Common Pool
509 * ===========
510 *
511 * The static common pool always exists after static
512 * initialization. Since it (or any other created pool) need
513 * never be used, we minimize initial construction overhead and
514 * footprint to the setup of about a dozen fields.
515 *
516 * When external threads submit to the common pool, they can
517 * perform subtask processing (see externalHelpComplete and
518 * related methods) upon joins. This caller-helps policy makes it
519 * sensible to set common pool parallelism level to one (or more)
520 * less than the total number of available cores, or even zero for
521 * pure caller-runs. We do not need to record whether external
522 * submissions are to the common pool -- if not, external help
523 * methods return quickly. These submitters would otherwise be
524 * blocked waiting for completion, so the extra effort (with
525 * liberally sprinkled task status checks) in inapplicable cases
526 * amounts to an odd form of limited spin-wait before blocking in
527 * ForkJoinTask.join.
528 *
529 * As a more appropriate default in managed environments, unless
530 * overridden by system properties, we use workers of subclass
531 * InnocuousForkJoinWorkerThread when there is a SecurityManager
532 * present. These workers have no permissions set, do not belong
533 * to any user-defined ThreadGroup, and erase all ThreadLocals
534 * after executing any top-level task (see
535 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536 * in ForkJoinWorkerThread) may be JVM-dependent and must access
537 * particular Thread class fields to achieve this effect.
538 *
539 * Style notes
540 * ===========
541 *
542 * Memory ordering relies mainly on Unsafe intrinsics that carry
543 * the further responsibility of explicitly performing null- and
544 * bounds- checks otherwise carried out implicitly by JVMs. This
545 * can be awkward and ugly, but also reflects the need to control
546 * outcomes across the unusual cases that arise in very racy code
547 * with very few invariants. So these explicit checks would exist
548 * in some form anyway. All fields are read into locals before
549 * use, and null-checked if they are references. This is usually
550 * done in a "C"-like style of listing declarations at the heads
551 * of methods or blocks, and using inline assignments on first
552 * encounter. Array bounds-checks are usually performed by
553 * masking with array.length-1, which relies on the invariant that
554 * these arrays are created with positive lengths, which is itself
555 * paranoically checked. Nearly all explicit checks lead to
556 * bypass/return, not exception throws, because they may
557 * legitimately arise due to cancellation/revocation during
558 * shutdown.
559 *
560 * There is a lot of representation-level coupling among classes
561 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562 * fields of WorkQueue maintain data structures managed by
563 * ForkJoinPool, so are directly accessed. There is little point
564 * trying to reduce this, since any associated future changes in
565 * representations will need to be accompanied by algorithmic
566 * changes anyway. Several methods intrinsically sprawl because
567 * they must accumulate sets of consistent reads of fields held in
568 * local variables. There are also other coding oddities
569 * (including several unnecessary-looking hoisted null checks)
570 * that help some methods perform reasonably even when interpreted
571 * (not compiled).
572 *
573 * The order of declarations in this file is (with a few exceptions):
574 * (1) Static utility functions
575 * (2) Nested (static) classes
576 * (3) Static fields
577 * (4) Fields, along with constants used when unpacking some of them
578 * (5) Internal control methods
579 * (6) Callbacks and other support for ForkJoinTask methods
580 * (7) Exported methods
581 * (8) Static block initializing statics in minimally dependent order
582 */
583
584 // Static utilities
585
586 /**
587 * If there is a security manager, makes sure caller has
588 * permission to modify threads.
589 */
590 private static void checkPermission() {
591 SecurityManager security = System.getSecurityManager();
592 if (security != null)
593 security.checkPermission(modifyThreadPermission);
594 }
595
596 // Nested classes
597
598 /**
599 * Factory for creating new {@link ForkJoinWorkerThread}s.
600 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601 * for {@code ForkJoinWorkerThread} subclasses that extend base
602 * functionality or initialize threads with different contexts.
603 */
604 public static interface ForkJoinWorkerThreadFactory {
605 /**
606 * Returns a new worker thread operating in the given pool.
607 * Returning null or throwing an exception may result in tasks
608 * never being executed. If this method throws an exception,
609 * it is relayed to the caller of the method (for example
610 * {@code execute}) causing attempted thread creation. If this
611 * method returns null or throws an exception, it is not
612 * retried until the next attempted creation (for example
613 * another call to {@code execute}).
614 *
615 * @param pool the pool this thread works in
616 * @return the new worker thread, or {@code null} if the request
617 * to create a thread is rejected.
618 * @throws NullPointerException if the pool is null
619 */
620 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621 }
622
623 /**
624 * Default ForkJoinWorkerThreadFactory implementation; creates a
625 * new ForkJoinWorkerThread.
626 */
627 private static final class DefaultForkJoinWorkerThreadFactory
628 implements ForkJoinWorkerThreadFactory {
629 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 return new ForkJoinWorkerThread(pool);
631 }
632 }
633
634 // Constants shared across ForkJoinPool and WorkQueue
635
636 // Bounds
637 static final int SWIDTH = 16; // width of short
638 static final int SMASK = 0xffff; // short bits == max index
639 static final int MAX_CAP = 0x7fff; // max #workers - 1
640 static final int SQMASK = 0x007e; // max 64 (even) slots
641
642 // Masks and units for WorkQueue.phase and ctl sp subfield
643 static final int UNSIGNALLED = 1 << 31; // must be negative
644 static final int SS_SEQ = 1 << 16; // version count
645 static final int QLOCK = 1; // must be 1
646
647 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648 static final int OWNED = 1; // queue has owner thread
649 static final int FIFO = 1 << 16; // fifo queue or access mode
650 static final int SATURATE = 1 << 17; // for tryCompensate
651 static final int SHUTDOWN = 1 << 18;
652 static final int TERMINATED = 1 << 19;
653 static final int STOP = 1 << 31; // must be negative
654 static final int QUIET = 1 << 30; // not scanning or working
655 static final int DORMANT = QUIET | UNSIGNALLED;
656
657 /**
658 * The maximum number of local polls from the same queue before
659 * checking others. This is a safeguard against infinitely unfair
660 * looping under unbounded user task recursion, and must be larger
661 * than plausible cases of intentional bounded task recursion.
662 */
663 static final int POLL_LIMIT = 1 << 10;
664
665 /**
666 * Queues supporting work-stealing as well as external task
667 * submission. See above for descriptions and algorithms.
668 * Performance on most platforms is very sensitive to placement of
669 * instances of both WorkQueues and their arrays -- we absolutely
670 * do not want multiple WorkQueue instances or multiple queue
671 * arrays sharing cache lines. The @Contended annotation alerts
672 * JVMs to try to keep instances apart.
673 */
674 // For now, using manual padding.
675 // @jdk.internal.vm.annotation.Contended
676 // @sun.misc.Contended
677 static final class WorkQueue {
678
679 /**
680 * Capacity of work-stealing queue array upon initialization.
681 * Must be a power of two; at least 4, but should be larger to
682 * reduce or eliminate cacheline sharing among queues.
683 * Currently, it is much larger, as a partial workaround for
684 * the fact that JVMs often place arrays in locations that
685 * share GC bookkeeping (especially cardmarks) such that
686 * per-write accesses encounter serious memory contention.
687 */
688 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689
690 /**
691 * Maximum size for queue arrays. Must be a power of two less
692 * than or equal to 1 << (31 - width of array entry) to ensure
693 * lack of wraparound of index calculations, but defined to a
694 * value a bit less than this to help users trap runaway
695 * programs before saturating systems.
696 */
697 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698
699 // Instance fields
700 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702 volatile int phase; // versioned, negative: queued, 1: locked
703 int stackPred; // pool stack (ctl) predecessor link
704 int nsteals; // number of steals
705 int id; // index, mode, tag
706 volatile int source; // source queue id, or sentinel
707 volatile int base; // index of next slot for poll
708 int top; // index of next slot for push
709 ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 final ForkJoinPool pool; // the containing pool (may be null)
711 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714
715 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 this.pool = pool;
717 this.owner = owner;
718 // Place indices in the center of array (that is not yet allocated)
719 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720 }
721
722 /**
723 * Returns an exportable index (used by ForkJoinWorkerThread).
724 */
725 final int getPoolIndex() {
726 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 }
728
729 /**
730 * Returns the approximate number of tasks in the queue.
731 */
732 final int queueSize() {
733 int n = base - top; // read base first
734 return (n >= 0) ? 0 : -n; // ignore transient negative
735 }
736
737 /**
738 * Provides a more accurate estimate of whether this queue has
739 * any tasks than does queueSize, by checking whether a
740 * near-empty queue has at least one unclaimed task.
741 */
742 final boolean isEmpty() {
743 ForkJoinTask<?>[] a; int n, al, b;
744 return ((n = (b = base) - top) >= 0 || // possibly one task
745 (n == -1 && ((a = array) == null ||
746 (al = a.length) == 0 ||
747 a[(al - 1) & b] == null)));
748 }
749
750
751 /**
752 * Pushes a task. Call only by owner in unshared queues.
753 *
754 * @param task the task. Caller must ensure non-null.
755 * @throws RejectedExecutionException if array cannot be resized
756 */
757 final void push(ForkJoinTask<?> task) {
758 int s = top; ForkJoinTask<?>[] a; int al, d;
759 if ((a = array) != null && (al = a.length) > 0) {
760 int index = (al - 1) & s;
761 long offset = ((long)index << ASHIFT) + ABASE;
762 ForkJoinPool p = pool;
763 top = s + 1;
764 U.putOrderedObject(a, offset, task);
765 if ((d = base - s) == 0 && p != null) {
766 U.fullFence();
767 p.signalWork();
768 }
769 else if (d + al == 1)
770 growArray();
771 }
772 }
773
774 /**
775 * Initializes or doubles the capacity of array. Call either
776 * by owner or with lock held -- it is OK for base, but not
777 * top, to move while resizings are in progress.
778 */
779 final ForkJoinTask<?>[] growArray() {
780 ForkJoinTask<?>[] oldA = array;
781 int oldSize = oldA != null ? oldA.length : 0;
782 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 throw new RejectedExecutionException("Queue capacity exceeded");
785 int oldMask, t, b;
786 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 (t = top) - (b = base) > 0) {
789 int mask = size - 1;
790 do { // emulate poll from old array, push to new array
791 int index = b & oldMask;
792 long offset = ((long)index << ASHIFT) + ABASE;
793 ForkJoinTask<?> x = (ForkJoinTask<?>)
794 U.getObjectVolatile(oldA, offset);
795 if (x != null &&
796 U.compareAndSwapObject(oldA, offset, x, null))
797 a[b & mask] = x;
798 } while (++b != t);
799 U.storeFence();
800 }
801 return a;
802 }
803
804 /**
805 * Takes next task, if one exists, in LIFO order. Call only
806 * by owner in unshared queues.
807 */
808 final ForkJoinTask<?> pop() {
809 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 if ((a = array) != null && b != s && (al = a.length) > 0) {
811 int index = (al - 1) & --s;
812 long offset = ((long)index << ASHIFT) + ABASE;
813 ForkJoinTask<?> t = (ForkJoinTask<?>)
814 U.getObject(a, offset);
815 if (t != null &&
816 U.compareAndSwapObject(a, offset, t, null)) {
817 top = s;
818 U.storeFence();
819 return t;
820 }
821 }
822 return null;
823 }
824
825 /**
826 * Takes next task, if one exists, in FIFO order.
827 */
828 final ForkJoinTask<?> poll() {
829 for (;;) {
830 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831 if ((a = array) != null && (d = b - s) < 0 &&
832 (al = a.length) > 0) {
833 int index = (al - 1) & b;
834 long offset = ((long)index << ASHIFT) + ABASE;
835 ForkJoinTask<?> t = (ForkJoinTask<?>)
836 U.getObjectVolatile(a, offset);
837 if (b++ == base) {
838 if (t != null) {
839 if (U.compareAndSwapObject(a, offset, t, null)) {
840 base = b;
841 return t;
842 }
843 }
844 else if (d == -1)
845 break; // now empty
846 }
847 }
848 else
849 break;
850 }
851 return null;
852 }
853
854 /**
855 * Takes next task, if one exists, in order specified by mode.
856 */
857 final ForkJoinTask<?> nextLocalTask() {
858 return ((id & FIFO) != 0) ? poll() : pop();
859 }
860
861 /**
862 * Returns next task, if one exists, in order specified by mode.
863 */
864 final ForkJoinTask<?> peek() {
865 int al; ForkJoinTask<?>[] a;
866 return ((a = array) != null && (al = a.length) > 0) ?
867 a[(al - 1) &
868 ((id & FIFO) != 0 ? base : top - 1)] : null;
869 }
870
871 /**
872 * Pops the given task only if it is at the current top.
873 */
874 final boolean tryUnpush(ForkJoinTask<?> task) {
875 int b = base, s = top, al; ForkJoinTask<?>[] a;
876 if ((a = array) != null && b != s && (al = a.length) > 0) {
877 int index = (al - 1) & --s;
878 long offset = ((long)index << ASHIFT) + ABASE;
879 if (U.compareAndSwapObject(a, offset, task, null)) {
880 top = s;
881 U.storeFence();
882 return true;
883 }
884 }
885 return false;
886 }
887
888 /**
889 * Removes and cancels all known tasks, ignoring any exceptions.
890 */
891 final void cancelAll() {
892 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 ForkJoinTask.cancelIgnoringExceptions(t);
894 }
895
896 // Specialized execution methods
897
898 /**
899 * Pops and executes up to limit consecutive tasks or until empty.
900 *
901 * @param limit max runs, or zero for no limit
902 */
903 final void localPopAndExec(int limit) {
904 for (;;) {
905 int b = base, s = top, al; ForkJoinTask<?>[] a;
906 if ((a = array) != null && b != s && (al = a.length) > 0) {
907 int index = (al - 1) & --s;
908 long offset = ((long)index << ASHIFT) + ABASE;
909 ForkJoinTask<?> t = (ForkJoinTask<?>)
910 U.getAndSetObject(a, offset, null);
911 if (t != null) {
912 top = s;
913 U.storeFence();
914 t.doExec();
915 if (limit != 0 && --limit == 0)
916 break;
917 }
918 else
919 break;
920 }
921 else
922 break;
923 }
924 }
925
926 /**
927 * Polls and executes up to limit consecutive tasks or until empty.
928 *
929 * @param limit, or zero for no limit
930 */
931 final void localPollAndExec(int limit) {
932 for (int polls = 0;;) {
933 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934 if ((a = array) != null && (d = b - s) < 0 &&
935 (al = a.length) > 0) {
936 int index = (al - 1) & b++;
937 long offset = ((long)index << ASHIFT) + ABASE;
938 ForkJoinTask<?> t = (ForkJoinTask<?>)
939 U.getAndSetObject(a, offset, null);
940 if (t != null) {
941 base = b;
942 t.doExec();
943 if (limit != 0 && ++polls == limit)
944 break;
945 }
946 else if (d == -1)
947 break; // now empty
948 else
949 polls = 0; // stolen; reset
950 }
951 else
952 break;
953 }
954 }
955
956 /**
957 * If present, removes task from queue and executes it.
958 */
959 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960 ForkJoinTask<?>[] wa; int s, wal;
961 if (base - (s = top) < 0 && // traverse from top
962 (wa = array) != null && (wal = wa.length) > 0) {
963 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964 int index = i & m;
965 long offset = (index << ASHIFT) + ABASE;
966 ForkJoinTask<?> t = (ForkJoinTask<?>)
967 U.getObject(wa, offset);
968 if (t == null)
969 break;
970 else if (t == task) {
971 if (U.compareAndSwapObject(wa, offset, t, null)) {
972 top = ns; // safely shift down
973 for (int j = i; j != ns; ++j) {
974 ForkJoinTask<?> f;
975 int pindex = (j + 1) & m;
976 long pOffset = (pindex << ASHIFT) + ABASE;
977 f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978 U.putObjectVolatile(wa, pOffset, null);
979
980 int jindex = j & m;
981 long jOffset = (jindex << ASHIFT) + ABASE;
982 U.putOrderedObject(wa, jOffset, f);
983 }
984 U.storeFence();
985 t.doExec();
986 }
987 break;
988 }
989 }
990 }
991 }
992
993 /**
994 * Tries to steal and run tasks within the target's
995 * computation until done, not found, or limit exceeded.
996 *
997 * @param task root of CountedCompleter computation
998 * @param limit max runs, or zero for no limit
999 * @return task status on exit
1000 */
1001 final int localHelpCC(CountedCompleter<?> task, int limit) {
1002 int status = 0;
1003 if (task != null && (status = task.status) >= 0) {
1004 for (;;) {
1005 boolean help = false;
1006 int b = base, s = top, al; ForkJoinTask<?>[] a;
1007 if ((a = array) != null && b != s && (al = a.length) > 0) {
1008 int index = (al - 1) & (s - 1);
1009 long offset = ((long)index << ASHIFT) + ABASE;
1010 ForkJoinTask<?> o = (ForkJoinTask<?>)
1011 U.getObject(a, offset);
1012 if (o instanceof CountedCompleter) {
1013 CountedCompleter<?> t = (CountedCompleter<?>)o;
1014 for (CountedCompleter<?> f = t;;) {
1015 if (f != task) {
1016 if ((f = f.completer) == null) // try parent
1017 break;
1018 }
1019 else {
1020 if (U.compareAndSwapObject(a, offset,
1021 t, null)) {
1022 top = s - 1;
1023 U.storeFence();
1024 t.doExec();
1025 help = true;
1026 }
1027 break;
1028 }
1029 }
1030 }
1031 }
1032 if ((status = task.status) < 0 || !help ||
1033 (limit != 0 && --limit == 0))
1034 break;
1035 }
1036 }
1037 return status;
1038 }
1039
1040 // Operations on shared queues
1041
1042 /**
1043 * Tries to lock shared queue by CASing phase field.
1044 */
1045 final boolean tryLockSharedQueue() {
1046 return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 }
1048
1049 /**
1050 * Shared version of tryUnpush.
1051 */
1052 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053 boolean popped = false;
1054 int s = top - 1, al; ForkJoinTask<?>[] a;
1055 if ((a = array) != null && (al = a.length) > 0) {
1056 int index = (al - 1) & s;
1057 long offset = ((long)index << ASHIFT) + ABASE;
1058 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059 if (t == task &&
1060 U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061 if (top == s + 1 && array == a &&
1062 U.compareAndSwapObject(a, offset, task, null)) {
1063 popped = true;
1064 top = s;
1065 }
1066 U.putOrderedInt(this, PHASE, 0);
1067 }
1068 }
1069 return popped;
1070 }
1071
1072 /**
1073 * Shared version of localHelpCC.
1074 */
1075 final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076 int status = 0;
1077 if (task != null && (status = task.status) >= 0) {
1078 for (;;) {
1079 boolean help = false;
1080 int b = base, s = top, al; ForkJoinTask<?>[] a;
1081 if ((a = array) != null && b != s && (al = a.length) > 0) {
1082 int index = (al - 1) & (s - 1);
1083 long offset = ((long)index << ASHIFT) + ABASE;
1084 ForkJoinTask<?> o = (ForkJoinTask<?>)
1085 U.getObject(a, offset);
1086 if (o instanceof CountedCompleter) {
1087 CountedCompleter<?> t = (CountedCompleter<?>)o;
1088 for (CountedCompleter<?> f = t;;) {
1089 if (f != task) {
1090 if ((f = f.completer) == null)
1091 break;
1092 }
1093 else {
1094 if (U.compareAndSwapInt(this, PHASE,
1095 0, QLOCK)) {
1096 if (top == s && array == a &&
1097 U.compareAndSwapObject(a, offset,
1098 t, null)) {
1099 help = true;
1100 top = s - 1;
1101 }
1102 U.putOrderedInt(this, PHASE, 0);
1103 if (help)
1104 t.doExec();
1105 }
1106 break;
1107 }
1108 }
1109 }
1110 }
1111 if ((status = task.status) < 0 || !help ||
1112 (limit != 0 && --limit == 0))
1113 break;
1114 }
1115 }
1116 return status;
1117 }
1118
1119 /**
1120 * Returns true if owned and not known to be blocked.
1121 */
1122 final boolean isApparentlyUnblocked() {
1123 Thread wt; Thread.State s;
1124 return ((wt = owner) != null &&
1125 (s = wt.getState()) != Thread.State.BLOCKED &&
1126 s != Thread.State.WAITING &&
1127 s != Thread.State.TIMED_WAITING);
1128 }
1129
1130 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 private static final long PHASE;
1133 private static final int ABASE;
1134 private static final int ASHIFT;
1135 static {
1136 try {
1137 PHASE = U.objectFieldOffset
1138 (WorkQueue.class.getDeclaredField("phase"));
1139 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140 int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 if ((scale & (scale - 1)) != 0)
1142 throw new Error("array index scale not a power of two");
1143 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 } catch (ReflectiveOperationException e) {
1145 throw new Error(e);
1146 }
1147 }
1148 }
1149
1150 // static fields (initialized in static initializer below)
1151
1152 /**
1153 * Creates a new ForkJoinWorkerThread. This factory is used unless
1154 * overridden in ForkJoinPool constructors.
1155 */
1156 public static final ForkJoinWorkerThreadFactory
1157 defaultForkJoinWorkerThreadFactory;
1158
1159 /**
1160 * Permission required for callers of methods that may start or
1161 * kill threads.
1162 */
1163 static final RuntimePermission modifyThreadPermission;
1164
1165 /**
1166 * Common (static) pool. Non-null for public use unless a static
1167 * construction exception, but internal usages null-check on use
1168 * to paranoically avoid potential initialization circularities
1169 * as well as to simplify generated code.
1170 */
1171 static final ForkJoinPool common;
1172
1173 /**
1174 * Common pool parallelism. To allow simpler use and management
1175 * when common pool threads are disabled, we allow the underlying
1176 * common.parallelism field to be zero, but in that case still report
1177 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 */
1179 static final int COMMON_PARALLELISM;
1180
1181 /**
1182 * Limit on spare thread construction in tryCompensate.
1183 */
1184 private static final int COMMON_MAX_SPARES;
1185
1186 /**
1187 * Sequence number for creating workerNamePrefix.
1188 */
1189 private static int poolNumberSequence;
1190
1191 /**
1192 * Returns the next sequence number. We don't expect this to
1193 * ever contend, so use simple builtin sync.
1194 */
1195 private static final synchronized int nextPoolId() {
1196 return ++poolNumberSequence;
1197 }
1198
1199 // static configuration constants
1200
1201 /**
1202 * Default idle timeout value (in milliseconds) for the thread
1203 * triggering quiescence to park waiting for new work
1204 */
1205 private static final long DEFAULT_KEEPALIVE = 60000L;
1206
1207 /**
1208 * Undershoot tolerance for idle timeouts
1209 */
1210 private static final long TIMEOUT_SLOP = 20L;
1211
1212 /**
1213 * The default value for COMMON_MAX_SPARES. Overridable using the
1214 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215 * property. The default value is far in excess of normal
1216 * requirements, but also far short of MAX_CAP and typical OS
1217 * thread limits, so allows JVMs to catch misuse/abuse before
1218 * running out of resources needed to do so.
1219 */
1220 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221
1222 /**
1223 * Increment for seed generators. See class ThreadLocal for
1224 * explanation.
1225 */
1226 private static final int SEED_INCREMENT = 0x9e3779b9;
1227
1228 /*
1229 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 * RC: Number of released (unqueued) workers minus target parallelism
1231 * TC: Number of total workers minus target parallelism
1232 * SS: version count and status of top waiting thread
1233 * ID: poolIndex of top of Treiber stack of waiters
1234 *
1235 * When convenient, we can extract the lower 32 stack top bits
1236 * (including version bits) as sp=(int)ctl. The offsets of counts
1237 * by the target parallelism and the positionings of fields makes
1238 * it possible to perform the most common checks via sign tests of
1239 * fields: When ac is negative, there are not enough unqueued
1240 * workers, when tc is negative, there are not enough total
1241 * workers. When sp is non-zero, there are waiting workers. To
1242 * deal with possibly negative fields, we use casts in and out of
1243 * "short" and/or signed shifts to maintain signedness.
1244 *
1245 * Because it occupies uppermost bits, we can add one release count
1246 * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 * from a blocked join. Other updates entail multiple subfields
1248 * and masking, requiring CAS.
1249 *
1250 * The limits packed in field "bounds" are also offset by the
1251 * parallelism level to make them comparable to the ctl rc and tc
1252 * fields.
1253 */
1254
1255 // Lower and upper word masks
1256 private static final long SP_MASK = 0xffffffffL;
1257 private static final long UC_MASK = ~SP_MASK;
1258
1259 // Release counts
1260 private static final int RC_SHIFT = 48;
1261 private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262 private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263
1264 // Total counts
1265 private static final int TC_SHIFT = 32;
1266 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269
1270 // Instance fields
1271
1272 // Segregate ctl field, For now using padding vs @Contended
1273 // @jdk.internal.vm.annotation.Contended("fjpctl")
1274 // @sun.misc.Contended("fjpctl")
1275 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 volatile long ctl; // main pool control
1278 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279 volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280
1281 volatile long stealCount; // collects worker nsteals
1282 final long keepAlive; // milliseconds before dropping if idle
1283 int indexSeed; // next worker index
1284 final int bounds; // min, max threads packed as shorts
1285 volatile int mode; // parallelism, runstate, queue mode
1286 WorkQueue[] workQueues; // main registry
1287 final String workerNamePrefix; // for worker thread string; sync lock
1288 final ForkJoinWorkerThreadFactory factory;
1289 final UncaughtExceptionHandler ueh; // per-worker UEH
1290
1291 // Creating, registering and deregistering workers
1292
1293 /**
1294 * Tries to construct and start one worker. Assumes that total
1295 * count has already been incremented as a reservation. Invokes
1296 * deregisterWorker on any failure.
1297 *
1298 * @return true if successful
1299 */
1300 private boolean createWorker() {
1301 ForkJoinWorkerThreadFactory fac = factory;
1302 Throwable ex = null;
1303 ForkJoinWorkerThread wt = null;
1304 try {
1305 if (fac != null && (wt = fac.newThread(this)) != null) {
1306 wt.start();
1307 return true;
1308 }
1309 } catch (Throwable rex) {
1310 ex = rex;
1311 }
1312 deregisterWorker(wt, ex);
1313 return false;
1314 }
1315
1316 /**
1317 * Tries to add one worker, incrementing ctl counts before doing
1318 * so, relying on createWorker to back out on failure.
1319 *
1320 * @param c incoming ctl value, with total count negative and no
1321 * idle workers. On CAS failure, c is refreshed and retried if
1322 * this holds (otherwise, a new worker is not needed).
1323 */
1324 private void tryAddWorker(long c) {
1325 do {
1326 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 (TC_MASK & (c + TC_UNIT)));
1328 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 createWorker();
1330 break;
1331 }
1332 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333 }
1334
1335 /**
1336 * Callback from ForkJoinWorkerThread constructor to establish and
1337 * record its WorkQueue.
1338 *
1339 * @param wt the worker thread
1340 * @return the worker's queue
1341 */
1342 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 UncaughtExceptionHandler handler;
1344 wt.setDaemon(true); // configure thread
1345 if ((handler = ueh) != null)
1346 wt.setUncaughtExceptionHandler(handler);
1347 WorkQueue w = new WorkQueue(this, wt);
1348 int tid = 0; // for thread name
1349 int fifo = mode & FIFO;
1350 String prefix = workerNamePrefix;
1351 if (prefix != null) {
1352 synchronized (prefix) {
1353 WorkQueue[] ws = workQueues; int n;
1354 int s = indexSeed += SEED_INCREMENT;
1355 if (ws != null && (n = ws.length) > 1) {
1356 int m = n - 1;
1357 tid = s & m;
1358 int i = m & ((s << 1) | 1); // odd-numbered indices
1359 for (int probes = n >>> 1;;) { // find empty slot
1360 WorkQueue q;
1361 if ((q = ws[i]) == null || q.phase == QUIET)
1362 break;
1363 else if (--probes == 0) {
1364 i = n | 1; // resize below
1365 break;
1366 }
1367 else
1368 i = (i + 2) & m;
1369 }
1370
1371 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372 w.phase = w.id = id; // now publishable
1373
1374 if (i < n)
1375 ws[i] = w;
1376 else { // expand array
1377 int an = n << 1;
1378 WorkQueue[] as = new WorkQueue[an];
1379 as[i] = w;
1380 int am = an - 1;
1381 for (int j = 0; j < n; ++j) {
1382 WorkQueue v; // copy external queue
1383 if ((v = ws[j]) != null) // position may change
1384 as[v.id & am & SQMASK] = v;
1385 if (++j >= n)
1386 break;
1387 as[j] = ws[j]; // copy worker
1388 }
1389 workQueues = as;
1390 }
1391 }
1392 }
1393 wt.setName(prefix.concat(Integer.toString(tid)));
1394 }
1395 return w;
1396 }
1397
1398 /**
1399 * Final callback from terminating worker, as well as upon failure
1400 * to construct or start a worker. Removes record of worker from
1401 * array, and adjusts counts. If pool is shutting down, tries to
1402 * complete termination.
1403 *
1404 * @param wt the worker thread, or null if construction failed
1405 * @param ex the exception causing failure, or null if none
1406 */
1407 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408 WorkQueue w = null;
1409 int phase = 0;
1410 if (wt != null && (w = wt.workQueue) != null) {
1411 Object lock = workerNamePrefix;
1412 long ns = (long)w.nsteals & 0xffffffffL;
1413 int idx = w.id & SMASK;
1414 if (lock != null) {
1415 WorkQueue[] ws; // remove index from array
1416 synchronized (lock) {
1417 if ((ws = workQueues) != null && ws.length > idx &&
1418 ws[idx] == w)
1419 ws[idx] = null;
1420 stealCount += ns;
1421 }
1422 }
1423 phase = w.phase;
1424 }
1425 if (phase != QUIET) { // else pre-adjusted
1426 long c; // decrement counts
1427 do {} while (!U.compareAndSwapLong
1428 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 (TC_MASK & (c - TC_UNIT)) |
1430 (SP_MASK & c))));
1431 }
1432 if (w != null)
1433 w.cancelAll(); // cancel remaining tasks
1434
1435 if (!tryTerminate(false, false) && // possibly replace worker
1436 w != null && w.array != null) // avoid repeated failures
1437 signalWork();
1438
1439 if (ex == null) // help clean on way out
1440 ForkJoinTask.helpExpungeStaleExceptions();
1441 else // rethrow
1442 ForkJoinTask.rethrow(ex);
1443 }
1444
1445 /**
1446 * Tries to create or release a worker if too few are running.
1447 */
1448 final void signalWork() {
1449 for (;;) {
1450 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 if ((c = ctl) >= 0L) // enough workers
1452 break;
1453 else if ((sp = (int)c) == 0) { // no idle workers
1454 if ((c & ADD_WORKER) != 0L) // too few workers
1455 tryAddWorker(c);
1456 break;
1457 }
1458 else if ((ws = workQueues) == null)
1459 break; // unstarted/terminated
1460 else if (ws.length <= (i = sp & SMASK))
1461 break; // terminated
1462 else if ((v = ws[i]) == null)
1463 break; // terminating
1464 else {
1465 int np = sp & ~UNSIGNALLED;
1466 int vp = v.phase;
1467 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468 Thread vt = v.owner;
1469 if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470 v.phase = np;
1471 if (v.source < 0)
1472 LockSupport.unpark(vt);
1473 break;
1474 }
1475 }
1476 }
1477 }
1478
1479 /**
1480 * Tries to decrement counts (sometimes implicitly) and possibly
1481 * arrange for a compensating worker in preparation for blocking:
1482 * If not all core workers yet exist, creates one, else if any are
1483 * unreleased (possibly including caller) releases one, else if
1484 * fewer than the minimum allowed number of workers running,
1485 * checks to see that they are all active, and if so creates an
1486 * extra worker unless over maximum limit and policy is to
1487 * saturate. Most of these steps can fail due to interference, in
1488 * which case 0 is returned so caller will retry. A negative
1489 * return value indicates that the caller doesn't need to
1490 * re-adjust counts when later unblocked.
1491 *
1492 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 */
1494 private int tryCompensate(WorkQueue w) {
1495 int t, n, sp;
1496 long c = ctl;
1497 WorkQueue[] ws = workQueues;
1498 if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499 if (ws == null || (n = ws.length) <= 0 || w == null)
1500 return 0; // disabled
1501 else if ((sp = (int)c) != 0) { // replace or release
1502 WorkQueue v = ws[sp & (n - 1)];
1503 int wp = w.phase;
1504 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505 int np = sp & ~UNSIGNALLED;
1506 if (v != null) {
1507 int vp = v.phase;
1508 Thread vt = v.owner;
1509 long nc = ((long)v.stackPred & SP_MASK) | uc;
1510 if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511 v.phase = np;
1512 if (v.source < 0)
1513 LockSupport.unpark(vt);
1514 return (wp < 0) ? -1 : 1;
1515 }
1516 }
1517 return 0;
1518 }
1519 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520 (short)(bounds & SMASK) > 0) {
1521 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522 return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523 }
1524 else { // validate
1525 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526 boolean unstable = false;
1527 for (int i = 1; i < n; i += 2) {
1528 WorkQueue q; Thread wt; Thread.State ts;
1529 if ((q = ws[i]) != null) {
1530 if (q.source == 0) {
1531 unstable = true;
1532 break;
1533 }
1534 else {
1535 --tc;
1536 if ((wt = q.owner) != null &&
1537 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538 ts == Thread.State.WAITING))
1539 ++bc; // worker is blocking
1540 }
1541 }
1542 }
1543 if (unstable || tc != 0 || ctl != c)
1544 return 0; // inconsistent
1545 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546 if ((md & SATURATE) != 0)
1547 return -1;
1548 else if (bc < pc) { // lagging
1549 Thread.yield(); // for retry spins
1550 return 0;
1551 }
1552 else
1553 throw new RejectedExecutionException(
1554 "Thread limit exceeded replacing blocked worker");
1555 }
1556 }
1557 }
1558
1559 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560 return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 }
1562
1563 /**
1564 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 * See above for explanation.
1566 */
1567 final void runWorker(WorkQueue w) {
1568 WorkQueue[] ws;
1569 w.growArray(); // allocate queue
1570 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571 if (r == 0) // initial nonzero seed
1572 r = 1;
1573 int lastSignalId = 0; // avoid unneeded signals
1574 while ((ws = workQueues) != null) {
1575 boolean nonempty = false; // scan
1576 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578 if ((i = r & m) >= 0 && i < n && // always true
1579 (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 (a = q.array) != null && (al = a.length) > 0) {
1581 int qid = q.id; // (never zero)
1582 int index = (al - 1) & b;
1583 long offset = ((long)index << ASHIFT) + ABASE;
1584 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585 U.getObjectVolatile(a, offset);
1586 if (t != null && b++ == q.base &&
1587 U.compareAndSwapObject(a, offset, t, null)) {
1588 if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589 signalWork(); // propagate signal
1590 w.source = lastSignalId = qid;
1591 t.doExec();
1592 if ((w.id & FIFO) != 0) // run remaining locals
1593 w.localPollAndExec(POLL_LIMIT);
1594 else
1595 w.localPopAndExec(POLL_LIMIT);
1596 ForkJoinWorkerThread thread = w.owner;
1597 ++w.nsteals;
1598 w.source = 0; // now idle
1599 if (thread != null)
1600 thread.afterTopLevelExec();
1601 }
1602 nonempty = true;
1603 }
1604 else if (nonempty)
1605 break;
1606 else
1607 ++r;
1608 }
1609
1610 if (nonempty) { // move (xorshift)
1611 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612 }
1613 else {
1614 int phase;
1615 lastSignalId = 0; // clear for next scan
1616 if ((phase = w.phase) >= 0) { // enqueue
1617 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618 long c, nc;
1619 do {
1620 w.stackPred = (int)(c = ctl);
1621 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622 } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623 }
1624 else { // already queued
1625 int pred = w.stackPred;
1626 w.source = DORMANT; // enable signal
1627 for (int steps = 0;;) {
1628 int md, rc; long c;
1629 if (w.phase >= 0) {
1630 w.source = 0;
1631 break;
1632 }
1633 else if ((md = mode) < 0) // shutting down
1634 return;
1635 else if ((rc = ((md & SMASK) + // possibly quiescent
1636 (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637 (md & SHUTDOWN) != 0 &&
1638 tryTerminate(false, false))
1639 return; // help terminate
1640 else if ((++steps & 1) == 0)
1641 Thread.interrupted(); // clear between parks
1642 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643 long d = keepAlive + System.currentTimeMillis();
1644 LockSupport.parkUntil(this, d);
1645 if (ctl == c &&
1646 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647 long nc = ((UC_MASK & (c - TC_UNIT)) |
1648 (SP_MASK & pred));
1649 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650 w.phase = QUIET;
1651 return; // drop on timeout
1652 }
1653 }
1654 }
1655 else
1656 LockSupport.park(this);
1657 }
1658 }
1659 }
1660 }
1661 }
1662
1663 /**
1664 * Helps and/or blocks until the given task is done or timeout.
1665 * First tries locally helping, then scans other queues for a task
1666 * produced by one of w's stealers; compensating and blocking if
1667 * none are found (rescanning if tryCompensate fails).
1668 *
1669 * @param w caller
1670 * @param task the task
1671 * @param deadline for timed waits, if nonzero
1672 * @return task status on exit
1673 */
1674 final int xawaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1675 int s = 0;
1676 if (w != null && task != null &&
1677 (!(task instanceof CountedCompleter) ||
1678 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1679 w.tryRemoveAndExec(task);
1680 int src = w.source, id = w.id, block = 0;
1681 s = task.status;
1682 while (s >= 0) {
1683 WorkQueue[] ws;
1684 boolean nonempty = false;
1685 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1686 if ((ws = workQueues) != null) { // scan for matching id
1687 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1688 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1689 if ((i = r & m) >= 0 && i < n &&
1690 (q = ws[i]) != null && q.source == id &&
1691 (b = q.base) - q.top < 0 &&
1692 (a = q.array) != null && (al = a.length) > 0) {
1693 int qid = q.id;
1694 if (block > 0)
1695 U.getAndAddLong(this, CTL, RC_UNIT);
1696 block = 0;
1697 int index = (al - 1) & b;
1698 long offset = ((long)index << ASHIFT) + ABASE;
1699 ForkJoinTask<?> t = (ForkJoinTask<?>)
1700 U.getObjectVolatile(a, offset);
1701 if (t != null && b++ == q.base && id == q.source &&
1702 U.compareAndSwapObject(a, offset, t, null)) {
1703 q.base = b;
1704 w.source = qid;
1705 t.doExec();
1706 w.source = src;
1707 }
1708 nonempty = true;
1709 break;
1710 }
1711 else
1712 r += 2;
1713 }
1714 }
1715 if ((s = task.status) < 0)
1716 break;
1717 else if (!nonempty) {
1718 long ms, ns;
1719 if (deadline == 0L)
1720 ms = 0L; // untimed
1721 else if ((ns = deadline - System.nanoTime()) <= 0L)
1722 break; // timeout
1723 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1724 ms = 1L; // avoid 0 for timed wait
1725 if (block == 0)
1726 block = tryCompensate(w);
1727 else
1728 task.internalWait(ms);
1729 s = task.status;
1730 }
1731 }
1732 if (block > 0)
1733 U.getAndAddLong(this, CTL, RC_UNIT);
1734 }
1735 return s;
1736 }
1737
1738 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1739 int s = 0;
1740 if (w != null && task != null &&
1741 (!(task instanceof CountedCompleter) ||
1742 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1743 w.tryRemoveAndExec(task);
1744 int src = w.source, id = w.id;
1745 s = task.status;
1746 while (s >= 0) {
1747 WorkQueue[] ws;
1748 boolean nonempty = false;
1749 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1750 if ((ws = workQueues) != null) { // scan for matching id
1751 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1752 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1753 if ((i = (r + j) & m) >= 0 && i < n &&
1754 (q = ws[i]) != null && q.source == id &&
1755 (b = q.base) - q.top < 0 &&
1756 (a = q.array) != null && (al = a.length) > 0) {
1757 int qid = q.id;
1758 int index = (al - 1) & b;
1759 long offset = ((long)index << ASHIFT) + ABASE;
1760 ForkJoinTask<?> t = (ForkJoinTask<?>)
1761 U.getObjectVolatile(a, offset);
1762 if (t != null && b++ == q.base && id == q.source &&
1763 U.compareAndSwapObject(a, offset, t, null)) {
1764 q.base = b;
1765 w.source = qid;
1766 t.doExec();
1767 w.source = src;
1768 }
1769 nonempty = true;
1770 break;
1771 }
1772 }
1773 }
1774 if ((s = task.status) < 0)
1775 break;
1776 else if (!nonempty) {
1777 long ms, ns; int block;
1778 if (deadline == 0L)
1779 ms = 0L; // untimed
1780 else if ((ns = deadline - System.nanoTime()) <= 0L)
1781 break; // timeout
1782 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1783 ms = 1L; // avoid 0 for timed wait
1784 if ((block = tryCompensate(w)) != 0) {
1785 task.internalWait(ms);
1786 U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1787 }
1788 s = task.status;
1789 }
1790 }
1791 }
1792 return s;
1793 }
1794
1795 /**
1796 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1797 * when tasks cannot be found, rescans until all others cannot
1798 * find tasks either.
1799 */
1800 final void helpQuiescePool(WorkQueue w) {
1801 int prevSrc = w.source, fifo = w.id & FIFO;
1802 for (int source = prevSrc, released = -1;;) { // -1 until known
1803 WorkQueue[] ws;
1804 if (fifo != 0)
1805 w.localPollAndExec(0);
1806 else
1807 w.localPopAndExec(0);
1808 if (released == -1 && w.phase >= 0)
1809 released = 1;
1810 boolean quiet = true, empty = true;
1811 int r = ThreadLocalRandom.nextSecondarySeed();
1812 if ((ws = workQueues) != null) {
1813 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1814 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1815 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1816 if ((b = q.base) - q.top < 0 &&
1817 (a = q.array) != null && (al = a.length) > 0) {
1818 int qid = q.id;
1819 if (released == 0) { // increment
1820 released = 1;
1821 U.getAndAddLong(this, CTL, RC_UNIT);
1822 }
1823 int index = (al - 1) & b;
1824 long offset = ((long)index << ASHIFT) + ABASE;
1825 ForkJoinTask<?> t = (ForkJoinTask<?>)
1826 U.getObjectVolatile(a, offset);
1827 if (t != null && b++ == q.base &&
1828 U.compareAndSwapObject(a, offset, t, null)) {
1829 q.base = b;
1830 w.source = source = q.id;
1831 t.doExec();
1832 w.source = source = prevSrc;
1833 }
1834 quiet = empty = false;
1835 break;
1836 }
1837 else if ((q.source & QUIET) == 0)
1838 quiet = false;
1839 }
1840 }
1841 }
1842 if (quiet) {
1843 if (released == 0)
1844 U.getAndAddLong(this, CTL, RC_UNIT);
1845 w.source = prevSrc;
1846 break;
1847 }
1848 else if (empty) {
1849 if (source != QUIET)
1850 w.source = source = QUIET;
1851 if (released == 1) { // decrement
1852 released = 0;
1853 U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1854 }
1855 }
1856 }
1857 }
1858
1859 /**
1860 * Scans for and returns a polled task, if available.
1861 * Used only for untracked polls.
1862 *
1863 * @param submissionsOnly if true, only scan submission queues
1864 */
1865 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1866 WorkQueue[] ws; int n;
1867 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1868 (n = ws.length) > 0) {
1869 int m = n - 1;
1870 int r = ThreadLocalRandom.nextSecondarySeed();
1871 int h = r >>> 16;
1872 int origin, step;
1873 if (submissionsOnly) {
1874 origin = (r & ~1) & m; // even indices and steps
1875 step = (h & ~1) | 2;
1876 }
1877 else {
1878 origin = r & m;
1879 step = h | 1;
1880 }
1881 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1882 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1883 if ((q = ws[k]) != null) {
1884 checkSum += b = q.base;
1885 if (b - q.top < 0 &&
1886 (a = q.array) != null && (al = a.length) > 0) {
1887 int index = (al - 1) & b;
1888 long offset = ((long)index << ASHIFT) + ABASE;
1889 ForkJoinTask<?> t = (ForkJoinTask<?>)
1890 U.getObjectVolatile(a, offset);
1891 if (t != null && b++ == q.base &&
1892 U.compareAndSwapObject(a, offset, t, null)) {
1893 q.base = b;
1894 return t;
1895 }
1896 else
1897 break; // restart
1898 }
1899 }
1900 if ((k = (k + step) & m) == origin) {
1901 if (oldSum == (oldSum = checkSum))
1902 break rescan;
1903 checkSum = 0;
1904 }
1905 }
1906 }
1907 return null;
1908 }
1909
1910 /**
1911 * Gets and removes a local or stolen task for the given worker.
1912 *
1913 * @return a task, if available
1914 */
1915 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1916 ForkJoinTask<?> t;
1917 if (w != null &&
1918 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1919 return t;
1920 else
1921 return pollScan(false);
1922 }
1923
1924 // External operations
1925
1926 /**
1927 * Adds the given task to a submission queue at submitter's
1928 * current queue, creating one if null or contended.
1929 *
1930 * @param task the task. Caller must ensure non-null.
1931 */
1932 final void externalPush(ForkJoinTask<?> task) {
1933 int r; // initialize caller's probe
1934 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1935 ThreadLocalRandom.localInit();
1936 r = ThreadLocalRandom.getProbe();
1937 }
1938 for (;;) {
1939 int md = mode, n;
1940 WorkQueue[] ws = workQueues;
1941 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1942 throw new RejectedExecutionException();
1943 else {
1944 WorkQueue q;
1945 boolean push = false, grow = false;
1946 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1947 Object lock = workerNamePrefix;
1948 int qid = (r | QUIET) & ~(FIFO | OWNED);
1949 q = new WorkQueue(this, null);
1950 q.id = qid;
1951 q.source = QUIET;
1952 q.phase = QLOCK; // lock queue
1953 if (lock != null) {
1954 synchronized (lock) { // lock pool to install
1955 int i;
1956 if ((ws = workQueues) != null &&
1957 (n = ws.length) > 0 &&
1958 ws[i = qid & (n - 1) & SQMASK] == null) {
1959 ws[i] = q;
1960 push = grow = true;
1961 }
1962 }
1963 }
1964 }
1965 else if (q.tryLockSharedQueue()) {
1966 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1967 if ((a = q.array) != null && (al = a.length) > 0 &&
1968 al - 1 + (d = b - s) > 0) {
1969 a[(al - 1) & s] = task;
1970 q.top = s + 1; // relaxed writes OK here
1971 q.phase = 0;
1972 if (d < 0 && q.base - s < 0)
1973 break; // no signal needed
1974 }
1975 else
1976 grow = true;
1977 push = true;
1978 }
1979 if (push) {
1980 if (grow) {
1981 try {
1982 q.growArray();
1983 int s = q.top, al; ForkJoinTask<?>[] a;
1984 if ((a = q.array) != null && (al = a.length) > 0) {
1985 a[(al - 1) & s] = task;
1986 q.top = s + 1;
1987 }
1988 } finally {
1989 q.phase = 0;
1990 }
1991 }
1992 signalWork();
1993 break;
1994 }
1995 else // move if busy
1996 r = ThreadLocalRandom.advanceProbe(r);
1997 }
1998 }
1999 }
2000
2001 /**
2002 * Pushes a possibly-external submission.
2003 */
2004 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2005 Thread t; ForkJoinWorkerThread w; WorkQueue q;
2006 if (task == null)
2007 throw new NullPointerException();
2008 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2009 (w = (ForkJoinWorkerThread)t).pool == this &&
2010 (q = w.workQueue) != null)
2011 q.push(task);
2012 else
2013 externalPush(task);
2014 return task;
2015 }
2016
2017 /**
2018 * Returns common pool queue for an external thread.
2019 */
2020 static WorkQueue commonSubmitterQueue() {
2021 ForkJoinPool p = common;
2022 int r = ThreadLocalRandom.getProbe();
2023 WorkQueue[] ws; int n;
2024 return (p != null && (ws = p.workQueues) != null &&
2025 (n = ws.length) > 0) ?
2026 ws[(n - 1) & r & SQMASK] : null;
2027 }
2028
2029 /**
2030 * Performs tryUnpush for an external submitter.
2031 */
2032 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2033 int r = ThreadLocalRandom.getProbe();
2034 WorkQueue[] ws; WorkQueue w; int n;
2035 return ((ws = workQueues) != null &&
2036 (n = ws.length) > 0 &&
2037 (w = ws[(n - 1) & r & SQMASK]) != null &&
2038 w.trySharedUnpush(task));
2039 }
2040
2041 /**
2042 * Performs helpComplete for an external submitter.
2043 */
2044 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2045 int r = ThreadLocalRandom.getProbe();
2046 WorkQueue[] ws; WorkQueue w; int n;
2047 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
2048 (w = ws[(n - 1) & r & SQMASK]) != null) ?
2049 w.sharedHelpCC(task, maxTasks) : 0;
2050 }
2051
2052 /**
2053 * Tries to steal and run tasks within the target's computation.
2054 * The maxTasks argument supports external usages; internal calls
2055 * use zero, allowing unbounded steps (external calls trap
2056 * non-positive values).
2057 *
2058 * @param w caller
2059 * @param maxTasks if non-zero, the maximum number of other tasks to run
2060 * @return task status on exit
2061 */
2062 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2063 int maxTasks) {
2064 return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2065 }
2066
2067 /**
2068 * Returns a cheap heuristic guide for task partitioning when
2069 * programmers, frameworks, tools, or languages have little or no
2070 * idea about task granularity. In essence, by offering this
2071 * method, we ask users only about tradeoffs in overhead vs
2072 * expected throughput and its variance, rather than how finely to
2073 * partition tasks.
2074 *
2075 * In a steady state strict (tree-structured) computation, each
2076 * thread makes available for stealing enough tasks for other
2077 * threads to remain active. Inductively, if all threads play by
2078 * the same rules, each thread should make available only a
2079 * constant number of tasks.
2080 *
2081 * The minimum useful constant is just 1. But using a value of 1
2082 * would require immediate replenishment upon each steal to
2083 * maintain enough tasks, which is infeasible. Further,
2084 * partitionings/granularities of offered tasks should minimize
2085 * steal rates, which in general means that threads nearer the top
2086 * of computation tree should generate more than those nearer the
2087 * bottom. In perfect steady state, each thread is at
2088 * approximately the same level of computation tree. However,
2089 * producing extra tasks amortizes the uncertainty of progress and
2090 * diffusion assumptions.
2091 *
2092 * So, users will want to use values larger (but not much larger)
2093 * than 1 to both smooth over transient shortages and hedge
2094 * against uneven progress; as traded off against the cost of
2095 * extra task overhead. We leave the user to pick a threshold
2096 * value to compare with the results of this call to guide
2097 * decisions, but recommend values such as 3.
2098 *
2099 * When all threads are active, it is on average OK to estimate
2100 * surplus strictly locally. In steady-state, if one thread is
2101 * maintaining say 2 surplus tasks, then so are others. So we can
2102 * just use estimated queue length. However, this strategy alone
2103 * leads to serious mis-estimates in some non-steady-state
2104 * conditions (ramp-up, ramp-down, other stalls). We can detect
2105 * many of these by further considering the number of "idle"
2106 * threads, that are known to have zero queued tasks, so
2107 * compensate by a factor of (#idle/#active) threads.
2108 */
2109 static int getSurplusQueuedTaskCount() {
2110 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2111 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2112 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2113 (q = wt.workQueue) != null) {
2114 int p = pool.mode & SMASK;
2115 int a = p + (int)(pool.ctl >> RC_SHIFT);
2116 int n = q.top - q.base;
2117 return n - (a > (p >>>= 1) ? 0 :
2118 a > (p >>>= 1) ? 1 :
2119 a > (p >>>= 1) ? 2 :
2120 a > (p >>>= 1) ? 4 :
2121 8);
2122 }
2123 return 0;
2124 }
2125
2126 // Termination
2127
2128 /**
2129 * Possibly initiates and/or completes termination.
2130 *
2131 * @param now if true, unconditionally terminate, else only
2132 * if no work and no active workers
2133 * @param enable if true, terminate when next possible
2134 * @return true if terminating or terminated
2135 */
2136 private boolean tryTerminate(boolean now, boolean enable) {
2137 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2138
2139 while (((md = mode) & SHUTDOWN) == 0) {
2140 if (!enable || this == common) // cannot shutdown
2141 return false;
2142 else
2143 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2144 }
2145
2146 while (((md = mode) & STOP) == 0) { // try to initiate termination
2147 if (!now) { // check if quiescent & empty
2148 for (long oldSum = 0L;;) { // repeat until stable
2149 boolean running = false;
2150 long checkSum = ctl;
2151 WorkQueue[] ws = workQueues;
2152 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2153 running = true;
2154 else if (ws != null) {
2155 WorkQueue w; int b;
2156 for (int i = 0; i < ws.length; ++i) {
2157 if ((w = ws[i]) != null) {
2158 checkSum += (b = w.base) + w.id;
2159 if (b != w.top ||
2160 ((i & 1) == 1 && w.source >= 0)) {
2161 running = true;
2162 break;
2163 }
2164 }
2165 }
2166 }
2167 if (((md = mode) & STOP) != 0)
2168 break; // already triggered
2169 else if (running)
2170 return false;
2171 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2172 break;
2173 }
2174 }
2175 if ((md & STOP) == 0)
2176 U.compareAndSwapInt(this, MODE, md, md | STOP);
2177 }
2178
2179 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2180 for (long oldSum = 0L;;) { // repeat until stable
2181 WorkQueue[] ws; WorkQueue w;
2182 long checkSum = ctl;
2183 if ((ws = workQueues) != null) {
2184 for (int i = 0; i < ws.length; ++i) {
2185 if ((w = ws[i]) != null) {
2186 ForkJoinWorkerThread wt = w.owner;
2187 w.cancelAll(); // clear queues
2188 if (wt != null) {
2189 try { // unblock join or park
2190 wt.interrupt();
2191 } catch (Throwable ignore) {
2192 }
2193 }
2194 checkSum += w.base + w.id;
2195 }
2196 }
2197 }
2198 if (((md = mode) & TERMINATED) != 0 ||
2199 (workQueues == ws && oldSum == (oldSum = checkSum)))
2200 break;
2201 }
2202 if ((md & TERMINATED) != 0)
2203 break;
2204 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2205 break;
2206 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2207 synchronized (this) {
2208 notifyAll(); // for awaitTermination
2209 }
2210 break;
2211 }
2212 }
2213 return true;
2214 }
2215
2216 // Exported methods
2217
2218 // Constructors
2219
2220 /**
2221 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2222 * java.lang.Runtime#availableProcessors}, using defaults for all
2223 * other parameters.
2224 *
2225 * @throws SecurityException if a security manager exists and
2226 * the caller is not permitted to modify threads
2227 * because it does not hold {@link
2228 * java.lang.RuntimePermission}{@code ("modifyThread")}
2229 */
2230 public ForkJoinPool() {
2231 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2232 defaultForkJoinWorkerThreadFactory, null, false,
2233 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2234 }
2235
2236 /**
2237 * Creates a {@code ForkJoinPool} with the indicated parallelism
2238 * level, using defaults for all other parameters.
2239 *
2240 * @param parallelism the parallelism level
2241 * @throws IllegalArgumentException if parallelism less than or
2242 * equal to zero, or greater than implementation limit
2243 * @throws SecurityException if a security manager exists and
2244 * the caller is not permitted to modify threads
2245 * because it does not hold {@link
2246 * java.lang.RuntimePermission}{@code ("modifyThread")}
2247 */
2248 public ForkJoinPool(int parallelism) {
2249 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2250 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2251 }
2252
2253 /**
2254 * Creates a {@code ForkJoinPool} with the given parameters (using
2255 * defaults for others).
2256 *
2257 * @param parallelism the parallelism level. For default value,
2258 * use {@link java.lang.Runtime#availableProcessors}.
2259 * @param factory the factory for creating new threads. For default value,
2260 * use {@link #defaultForkJoinWorkerThreadFactory}.
2261 * @param handler the handler for internal worker threads that
2262 * terminate due to unrecoverable errors encountered while executing
2263 * tasks. For default value, use {@code null}.
2264 * @param asyncMode if true,
2265 * establishes local first-in-first-out scheduling mode for forked
2266 * tasks that are never joined. This mode may be more appropriate
2267 * than default locally stack-based mode in applications in which
2268 * worker threads only process event-style asynchronous tasks.
2269 * For default value, use {@code false}.
2270 * @throws IllegalArgumentException if parallelism less than or
2271 * equal to zero, or greater than implementation limit
2272 * @throws NullPointerException if the factory is null
2273 * @throws SecurityException if a security manager exists and
2274 * the caller is not permitted to modify threads
2275 * because it does not hold {@link
2276 * java.lang.RuntimePermission}{@code ("modifyThread")}
2277 */
2278 public ForkJoinPool(int parallelism,
2279 ForkJoinWorkerThreadFactory factory,
2280 UncaughtExceptionHandler handler,
2281 boolean asyncMode) {
2282 this(parallelism, factory, handler, asyncMode,
2283 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2284 }
2285
2286 /**
2287 * Creates a {@code ForkJoinPool} with the given parameters.
2288 *
2289 * @param parallelism the parallelism level. For default value,
2290 * use {@link java.lang.Runtime#availableProcessors}.
2291 *
2292 * @param factory the factory for creating new threads. For
2293 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2294 *
2295 * @param handler the handler for internal worker threads that
2296 * terminate due to unrecoverable errors encountered while
2297 * executing tasks. For default value, use {@code null}.
2298 *
2299 * @param asyncMode if true, establishes local first-in-first-out
2300 * scheduling mode for forked tasks that are never joined. This
2301 * mode may be more appropriate than default locally stack-based
2302 * mode in applications in which worker threads only process
2303 * event-style asynchronous tasks. For default value, use {@code
2304 * false}.
2305 *
2306 * @param corePoolSize the number of threads to keep in the pool
2307 * (unless timed out after an elapsed keep-alive). Normally (and
2308 * by default) this is the same value as the parallelism level,
2309 * but may be set to a larger value to reduce dynamic overhead if
2310 * tasks regularly block. Using a smaller value (for example
2311 * {@code 0}) has the same effect as the default.
2312 *
2313 * @param maximumPoolSize the maximum number of threads allowed.
2314 * When the maximum is reached, attempts to replace blocked
2315 * threads fail. (However, because creation and termination of
2316 * different threads may overlap, and may be managed by the given
2317 * thread factory, this value may be transiently exceeded.) The
2318 * default for the common pool is {@code 256} plus the parallelism
2319 * level. Using a value (for example {@code Integer.MAX_VALUE})
2320 * larger than the implementation's total thread limit has the
2321 * same effect as using this limit.
2322 *
2323 * @param minimumRunnable the minimum allowed number of core
2324 * threads not blocked by a join or {@link ManagedBlocker}. To
2325 * ensure progress, when too few unblocked threads exist and
2326 * unexecuted tasks may exist, new threads are constructed, up to
2327 * the given maximumPoolSize. For the default value, use {@code
2328 * 1}, that ensures liveness. A larger value might improve
2329 * throughput in the presence of blocked activities, but might
2330 * not, due to increased overhead. A value of zero may be
2331 * acceptable when submitted tasks cannot have dependencies
2332 * requiring additional threads.
2333 *
2334 * @param rejectOnSaturation if true, attempts to create more than
2335 * the maximum total allowed threads throw {@link
2336 * RejectedExecutionException}. Otherwise, the pool continues to
2337 * operate, but with fewer than the target number of runnable
2338 * threads, so might not ensure progress. For default value, use
2339 * {@code true}.
2340 *
2341 * @param keepAliveTime the elapsed time since last use before
2342 * a thread is terminated (and then later replaced if needed).
2343 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2344 *
2345 * @param unit the time unit for the {@code keepAliveTime} argument
2346 *
2347 * @throws IllegalArgumentException if parallelism is less than or
2348 * equal to zero, or is greater than implementation limit,
2349 * or if maximumPoolSize is less than parallelism,
2350 * of if the keepAliveTime is less than or equal to zero.
2351 * @throws NullPointerException if the factory is null
2352 * @throws SecurityException if a security manager exists and
2353 * the caller is not permitted to modify threads
2354 * because it does not hold {@link
2355 * java.lang.RuntimePermission}{@code ("modifyThread")}
2356 */
2357 public ForkJoinPool(int parallelism,
2358 ForkJoinWorkerThreadFactory factory,
2359 UncaughtExceptionHandler handler,
2360 boolean asyncMode,
2361 int corePoolSize,
2362 int maximumPoolSize,
2363 int minimumRunnable,
2364 boolean rejectOnSaturation,
2365 long keepAliveTime,
2366 TimeUnit unit) {
2367 // check, encode, pack parameters
2368 if (parallelism <= 0 || parallelism > MAX_CAP ||
2369 maximumPoolSize < parallelism || keepAliveTime <= 0L)
2370 throw new IllegalArgumentException();
2371 if (factory == null)
2372 throw new NullPointerException();
2373 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2374
2375 String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2376 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2377 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2378 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2379 int m = (parallelism |
2380 (asyncMode ? FIFO : 0) |
2381 (rejectOnSaturation ? 0 : SATURATE));
2382 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2383 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2384 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2385 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2386 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2387 n = (n + 1) << 1; // power of two, including space for submission queues
2388
2389 this.workQueues = new WorkQueue[n];
2390 this.workerNamePrefix = prefix;
2391 this.factory = factory;
2392 this.ueh = handler;
2393 this.keepAlive = ms;
2394 this.bounds = b;
2395 this.mode = m;
2396 this.ctl = c;
2397 checkPermission();
2398 }
2399
2400 /**
2401 * Constructor for common pool using parameters possibly
2402 * overridden by system properties
2403 */
2404 private ForkJoinPool(byte forCommonPoolOnly) {
2405 int parallelism = -1;
2406 ForkJoinWorkerThreadFactory fac = null;
2407 UncaughtExceptionHandler handler = null;
2408 try { // ignore exceptions in accessing/parsing properties
2409 String pp = System.getProperty
2410 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2411 String fp = System.getProperty
2412 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2413 String hp = System.getProperty
2414 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2415 if (pp != null)
2416 parallelism = Integer.parseInt(pp);
2417 if (fp != null)
2418 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2419 getSystemClassLoader().loadClass(fp).newInstance());
2420 if (hp != null)
2421 handler = ((UncaughtExceptionHandler)ClassLoader.
2422 getSystemClassLoader().loadClass(hp).newInstance());
2423 } catch (Exception ignore) {
2424 }
2425
2426 if (fac == null) {
2427 if (System.getSecurityManager() == null)
2428 fac = defaultForkJoinWorkerThreadFactory;
2429 else // use security-managed default
2430 fac = new InnocuousForkJoinWorkerThreadFactory();
2431 }
2432 if (parallelism < 0 && // default 1 less than #cores
2433 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2434 parallelism = 1;
2435 if (parallelism > MAX_CAP)
2436 parallelism = MAX_CAP;
2437
2438 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2439 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2440 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2441 int m = (parallelism < 1) ? 1 : parallelism;
2442 int n = (parallelism > 1) ? parallelism - 1 : 1;
2443 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2444 n = (n + 1) << 1;
2445
2446 this.workQueues = new WorkQueue[n];
2447 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2448 this.factory = fac;
2449 this.ueh = handler;
2450 this.keepAlive = DEFAULT_KEEPALIVE;
2451 this.bounds = b;
2452 this.mode = m;
2453 this.ctl = c;
2454 }
2455
2456 /**
2457 * Returns the common pool instance. This pool is statically
2458 * constructed; its run state is unaffected by attempts to {@link
2459 * #shutdown} or {@link #shutdownNow}. However this pool and any
2460 * ongoing processing are automatically terminated upon program
2461 * {@link System#exit}. Any program that relies on asynchronous
2462 * task processing to complete before program termination should
2463 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2464 * before exit.
2465 *
2466 * @return the common pool instance
2467 * @since 1.8
2468 */
2469 public static ForkJoinPool commonPool() {
2470 // assert common != null : "static init error";
2471 return common;
2472 }
2473
2474 // Execution methods
2475
2476 /**
2477 * Performs the given task, returning its result upon completion.
2478 * If the computation encounters an unchecked Exception or Error,
2479 * it is rethrown as the outcome of this invocation. Rethrown
2480 * exceptions behave in the same way as regular exceptions, but,
2481 * when possible, contain stack traces (as displayed for example
2482 * using {@code ex.printStackTrace()}) of both the current thread
2483 * as well as the thread actually encountering the exception;
2484 * minimally only the latter.
2485 *
2486 * @param task the task
2487 * @param <T> the type of the task's result
2488 * @return the task's result
2489 * @throws NullPointerException if the task is null
2490 * @throws RejectedExecutionException if the task cannot be
2491 * scheduled for execution
2492 */
2493 public <T> T invoke(ForkJoinTask<T> task) {
2494 if (task == null)
2495 throw new NullPointerException();
2496 externalSubmit(task);
2497 return task.join();
2498 }
2499
2500 /**
2501 * Arranges for (asynchronous) execution of the given task.
2502 *
2503 * @param task the task
2504 * @throws NullPointerException if the task is null
2505 * @throws RejectedExecutionException if the task cannot be
2506 * scheduled for execution
2507 */
2508 public void execute(ForkJoinTask<?> task) {
2509 externalSubmit(task);
2510 }
2511
2512 // AbstractExecutorService methods
2513
2514 /**
2515 * @throws NullPointerException if the task is null
2516 * @throws RejectedExecutionException if the task cannot be
2517 * scheduled for execution
2518 */
2519 public void execute(Runnable task) {
2520 if (task == null)
2521 throw new NullPointerException();
2522 ForkJoinTask<?> job;
2523 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2524 job = (ForkJoinTask<?>) task;
2525 else
2526 job = new ForkJoinTask.RunnableExecuteAction(task);
2527 externalSubmit(job);
2528 }
2529
2530 /**
2531 * Submits a ForkJoinTask for execution.
2532 *
2533 * @param task the task to submit
2534 * @param <T> the type of the task's result
2535 * @return the task
2536 * @throws NullPointerException if the task is null
2537 * @throws RejectedExecutionException if the task cannot be
2538 * scheduled for execution
2539 */
2540 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2541 return externalSubmit(task);
2542 }
2543
2544 /**
2545 * @throws NullPointerException if the task is null
2546 * @throws RejectedExecutionException if the task cannot be
2547 * scheduled for execution
2548 */
2549 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2550 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2551 }
2552
2553 /**
2554 * @throws NullPointerException if the task is null
2555 * @throws RejectedExecutionException if the task cannot be
2556 * scheduled for execution
2557 */
2558 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2559 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2560 }
2561
2562 /**
2563 * @throws NullPointerException if the task is null
2564 * @throws RejectedExecutionException if the task cannot be
2565 * scheduled for execution
2566 */
2567 public ForkJoinTask<?> submit(Runnable task) {
2568 if (task == null)
2569 throw new NullPointerException();
2570 ForkJoinTask<?> job;
2571 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2572 job = (ForkJoinTask<?>) task;
2573 else
2574 job = new ForkJoinTask.AdaptedRunnableAction(task);
2575 return externalSubmit(job);
2576 }
2577
2578 /**
2579 * @throws NullPointerException {@inheritDoc}
2580 * @throws RejectedExecutionException {@inheritDoc}
2581 */
2582 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2583 // In previous versions of this class, this method constructed
2584 // a task to run ForkJoinTask.invokeAll, but now external
2585 // invocation of multiple tasks is at least as efficient.
2586 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2587
2588 try {
2589 for (Callable<T> t : tasks) {
2590 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2591 futures.add(f);
2592 externalSubmit(f);
2593 }
2594 for (int i = 0, size = futures.size(); i < size; i++)
2595 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2596 return futures;
2597 } catch (Throwable t) {
2598 for (int i = 0, size = futures.size(); i < size; i++)
2599 futures.get(i).cancel(false);
2600 throw t;
2601 }
2602 }
2603
2604 /**
2605 * Returns the factory used for constructing new workers.
2606 *
2607 * @return the factory used for constructing new workers
2608 */
2609 public ForkJoinWorkerThreadFactory getFactory() {
2610 return factory;
2611 }
2612
2613 /**
2614 * Returns the handler for internal worker threads that terminate
2615 * due to unrecoverable errors encountered while executing tasks.
2616 *
2617 * @return the handler, or {@code null} if none
2618 */
2619 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2620 return ueh;
2621 }
2622
2623 /**
2624 * Returns the targeted parallelism level of this pool.
2625 *
2626 * @return the targeted parallelism level of this pool
2627 */
2628 public int getParallelism() {
2629 return mode & SMASK;
2630 }
2631
2632 /**
2633 * Returns the targeted parallelism level of the common pool.
2634 *
2635 * @return the targeted parallelism level of the common pool
2636 * @since 1.8
2637 */
2638 public static int getCommonPoolParallelism() {
2639 return COMMON_PARALLELISM;
2640 }
2641
2642 /**
2643 * Returns the number of worker threads that have started but not
2644 * yet terminated. The result returned by this method may differ
2645 * from {@link #getParallelism} when threads are created to
2646 * maintain parallelism when others are cooperatively blocked.
2647 *
2648 * @return the number of worker threads
2649 */
2650 public int getPoolSize() {
2651 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2652 }
2653
2654 /**
2655 * Returns {@code true} if this pool uses local first-in-first-out
2656 * scheduling mode for forked tasks that are never joined.
2657 *
2658 * @return {@code true} if this pool uses async mode
2659 */
2660 public boolean getAsyncMode() {
2661 return (mode & FIFO) != 0;
2662 }
2663
2664 /**
2665 * Returns an estimate of the number of worker threads that are
2666 * not blocked waiting to join tasks or for other managed
2667 * synchronization. This method may overestimate the
2668 * number of running threads.
2669 *
2670 * @return the number of worker threads
2671 */
2672 public int getRunningThreadCount() {
2673 int rc = 0;
2674 WorkQueue[] ws; WorkQueue w;
2675 if ((ws = workQueues) != null) {
2676 for (int i = 1; i < ws.length; i += 2) {
2677 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2678 ++rc;
2679 }
2680 }
2681 return rc;
2682 }
2683
2684 /**
2685 * Returns an estimate of the number of threads that are currently
2686 * stealing or executing tasks. This method may overestimate the
2687 * number of active threads.
2688 *
2689 * @return the number of active threads
2690 */
2691 public int getActiveThreadCount() {
2692 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2693 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2694 }
2695
2696 /**
2697 * Returns {@code true} if all worker threads are currently idle.
2698 * An idle worker is one that cannot obtain a task to execute
2699 * because none are available to steal from other threads, and
2700 * there are no pending submissions to the pool. This method is
2701 * conservative; it might not return {@code true} immediately upon
2702 * idleness of all threads, but will eventually become true if
2703 * threads remain inactive.
2704 *
2705 * @return {@code true} if all threads are currently idle
2706 */
2707 public boolean isQuiescent() {
2708 for (;;) {
2709 long c = ctl;
2710 int md = mode, pc = md & SMASK;
2711 int tc = pc + (short)(c >> TC_SHIFT);
2712 int rc = pc + (int)(c >> RC_SHIFT);
2713 if ((md & (STOP | TERMINATED)) != 0)
2714 return true;
2715 else if (rc > 0)
2716 return false;
2717 else {
2718 WorkQueue[] ws; WorkQueue v;
2719 if ((ws = workQueues) != null) {
2720 for (int i = 1; i < ws.length; i += 2) {
2721 if ((v = ws[i]) != null) {
2722 if ((v.source & QUIET) == 0)
2723 return false;
2724 --tc;
2725 }
2726 }
2727 }
2728 if (tc == 0 && ctl == c)
2729 return true;
2730 }
2731 }
2732 }
2733
2734 /**
2735 * Returns an estimate of the total number of tasks stolen from
2736 * one thread's work queue by another. The reported value
2737 * underestimates the actual total number of steals when the pool
2738 * is not quiescent. This value may be useful for monitoring and
2739 * tuning fork/join programs: in general, steal counts should be
2740 * high enough to keep threads busy, but low enough to avoid
2741 * overhead and contention across threads.
2742 *
2743 * @return the number of steals
2744 */
2745 public long getStealCount() {
2746 long count = stealCount;
2747 WorkQueue[] ws; WorkQueue w;
2748 if ((ws = workQueues) != null) {
2749 for (int i = 1; i < ws.length; i += 2) {
2750 if ((w = ws[i]) != null)
2751 count += (long)w.nsteals & 0xffffffffL;
2752 }
2753 }
2754 return count;
2755 }
2756
2757 /**
2758 * Returns an estimate of the total number of tasks currently held
2759 * in queues by worker threads (but not including tasks submitted
2760 * to the pool that have not begun executing). This value is only
2761 * an approximation, obtained by iterating across all threads in
2762 * the pool. This method may be useful for tuning task
2763 * granularities.
2764 *
2765 * @return the number of queued tasks
2766 */
2767 public long getQueuedTaskCount() {
2768 long count = 0;
2769 WorkQueue[] ws; WorkQueue w;
2770 if ((ws = workQueues) != null) {
2771 for (int i = 1; i < ws.length; i += 2) {
2772 if ((w = ws[i]) != null)
2773 count += w.queueSize();
2774 }
2775 }
2776 return count;
2777 }
2778
2779 /**
2780 * Returns an estimate of the number of tasks submitted to this
2781 * pool that have not yet begun executing. This method may take
2782 * time proportional to the number of submissions.
2783 *
2784 * @return the number of queued submissions
2785 */
2786 public int getQueuedSubmissionCount() {
2787 int count = 0;
2788 WorkQueue[] ws; WorkQueue w;
2789 if ((ws = workQueues) != null) {
2790 for (int i = 0; i < ws.length; i += 2) {
2791 if ((w = ws[i]) != null)
2792 count += w.queueSize();
2793 }
2794 }
2795 return count;
2796 }
2797
2798 /**
2799 * Returns {@code true} if there are any tasks submitted to this
2800 * pool that have not yet begun executing.
2801 *
2802 * @return {@code true} if there are any queued submissions
2803 */
2804 public boolean hasQueuedSubmissions() {
2805 WorkQueue[] ws; WorkQueue w;
2806 if ((ws = workQueues) != null) {
2807 for (int i = 0; i < ws.length; i += 2) {
2808 if ((w = ws[i]) != null && !w.isEmpty())
2809 return true;
2810 }
2811 }
2812 return false;
2813 }
2814
2815 /**
2816 * Removes and returns the next unexecuted submission if one is
2817 * available. This method may be useful in extensions to this
2818 * class that re-assign work in systems with multiple pools.
2819 *
2820 * @return the next submission, or {@code null} if none
2821 */
2822 protected ForkJoinTask<?> pollSubmission() {
2823 return pollScan(true);
2824 }
2825
2826 /**
2827 * Removes all available unexecuted submitted and forked tasks
2828 * from scheduling queues and adds them to the given collection,
2829 * without altering their execution status. These may include
2830 * artificially generated or wrapped tasks. This method is
2831 * designed to be invoked only when the pool is known to be
2832 * quiescent. Invocations at other times may not remove all
2833 * tasks. A failure encountered while attempting to add elements
2834 * to collection {@code c} may result in elements being in
2835 * neither, either or both collections when the associated
2836 * exception is thrown. The behavior of this operation is
2837 * undefined if the specified collection is modified while the
2838 * operation is in progress.
2839 *
2840 * @param c the collection to transfer elements into
2841 * @return the number of elements transferred
2842 */
2843 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2844 int count = 0;
2845 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2846 if ((ws = workQueues) != null) {
2847 for (int i = 0; i < ws.length; ++i) {
2848 if ((w = ws[i]) != null) {
2849 while ((t = w.poll()) != null) {
2850 c.add(t);
2851 ++count;
2852 }
2853 }
2854 }
2855 }
2856 return count;
2857 }
2858
2859 /**
2860 * Returns a string identifying this pool, as well as its state,
2861 * including indications of run state, parallelism level, and
2862 * worker and task counts.
2863 *
2864 * @return a string identifying this pool, as well as its state
2865 */
2866 public String toString() {
2867 // Use a single pass through workQueues to collect counts
2868 long qt = 0L, qs = 0L; int rc = 0;
2869 long st = stealCount;
2870 WorkQueue[] ws; WorkQueue w;
2871 if ((ws = workQueues) != null) {
2872 for (int i = 0; i < ws.length; ++i) {
2873 if ((w = ws[i]) != null) {
2874 int size = w.queueSize();
2875 if ((i & 1) == 0)
2876 qs += size;
2877 else {
2878 qt += size;
2879 st += (long)w.nsteals & 0xffffffffL;
2880 if (w.isApparentlyUnblocked())
2881 ++rc;
2882 }
2883 }
2884 }
2885 }
2886
2887 int md = mode;
2888 int pc = (md & SMASK);
2889 long c = ctl;
2890 int tc = pc + (short)(c >>> TC_SHIFT);
2891 int ac = pc + (int)(c >> RC_SHIFT);
2892 if (ac < 0) // ignore transient negative
2893 ac = 0;
2894 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2895 (md & STOP) != 0 ? "Terminating" :
2896 (md & SHUTDOWN) != 0 ? "Shutting down" :
2897 "Running");
2898 return super.toString() +
2899 "[" + level +
2900 ", parallelism = " + pc +
2901 ", size = " + tc +
2902 ", active = " + ac +
2903 ", running = " + rc +
2904 ", steals = " + st +
2905 ", tasks = " + qt +
2906 ", submissions = " + qs +
2907 "]";
2908 }
2909
2910 /**
2911 * Possibly initiates an orderly shutdown in which previously
2912 * submitted tasks are executed, but no new tasks will be
2913 * accepted. Invocation has no effect on execution state if this
2914 * is the {@link #commonPool()}, and no additional effect if
2915 * already shut down. Tasks that are in the process of being
2916 * submitted concurrently during the course of this method may or
2917 * may not be rejected.
2918 *
2919 * @throws SecurityException if a security manager exists and
2920 * the caller is not permitted to modify threads
2921 * because it does not hold {@link
2922 * java.lang.RuntimePermission}{@code ("modifyThread")}
2923 */
2924 public void shutdown() {
2925 checkPermission();
2926 tryTerminate(false, true);
2927 }
2928
2929 /**
2930 * Possibly attempts to cancel and/or stop all tasks, and reject
2931 * all subsequently submitted tasks. Invocation has no effect on
2932 * execution state if this is the {@link #commonPool()}, and no
2933 * additional effect if already shut down. Otherwise, tasks that
2934 * are in the process of being submitted or executed concurrently
2935 * during the course of this method may or may not be
2936 * rejected. This method cancels both existing and unexecuted
2937 * tasks, in order to permit termination in the presence of task
2938 * dependencies. So the method always returns an empty list
2939 * (unlike the case for some other Executors).
2940 *
2941 * @return an empty list
2942 * @throws SecurityException if a security manager exists and
2943 * the caller is not permitted to modify threads
2944 * because it does not hold {@link
2945 * java.lang.RuntimePermission}{@code ("modifyThread")}
2946 */
2947 public List<Runnable> shutdownNow() {
2948 checkPermission();
2949 tryTerminate(true, true);
2950 return Collections.emptyList();
2951 }
2952
2953 /**
2954 * Returns {@code true} if all tasks have completed following shut down.
2955 *
2956 * @return {@code true} if all tasks have completed following shut down
2957 */
2958 public boolean isTerminated() {
2959 return (mode & TERMINATED) != 0;
2960 }
2961
2962 /**
2963 * Returns {@code true} if the process of termination has
2964 * commenced but not yet completed. This method may be useful for
2965 * debugging. A return of {@code true} reported a sufficient
2966 * period after shutdown may indicate that submitted tasks have
2967 * ignored or suppressed interruption, or are waiting for I/O,
2968 * causing this executor not to properly terminate. (See the
2969 * advisory notes for class {@link ForkJoinTask} stating that
2970 * tasks should not normally entail blocking operations. But if
2971 * they do, they must abort them on interrupt.)
2972 *
2973 * @return {@code true} if terminating but not yet terminated
2974 */
2975 public boolean isTerminating() {
2976 int md = mode;
2977 return (md & STOP) != 0 && (md & TERMINATED) == 0;
2978 }
2979
2980 /**
2981 * Returns {@code true} if this pool has been shut down.
2982 *
2983 * @return {@code true} if this pool has been shut down
2984 */
2985 public boolean isShutdown() {
2986 return (mode & SHUTDOWN) != 0;
2987 }
2988
2989 /**
2990 * Blocks until all tasks have completed execution after a
2991 * shutdown request, or the timeout occurs, or the current thread
2992 * is interrupted, whichever happens first. Because the {@link
2993 * #commonPool()} never terminates until program shutdown, when
2994 * applied to the common pool, this method is equivalent to {@link
2995 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2996 *
2997 * @param timeout the maximum time to wait
2998 * @param unit the time unit of the timeout argument
2999 * @return {@code true} if this executor terminated and
3000 * {@code false} if the timeout elapsed before termination
3001 * @throws InterruptedException if interrupted while waiting
3002 */
3003 public boolean awaitTermination(long timeout, TimeUnit unit)
3004 throws InterruptedException {
3005 if (Thread.interrupted())
3006 throw new InterruptedException();
3007 if (this == common) {
3008 awaitQuiescence(timeout, unit);
3009 return false;
3010 }
3011 long nanos = unit.toNanos(timeout);
3012 if (isTerminated())
3013 return true;
3014 if (nanos <= 0L)
3015 return false;
3016 long deadline = System.nanoTime() + nanos;
3017 synchronized (this) {
3018 for (;;) {
3019 if (isTerminated())
3020 return true;
3021 if (nanos <= 0L)
3022 return false;
3023 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3024 wait(millis > 0L ? millis : 1L);
3025 nanos = deadline - System.nanoTime();
3026 }
3027 }
3028 }
3029
3030 /**
3031 * If called by a ForkJoinTask operating in this pool, equivalent
3032 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3033 * waits and/or attempts to assist performing tasks until this
3034 * pool {@link #isQuiescent} or the indicated timeout elapses.
3035 *
3036 * @param timeout the maximum time to wait
3037 * @param unit the time unit of the timeout argument
3038 * @return {@code true} if quiescent; {@code false} if the
3039 * timeout elapsed.
3040 */
3041 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3042 long nanos = unit.toNanos(timeout);
3043 ForkJoinWorkerThread wt;
3044 Thread thread = Thread.currentThread();
3045 if ((thread instanceof ForkJoinWorkerThread) &&
3046 (wt = (ForkJoinWorkerThread)thread).pool == this) {
3047 helpQuiescePool(wt.workQueue);
3048 return true;
3049 }
3050 else {
3051 for (long startTime = System.nanoTime();;) {
3052 ForkJoinTask<?> t;
3053 if ((t = pollScan(false)) != null)
3054 t.doExec();
3055 else if (isQuiescent())
3056 return true;
3057 else if ((System.nanoTime() - startTime) > nanos)
3058 return false;
3059 else
3060 Thread.yield(); // cannot block
3061 }
3062 }
3063 }
3064
3065 /**
3066 * Waits and/or attempts to assist performing tasks indefinitely
3067 * until the {@link #commonPool()} {@link #isQuiescent}.
3068 */
3069 static void quiesceCommonPool() {
3070 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3071 }
3072
3073 /**
3074 * Interface for extending managed parallelism for tasks running
3075 * in {@link ForkJoinPool}s.
3076 *
3077 * <p>A {@code ManagedBlocker} provides two methods. Method
3078 * {@link #isReleasable} must return {@code true} if blocking is
3079 * not necessary. Method {@link #block} blocks the current thread
3080 * if necessary (perhaps internally invoking {@code isReleasable}
3081 * before actually blocking). These actions are performed by any
3082 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3083 * The unusual methods in this API accommodate synchronizers that
3084 * may, but don't usually, block for long periods. Similarly, they
3085 * allow more efficient internal handling of cases in which
3086 * additional workers may be, but usually are not, needed to
3087 * ensure sufficient parallelism. Toward this end,
3088 * implementations of method {@code isReleasable} must be amenable
3089 * to repeated invocation.
3090 *
3091 * <p>For example, here is a ManagedBlocker based on a
3092 * ReentrantLock:
3093 * <pre> {@code
3094 * class ManagedLocker implements ManagedBlocker {
3095 * final ReentrantLock lock;
3096 * boolean hasLock = false;
3097 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3098 * public boolean block() {
3099 * if (!hasLock)
3100 * lock.lock();
3101 * return true;
3102 * }
3103 * public boolean isReleasable() {
3104 * return hasLock || (hasLock = lock.tryLock());
3105 * }
3106 * }}</pre>
3107 *
3108 * <p>Here is a class that possibly blocks waiting for an
3109 * item on a given queue:
3110 * <pre> {@code
3111 * class QueueTaker<E> implements ManagedBlocker {
3112 * final BlockingQueue<E> queue;
3113 * volatile E item = null;
3114 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3115 * public boolean block() throws InterruptedException {
3116 * if (item == null)
3117 * item = queue.take();
3118 * return true;
3119 * }
3120 * public boolean isReleasable() {
3121 * return item != null || (item = queue.poll()) != null;
3122 * }
3123 * public E getItem() { // call after pool.managedBlock completes
3124 * return item;
3125 * }
3126 * }}</pre>
3127 */
3128 public static interface ManagedBlocker {
3129 /**
3130 * Possibly blocks the current thread, for example waiting for
3131 * a lock or condition.
3132 *
3133 * @return {@code true} if no additional blocking is necessary
3134 * (i.e., if isReleasable would return true)
3135 * @throws InterruptedException if interrupted while waiting
3136 * (the method is not required to do so, but is allowed to)
3137 */
3138 boolean block() throws InterruptedException;
3139
3140 /**
3141 * Returns {@code true} if blocking is unnecessary.
3142 * @return {@code true} if blocking is unnecessary
3143 */
3144 boolean isReleasable();
3145 }
3146
3147 /**
3148 * Runs the given possibly blocking task. When {@linkplain
3149 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3150 * method possibly arranges for a spare thread to be activated if
3151 * necessary to ensure sufficient parallelism while the current
3152 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3153 *
3154 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3155 * {@code blocker.block()} until either method returns {@code true}.
3156 * Every call to {@code blocker.block()} is preceded by a call to
3157 * {@code blocker.isReleasable()} that returned {@code false}.
3158 *
3159 * <p>If not running in a ForkJoinPool, this method is
3160 * behaviorally equivalent to
3161 * <pre> {@code
3162 * while (!blocker.isReleasable())
3163 * if (blocker.block())
3164 * break;}</pre>
3165 *
3166 * If running in a ForkJoinPool, the pool may first be expanded to
3167 * ensure sufficient parallelism available during the call to
3168 * {@code blocker.block()}.
3169 *
3170 * @param blocker the blocker task
3171 * @throws InterruptedException if {@code blocker.block()} did so
3172 */
3173 public static void managedBlock(ManagedBlocker blocker)
3174 throws InterruptedException {
3175 ForkJoinPool p;
3176 ForkJoinWorkerThread wt;
3177 WorkQueue w;
3178 Thread t = Thread.currentThread();
3179 if ((t instanceof ForkJoinWorkerThread) &&
3180 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3181 (w = wt.workQueue) != null) {
3182 int block;
3183 while (!blocker.isReleasable()) {
3184 if ((block = p.tryCompensate(w)) != 0) {
3185 try {
3186 do {} while (!blocker.isReleasable() &&
3187 !blocker.block());
3188 } finally {
3189 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3190 }
3191 break;
3192 }
3193 }
3194 }
3195 else {
3196 do {} while (!blocker.isReleasable() &&
3197 !blocker.block());
3198 }
3199 }
3200
3201 // AbstractExecutorService overrides. These rely on undocumented
3202 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3203 // implement RunnableFuture.
3204
3205 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3206 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3207 }
3208
3209 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3210 return new ForkJoinTask.AdaptedCallable<T>(callable);
3211 }
3212
3213 // Unsafe mechanics
3214 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3215 private static final long CTL;
3216 private static final long MODE;
3217 private static final int ABASE;
3218 private static final int ASHIFT;
3219
3220 static {
3221 try {
3222 CTL = U.objectFieldOffset
3223 (ForkJoinPool.class.getDeclaredField("ctl"));
3224 MODE = U.objectFieldOffset
3225 (ForkJoinPool.class.getDeclaredField("mode"));
3226 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3227 int scale = U.arrayIndexScale(ForkJoinTask[].class);
3228 if ((scale & (scale - 1)) != 0)
3229 throw new Error("array index scale not a power of two");
3230 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3231 } catch (ReflectiveOperationException e) {
3232 throw new Error(e);
3233 }
3234
3235 // Reduce the risk of rare disastrous classloading in first call to
3236 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3237 Class<?> ensureLoaded = LockSupport.class;
3238
3239 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3240 try {
3241 String p = System.getProperty
3242 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3243 if (p != null)
3244 commonMaxSpares = Integer.parseInt(p);
3245 } catch (Exception ignore) {}
3246 COMMON_MAX_SPARES = commonMaxSpares;
3247
3248 defaultForkJoinWorkerThreadFactory =
3249 new DefaultForkJoinWorkerThreadFactory();
3250 modifyThreadPermission = new RuntimePermission("modifyThread");
3251
3252 common = java.security.AccessController.doPrivileged
3253 (new java.security.PrivilegedAction<ForkJoinPool>() {
3254 public ForkJoinPool run() {
3255 return new ForkJoinPool((byte)0); }});
3256
3257 COMMON_PARALLELISM = common.mode & SMASK;
3258 }
3259
3260 /**
3261 * Factory for innocuous worker threads.
3262 */
3263 private static final class InnocuousForkJoinWorkerThreadFactory
3264 implements ForkJoinWorkerThreadFactory {
3265
3266 /**
3267 * An ACC to restrict permissions for the factory itself.
3268 * The constructed workers have no permissions set.
3269 */
3270 private static final AccessControlContext innocuousAcc;
3271 static {
3272 Permissions innocuousPerms = new Permissions();
3273 innocuousPerms.add(modifyThreadPermission);
3274 innocuousPerms.add(new RuntimePermission(
3275 "enableContextClassLoaderOverride"));
3276 innocuousPerms.add(new RuntimePermission(
3277 "modifyThreadGroup"));
3278 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3279 new ProtectionDomain(null, innocuousPerms)
3280 });
3281 }
3282
3283 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3284 return java.security.AccessController.doPrivileged(
3285 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3286 public ForkJoinWorkerThread run() {
3287 return new ForkJoinWorkerThread.
3288 InnocuousForkJoinWorkerThread(pool);
3289 }}, innocuousAcc);
3290 }
3291 }
3292
3293 }