ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.300
Committed: Mon Mar 14 13:48:40 2016 UTC (8 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.299: +1259 -1517 lines
Log Message:
Initial version with customized constructor

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6 package java.util.concurrent;
7
8 import java.lang.Thread.UncaughtExceptionHandler;
9 import java.security.AccessControlContext;
10 import java.security.Permissions;
11 import java.security.ProtectionDomain;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.CountedCompleter;
19 import java.util.concurrent.ForkJoinTask;
20 import java.util.concurrent.ForkJoinWorkerThread;
21 import java.util.concurrent.locks.LockSupport;
22
23 /**
24 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
25 * A {@code ForkJoinPool} provides the entry point for submissions
26 * from non-{@code ForkJoinTask} clients, as well as management and
27 * monitoring operations.
28 *
29 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
30 * ExecutorService} mainly by virtue of employing
31 * <em>work-stealing</em>: all threads in the pool attempt to find and
32 * execute tasks submitted to the pool and/or created by other active
33 * tasks (eventually blocking waiting for work if none exist). This
34 * enables efficient processing when most tasks spawn other subtasks
35 * (as do most {@code ForkJoinTask}s), as well as when many small
36 * tasks are submitted to the pool from external clients. Especially
37 * when setting <em>asyncMode</em> to true in constructors, {@code
38 * ForkJoinPool}s may also be appropriate for use with event-style
39 * tasks that are never joined.
40 *
41 * <p>A static {@link #commonPool()} is available and appropriate for
42 * most applications. The common pool is used by any ForkJoinTask that
43 * is not explicitly submitted to a specified pool. Using the common
44 * pool normally reduces resource usage (its threads are slowly
45 * reclaimed during periods of non-use, and reinstated upon subsequent
46 * use).
47 *
48 * <p>For applications that require separate or custom pools, a {@code
49 * ForkJoinPool} may be constructed with a given target parallelism
50 * level; by default, equal to the number of available processors.
51 * The pool attempts to maintain enough active (or available) threads
52 * by dynamically adding, suspending, or resuming internal worker
53 * threads, even if some tasks are stalled waiting to join others.
54 * However, no such adjustments are guaranteed in the face of blocked
55 * I/O or other unmanaged synchronization. The nested {@link
56 * ManagedBlocker} interface enables extension of the kinds of
57 * synchronization accommodated. The default policies may be
58 * overridden using a constructor with parameters corresponding to
59 * those documented in class {@link ThreadPoolExecutor}.
60 *
61 * <p>In addition to execution and lifecycle control methods, this
62 * class provides status check methods (for example
63 * {@link #getStealCount}) that are intended to aid in developing,
64 * tuning, and monitoring fork/join applications. Also, method
65 * {@link #toString} returns indications of pool state in a
66 * convenient form for informal monitoring.
67 *
68 * <p>As is the case with other ExecutorServices, there are three
69 * main task execution methods summarized in the following table.
70 * These are designed to be used primarily by clients not already
71 * engaged in fork/join computations in the current pool. The main
72 * forms of these methods accept instances of {@code ForkJoinTask},
73 * but overloaded forms also allow mixed execution of plain {@code
74 * Runnable}- or {@code Callable}- based activities as well. However,
75 * tasks that are already executing in a pool should normally instead
76 * use the within-computation forms listed in the table unless using
77 * async event-style tasks that are not usually joined, in which case
78 * there is little difference among choice of methods.
79 *
80 * <table BORDER CELLPADDING=3 CELLSPACING=1>
81 * <caption>Summary of task execution methods</caption>
82 * <tr>
83 * <td></td>
84 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
85 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
86 * </tr>
87 * <tr>
88 * <td> <b>Arrange async execution</b></td>
89 * <td> {@link #execute(ForkJoinTask)}</td>
90 * <td> {@link ForkJoinTask#fork}</td>
91 * </tr>
92 * <tr>
93 * <td> <b>Await and obtain result</b></td>
94 * <td> {@link #invoke(ForkJoinTask)}</td>
95 * <td> {@link ForkJoinTask#invoke}</td>
96 * </tr>
97 * <tr>
98 * <td> <b>Arrange exec and obtain Future</b></td>
99 * <td> {@link #submit(ForkJoinTask)}</td>
100 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
101 * </tr>
102 * </table>
103 *
104 * <p>The common pool is by default constructed with default
105 * parameters, but these may be controlled by setting three
106 * {@linkplain System#getProperty system properties}:
107 * <ul>
108 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
109 * - the parallelism level, a non-negative integer
110 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
111 * - the class name of a {@link ForkJoinWorkerThreadFactory}
112 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
113 * - the class name of a {@link UncaughtExceptionHandler}
114 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
115 * - the maximum number of allowed extra threads to maintain target
116 * parallelism (default 256).
117 * </ul>
118 * If a {@link SecurityManager} is present and no factory is
119 * specified, then the default pool uses a factory supplying
120 * threads that have no {@link Permissions} enabled.
121 * The system class loader is used to load these classes.
122 * Upon any error in establishing these settings, default parameters
123 * are used. It is possible to disable or limit the use of threads in
124 * the common pool by setting the parallelism property to zero, and/or
125 * using a factory that may return {@code null}. However doing so may
126 * cause unjoined tasks to never be executed.
127 *
128 * <p><b>Implementation notes</b>: This implementation restricts the
129 * maximum number of running threads to 32767. Attempts to create
130 * pools with greater than the maximum number result in
131 * {@code IllegalArgumentException}.
132 *
133 * <p>This implementation rejects submitted tasks (that is, by throwing
134 * {@link RejectedExecutionException}) only when the pool is shut down
135 * or internal resources have been exhausted.
136 *
137 * @since 1.7
138 * @author Doug Lea
139 */
140 public class ForkJoinPool extends AbstractExecutorService {
141
142 /*
143 * Implementation Overview
144 *
145 * This class and its nested classes provide the main
146 * functionality and control for a set of worker threads:
147 * Submissions from non-FJ threads enter into submission queues.
148 * Workers take these tasks and typically split them into subtasks
149 * that may be stolen by other workers. Preference rules give
150 * first priority to processing tasks from their own queues (LIFO
151 * or FIFO, depending on mode), then to randomized FIFO steals of
152 * tasks in other queues. This framework began as vehicle for
153 * supporting tree-structured parallelism using work-stealing.
154 * Over time, its scalability advantages led to extensions and
155 * changes to better support more diverse usage contexts. Because
156 * most internal methods and nested classes are interrelated,
157 * their main rationale and descriptions are presented here;
158 * individual methods and nested classes contain only brief
159 * comments about details.
160 *
161 * WorkQueues
162 * ==========
163 *
164 * Most operations occur within work-stealing queues (in nested
165 * class WorkQueue). These are special forms of Deques that
166 * support only three of the four possible end-operations -- push,
167 * pop, and poll (aka steal), under the further constraints that
168 * push and pop are called only from the owning thread (or, as
169 * extended here, under a lock), while poll may be called from
170 * other threads. (If you are unfamiliar with them, you probably
171 * want to read Herlihy and Shavit's book "The Art of
172 * Multiprocessor programming", chapter 16 describing these in
173 * more detail before proceeding.) The main work-stealing queue
174 * design is roughly similar to those in the papers "Dynamic
175 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
176 * (http://research.sun.com/scalable/pubs/index.html) and
177 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
178 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
179 * The main differences ultimately stem from GC requirements that
180 * we null out taken slots as soon as we can, to maintain as small
181 * a footprint as possible even in programs generating huge
182 * numbers of tasks. To accomplish this, we shift the CAS
183 * arbitrating pop vs poll (steal) from being on the indices
184 * ("base" and "top") to the slots themselves.
185 *
186 * Adding tasks then takes the form of a classic array push(task)
187 * in a circular buffer:
188 * q.array[q.top++ % length] = task;
189 *
190 * (The actual code needs to null-check and size-check the array,
191 * uses masking, not mod, for indexing a power-of-two-sized array,
192 * properly fences accesses, and possibly signals waiting workers
193 * to start scanning -- see below.) Both a successful pop and
194 * poll mainly entail a CAS of a slot from non-null to null.
195 *
196 * The pop operation (always performed by owner) is:
197 * if ((the task at top slot is not null) and
198 * (CAS slot to null))
199 * decrement top and return task;
200 *
201 * And the poll operation (usually by a stealer) is
202 * if ((the task at base slot is not null) and
203 * (CAS slot to null))
204 * increment base and return task;
205 *
206 * There are several variants of each of these. In particular,
207 * almost all uses of poll occur within scan operations that also
208 * interleave contention tracking (with associated code sprawl.)
209 *
210 * Memory ordering. See "Correct and Efficient Work-Stealing for
211 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
212 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
213 * analysis of memory ordering requirements in work-stealing
214 * algorithms similar to (but different than) the one used here.
215 * Extracting tasks in array slots via (fully fenced) CAS provides
216 * primary synchronization. The base and top indices imprecisely
217 * guide where to extract from. We do not always require strict
218 * orderings of array and index updates, so sometimes let them be
219 * subject to compiler and processor reorderings. However, the
220 * volatile "base" index also serves as a basis for memory
221 * ordering: Slot accesses are preceded by a read of base,
222 * ensuring happens-before ordering with respect to stealers (so
223 * the slots themselves can be read via plain array reads.) The
224 * only other memory orderings relied on are maintained in the
225 * course of signalling and activation (see below). A check that
226 * base == top indicates (momentary) emptiness, but otherwise may
227 * err on the side of possibly making the queue appear nonempty
228 * when a push, pop, or poll have not fully committed, or making
229 * it appear empty when an update of top has not yet been visibly
230 * written. (Method isEmpty() checks the case of a partially
231 * completed removal of the last element.) Because of this, the
232 * poll operation, considered individually, is not wait-free. One
233 * thief cannot successfully continue until another in-progress
234 * one (or, if previously empty, a push) visibly completes.
235 * However, in the aggregate, we ensure at least probabilistic
236 * non-blockingness. If an attempted steal fails, a scanning
237 * thief chooses a different random victim target to try next. So,
238 * in order for one thief to progress, it suffices for any
239 * in-progress poll or new push on any empty queue to
240 * complete.
241 *
242 * This approach also enables support of a user mode in which
243 * local task processing is in FIFO, not LIFO order, simply by
244 * using poll rather than pop. This can be useful in
245 * message-passing frameworks in which tasks are never joined.
246 *
247 * WorkQueues are also used in a similar way for tasks submitted
248 * to the pool. We cannot mix these tasks in the same queues used
249 * by workers. Instead, we randomly associate submission queues
250 * with submitting threads, using a form of hashing. The
251 * ThreadLocalRandom probe value serves as a hash code for
252 * choosing existing queues, and may be randomly repositioned upon
253 * contention with other submitters. In essence, submitters act
254 * like workers except that they are restricted to executing local
255 * tasks that they submitted. Insertion of tasks in shared mode
256 * requires a lock but we use only a simple spinlock (using field
257 * phase), because submitters encountering a busy queue move on to
258 * try or create other queues -- they block only when creating and
259 * registering new queues. Because it is used only as a spinlock,
260 * unlocking requires only a "releasing" store (using
261 * putOrderedInt).
262 *
263 * Management
264 * ==========
265 *
266 * The main throughput advantages of work-stealing stem from
267 * decentralized control -- workers mostly take tasks from
268 * themselves or each other, at rates that can exceed a billion
269 * per second. The pool itself creates, activates (enables
270 * scanning for and running tasks), deactivates, blocks, and
271 * terminates threads, all with minimal central information.
272 * There are only a few properties that we can globally track or
273 * maintain, so we pack them into a small number of variables,
274 * often maintaining atomicity without blocking or locking.
275 * Nearly all essentially atomic control state is held in a few
276 * volatile variables that are by far most often read (not
277 * written) as status and consistency checks. We pack as much
278 * information into them as we can.
279 *
280 * Field "ctl" contains 64 bits holding information needed to
281 * atomically decide to add, enqueue (on an event queue), and
282 * dequeue (and release)-activate workers. To enable this
283 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
284 * far in excess of normal operating range) to allow ids, counts,
285 * and their negations (used for thresholding) to fit into 16bit
286 * subfields.
287 *
288 * Field "mode" holds configuration parameters as well as lifetime
289 * status, atomically and monotonically setting SHUTDOWN, STOP,
290 * and finally TERMINATED bits.
291 *
292 * Field "workQueues" holds references to WorkQueues. It is
293 * updated (only during worker creation and termination) under
294 * lock (using field workerNamePrefix as lock), but is otherwise
295 * concurrently readable, and accessed directly. We also ensure
296 * that uses of the array reference itself never become too stale
297 * in case of resizing. To simplify index-based operations, the
298 * array size is always a power of two, and all readers must
299 * tolerate null slots. Worker queues are at odd indices. Shared
300 * (submission) queues are at even indices, up to a maximum of 64
301 * slots, to limit growth even if array needs to expand to add
302 * more workers. Grouping them together in this way simplifies and
303 * speeds up task scanning.
304 *
305 * All worker thread creation is on-demand, triggered by task
306 * submissions, replacement of terminated workers, and/or
307 * compensation for blocked workers. However, all other support
308 * code is set up to work with other policies. To ensure that we
309 * do not hold on to worker references that would prevent GC, all
310 * accesses to workQueues are via indices into the workQueues
311 * array (which is one source of some of the messy code
312 * constructions here). In essence, the workQueues array serves as
313 * a weak reference mechanism. Thus for example the stack top
314 * subfield of ctl stores indices, not references.
315 *
316 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
317 * cannot let workers spin indefinitely scanning for tasks when
318 * none can be found immediately, and we cannot start/resume
319 * workers unless there appear to be tasks available. On the
320 * other hand, we must quickly prod them into action when new
321 * tasks are submitted or generated. In many usages, ramp-up time
322 * is the main limiting factor in overall performance, which is
323 * compounded at program start-up by JIT compilation and
324 * allocation. So we streamline this as much as possible.
325 *
326 * The "ctl" field atomically maintains total worker and
327 * "released" worker counts, plus the head of the available worker
328 * queue (actually stack, represented by the lower 32bit subfield
329 * of ctl). Released workers are those known to be scanning for
330 * and/or running tasks. Unreleased ("available") workers are
331 * recorded in the ctl stack. These workers are made available for
332 * signalling by enqueuing in ctl (see method runWorker). The
333 * "queue" is a form of Treiber stack. This is ideal for
334 * activating threads in most-recently used order, and improves
335 * performance and locality, outweighing the disadvantages of
336 * being prone to contention and inability to release a worker
337 * unless it is topmost on stack. To avoid missed signal problems
338 * inherent in any wait/signal design, available workers rescan
339 * for (and if found run) tasks after enqueuing. Normally their
340 * release status will be updated while doing so, but the released
341 * worker ctl count may underestimate the number of active
342 * threads. (However, it is still possible to determine quiescence
343 * via a validation traversal -- see isQuiescent). After an
344 * unsuccessful rescan, available workers are blocked until
345 * signalled (see signalWork). The top stack state holds the
346 * value of the "phase" field of the worker: its index and status,
347 * plus a version counter that, in addition to the count subfields
348 * (also serving as version stamps) provide protection against
349 * Treiber stack ABA effects.
350 *
351 * Creating workers. To create a worker, we pre-increment counts
352 * (serving as a reservation), and attempt to construct a
353 * ForkJoinWorkerThread via its factory. Upon construction, the
354 * new thread invokes registerWorker, where it constructs a
355 * WorkQueue and is assigned an index in the workQueues array
356 * (expanding the array if necessary). The thread is then started.
357 * Upon any exception across these steps, or null return from
358 * factory, deregisterWorker adjusts counts and records
359 * accordingly. If a null return, the pool continues running with
360 * fewer than the target number workers. If exceptional, the
361 * exception is propagated, generally to some external caller.
362 * Worker index assignment avoids the bias in scanning that would
363 * occur if entries were sequentially packed starting at the front
364 * of the workQueues array. We treat the array as a simple
365 * power-of-two hash table, expanding as needed. The seedIndex
366 * increment ensures no collisions until a resize is needed or a
367 * worker is deregistered and replaced, and thereafter keeps
368 * probability of collision low. We cannot use
369 * ThreadLocalRandom.getProbe() for similar purposes here because
370 * the thread has not started yet, but do so for creating
371 * submission queues for existing external threads (see
372 * externalPush).
373 *
374 * WorkQueue field "phase" is used by both workers and the pool to
375 * manage and track whether a worker is UNSIGNALLED (possibly
376 * blocked waiting for a signal). When a worker is enqueued its
377 * phase field is set. Note that phase field updates lag queue CAS
378 * releases so usage requires care -- seeing a negative phase does
379 * not guarantee that the worker is available. When queued, the
380 * lower 16 bits of scanState must hold its pool index. So we
381 * place the index there upon initialization (see registerWorker)
382 * and otherwise keep it there or restore it when necessary.
383 *
384 * The ctl field also serves as the basis for memory
385 * synchronization surrounding activation. This uses a more
386 * efficient version of a Dekker-like rule that task producers and
387 * consumers sync with each other by both writing/CASing ctl (even
388 * if to its current value). This would be extremely costly. So
389 * we relax it in several ways: (1) Producers only signal when
390 * their queue is empty. Other workers propagate this signal (in
391 * method scan) when they find tasks; to further reduce flailing,
392 * each worker signals only one other per activation. (2) Workers
393 * only enqueue after scanning (see below) and not finding any
394 * tasks. (3) Rather than CASing ctl to its current value in the
395 * common case where no action is required, we reduce write
396 * contention by equivalently prefacing signalWork when called by
397 * an external task producer using a memory access with
398 * full-volatile semantics or a "fullFence".
399 *
400 * Almost always, too many signals are issued. A task producer
401 * cannot in general tell if some existing worker is in the midst
402 * of finishing one task (or already scanning) and ready to take
403 * another without being signalled. So the producer might instead
404 * activate a different worker that does not find any work, and
405 * then inactivates. This scarcely matters in steady-state
406 * computations involving all workers, but can create contention
407 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
408 * computations involving only a few workers.
409 *
410 * Scanning. Method runWorker performs top-level scanning for
411 * tasks. Each scan traverses and tries to poll from each queue
412 * starting at a random index and circularly stepping. Scans are
413 * not performed in ideal random permutation order, to reduce
414 * cacheline contention. The pseudorandom generator need not have
415 * high-quality statistical properties in the long term, but just
416 * within computations; We use Marsaglia XorShifts (often via
417 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
418 * suffice. Scanning also employs contention reduction: When
419 * scanning workers fail to extract an apparently existing task,
420 * they soon restart at a different pseudorandom index. This
421 * improves throughput when many threads are trying to take tasks
422 * from few queues, which can be common in some usages. Scans do
423 * not otherwise explicitly take into account core affinities,
424 * loads, cache localities, etc, However, they do exploit temporal
425 * locality (which usually approximates these) by preferring to
426 * re-poll (at most #workers times) from the same queue after a
427 * successful poll before trying others.
428 *
429 * Trimming workers. To release resources after periods of lack of
430 * use, a worker starting to wait when the pool is quiescent will
431 * time out and terminate (see method scan) if the pool has
432 * remained quiescent for period given by field keepAlive.
433 *
434 * Shutdown and Termination. A call to shutdownNow invokes
435 * tryTerminate to atomically set a runState bit. The calling
436 * thread, as well as every other worker thereafter terminating,
437 * helps terminate others by cancelling their unprocessed tasks,
438 * and waking them up, doing so repeatedly until stable. Calls to
439 * non-abrupt shutdown() preface this by checking whether
440 * termination should commence by sweeping through queues (until
441 * stable) to ensure lack of in-flight submissions and workers
442 * about to process them before triggering the "STOP" phase of
443 * termination.
444 *
445 * Joining Tasks
446 * =============
447 *
448 * Any of several actions may be taken when one worker is waiting
449 * to join a task stolen (or always held) by another. Because we
450 * are multiplexing many tasks on to a pool of workers, we can't
451 * always just let them block (as in Thread.join). We also cannot
452 * just reassign the joiner's run-time stack with another and
453 * replace it later, which would be a form of "continuation", that
454 * even if possible is not necessarily a good idea since we may
455 * need both an unblocked task and its continuation to progress.
456 * Instead we combine two tactics:
457 *
458 * Helping: Arranging for the joiner to execute some task that it
459 * would be running if the steal had not occurred.
460 *
461 * Compensating: Unless there are already enough live threads,
462 * method tryCompensate() may create or re-activate a spare
463 * thread to compensate for blocked joiners until they unblock.
464 *
465 * A third form (implemented in tryRemoveAndExec) amounts to
466 * helping a hypothetical compensator: If we can readily tell that
467 * a possible action of a compensator is to steal and execute the
468 * task being joined, the joining thread can do so directly,
469 * without the need for a compensation thread.
470 *
471 * The ManagedBlocker extension API can't use helping so relies
472 * only on compensation in method awaitBlocker.
473 *
474 * The algorithm in awaitJoin entails a form of "linear helping".
475 * Each worker records (in field source) the id of the queue from
476 * which it last stole a task. The scan in method awaitJoin uses
477 * these markers to try to find a worker to help (i.e., steal back
478 * a task from and execute it) that could hasten completion of the
479 * actively joined task. Thus, the joiner executes a task that
480 * would be on its own local deque if the to-be-joined task had
481 * not been stolen. This is a conservative variant of the approach
482 * described in Wagner & Calder "Leapfrogging: a portable
483 * technique for implementing efficient futures" SIGPLAN Notices,
484 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
485 * mainly in that we only record queue ids, not full dependency
486 * links. This requires a linear scan of the workQueues array to
487 * locate stealers, but isolates cost to when it is needed, rather
488 * than adding to per-task overhead. Searches can fail to locate
489 * stealers GC stalls and the like delay recording sources.
490 * Further, even when accurately identified, stealers might not
491 * ever produce a task that the joiner can in turn help with. So,
492 * compensation is tried upon failure to find tasks to run.
493 *
494 * Compensation does not by default aim to keep exactly the target
495 * parallelism number of unblocked threads running at any given
496 * time. Some previous versions of this class employed immediate
497 * compensations for any blocked join. However, in practice, the
498 * vast majority of blockages are transient byproducts of GC and
499 * other JVM or OS activities that are made worse by replacement.
500 * Rather than impose arbitrary policies, we allow users to
501 * override the default of only adding threads upon apparent
502 * starvation. The compensation mechanism may also be bounded.
503 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
504 * JVMs to cope with programming errors and abuse before running
505 * out of resources to do so.
506
507 * Common Pool
508 * ===========
509 *
510 * The static common pool always exists after static
511 * initialization. Since it (or any other created pool) need
512 * never be used, we minimize initial construction overhead and
513 * footprint to the setup of about a dozen fields.
514 *
515 * When external threads submit to the common pool, they can
516 * perform subtask processing (see externalHelpComplete and
517 * related methods) upon joins. This caller-helps policy makes it
518 * sensible to set common pool parallelism level to one (or more)
519 * less than the total number of available cores, or even zero for
520 * pure caller-runs. We do not need to record whether external
521 * submissions are to the common pool -- if not, external help
522 * methods return quickly. These submitters would otherwise be
523 * blocked waiting for completion, so the extra effort (with
524 * liberally sprinkled task status checks) in inapplicable cases
525 * amounts to an odd form of limited spin-wait before blocking in
526 * ForkJoinTask.join.
527 *
528 * As a more appropriate default in managed environments, unless
529 * overridden by system properties, we use workers of subclass
530 * InnocuousForkJoinWorkerThread when there is a SecurityManager
531 * present. These workers have no permissions set, do not belong
532 * to any user-defined ThreadGroup, and erase all ThreadLocals
533 * after executing any top-level task (see
534 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
535 * in ForkJoinWorkerThread) may be JVM-dependent and must access
536 * particular Thread class fields to achieve this effect.
537 *
538 * Style notes
539 * ===========
540 *
541 * Memory ordering relies mainly on Unsafe intrinsics that carry
542 * the further responsibility of explicitly performing null- and
543 * bounds- checks otherwise carried out implicitly by JVMs. This
544 * can be awkward and ugly, but also reflects the need to control
545 * outcomes across the unusual cases that arise in very racy code
546 * with very few invariants. So these explicit checks would exist
547 * in some form anyway. All fields are read into locals before
548 * use, and null-checked if they are references. This is usually
549 * done in a "C"-like style of listing declarations at the heads
550 * of methods or blocks, and using inline assignments on first
551 * encounter. Array bounds-checks are usually performed by
552 * masking with array.length-1, which relies on the invariant that
553 * these arrays are created with positive lengths, which is itself
554 * paranoically checked. Nearly all explicit checks lead to
555 * bypass/return, not exception throws, because they may
556 * legitimately arise due to cancellation/revocation during
557 * shutdown.
558 *
559 * There is a lot of representation-level coupling among classes
560 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
561 * fields of WorkQueue maintain data structures managed by
562 * ForkJoinPool, so are directly accessed. There is little point
563 * trying to reduce this, since any associated future changes in
564 * representations will need to be accompanied by algorithmic
565 * changes anyway. Several methods intrinsically sprawl because
566 * they must accumulate sets of consistent reads of fields held in
567 * local variables. There are also other coding oddities
568 * (including several unnecessary-looking hoisted null checks)
569 * that help some methods perform reasonably even when interpreted
570 * (not compiled).
571 *
572 * The order of declarations in this file is (with a few exceptions):
573 * (1) Static utility functions
574 * (2) Nested (static) classes
575 * (3) Static fields
576 * (4) Fields, along with constants used when unpacking some of them
577 * (5) Internal control methods
578 * (6) Callbacks and other support for ForkJoinTask methods
579 * (7) Exported methods
580 * (8) Static block initializing statics in minimally dependent order
581 */
582
583 // Static utilities
584
585 /**
586 * If there is a security manager, makes sure caller has
587 * permission to modify threads.
588 */
589 private static void checkPermission() {
590 SecurityManager security = System.getSecurityManager();
591 if (security != null)
592 security.checkPermission(modifyThreadPermission);
593 }
594
595 // Nested classes
596
597 /**
598 * Factory for creating new {@link ForkJoinWorkerThread}s.
599 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
600 * for {@code ForkJoinWorkerThread} subclasses that extend base
601 * functionality or initialize threads with different contexts.
602 */
603 public static interface ForkJoinWorkerThreadFactory {
604 /**
605 * Returns a new worker thread operating in the given pool.
606 * Returning null or throwing an exception may result in tasks
607 * never being executed. If this method throws an exception,
608 * it is relayed to the caller of the method (for example
609 * {@code execute}) causing attempted thread creation. If this
610 * method returns null or throws an exception, it is not
611 * retried until the next attempted creation (for example
612 * another call to {@code execute}).
613 *
614 * @param pool the pool this thread works in
615 * @return the new worker thread, or {@code null} if the request
616 * to create a thread is rejected.
617 * @throws NullPointerException if the pool is null
618 */
619 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
620 }
621
622 /**
623 * Default ForkJoinWorkerThreadFactory implementation; creates a
624 * new ForkJoinWorkerThread.
625 */
626 private static final class DefaultForkJoinWorkerThreadFactory
627 implements ForkJoinWorkerThreadFactory {
628 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
629 return new ForkJoinWorkerThread(pool);
630 }
631 }
632
633 // Constants shared across ForkJoinPool and WorkQueue
634
635 // Bounds
636 static final int SWIDTH = 16; // width of short
637 static final int SMASK = 0xffff; // short bits == max index
638 static final int MAX_CAP = 0x7fff; // max #workers - 1
639 static final int SQMASK = 0x007e; // max 64 (even) slots
640
641 // Masks and units for WorkQueue.phase and ctl sp subfield
642 static final int UNSIGNALLED = 1 << 31; // must be negative
643 static final int SS_SEQ = 1 << 16; // version count
644 static final int QLOCK = 1; // must be 1
645
646 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
647 static final int OWNED = 1; // queue has owner thread
648 static final int FIFO = 1 << 16; // fifo queue or access mode
649 static final int SATURATE = 1 << 17; // for tryCompensate
650 static final int SHUTDOWN = 1 << 18;
651 static final int TERMINATED = 1 << 19;
652 static final int STOP = 1 << 31; // must be negative
653 static final int QUIET = 1 << 30; // not scanning or working
654 static final int DORMANT = QUIET | UNSIGNALLED;
655
656 /**
657 * The maximum number of local polls from the same queue before
658 * checking others. This is a safeguard against infinitely unfair
659 * looping under unbounded user task recursion, and must be larger
660 * than plausible cases of intentional bounded task recursion.
661 */
662 static final int POLL_LIMIT = 1 << 10;
663
664 /**
665 * Queues supporting work-stealing as well as external task
666 * submission. See above for descriptions and algorithms.
667 * Performance on most platforms is very sensitive to placement of
668 * instances of both WorkQueues and their arrays -- we absolutely
669 * do not want multiple WorkQueue instances or multiple queue
670 * arrays sharing cache lines. The @Contended annotation alerts
671 * JVMs to try to keep instances apart.
672 */
673 // For now, using manual padding.
674 // @jdk.internal.vm.annotation.Contended
675 // @sun.misc.Contended
676 static final class WorkQueue {
677
678 /**
679 * Capacity of work-stealing queue array upon initialization.
680 * Must be a power of two; at least 4, but should be larger to
681 * reduce or eliminate cacheline sharing among queues.
682 * Currently, it is much larger, as a partial workaround for
683 * the fact that JVMs often place arrays in locations that
684 * share GC bookkeeping (especially cardmarks) such that
685 * per-write accesses encounter serious memory contention.
686 */
687 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
688
689 /**
690 * Maximum size for queue arrays. Must be a power of two less
691 * than or equal to 1 << (31 - width of array entry) to ensure
692 * lack of wraparound of index calculations, but defined to a
693 * value a bit less than this to help users trap runaway
694 * programs before saturating systems.
695 */
696 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
697
698 // Instance fields
699 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
700 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
701 volatile int phase; // versioned, negative: queued, 1: locked
702 int stackPred; // pool stack (ctl) predecessor link
703 int nsteals; // number of steals
704 int id; // index, mode, tag
705 volatile int source; // source queue id, or sentinel
706 volatile int base; // index of next slot for poll
707 int top; // index of next slot for push
708 ForkJoinTask<?>[] array; // the elements (initially unallocated)
709 final ForkJoinPool pool; // the containing pool (may be null)
710 final ForkJoinWorkerThread owner; // owning thread or null if shared
711 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
712 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
713
714 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
715 this.pool = pool;
716 this.owner = owner;
717 // Place indices in the center of array (that is not yet allocated)
718 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
719 }
720
721 /**
722 * Returns an exportable index (used by ForkJoinWorkerThread).
723 */
724 final int getPoolIndex() {
725 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
726 }
727
728 /**
729 * Returns the approximate number of tasks in the queue.
730 */
731 final int queueSize() {
732 int n = base - top; // read base first
733 return (n >= 0) ? 0 : -n; // ignore transient negative
734 }
735
736 /**
737 * Provides a more accurate estimate of whether this queue has
738 * any tasks than does queueSize, by checking whether a
739 * near-empty queue has at least one unclaimed task.
740 */
741 final boolean isEmpty() {
742 ForkJoinTask<?>[] a; int n, al, b;
743 return ((n = (b = base) - top) >= 0 || // possibly one task
744 (n == -1 && ((a = array) == null ||
745 (al = a.length) == 0 ||
746 a[(al - 1) & b] == null)));
747 }
748
749
750 /**
751 * Pushes a task. Call only by owner in unshared queues.
752 *
753 * @param task the task. Caller must ensure non-null.
754 * @throws RejectedExecutionException if array cannot be resized
755 */
756 final void push(ForkJoinTask<?> task) {
757 int s = top; ForkJoinTask<?>[] a; int al, d;
758 if ((a = array) != null && (al = a.length) > 0) {
759 int index = (al - 1) & s;
760 long offset = ((long)index << ASHIFT) + ABASE;
761 ForkJoinPool p = pool;
762 top = s + 1;
763 U.putOrderedObject(a, offset, task);
764 if ((d = base - s) == 0 && p != null) {
765 U.fullFence();
766 p.signalWork();
767 }
768 else if (d + al == 1)
769 growArray();
770 }
771 }
772
773 /**
774 * Initializes or doubles the capacity of array. Call either
775 * by owner or with lock held -- it is OK for base, but not
776 * top, to move while resizings are in progress.
777 */
778 final ForkJoinTask<?>[] growArray() {
779 ForkJoinTask<?>[] oldA = array;
780 int oldSize = oldA != null ? oldA.length : 0;
781 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
782 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
783 throw new RejectedExecutionException("Queue capacity exceeded");
784 int oldMask, t, b;
785 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
786 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
787 (t = top) - (b = base) > 0) {
788 int mask = size - 1;
789 do { // emulate poll from old array, push to new array
790 int index = b & oldMask;
791 long offset = ((long)index << ASHIFT) + ABASE;
792 ForkJoinTask<?> x = (ForkJoinTask<?>)
793 U.getObjectVolatile(oldA, offset);
794 if (x != null &&
795 U.compareAndSwapObject(oldA, offset, x, null))
796 a[b & mask] = x;
797 } while (++b != t);
798 U.storeFence();
799 }
800 return a;
801 }
802
803 /**
804 * Takes next task, if one exists, in LIFO order. Call only
805 * by owner in unshared queues.
806 */
807 final ForkJoinTask<?> pop() {
808 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
809 if ((a = array) != null && b != s && (al = a.length) > 0) {
810 int index = (al - 1) & --s;
811 long offset = ((long)index << ASHIFT) + ABASE;
812 ForkJoinTask<?> t = (ForkJoinTask<?>)
813 U.getObject(a, offset);
814 if (t != null &&
815 U.compareAndSwapObject(a, offset, t, null)) {
816 top = s;
817 U.storeFence();
818 return t;
819 }
820 }
821 return null;
822 }
823
824 /**
825 * Takes next task, if one exists, in FIFO order.
826 */
827 final ForkJoinTask<?> poll() {
828 for (;;) {
829 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
830 if ((a = array) != null && (d = b - s) < 0 &&
831 (al = a.length) > 0) {
832 int index = (al - 1) & b;
833 long offset = ((long)index << ASHIFT) + ABASE;
834 ForkJoinTask<?> t = (ForkJoinTask<?>)
835 U.getObjectVolatile(a, offset);
836 if (b++ == base) {
837 if (t != null) {
838 if (U.compareAndSwapObject(a, offset, t, null)) {
839 base = b;
840 return t;
841 }
842 }
843 else if (d == -1)
844 break; // now empty
845 }
846 }
847 else
848 break;
849 }
850 return null;
851 }
852
853 /**
854 * Takes next task, if one exists, in order specified by mode.
855 */
856 final ForkJoinTask<?> nextLocalTask() {
857 return ((id & FIFO) != 0) ? poll() : pop();
858 }
859
860 /**
861 * Returns next task, if one exists, in order specified by mode.
862 */
863 final ForkJoinTask<?> peek() {
864 int al; ForkJoinTask<?>[] a;
865 return ((a = array) != null && (al = a.length) > 0) ?
866 a[(al - 1) &
867 ((id & FIFO) != 0 ? base : top - 1)] : null;
868 }
869
870 /**
871 * Pops the given task only if it is at the current top.
872 */
873 final boolean tryUnpush(ForkJoinTask<?> task) {
874 int b = base, s = top, al; ForkJoinTask<?>[] a;
875 if ((a = array) != null && b != s && (al = a.length) > 0) {
876 int index = (al - 1) & --s;
877 long offset = ((long)index << ASHIFT) + ABASE;
878 if (U.compareAndSwapObject(a, offset, task, null)) {
879 top = s;
880 U.storeFence();
881 return true;
882 }
883 }
884 return false;
885 }
886
887 /**
888 * Removes and cancels all known tasks, ignoring any exceptions.
889 */
890 final void cancelAll() {
891 for (ForkJoinTask<?> t; (t = poll()) != null; )
892 ForkJoinTask.cancelIgnoringExceptions(t);
893 }
894
895 // Specialized execution methods
896
897 /**
898 * Pops and executes up to limit consecutive tasks or until empty.
899 *
900 * @param limit max runs, or zero for no limit
901 */
902 final void localPopAndExec(int limit) {
903 for (;;) {
904 int b = base, s = top, al; ForkJoinTask<?>[] a;
905 if ((a = array) != null && b != s && (al = a.length) > 0) {
906 int index = (al - 1) & --s;
907 long offset = ((long)index << ASHIFT) + ABASE;
908 ForkJoinTask<?> t = (ForkJoinTask<?>)
909 U.getAndSetObject(a, offset, null);
910 if (t != null) {
911 top = s;
912 U.storeFence();
913 t.doExec();
914 if (limit != 0 && --limit == 0)
915 break;
916 }
917 else
918 break;
919 }
920 else
921 break;
922 }
923 }
924
925 /**
926 * Polls and executes up to limit consecutive tasks or until empty.
927 *
928 * @param limit, or zero for no limit
929 */
930 final void localPollAndExec(int limit) {
931 for (int polls = 0;;) {
932 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
933 if ((a = array) != null && (d = b - s) < 0 &&
934 (al = a.length) > 0) {
935 int index = (al - 1) & b++;
936 long offset = ((long)index << ASHIFT) + ABASE;
937 ForkJoinTask<?> t = (ForkJoinTask<?>)
938 U.getAndSetObject(a, offset, null);
939 if (t != null) {
940 base = b;
941 t.doExec();
942 if (limit != 0 && ++polls == limit)
943 break;
944 }
945 else if (d == -1)
946 break; // now empty
947 else
948 polls = 0; // stolen; reset
949 }
950 else
951 break;
952 }
953 }
954
955 /**
956 * If present, removes task from queue and executes
957 */
958 final void tryRemoveAndExec(ForkJoinTask<?> task) {
959 ForkJoinTask<?>[] wa; int s, wal;
960 if (base - (s = top) < 0 && // traverse from top
961 (wa = array) != null && (wal = wa.length) > 0) {
962 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
963 int index = i & m;
964 long offset = (index << ASHIFT) + ABASE;
965 ForkJoinTask<?> t = (ForkJoinTask<?>)
966 U.getObject(wa, offset);
967 if (t == null)
968 break;
969 else if (t == task) {
970 if (U.compareAndSwapObject(wa, offset, t, null)) {
971 top = ns; // safely shift down
972 for (int j = i; j != ns; ++j) {
973 ForkJoinTask<?> f;
974 int pindex = (j + 1) & m;
975 long pOffset = (pindex << ASHIFT) + ABASE;
976 f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
977 U.putObjectVolatile(wa, pOffset, null);
978
979 int jindex = j & m;
980 long jOffset = (jindex << ASHIFT) + ABASE;
981 U.putOrderedObject(wa, jOffset, f);
982 }
983 U.storeFence();
984 t.doExec();
985 }
986 break;
987 }
988 }
989 }
990 }
991
992 /**
993 * Tries to steal and run tasks within the target's
994 * computation until done, not found, or limit exceeded
995 *
996 * @param task root of CountedCompleter computation
997 * @param limit max runs, or zero for no limit
998 * @return task status on exit
999 */
1000 final int localHelpCC(CountedCompleter<?> task, int limit) {
1001 int status = 0;
1002 if (task != null && (status = task.status) >= 0) {
1003 for (;;) {
1004 boolean help = false;
1005 int b = base, s = top, al; ForkJoinTask<?>[] a;
1006 if ((a = array) != null && b != s && (al = a.length) > 0) {
1007 int index = (al - 1) & (s - 1);
1008 long offset = ((long)index << ASHIFT) + ABASE;
1009 ForkJoinTask<?> o = (ForkJoinTask<?>)
1010 U.getObject(a, offset);
1011 if (o instanceof CountedCompleter) {
1012 CountedCompleter<?> t = (CountedCompleter<?>)o;
1013 for (CountedCompleter<?> f = t;;) {
1014 if (f != task) {
1015 if ((f = f.completer) == null) // try parent
1016 break;
1017 }
1018 else {
1019 if (U.compareAndSwapObject(a, offset,
1020 t, null)) {
1021 top = s - 1;
1022 U.storeFence();
1023 t.doExec();
1024 help = true;
1025 }
1026 break;
1027 }
1028 }
1029 }
1030 }
1031 if ((status = task.status) < 0 || !help ||
1032 (limit != 0 && --limit == 0))
1033 break;
1034 }
1035 }
1036 return status;
1037 }
1038
1039 // Operations on shared queues
1040
1041 /**
1042 * Tries to lock shared queue by CASing phase field
1043 */
1044 final boolean tryLockSharedQueue() {
1045 return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1046 }
1047
1048 /**
1049 * Shared version of tryUnpush.
1050 */
1051 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1052 boolean popped = false;
1053 int s = top - 1, al; ForkJoinTask<?>[] a;
1054 if ((a = array) != null && (al = a.length) > 0) {
1055 int index = (al - 1) & s;
1056 long offset = ((long)index << ASHIFT) + ABASE;
1057 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1058 if (t == task &&
1059 U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1060 if (top == s + 1 && array == a &&
1061 U.compareAndSwapObject(a, offset, task, null)) {
1062 popped = true;
1063 top = s;
1064 }
1065 U.putOrderedInt(this, PHASE, 0);
1066 }
1067 }
1068 return popped;
1069 }
1070
1071 /**
1072 * Shared version of localHelpCC.
1073 */
1074 final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1075 int status = 0;
1076 if (task != null && (status = task.status) >= 0) {
1077 for (;;) {
1078 boolean help = false;
1079 int b = base, s = top, al; ForkJoinTask<?>[] a;
1080 if ((a = array) != null && b != s && (al = a.length) > 0) {
1081 int index = (al - 1) & (s - 1);
1082 long offset = ((long)index << ASHIFT) + ABASE;
1083 ForkJoinTask<?> o = (ForkJoinTask<?>)
1084 U.getObject(a, offset);
1085 if (o instanceof CountedCompleter) {
1086 CountedCompleter<?> t = (CountedCompleter<?>)o;
1087 for (CountedCompleter<?> f = t;;) {
1088 if (f != task) {
1089 if ((f = f.completer) == null)
1090 break;
1091 }
1092 else {
1093 if (U.compareAndSwapInt(this, PHASE,
1094 0, QLOCK)) {
1095 if (top == s && array == a &&
1096 U.compareAndSwapObject(a, offset,
1097 t, null)) {
1098 help = true;
1099 top = s - 1;
1100 }
1101 U.putOrderedInt(this, PHASE, 0);
1102 if (help)
1103 t.doExec();
1104 }
1105 break;
1106 }
1107 }
1108 }
1109 }
1110 if ((status = task.status) < 0 || !help ||
1111 (limit != 0 && --limit == 0))
1112 break;
1113 }
1114 }
1115 return status;
1116 }
1117
1118 /**
1119 * Returns true if owned and not known to be blocked.
1120 */
1121 final boolean isApparentlyUnblocked() {
1122 Thread wt; Thread.State s;
1123 return ((wt = owner) != null &&
1124 (s = wt.getState()) != Thread.State.BLOCKED &&
1125 s != Thread.State.WAITING &&
1126 s != Thread.State.TIMED_WAITING);
1127 }
1128
1129 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1130 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1131 private static final long PHASE;
1132 private static final int ABASE;
1133 private static final int ASHIFT;
1134 static {
1135 try {
1136 PHASE = U.objectFieldOffset
1137 (WorkQueue.class.getDeclaredField("phase"));
1138 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1139 int scale = U.arrayIndexScale(ForkJoinTask[].class);
1140 if ((scale & (scale - 1)) != 0)
1141 throw new Error("array index scale not a power of two");
1142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1143 } catch (ReflectiveOperationException e) {
1144 throw new Error(e);
1145 }
1146 }
1147 }
1148
1149 // static fields (initialized in static initializer below)
1150
1151 /**
1152 * Creates a new ForkJoinWorkerThread. This factory is used unless
1153 * overridden in ForkJoinPool constructors.
1154 */
1155 public static final ForkJoinWorkerThreadFactory
1156 defaultForkJoinWorkerThreadFactory;
1157
1158 /**
1159 * Permission required for callers of methods that may start or
1160 * kill threads.
1161 */
1162 static final RuntimePermission modifyThreadPermission;
1163
1164 /**
1165 * Common (static) pool. Non-null for public use unless a static
1166 * construction exception, but internal usages null-check on use
1167 * to paranoically avoid potential initialization circularities
1168 * as well as to simplify generated code.
1169 */
1170 static final ForkJoinPool common;
1171
1172 /**
1173 * Common pool parallelism. To allow simpler use and management
1174 * when common pool threads are disabled, we allow the underlying
1175 * common.parallelism field to be zero, but in that case still report
1176 * parallelism as 1 to reflect resulting caller-runs mechanics.
1177 */
1178 static final int COMMON_PARALLELISM;
1179
1180 /**
1181 * Limit on spare thread construction in tryCompensate.
1182 */
1183 private static final int COMMON_MAX_SPARES;
1184
1185 /**
1186 * Sequence number for creating workerNamePrefix.
1187 */
1188 private static int poolNumberSequence;
1189
1190 /**
1191 * Returns the next sequence number. We don't expect this to
1192 * ever contend, so use simple builtin sync.
1193 */
1194 private static final synchronized int nextPoolId() {
1195 return ++poolNumberSequence;
1196 }
1197
1198 // static configuration constants
1199
1200 /**
1201 * Default idle timeout value (in milliseconds) for the thread
1202 * triggering quiescence to park waiting for new work
1203 */
1204 private static final long DEFAULT_KEEPALIVE = 60000L;
1205
1206 /**
1207 * Undershoot tolerance for idle timeouts
1208 */
1209 private static final long TIMEOUT_SLOP = 20L;
1210
1211 /**
1212 * The default value for COMMON_MAX_SPARES. Overridable using the
1213 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1214 * property. The default value is far in excess of normal
1215 * requirements, but also far short of MAX_CAP and typical OS
1216 * thread limits, so allows JVMs to catch misuse/abuse before
1217 * running out of resources needed to do so.
1218 */
1219 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1220
1221 /**
1222 * Increment for seed generators. See class ThreadLocal for
1223 * explanation.
1224 */
1225 private static final int SEED_INCREMENT = 0x9e3779b9;
1226
1227 /*
1228 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1229 * RC: Number of released (unqueued) workers minus target parallelism
1230 * TC: Number of total workers minus target parallelism
1231 * SS: version count and status of top waiting thread
1232 * ID: poolIndex of top of Treiber stack of waiters
1233 *
1234 * When convenient, we can extract the lower 32 stack top bits
1235 * (including version bits) as sp=(int)ctl. The offsets of counts
1236 * by the target parallelism and the positionings of fields makes
1237 * it possible to perform the most common checks via sign tests of
1238 * fields: When ac is negative, there are not enough unqueued
1239 * workers, when tc is negative, there are not enough total
1240 * workers. When sp is non-zero, there are waiting workers. To
1241 * deal with possibly negative fields, we use casts in and out of
1242 * "short" and/or signed shifts to maintain signedness.
1243 *
1244 * Because it occupies uppermost bits, we can add one release count
1245 * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1246 * from a blocked join. Other updates entail multiple subfields
1247 * and masking, requiring CAS.
1248 *
1249 * The limits packed in field "bounds" are also offset by the
1250 * parallelism level to make them comparable to the ctl rc and tc
1251 * fields.
1252 */
1253
1254 // Lower and upper word masks
1255 private static final long SP_MASK = 0xffffffffL;
1256 private static final long UC_MASK = ~SP_MASK;
1257
1258 // Release counts
1259 private static final int RC_SHIFT = 48;
1260 private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1261 private static final long RC_MASK = 0xffffL << RC_SHIFT;
1262
1263 // Total counts
1264 private static final int TC_SHIFT = 32;
1265 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1266 private static final long TC_MASK = 0xffffL << TC_SHIFT;
1267 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1268
1269 // Instance fields
1270
1271 // Segregate ctl field, For now using padding vs @Contended
1272 // @jdk.internal.vm.annotation.Contended("fjpctl")
1273 // @sun.misc.Contended("fjpctl")
1274 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1275 volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1276 volatile long ctl; // main pool control
1277 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1278 volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1279
1280 volatile long stealCount; // collects worker nsteals
1281 final long keepAlive; // milliseconds before dropping if idle
1282 int indexSeed; // next worker index
1283 final int bounds; // min, max threads packed as shorts
1284 volatile int mode; // parallelism, runstate, queue mode
1285 WorkQueue[] workQueues; // main registry
1286 final String workerNamePrefix; // for worker thread string; sync lock
1287 final ForkJoinWorkerThreadFactory factory;
1288 final UncaughtExceptionHandler ueh; // per-worker UEH
1289
1290 // Creating, registering and deregistering workers
1291
1292 /**
1293 * Tries to construct and start one worker. Assumes that total
1294 * count has already been incremented as a reservation. Invokes
1295 * deregisterWorker on any failure.
1296 *
1297 * @return true if successful
1298 */
1299 private boolean createWorker() {
1300 ForkJoinWorkerThreadFactory fac = factory;
1301 Throwable ex = null;
1302 ForkJoinWorkerThread wt = null;
1303 try {
1304 if (fac != null && (wt = fac.newThread(this)) != null) {
1305 wt.start();
1306 return true;
1307 }
1308 } catch (Throwable rex) {
1309 ex = rex;
1310 }
1311 deregisterWorker(wt, ex);
1312 return false;
1313 }
1314
1315 /**
1316 * Tries to add one worker, incrementing ctl counts before doing
1317 * so, relying on createWorker to back out on failure.
1318 *
1319 * @param c incoming ctl value, with total count negative and no
1320 * idle workers. On CAS failure, c is refreshed and retried if
1321 * this holds (otherwise, a new worker is not needed).
1322 */
1323 private void tryAddWorker(long c) {
1324 do {
1325 long nc = ((RC_MASK & (c + RC_UNIT)) |
1326 (TC_MASK & (c + TC_UNIT)));
1327 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1328 createWorker();
1329 break;
1330 }
1331 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1332 }
1333
1334 /**
1335 * Callback from ForkJoinWorkerThread constructor to establish and
1336 * record its WorkQueue.
1337 *
1338 * @param wt the worker thread
1339 * @return the worker's queue
1340 */
1341 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1342 UncaughtExceptionHandler handler;
1343 wt.setDaemon(true); // configure thread
1344 if ((handler = ueh) != null)
1345 wt.setUncaughtExceptionHandler(handler);
1346 WorkQueue w = new WorkQueue(this, wt);
1347 int tid = 0; // for thread name
1348 int fifo = mode & FIFO;
1349 String prefix = workerNamePrefix;
1350 if (prefix != null) {
1351 synchronized(prefix) {
1352 WorkQueue[] ws = workQueues; int n;
1353 int s = indexSeed += SEED_INCREMENT;
1354 if (ws != null && (n = ws.length) > 1) {
1355 int m = n - 1;
1356 tid = s & m;
1357 int i = m & ((s << 1) | 1); // odd-numbered indices
1358 for (int probes = n >>> 1;;) { // find empty slot
1359 WorkQueue q;
1360 if ((q = ws[i]) == null || q.phase == QUIET)
1361 break;
1362 else if (--probes == 0) {
1363 i = n | 1; // resize below
1364 break;
1365 }
1366 else
1367 i = (i + 2) & m;
1368 }
1369
1370 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1371 w.phase = w.id = id; // now publishable
1372
1373 if (i < n)
1374 ws[i] = w;
1375 else { // expand array
1376 int an = n << 1;
1377 WorkQueue[] as = new WorkQueue[an];
1378 as[i] = w;
1379 int am = an - 1;
1380 for (int j = 0; j < n; ++j) {
1381 WorkQueue v; // copy external queue
1382 if ((v = ws[j]) != null) // position may change
1383 as[v.id & am & SQMASK] = v;
1384 if (++j >= n)
1385 break;
1386 as[j] = ws[j]; // copy worker
1387 }
1388 workQueues = as;
1389 }
1390 }
1391 }
1392 wt.setName(prefix.concat(Integer.toString(tid)));
1393 }
1394 return w;
1395 }
1396
1397 /**
1398 * Final callback from terminating worker, as well as upon failure
1399 * to construct or start a worker. Removes record of worker from
1400 * array, and adjusts counts. If pool is shutting down, tries to
1401 * complete termination.
1402 *
1403 * @param wt the worker thread, or null if construction failed
1404 * @param ex the exception causing failure, or null if none
1405 */
1406 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1407 WorkQueue w = null;
1408 int phase = 0;
1409 if (wt != null && (w = wt.workQueue) != null) {
1410 Object lock = workerNamePrefix;
1411 long ns = (long)w.nsteals & 0xffffffffL;
1412 int idx = w.id & SMASK;
1413 if (lock != null) {
1414 WorkQueue[] ws; // remove index from array
1415 synchronized(lock) {
1416 if ((ws = workQueues) != null && ws.length > idx &&
1417 ws[idx] == w)
1418 ws[idx] = null;
1419 stealCount += ns;
1420 }
1421 }
1422 phase = w.phase;
1423 }
1424 if (phase != QUIET) { // else pre-adjusted
1425 long c; // decrement counts
1426 do {} while (!U.compareAndSwapLong
1427 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1428 (TC_MASK & (c - TC_UNIT)) |
1429 (SP_MASK & c))));
1430 }
1431 if (w != null)
1432 w.cancelAll(); // cancel remaining tasks
1433
1434 if (!tryTerminate(false, false) && // possibly replace worker
1435 w != null && w.array != null) // avoid repeated failures
1436 signalWork();
1437
1438 if (ex == null) // help clean on way out
1439 ForkJoinTask.helpExpungeStaleExceptions();
1440 else // rethrow
1441 ForkJoinTask.rethrow(ex);
1442 }
1443
1444 /**
1445 * Tries to create or release a worker if too few are running.
1446 */
1447 final void signalWork() {
1448 for (;;) {
1449 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1450 if ((c = ctl) >= 0L) // enough workers
1451 break;
1452 else if ((sp = (int)c) == 0) { // no idle workers
1453 if ((c & ADD_WORKER) != 0L) // too few workers
1454 tryAddWorker(c);
1455 break;
1456 }
1457 else if ((ws = workQueues) == null)
1458 break; // unstarted/terminated
1459 else if (ws.length <= (i = sp & SMASK))
1460 break; // terminated
1461 else if ((v = ws[i]) == null)
1462 break; // terminating
1463 else {
1464 int np = sp & ~UNSIGNALLED;
1465 int vp = v.phase;
1466 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1467 Thread vt = v.owner;
1468 if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1469 v.phase = np;
1470 if (v.source < 0)
1471 LockSupport.unpark(vt);
1472 break;
1473 }
1474 }
1475 }
1476 }
1477
1478 /**
1479 * Tries to decrement counts (sometimes implicitly) and possibly
1480 * arrange for a compensating worker in preparation for blocking:
1481 * If not all core workers yet exist, creates one, else if any are
1482 * unreleased (possibly including caller) releases one, else if
1483 * fewer than the minimum allowed number of workers running,
1484 * checks to see that they are all active, and if so creates an
1485 * extra worker unless over maximum limit and policy is to
1486 * saturate. Most of these steps can fail due to interference, in
1487 * which case 0 is returned so caller will retry. A negative
1488 * return value indicates that the caller doesn't need to
1489 * re-adjust counts when later unblocked.
1490 *
1491 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1492 */
1493 private int tryCompensate(WorkQueue w) {
1494 int t, n, sp;
1495 long c = ctl;
1496 WorkQueue[] ws = workQueues;
1497 if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1498 if (ws == null || (n = ws.length) <= 0 || w == null)
1499 return 0; // disabled
1500 else if ((sp = (int)c) != 0) { // replace or release
1501 WorkQueue v = ws[sp & (n - 1)];
1502 int wp = w.phase;
1503 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1504 int np = sp & ~UNSIGNALLED;
1505 if (v != null) {
1506 int vp = v.phase;
1507 Thread vt = v.owner;
1508 long nc = ((long)v.stackPred & SP_MASK) | uc;
1509 if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1510 v.phase = np;
1511 if (v.source < 0)
1512 LockSupport.unpark(vt);
1513 return (wp < 0) ? -1 : 1;
1514 }
1515 }
1516 return 0;
1517 }
1518 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1519 (short)(bounds & SMASK) > 0) {
1520 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1521 return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1522 }
1523 else { // validate
1524 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1525 boolean unstable = false;
1526 for (int i = 1; i < n; i += 2) {
1527 WorkQueue q; Thread wt; Thread.State ts;
1528 if ((q = ws[i]) != null) {
1529 if (q.source == 0) {
1530 unstable = true;
1531 break;
1532 }
1533 else {
1534 --tc;
1535 if ((wt = q.owner) != null &&
1536 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1537 ts == Thread.State.WAITING))
1538 ++bc; // worker is blocking
1539 }
1540 }
1541 }
1542 if (unstable || tc != 0 || ctl != c)
1543 return 0; // inconsistent
1544 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1545 if ((md & SATURATE) != 0)
1546 return -1;
1547 else if (bc < pc) { // lagging
1548 Thread.yield(); // for retry spins
1549 return 0;
1550 }
1551 else
1552 throw new RejectedExecutionException(
1553 "Thread limit exceeded replacing blocked worker");
1554 }
1555 }
1556 }
1557
1558 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1559 return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1560 }
1561
1562 /**
1563 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1564 * See above for explanation.
1565 */
1566 final void runWorker(WorkQueue w) {
1567 WorkQueue[] ws;
1568 w.growArray(); // allocate queue
1569 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1570 if (r == 0) // initial nonzero seed
1571 r = 1;
1572 int lastSignalId = 0; // avoid unneeded signals
1573 while ((ws = workQueues) != null) {
1574 boolean nonempty = false; // scan
1575 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1576 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1577 if ((i = r & m) >= 0 && i < n && // always true
1578 (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1579 (a = q.array) != null && (al = a.length) > 0) {
1580 int qid = q.id; // (never zero)
1581 int index = (al - 1) & b;
1582 long offset = ((long)index << ASHIFT) + ABASE;
1583 ForkJoinTask<?> t = (ForkJoinTask<?>)
1584 U.getObjectVolatile(a, offset);
1585 if (t != null && b++ == q.base &&
1586 U.compareAndSwapObject(a, offset, t, null)) {
1587 if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1588 signalWork(); // propagate signal
1589 w.source = lastSignalId = qid;
1590 t.doExec();
1591 if ((w.id & FIFO) != 0) // run remaining locals
1592 w.localPollAndExec(POLL_LIMIT);
1593 else
1594 w.localPopAndExec(POLL_LIMIT);
1595 ForkJoinWorkerThread thread = w.owner;
1596 ++w.nsteals;
1597 w.source = 0; // now idle
1598 if (thread != null)
1599 thread.afterTopLevelExec();
1600 }
1601 nonempty = true;
1602 }
1603 else if (nonempty)
1604 break;
1605 else
1606 ++r;
1607 }
1608
1609 if (nonempty) { // move (xorshift)
1610 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1611 }
1612 else {
1613 int phase;
1614 lastSignalId = 0; // clear for next scan
1615 if ((phase = w.phase) >= 0) { // enqueue
1616 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1617 long c, nc;
1618 do {
1619 w.stackPred = (int)(c = ctl);
1620 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1621 } while (!U.compareAndSwapLong(this, CTL, c, nc));
1622 }
1623 else { // already queued
1624 int pred = w.stackPred;
1625 w.source = DORMANT; // enable signal
1626 for (int steps = 0;;) {
1627 int md, rc; long c;
1628 if (w.phase >= 0) {
1629 w.source = 0;
1630 break;
1631 }
1632 else if ((md = mode) < 0) // shutting down
1633 return;
1634 else if ((rc = ((md & SMASK) + // possibly quiescent
1635 (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1636 (md & SHUTDOWN) != 0 &&
1637 tryTerminate(false, false))
1638 return; // help terminate
1639 else if ((++steps & 1) == 0)
1640 Thread.interrupted(); // clear between parks
1641 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1642 long d = keepAlive + System.currentTimeMillis();
1643 LockSupport.parkUntil(this, d);
1644 if (ctl == c &&
1645 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1646 long nc = ((UC_MASK & (c - TC_UNIT)) |
1647 (SP_MASK & pred));
1648 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1649 w.phase = QUIET;
1650 return; // drop on timeout
1651 }
1652 }
1653 }
1654 else
1655 LockSupport.park(this);
1656 }
1657 }
1658 }
1659 }
1660 }
1661
1662 /**
1663 * Helps and/or blocks until the given task is done or timeout.
1664 * First tries locally helping, then scans other queues for a task
1665 * produced by one of w's stealers; compensating and blocking if
1666 * none are found (rescanning if tryCompensate fails).
1667 *
1668 * @param w caller
1669 * @param task the task
1670 * @param deadline for timed waits, if nonzero
1671 * @return task status on exit
1672 */
1673 final int xawaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1674 int s = 0;
1675 if (w != null && task != null &&
1676 (!(task instanceof CountedCompleter) ||
1677 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1678 w.tryRemoveAndExec(task);
1679 int src = w.source, id = w.id, block = 0;
1680 s = task.status;
1681 while (s >= 0) {
1682 WorkQueue[] ws;
1683 boolean nonempty = false;
1684 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1685 if ((ws = workQueues) != null) { // scan for matching id
1686 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1687 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1688 if ((i = r & m) >= 0 && i < n &&
1689 (q = ws[i]) != null && q.source == id &&
1690 (b = q.base) - q.top < 0 &&
1691 (a = q.array) != null && (al = a.length) > 0) {
1692 int qid = q.id;
1693 if (block > 0)
1694 U.getAndAddLong(this, CTL, RC_UNIT);
1695 block = 0;
1696 int index = (al - 1) & b;
1697 long offset = ((long)index << ASHIFT) + ABASE;
1698 ForkJoinTask<?> t = (ForkJoinTask<?>)
1699 U.getObjectVolatile(a, offset);
1700 if (t != null && b++ == q.base && id == q.source &&
1701 U.compareAndSwapObject(a, offset, t, null)) {
1702 q.base = b;
1703 w.source = qid;
1704 t.doExec();
1705 w.source = src;
1706 }
1707 nonempty = true;
1708 break;
1709 }
1710 else
1711 r += 2;
1712 }
1713 }
1714 if ((s = task.status) < 0)
1715 break;
1716 else if (!nonempty) {
1717 long ms, ns;
1718 if (deadline == 0L)
1719 ms = 0L; // untimed
1720 else if ((ns = deadline - System.nanoTime()) <= 0L)
1721 break; // timeout
1722 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1723 ms = 1L; // avoid 0 for timed wait
1724 if (block == 0)
1725 block = tryCompensate(w);
1726 else
1727 task.internalWait(ms);
1728 s = task.status;
1729 }
1730 }
1731 if (block > 0)
1732 U.getAndAddLong(this, CTL, RC_UNIT);
1733 }
1734 return s;
1735 }
1736
1737 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1738 int s = 0;
1739 if (w != null && task != null &&
1740 (!(task instanceof CountedCompleter) ||
1741 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1742 w.tryRemoveAndExec(task);
1743 int src = w.source, id = w.id;
1744 s = task.status;
1745 while (s >= 0) {
1746 WorkQueue[] ws;
1747 boolean nonempty = false;
1748 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1749 if ((ws = workQueues) != null) { // scan for matching id
1750 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1751 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1752 if ((i = (r + j) & m) >= 0 && i < n &&
1753 (q = ws[i]) != null && q.source == id &&
1754 (b = q.base) - q.top < 0 &&
1755 (a = q.array) != null && (al = a.length) > 0) {
1756 int qid = q.id;
1757 int index = (al - 1) & b;
1758 long offset = ((long)index << ASHIFT) + ABASE;
1759 ForkJoinTask<?> t = (ForkJoinTask<?>)
1760 U.getObjectVolatile(a, offset);
1761 if (t != null && b++ == q.base && id == q.source &&
1762 U.compareAndSwapObject(a, offset, t, null)) {
1763 q.base = b;
1764 w.source = qid;
1765 t.doExec();
1766 w.source = src;
1767 }
1768 nonempty = true;
1769 break;
1770 }
1771 }
1772 }
1773 if ((s = task.status) < 0)
1774 break;
1775 else if (!nonempty) {
1776 long ms, ns; int block;
1777 if (deadline == 0L)
1778 ms = 0L; // untimed
1779 else if ((ns = deadline - System.nanoTime()) <= 0L)
1780 break; // timeout
1781 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1782 ms = 1L; // avoid 0 for timed wait
1783 if ((block = tryCompensate(w)) != 0) {
1784 task.internalWait(ms);
1785 U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1786 }
1787 s = task.status;
1788 }
1789 }
1790 }
1791 return s;
1792 }
1793
1794 /**
1795 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1796 * when tasks cannot be found, rescans until all others cannot
1797 * find tasks either.
1798 */
1799 final void helpQuiescePool(WorkQueue w) {
1800 int prevSrc = w.source, fifo = w.id & FIFO;
1801 for (int source = prevSrc, released = -1;;) { // -1 until known
1802 WorkQueue[] ws;
1803 if (fifo != 0)
1804 w.localPollAndExec(0);
1805 else
1806 w.localPopAndExec(0);
1807 if (released == -1 && w.phase >= 0)
1808 released = 1;
1809 boolean quiet = true, empty = true;
1810 int r = ThreadLocalRandom.nextSecondarySeed();
1811 if ((ws = workQueues) != null) {
1812 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1813 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1814 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1815 if ((b = q.base) - q.top < 0 &&
1816 (a = q.array) != null && (al = a.length) > 0) {
1817 int qid = q.id;
1818 if (released == 0) { // increment
1819 released = 1;
1820 U.getAndAddLong(this, CTL, RC_UNIT);
1821 }
1822 int index = (al - 1) & b;
1823 long offset = ((long)index << ASHIFT) + ABASE;
1824 ForkJoinTask<?> t = (ForkJoinTask<?>)
1825 U.getObjectVolatile(a, offset);
1826 if (t != null && b++ == q.base &&
1827 U.compareAndSwapObject(a, offset, t, null)) {
1828 q.base = b;
1829 w.source = source = q.id;
1830 t.doExec();
1831 w.source = source = prevSrc;
1832 }
1833 quiet = empty = false;
1834 break;
1835 }
1836 else if ((q.source & QUIET) == 0)
1837 quiet = false;
1838 }
1839 }
1840 }
1841 if (quiet) {
1842 if (released == 0)
1843 U.getAndAddLong(this, CTL, RC_UNIT);
1844 w.source = prevSrc;
1845 break;
1846 }
1847 else if (empty) {
1848 if (source != QUIET)
1849 w.source = source = QUIET;
1850 if (released == 1) { // decrement
1851 released = 0;
1852 U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1853 }
1854 }
1855 }
1856 }
1857
1858 /**
1859 * Scans for and returns a polled task, if available.
1860 * Used only for untracked polls.
1861 *
1862 * @param submissionsOnly if true, only scan submission queues
1863 */
1864 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1865 WorkQueue[] ws; int n;
1866 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1867 (n = ws.length) > 0) {
1868 int m = n - 1;
1869 int r = ThreadLocalRandom.nextSecondarySeed();
1870 int h = r >>> 16;
1871 int origin, step;
1872 if (submissionsOnly) {
1873 origin = (r & ~1) & m; // even indices and steps
1874 step = (h & ~1) | 2;
1875 }
1876 else {
1877 origin = r & m;
1878 step = h | 1;
1879 }
1880 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1881 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1882 if ((q = ws[k]) != null) {
1883 checkSum += b = q.base;
1884 if (b - q.top < 0 &&
1885 (a = q.array) != null && (al = a.length) > 0) {
1886 int index = (al - 1) & b;
1887 long offset = ((long)index << ASHIFT) + ABASE;
1888 ForkJoinTask<?> t = (ForkJoinTask<?>)
1889 U.getObjectVolatile(a, offset);
1890 if (t != null && b++ == q.base &&
1891 U.compareAndSwapObject(a, offset, t, null)) {
1892 q.base = b;
1893 return t;
1894 }
1895 else
1896 break; // restart
1897 }
1898 }
1899 if ((k = (k + step) & m) == origin) {
1900 if (oldSum == (oldSum = checkSum))
1901 break rescan;
1902 checkSum = 0;
1903 }
1904 }
1905 }
1906 return null;
1907 }
1908
1909 /**
1910 * Gets and removes a local or stolen task for the given worker.
1911 *
1912 * @return a task, if available
1913 */
1914 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1915 ForkJoinTask<?> t;
1916 if (w != null &&
1917 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1918 return t;
1919 else
1920 return pollScan(false);
1921 }
1922
1923 // External operations
1924
1925 /**
1926 * Adds the given task to a submission queue at submitter's
1927 * current queue, creating one if null or contended.
1928 *
1929 * @param task the task. Caller must ensure non-null.
1930 */
1931 final void externalPush(ForkJoinTask<?> task) {
1932 int r; // initialize caller's probe
1933 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1934 ThreadLocalRandom.localInit();
1935 r = ThreadLocalRandom.getProbe();
1936 }
1937 for (;;) {
1938 int md = mode, n;
1939 WorkQueue[] ws = workQueues;
1940 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1941 throw new RejectedExecutionException();
1942 else {
1943 WorkQueue q;
1944 boolean push = false, grow = false;
1945 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1946 Object lock = workerNamePrefix;
1947 int qid = (r | QUIET) & ~(FIFO | OWNED);
1948 q = new WorkQueue(this, null);
1949 q.id = qid;
1950 q.source = QUIET;
1951 q.phase = QLOCK; // lock queue
1952 if (lock != null) {
1953 synchronized(lock) { // lock pool to install
1954 int i;
1955 if ((ws = workQueues) != null &&
1956 (n = ws.length) > 0 &&
1957 ws[i = qid & (n - 1) & SQMASK] == null) {
1958 ws[i] = q;
1959 push = grow = true;
1960 }
1961 }
1962 }
1963 }
1964 else if (q.tryLockSharedQueue()) {
1965 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1966 if ((a = q.array) != null && (al = a.length) > 0 &&
1967 al - 1 + (d = b - s) > 0) {
1968 a[(al - 1) & s] = task;
1969 q.top = s + 1; // relaxed writes OK here
1970 q.phase = 0;
1971 if (d < 0 && q.base - s < 0)
1972 break; // no signal needed
1973 }
1974 else
1975 grow = true;
1976 push = true;
1977 }
1978 if (push) {
1979 if (grow) {
1980 try {
1981 q.growArray();
1982 int s = q.top, al; ForkJoinTask<?>[] a;
1983 if ((a = q.array) != null && (al = a.length) > 0) {
1984 a[(al - 1) & s] = task;
1985 q.top = s + 1;
1986 }
1987 } finally {
1988 q.phase = 0;
1989 }
1990 }
1991 signalWork();
1992 break;
1993 }
1994 else // move if busy
1995 r = ThreadLocalRandom.advanceProbe(r);
1996 }
1997 }
1998 }
1999
2000 /**
2001 * Pushes a possibly-external submission.
2002 */
2003 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2004 Thread t; ForkJoinWorkerThread w; WorkQueue q;
2005 if (task == null)
2006 throw new NullPointerException();
2007 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2008 (w = (ForkJoinWorkerThread)t).pool == this &&
2009 (q = w.workQueue) != null)
2010 q.push(task);
2011 else
2012 externalPush(task);
2013 return task;
2014 }
2015
2016 /**
2017 * Returns common pool queue for an external thread.
2018 */
2019 static WorkQueue commonSubmitterQueue() {
2020 ForkJoinPool p = common;
2021 int r = ThreadLocalRandom.getProbe();
2022 WorkQueue[] ws; int n;
2023 return (p != null && (ws = p.workQueues) != null &&
2024 (n = ws.length) > 0) ?
2025 ws[(n - 1) & r & SQMASK] : null;
2026 }
2027
2028 /**
2029 * Performs tryUnpush for an external submitter.
2030 */
2031 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2032 int r = ThreadLocalRandom.getProbe();
2033 WorkQueue[] ws; WorkQueue w; int n;
2034 return ((ws = workQueues) != null &&
2035 (n = ws.length) > 0 &&
2036 (w = ws[(n - 1) & r & SQMASK]) != null &&
2037 w.trySharedUnpush(task));
2038 }
2039
2040 /**
2041 * Performs helpComplete for an external submitter.
2042 */
2043 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2044 int r = ThreadLocalRandom.getProbe();
2045 WorkQueue[] ws; WorkQueue w; int n;
2046 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
2047 (w = ws[(n - 1) & r & SQMASK]) != null) ?
2048 w.sharedHelpCC(task, maxTasks) : 0;
2049 }
2050
2051 /**
2052 * Tries to steal and run tasks within the target's computation.
2053 * The maxTasks argument supports external usages; internal calls
2054 * use zero, allowing unbounded steps (external calls trap
2055 * non-positive values).
2056 *
2057 * @param w caller
2058 * @param maxTasks if non-zero, the maximum number of other tasks to run
2059 * @return task status on exit
2060 */
2061 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2062 int maxTasks) {
2063 return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2064 }
2065
2066 /**
2067 * Returns a cheap heuristic guide for task partitioning when
2068 * programmers, frameworks, tools, or languages have little or no
2069 * idea about task granularity. In essence, by offering this
2070 * method, we ask users only about tradeoffs in overhead vs
2071 * expected throughput and its variance, rather than how finely to
2072 * partition tasks.
2073 *
2074 * In a steady state strict (tree-structured) computation, each
2075 * thread makes available for stealing enough tasks for other
2076 * threads to remain active. Inductively, if all threads play by
2077 * the same rules, each thread should make available only a
2078 * constant number of tasks.
2079 *
2080 * The minimum useful constant is just 1. But using a value of 1
2081 * would require immediate replenishment upon each steal to
2082 * maintain enough tasks, which is infeasible. Further,
2083 * partitionings/granularities of offered tasks should minimize
2084 * steal rates, which in general means that threads nearer the top
2085 * of computation tree should generate more than those nearer the
2086 * bottom. In perfect steady state, each thread is at
2087 * approximately the same level of computation tree. However,
2088 * producing extra tasks amortizes the uncertainty of progress and
2089 * diffusion assumptions.
2090 *
2091 * So, users will want to use values larger (but not much larger)
2092 * than 1 to both smooth over transient shortages and hedge
2093 * against uneven progress; as traded off against the cost of
2094 * extra task overhead. We leave the user to pick a threshold
2095 * value to compare with the results of this call to guide
2096 * decisions, but recommend values such as 3.
2097 *
2098 * When all threads are active, it is on average OK to estimate
2099 * surplus strictly locally. In steady-state, if one thread is
2100 * maintaining say 2 surplus tasks, then so are others. So we can
2101 * just use estimated queue length. However, this strategy alone
2102 * leads to serious mis-estimates in some non-steady-state
2103 * conditions (ramp-up, ramp-down, other stalls). We can detect
2104 * many of these by further considering the number of "idle"
2105 * threads, that are known to have zero queued tasks, so
2106 * compensate by a factor of (#idle/#active) threads.
2107 */
2108 static int getSurplusQueuedTaskCount() {
2109 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2110 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2111 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2112 (q = wt.workQueue) != null) {
2113 int p = pool.mode & SMASK;
2114 int a = p + (int)(pool.ctl >> RC_SHIFT);
2115 int n = q.top - q.base;
2116 return n - (a > (p >>>= 1) ? 0 :
2117 a > (p >>>= 1) ? 1 :
2118 a > (p >>>= 1) ? 2 :
2119 a > (p >>>= 1) ? 4 :
2120 8);
2121 }
2122 return 0;
2123 }
2124
2125 // Termination
2126
2127 /**
2128 * Possibly initiates and/or completes termination.
2129 *
2130 * @param now if true, unconditionally terminate, else only
2131 * if no work and no active workers
2132 * @param enable if true, terminate when next possible
2133 * @return true if terminating or terminated
2134 */
2135 private boolean tryTerminate(boolean now, boolean enable) {
2136 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2137
2138 while (((md = mode) & SHUTDOWN) == 0) {
2139 if (!enable || this == common) // cannot shutdown
2140 return false;
2141 else
2142 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2143 }
2144
2145 while (((md = mode) & STOP) == 0) { // try to initiate termination
2146 if (!now) { // check if quiescent & empty
2147 for (long oldSum = 0L;;) { // repeat until stable
2148 boolean running = false;
2149 long checkSum = ctl;
2150 WorkQueue[] ws = workQueues;
2151 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2152 running = true;
2153 else if (ws != null) {
2154 WorkQueue w; int b;
2155 for (int i = 0; i < ws.length; ++i) {
2156 if ((w = ws[i]) != null) {
2157 checkSum += (b = w.base) + w.id;
2158 if (b != w.top ||
2159 ((i & 1) == 1 && w.source >= 0)) {
2160 running = true;
2161 break;
2162 }
2163 }
2164 }
2165 }
2166 if (((md = mode) & STOP) != 0)
2167 break; // already triggered
2168 else if (running)
2169 return false;
2170 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2171 break;
2172 }
2173 }
2174 if ((md & STOP) == 0)
2175 U.compareAndSwapInt(this, MODE, md, md | STOP);
2176 }
2177
2178 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2179 for (long oldSum = 0L;;) { // repeat until stable
2180 WorkQueue[] ws; WorkQueue w;
2181 long checkSum = ctl;
2182 if ((ws = workQueues) != null) {
2183 for (int i = 0; i < ws.length; ++i) {
2184 if ((w = ws[i]) != null) {
2185 ForkJoinWorkerThread wt = w.owner;
2186 w.cancelAll(); // clear queues
2187 if (wt != null) {
2188 try { // unblock join or park
2189 wt.interrupt();
2190 } catch (Throwable ignore) {
2191 }
2192 }
2193 checkSum += w.base + w.id;
2194 }
2195 }
2196 }
2197 if (((md = mode) & TERMINATED) != 0 ||
2198 (workQueues == ws && oldSum == (oldSum = checkSum)))
2199 break;
2200 }
2201 if ((md & TERMINATED) != 0)
2202 break;
2203 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2204 break;
2205 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2206 synchronized (this) {
2207 notifyAll(); // for awaitTermination
2208 }
2209 break;
2210 }
2211 }
2212 return true;
2213 }
2214
2215 // Exported methods
2216
2217 // Constructors
2218
2219 /**
2220 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2221 * java.lang.Runtime#availableProcessors}, using defaults for all
2222 * other parameters.
2223 *
2224 * @throws SecurityException if a security manager exists and
2225 * the caller is not permitted to modify threads
2226 * because it does not hold {@link
2227 * java.lang.RuntimePermission}{@code ("modifyThread")}
2228 */
2229 public ForkJoinPool() {
2230 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2231 defaultForkJoinWorkerThreadFactory, null, false,
2232 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2233 }
2234
2235 /**
2236 * Creates a {@code ForkJoinPool} with the indicated parallelism
2237 * level, using defaults for all other parameters.
2238 *
2239 * @param parallelism the parallelism level
2240 * @throws IllegalArgumentException if parallelism less than or
2241 * equal to zero, or greater than implementation limit
2242 * @throws SecurityException if a security manager exists and
2243 * the caller is not permitted to modify threads
2244 * because it does not hold {@link
2245 * java.lang.RuntimePermission}{@code ("modifyThread")}
2246 */
2247 public ForkJoinPool(int parallelism) {
2248 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2249 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2250 }
2251
2252 /**
2253 * Creates a {@code ForkJoinPool} with the given parameters (using
2254 * defaults for others).
2255 *
2256 * @param parallelism the parallelism level. For default value,
2257 * use {@link java.lang.Runtime#availableProcessors}.
2258 * @param factory the factory for creating new threads. For default value,
2259 * use {@link #defaultForkJoinWorkerThreadFactory}.
2260 * @param handler the handler for internal worker threads that
2261 * terminate due to unrecoverable errors encountered while executing
2262 * tasks. For default value, use {@code null}.
2263 * @param asyncMode if true,
2264 * establishes local first-in-first-out scheduling mode for forked
2265 * tasks that are never joined. This mode may be more appropriate
2266 * than default locally stack-based mode in applications in which
2267 * worker threads only process event-style asynchronous tasks.
2268 * For default value, use {@code false}.
2269 * @throws IllegalArgumentException if parallelism less than or
2270 * equal to zero, or greater than implementation limit
2271 * @throws NullPointerException if the factory is null
2272 * @throws SecurityException if a security manager exists and
2273 * the caller is not permitted to modify threads
2274 * because it does not hold {@link
2275 * java.lang.RuntimePermission}{@code ("modifyThread")}
2276 */
2277 public ForkJoinPool(int parallelism,
2278 ForkJoinWorkerThreadFactory factory,
2279 UncaughtExceptionHandler handler,
2280 boolean asyncMode) {
2281 this(parallelism, factory, handler, asyncMode,
2282 0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2283 }
2284
2285 /**
2286 * Creates a {@code ForkJoinPool} with the given parameters.
2287 *
2288 * @param parallelism the parallelism level. For default value,
2289 * use {@link java.lang.Runtime#availableProcessors}.
2290 *
2291 * @param factory the factory for creating new threads. For
2292 * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2293 *
2294 * @param handler the handler for internal worker threads that
2295 * terminate due to unrecoverable errors encountered while
2296 * executing tasks. For default value, use {@code null}.
2297 *
2298 * @param asyncMode if true, establishes local first-in-first-out
2299 * scheduling mode for forked tasks that are never joined. This
2300 * mode may be more appropriate than default locally stack-based
2301 * mode in applications in which worker threads only process
2302 * event-style asynchronous tasks. For default value, use {@code
2303 * false}.
2304 *
2305 * @param corePoolSize the number of threads to keep in the pool
2306 * (unless timed out after an elapsed keep-alive). Normally (and
2307 * by default) this is the same value as the parallelism level,
2308 * but may be set to a larger value to reduce dynamic overhead if
2309 * tasks regularly block. Using a smaller value (for example
2310 * {@code 0}) has the same effect as the default.
2311 *
2312 * @param maximumPoolSize the maximum number of threads allowed.
2313 * When the maximum is reached, attempts to replace blocked
2314 * threads fail. (However, because creation and termination of
2315 * different threads may overlap, and may be managed by the given
2316 * thread factory, this value may be transiently exceeded.) The
2317 * default for the common pool is {@code 256} plus the parallelism
2318 * level. Using a value (for example {@code Integer.MAX_VALUE})
2319 * larger than the implementation's total thread limit has the
2320 * same effect as using this limit.
2321 *
2322 * @param minimumRunnable the minimum allowed number of core
2323 * threads not blocked by a join or {@link ManagedBlocker}. To
2324 * ensure progress, when too few unblocked threads exist and
2325 * unexecuted tasks may exist, new threads are constructed, up to
2326 * the given maximumPoolSize. For the default value, use {@code
2327 * 1}, that ensures liveness. A larger value might improve
2328 * throughput in the presence of blocked activities, but might
2329 * not, due to increased overhead. A value of zero may be
2330 * acceptable when submitted tasks cannot have dependencies
2331 * requiring additional threads.
2332 *
2333 * @param rejectOnSaturation if true, attempts to create more than
2334 * the maximum total allowed threads throw {@link
2335 * RejectedExecutionException}. Otherwise, the pool continues to
2336 * operate, but with fewer than the target number of runnable
2337 * threads, so might not ensure progress. For default value, use
2338 * {@code true}.
2339 *
2340 * @param keepAliveTime the elapsed time since last use before
2341 * a thread is terminated (and then later replaced if needed).
2342 * For the default value, use {@code 60, TimeUnit.SECONDS}.
2343 *
2344 * @param unit the time unit for the {@code keepAliveTime} argument
2345 *
2346 * @throws IllegalArgumentException if parallelism is less than or
2347 * equal to zero, or is greater than implementation limit,
2348 * or if maximumPoolSize is less than parallelism,
2349 * of if the keepAliveTime is less than or equal to zero.
2350 * @throws NullPointerException if the factory is null
2351 * @throws SecurityException if a security manager exists and
2352 * the caller is not permitted to modify threads
2353 * because it does not hold {@link
2354 * java.lang.RuntimePermission}{@code ("modifyThread")}
2355 */
2356 public ForkJoinPool(int parallelism,
2357 ForkJoinWorkerThreadFactory factory,
2358 UncaughtExceptionHandler handler,
2359 boolean asyncMode,
2360 int corePoolSize,
2361 int maximumPoolSize,
2362 int minimumRunnable,
2363 boolean rejectOnSaturation,
2364 long keepAliveTime,
2365 TimeUnit unit) {
2366 // check, encode, pack parameters
2367 if (parallelism <= 0 || parallelism > MAX_CAP ||
2368 maximumPoolSize < parallelism || keepAliveTime <= 0L)
2369 throw new IllegalArgumentException();
2370 if (factory == null)
2371 throw new NullPointerException();
2372 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2373
2374 String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2375 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2376 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2377 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2378 int m = (parallelism |
2379 (asyncMode ? FIFO : 0) |
2380 (rejectOnSaturation ? 0 : SATURATE));
2381 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2382 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2383 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2384 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2385 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2386 n = (n + 1) << 1; // power of two, including space for submission queues
2387
2388 this.workQueues = new WorkQueue[n];
2389 this.workerNamePrefix = prefix;
2390 this.factory = factory;
2391 this.ueh = handler;
2392 this.keepAlive = ms;
2393 this.bounds = b;
2394 this.mode = m;
2395 this.ctl = c;
2396 checkPermission();
2397 }
2398
2399 /**
2400 * Constructor for common pool using parameters possibly
2401 * overridden by system properties
2402 */
2403 private ForkJoinPool(byte forCommonPoolOnly) {
2404 int parallelism = -1;
2405 ForkJoinWorkerThreadFactory fac = null;
2406 UncaughtExceptionHandler handler = null;
2407 try { // ignore exceptions in accessing/parsing properties
2408 String pp = System.getProperty
2409 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2410 String fp = System.getProperty
2411 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2412 String hp = System.getProperty
2413 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2414 if (pp != null)
2415 parallelism = Integer.parseInt(pp);
2416 if (fp != null)
2417 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2418 getSystemClassLoader().loadClass(fp).newInstance());
2419 if (hp != null)
2420 handler = ((UncaughtExceptionHandler)ClassLoader.
2421 getSystemClassLoader().loadClass(hp).newInstance());
2422 } catch (Exception ignore) {
2423 }
2424
2425 if (fac == null) {
2426 if (System.getSecurityManager() == null)
2427 fac = defaultForkJoinWorkerThreadFactory;
2428 else // use security-managed default
2429 fac = new InnocuousForkJoinWorkerThreadFactory();
2430 }
2431 if (parallelism < 0 && // default 1 less than #cores
2432 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2433 parallelism = 1;
2434 if (parallelism > MAX_CAP)
2435 parallelism = MAX_CAP;
2436
2437 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2438 (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2439 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2440 int m = (parallelism < 1) ? 1 : parallelism;
2441 int n = (parallelism > 1) ? parallelism - 1 : 1;
2442 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2443 n = (n + 1) << 1;
2444
2445 this.workQueues = new WorkQueue[n];
2446 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2447 this.factory = fac;
2448 this.ueh = handler;
2449 this.keepAlive = DEFAULT_KEEPALIVE;
2450 this.bounds = b;
2451 this.mode = m;
2452 this.ctl = c;
2453 }
2454
2455 /**
2456 * Returns the common pool instance. This pool is statically
2457 * constructed; its run state is unaffected by attempts to {@link
2458 * #shutdown} or {@link #shutdownNow}. However this pool and any
2459 * ongoing processing are automatically terminated upon program
2460 * {@link System#exit}. Any program that relies on asynchronous
2461 * task processing to complete before program termination should
2462 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2463 * before exit.
2464 *
2465 * @return the common pool instance
2466 * @since 1.8
2467 */
2468 public static ForkJoinPool commonPool() {
2469 // assert common != null : "static init error";
2470 return common;
2471 }
2472
2473 // Execution methods
2474
2475 /**
2476 * Performs the given task, returning its result upon completion.
2477 * If the computation encounters an unchecked Exception or Error,
2478 * it is rethrown as the outcome of this invocation. Rethrown
2479 * exceptions behave in the same way as regular exceptions, but,
2480 * when possible, contain stack traces (as displayed for example
2481 * using {@code ex.printStackTrace()}) of both the current thread
2482 * as well as the thread actually encountering the exception;
2483 * minimally only the latter.
2484 *
2485 * @param task the task
2486 * @param <T> the type of the task's result
2487 * @return the task's result
2488 * @throws NullPointerException if the task is null
2489 * @throws RejectedExecutionException if the task cannot be
2490 * scheduled for execution
2491 */
2492 public <T> T invoke(ForkJoinTask<T> task) {
2493 if (task == null)
2494 throw new NullPointerException();
2495 externalSubmit(task);
2496 return task.join();
2497 }
2498
2499 /**
2500 * Arranges for (asynchronous) execution of the given task.
2501 *
2502 * @param task the task
2503 * @throws NullPointerException if the task is null
2504 * @throws RejectedExecutionException if the task cannot be
2505 * scheduled for execution
2506 */
2507 public void execute(ForkJoinTask<?> task) {
2508 externalSubmit(task);
2509 }
2510
2511 // AbstractExecutorService methods
2512
2513 /**
2514 * @throws NullPointerException if the task is null
2515 * @throws RejectedExecutionException if the task cannot be
2516 * scheduled for execution
2517 */
2518 public void execute(Runnable task) {
2519 if (task == null)
2520 throw new NullPointerException();
2521 ForkJoinTask<?> job;
2522 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2523 job = (ForkJoinTask<?>) task;
2524 else
2525 job = new ForkJoinTask.RunnableExecuteAction(task);
2526 externalSubmit(job);
2527 }
2528
2529 /**
2530 * Submits a ForkJoinTask for execution.
2531 *
2532 * @param task the task to submit
2533 * @param <T> the type of the task's result
2534 * @return the task
2535 * @throws NullPointerException if the task is null
2536 * @throws RejectedExecutionException if the task cannot be
2537 * scheduled for execution
2538 */
2539 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2540 return externalSubmit(task);
2541 }
2542
2543 /**
2544 * @throws NullPointerException if the task is null
2545 * @throws RejectedExecutionException if the task cannot be
2546 * scheduled for execution
2547 */
2548 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2549 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2550 }
2551
2552 /**
2553 * @throws NullPointerException if the task is null
2554 * @throws RejectedExecutionException if the task cannot be
2555 * scheduled for execution
2556 */
2557 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2558 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2559 }
2560
2561 /**
2562 * @throws NullPointerException if the task is null
2563 * @throws RejectedExecutionException if the task cannot be
2564 * scheduled for execution
2565 */
2566 public ForkJoinTask<?> submit(Runnable task) {
2567 if (task == null)
2568 throw new NullPointerException();
2569 ForkJoinTask<?> job;
2570 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2571 job = (ForkJoinTask<?>) task;
2572 else
2573 job = new ForkJoinTask.AdaptedRunnableAction(task);
2574 return externalSubmit(job);
2575 }
2576
2577 /**
2578 * @throws NullPointerException {@inheritDoc}
2579 * @throws RejectedExecutionException {@inheritDoc}
2580 */
2581 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2582 // In previous versions of this class, this method constructed
2583 // a task to run ForkJoinTask.invokeAll, but now external
2584 // invocation of multiple tasks is at least as efficient.
2585 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2586
2587 try {
2588 for (Callable<T> t : tasks) {
2589 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2590 futures.add(f);
2591 externalSubmit(f);
2592 }
2593 for (int i = 0, size = futures.size(); i < size; i++)
2594 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2595 return futures;
2596 } catch (Throwable t) {
2597 for (int i = 0, size = futures.size(); i < size; i++)
2598 futures.get(i).cancel(false);
2599 throw t;
2600 }
2601 }
2602
2603 /**
2604 * Returns the factory used for constructing new workers.
2605 *
2606 * @return the factory used for constructing new workers
2607 */
2608 public ForkJoinWorkerThreadFactory getFactory() {
2609 return factory;
2610 }
2611
2612 /**
2613 * Returns the handler for internal worker threads that terminate
2614 * due to unrecoverable errors encountered while executing tasks.
2615 *
2616 * @return the handler, or {@code null} if none
2617 */
2618 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2619 return ueh;
2620 }
2621
2622 /**
2623 * Returns the targeted parallelism level of this pool.
2624 *
2625 * @return the targeted parallelism level of this pool
2626 */
2627 public int getParallelism() {
2628 return mode & SMASK;
2629 }
2630
2631 /**
2632 * Returns the targeted parallelism level of the common pool.
2633 *
2634 * @return the targeted parallelism level of the common pool
2635 * @since 1.8
2636 */
2637 public static int getCommonPoolParallelism() {
2638 return COMMON_PARALLELISM;
2639 }
2640
2641 /**
2642 * Returns the number of worker threads that have started but not
2643 * yet terminated. The result returned by this method may differ
2644 * from {@link #getParallelism} when threads are created to
2645 * maintain parallelism when others are cooperatively blocked.
2646 *
2647 * @return the number of worker threads
2648 */
2649 public int getPoolSize() {
2650 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2651 }
2652
2653 /**
2654 * Returns {@code true} if this pool uses local first-in-first-out
2655 * scheduling mode for forked tasks that are never joined.
2656 *
2657 * @return {@code true} if this pool uses async mode
2658 */
2659 public boolean getAsyncMode() {
2660 return (mode & FIFO) != 0;
2661 }
2662
2663 /**
2664 * Returns an estimate of the number of worker threads that are
2665 * not blocked waiting to join tasks or for other managed
2666 * synchronization. This method may overestimate the
2667 * number of running threads.
2668 *
2669 * @return the number of worker threads
2670 */
2671 public int getRunningThreadCount() {
2672 int rc = 0;
2673 WorkQueue[] ws; WorkQueue w;
2674 if ((ws = workQueues) != null) {
2675 for (int i = 1; i < ws.length; i += 2) {
2676 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2677 ++rc;
2678 }
2679 }
2680 return rc;
2681 }
2682
2683 /**
2684 * Returns an estimate of the number of threads that are currently
2685 * stealing or executing tasks. This method may overestimate the
2686 * number of active threads.
2687 *
2688 * @return the number of active threads
2689 */
2690 public int getActiveThreadCount() {
2691 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2692 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2693 }
2694
2695 /**
2696 * Returns {@code true} if all worker threads are currently idle.
2697 * An idle worker is one that cannot obtain a task to execute
2698 * because none are available to steal from other threads, and
2699 * there are no pending submissions to the pool. This method is
2700 * conservative; it might not return {@code true} immediately upon
2701 * idleness of all threads, but will eventually become true if
2702 * threads remain inactive.
2703 *
2704 * @return {@code true} if all threads are currently idle
2705 */
2706 public boolean isQuiescent() {
2707 for (;;) {
2708 long c = ctl;
2709 int md = mode, pc = md & SMASK;
2710 int tc = pc + (short)(c >> TC_SHIFT);
2711 int rc = pc + (int)(c >> RC_SHIFT);
2712 if ((md & (STOP | TERMINATED)) != 0)
2713 return true;
2714 else if (rc > 0)
2715 return false;
2716 else {
2717 WorkQueue[] ws; WorkQueue v;
2718 if ((ws = workQueues) != null) {
2719 for (int i = 1; i < ws.length; i += 2) {
2720 if ((v = ws[i]) != null) {
2721 if ((v.source & QUIET) == 0)
2722 return false;
2723 --tc;
2724 }
2725 }
2726 }
2727 if (tc == 0 && ctl == c)
2728 return true;
2729 }
2730 }
2731 }
2732
2733 /**
2734 * Returns an estimate of the total number of tasks stolen from
2735 * one thread's work queue by another. The reported value
2736 * underestimates the actual total number of steals when the pool
2737 * is not quiescent. This value may be useful for monitoring and
2738 * tuning fork/join programs: in general, steal counts should be
2739 * high enough to keep threads busy, but low enough to avoid
2740 * overhead and contention across threads.
2741 *
2742 * @return the number of steals
2743 */
2744 public long getStealCount() {
2745 long count = stealCount;
2746 WorkQueue[] ws; WorkQueue w;
2747 if ((ws = workQueues) != null) {
2748 for (int i = 1; i < ws.length; i += 2) {
2749 if ((w = ws[i]) != null)
2750 count += (long)w.nsteals & 0xffffffffL;
2751 }
2752 }
2753 return count;
2754 }
2755
2756 /**
2757 * Returns an estimate of the total number of tasks currently held
2758 * in queues by worker threads (but not including tasks submitted
2759 * to the pool that have not begun executing). This value is only
2760 * an approximation, obtained by iterating across all threads in
2761 * the pool. This method may be useful for tuning task
2762 * granularities.
2763 *
2764 * @return the number of queued tasks
2765 */
2766 public long getQueuedTaskCount() {
2767 long count = 0;
2768 WorkQueue[] ws; WorkQueue w;
2769 if ((ws = workQueues) != null) {
2770 for (int i = 1; i < ws.length; i += 2) {
2771 if ((w = ws[i]) != null)
2772 count += w.queueSize();
2773 }
2774 }
2775 return count;
2776 }
2777
2778 /**
2779 * Returns an estimate of the number of tasks submitted to this
2780 * pool that have not yet begun executing. This method may take
2781 * time proportional to the number of submissions.
2782 *
2783 * @return the number of queued submissions
2784 */
2785 public int getQueuedSubmissionCount() {
2786 int count = 0;
2787 WorkQueue[] ws; WorkQueue w;
2788 if ((ws = workQueues) != null) {
2789 for (int i = 0; i < ws.length; i += 2) {
2790 if ((w = ws[i]) != null)
2791 count += w.queueSize();
2792 }
2793 }
2794 return count;
2795 }
2796
2797 /**
2798 * Returns {@code true} if there are any tasks submitted to this
2799 * pool that have not yet begun executing.
2800 *
2801 * @return {@code true} if there are any queued submissions
2802 */
2803 public boolean hasQueuedSubmissions() {
2804 WorkQueue[] ws; WorkQueue w;
2805 if ((ws = workQueues) != null) {
2806 for (int i = 0; i < ws.length; i += 2) {
2807 if ((w = ws[i]) != null && !w.isEmpty())
2808 return true;
2809 }
2810 }
2811 return false;
2812 }
2813
2814 /**
2815 * Removes and returns the next unexecuted submission if one is
2816 * available. This method may be useful in extensions to this
2817 * class that re-assign work in systems with multiple pools.
2818 *
2819 * @return the next submission, or {@code null} if none
2820 */
2821 protected ForkJoinTask<?> pollSubmission() {
2822 return pollScan(true);
2823 }
2824
2825 /**
2826 * Removes all available unexecuted submitted and forked tasks
2827 * from scheduling queues and adds them to the given collection,
2828 * without altering their execution status. These may include
2829 * artificially generated or wrapped tasks. This method is
2830 * designed to be invoked only when the pool is known to be
2831 * quiescent. Invocations at other times may not remove all
2832 * tasks. A failure encountered while attempting to add elements
2833 * to collection {@code c} may result in elements being in
2834 * neither, either or both collections when the associated
2835 * exception is thrown. The behavior of this operation is
2836 * undefined if the specified collection is modified while the
2837 * operation is in progress.
2838 *
2839 * @param c the collection to transfer elements into
2840 * @return the number of elements transferred
2841 */
2842 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2843 int count = 0;
2844 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2845 if ((ws = workQueues) != null) {
2846 for (int i = 0; i < ws.length; ++i) {
2847 if ((w = ws[i]) != null) {
2848 while ((t = w.poll()) != null) {
2849 c.add(t);
2850 ++count;
2851 }
2852 }
2853 }
2854 }
2855 return count;
2856 }
2857
2858 /**
2859 * Returns a string identifying this pool, as well as its state,
2860 * including indications of run state, parallelism level, and
2861 * worker and task counts.
2862 *
2863 * @return a string identifying this pool, as well as its state
2864 */
2865 public String toString() {
2866 // Use a single pass through workQueues to collect counts
2867 long qt = 0L, qs = 0L; int rc = 0;
2868 long st = stealCount;
2869 WorkQueue[] ws; WorkQueue w;
2870 if ((ws = workQueues) != null) {
2871 for (int i = 0; i < ws.length; ++i) {
2872 if ((w = ws[i]) != null) {
2873 int size = w.queueSize();
2874 if ((i & 1) == 0)
2875 qs += size;
2876 else {
2877 qt += size;
2878 st += (long)w.nsteals & 0xffffffffL;
2879 if (w.isApparentlyUnblocked())
2880 ++rc;
2881 }
2882 }
2883 }
2884 }
2885
2886 int md = mode;
2887 int pc = (md & SMASK);
2888 long c = ctl;
2889 int tc = pc + (short)(c >>> TC_SHIFT);
2890 int ac = pc + (int)(c >> RC_SHIFT);
2891 if (ac < 0) // ignore transient negative
2892 ac = 0;
2893 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2894 (md & STOP) != 0 ? "Terminating" :
2895 (md & SHUTDOWN) != 0 ? "Shutting down" :
2896 "Running");
2897 return super.toString() +
2898 "[" + level +
2899 ", parallelism = " + pc +
2900 ", size = " + tc +
2901 ", active = " + ac +
2902 ", running = " + rc +
2903 ", steals = " + st +
2904 ", tasks = " + qt +
2905 ", submissions = " + qs +
2906 "]";
2907 }
2908
2909 /**
2910 * Possibly initiates an orderly shutdown in which previously
2911 * submitted tasks are executed, but no new tasks will be
2912 * accepted. Invocation has no effect on execution state if this
2913 * is the {@link #commonPool()}, and no additional effect if
2914 * already shut down. Tasks that are in the process of being
2915 * submitted concurrently during the course of this method may or
2916 * may not be rejected.
2917 *
2918 * @throws SecurityException if a security manager exists and
2919 * the caller is not permitted to modify threads
2920 * because it does not hold {@link
2921 * java.lang.RuntimePermission}{@code ("modifyThread")}
2922 */
2923 public void shutdown() {
2924 checkPermission();
2925 tryTerminate(false, true);
2926 }
2927
2928 /**
2929 * Possibly attempts to cancel and/or stop all tasks, and reject
2930 * all subsequently submitted tasks. Invocation has no effect on
2931 * execution state if this is the {@link #commonPool()}, and no
2932 * additional effect if already shut down. Otherwise, tasks that
2933 * are in the process of being submitted or executed concurrently
2934 * during the course of this method may or may not be
2935 * rejected. This method cancels both existing and unexecuted
2936 * tasks, in order to permit termination in the presence of task
2937 * dependencies. So the method always returns an empty list
2938 * (unlike the case for some other Executors).
2939 *
2940 * @return an empty list
2941 * @throws SecurityException if a security manager exists and
2942 * the caller is not permitted to modify threads
2943 * because it does not hold {@link
2944 * java.lang.RuntimePermission}{@code ("modifyThread")}
2945 */
2946 public List<Runnable> shutdownNow() {
2947 checkPermission();
2948 tryTerminate(true, true);
2949 return Collections.emptyList();
2950 }
2951
2952 /**
2953 * Returns {@code true} if all tasks have completed following shut down.
2954 *
2955 * @return {@code true} if all tasks have completed following shut down
2956 */
2957 public boolean isTerminated() {
2958 return (mode & TERMINATED) != 0;
2959 }
2960
2961 /**
2962 * Returns {@code true} if the process of termination has
2963 * commenced but not yet completed. This method may be useful for
2964 * debugging. A return of {@code true} reported a sufficient
2965 * period after shutdown may indicate that submitted tasks have
2966 * ignored or suppressed interruption, or are waiting for I/O,
2967 * causing this executor not to properly terminate. (See the
2968 * advisory notes for class {@link ForkJoinTask} stating that
2969 * tasks should not normally entail blocking operations. But if
2970 * they do, they must abort them on interrupt.)
2971 *
2972 * @return {@code true} if terminating but not yet terminated
2973 */
2974 public boolean isTerminating() {
2975 int md = mode;
2976 return (md & STOP) != 0 && (md & TERMINATED) == 0;
2977 }
2978
2979 /**
2980 * Returns {@code true} if this pool has been shut down.
2981 *
2982 * @return {@code true} if this pool has been shut down
2983 */
2984 public boolean isShutdown() {
2985 return (mode & SHUTDOWN) != 0;
2986 }
2987
2988 /**
2989 * Blocks until all tasks have completed execution after a
2990 * shutdown request, or the timeout occurs, or the current thread
2991 * is interrupted, whichever happens first. Because the {@link
2992 * #commonPool()} never terminates until program shutdown, when
2993 * applied to the common pool, this method is equivalent to {@link
2994 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2995 *
2996 * @param timeout the maximum time to wait
2997 * @param unit the time unit of the timeout argument
2998 * @return {@code true} if this executor terminated and
2999 * {@code false} if the timeout elapsed before termination
3000 * @throws InterruptedException if interrupted while waiting
3001 */
3002 public boolean awaitTermination(long timeout, TimeUnit unit)
3003 throws InterruptedException {
3004 if (Thread.interrupted())
3005 throw new InterruptedException();
3006 if (this == common) {
3007 awaitQuiescence(timeout, unit);
3008 return false;
3009 }
3010 long nanos = unit.toNanos(timeout);
3011 if (isTerminated())
3012 return true;
3013 if (nanos <= 0L)
3014 return false;
3015 long deadline = System.nanoTime() + nanos;
3016 synchronized (this) {
3017 for (;;) {
3018 if (isTerminated())
3019 return true;
3020 if (nanos <= 0L)
3021 return false;
3022 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3023 wait(millis > 0L ? millis : 1L);
3024 nanos = deadline - System.nanoTime();
3025 }
3026 }
3027 }
3028
3029 /**
3030 * If called by a ForkJoinTask operating in this pool, equivalent
3031 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3032 * waits and/or attempts to assist performing tasks until this
3033 * pool {@link #isQuiescent} or the indicated timeout elapses.
3034 *
3035 * @param timeout the maximum time to wait
3036 * @param unit the time unit of the timeout argument
3037 * @return {@code true} if quiescent; {@code false} if the
3038 * timeout elapsed.
3039 */
3040 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3041 long nanos = unit.toNanos(timeout);
3042 ForkJoinWorkerThread wt;
3043 Thread thread = Thread.currentThread();
3044 if ((thread instanceof ForkJoinWorkerThread) &&
3045 (wt = (ForkJoinWorkerThread)thread).pool == this) {
3046 helpQuiescePool(wt.workQueue);
3047 return true;
3048 }
3049 else {
3050 for (long startTime = System.nanoTime();;) {
3051 ForkJoinTask<?> t;
3052 if ((t = pollScan(false)) != null)
3053 t.doExec();
3054 else if (isQuiescent())
3055 return true;
3056 else if ((System.nanoTime() - startTime) > nanos)
3057 return false;
3058 else
3059 Thread.yield(); // cannot block
3060 }
3061 }
3062 }
3063
3064 /**
3065 * Waits and/or attempts to assist performing tasks indefinitely
3066 * until the {@link #commonPool()} {@link #isQuiescent}.
3067 */
3068 static void quiesceCommonPool() {
3069 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3070 }
3071
3072 /**
3073 * Interface for extending managed parallelism for tasks running
3074 * in {@link ForkJoinPool}s.
3075 *
3076 * <p>A {@code ManagedBlocker} provides two methods. Method
3077 * {@link #isReleasable} must return {@code true} if blocking is
3078 * not necessary. Method {@link #block} blocks the current thread
3079 * if necessary (perhaps internally invoking {@code isReleasable}
3080 * before actually blocking). These actions are performed by any
3081 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3082 * The unusual methods in this API accommodate synchronizers that
3083 * may, but don't usually, block for long periods. Similarly, they
3084 * allow more efficient internal handling of cases in which
3085 * additional workers may be, but usually are not, needed to
3086 * ensure sufficient parallelism. Toward this end,
3087 * implementations of method {@code isReleasable} must be amenable
3088 * to repeated invocation.
3089 *
3090 * <p>For example, here is a ManagedBlocker based on a
3091 * ReentrantLock:
3092 * <pre> {@code
3093 * class ManagedLocker implements ManagedBlocker {
3094 * final ReentrantLock lock;
3095 * boolean hasLock = false;
3096 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3097 * public boolean block() {
3098 * if (!hasLock)
3099 * lock.lock();
3100 * return true;
3101 * }
3102 * public boolean isReleasable() {
3103 * return hasLock || (hasLock = lock.tryLock());
3104 * }
3105 * }}</pre>
3106 *
3107 * <p>Here is a class that possibly blocks waiting for an
3108 * item on a given queue:
3109 * <pre> {@code
3110 * class QueueTaker<E> implements ManagedBlocker {
3111 * final BlockingQueue<E> queue;
3112 * volatile E item = null;
3113 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3114 * public boolean block() throws InterruptedException {
3115 * if (item == null)
3116 * item = queue.take();
3117 * return true;
3118 * }
3119 * public boolean isReleasable() {
3120 * return item != null || (item = queue.poll()) != null;
3121 * }
3122 * public E getItem() { // call after pool.managedBlock completes
3123 * return item;
3124 * }
3125 * }}</pre>
3126 */
3127 public static interface ManagedBlocker {
3128 /**
3129 * Possibly blocks the current thread, for example waiting for
3130 * a lock or condition.
3131 *
3132 * @return {@code true} if no additional blocking is necessary
3133 * (i.e., if isReleasable would return true)
3134 * @throws InterruptedException if interrupted while waiting
3135 * (the method is not required to do so, but is allowed to)
3136 */
3137 boolean block() throws InterruptedException;
3138
3139 /**
3140 * Returns {@code true} if blocking is unnecessary.
3141 * @return {@code true} if blocking is unnecessary
3142 */
3143 boolean isReleasable();
3144 }
3145
3146 /**
3147 * Runs the given possibly blocking task. When {@linkplain
3148 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3149 * method possibly arranges for a spare thread to be activated if
3150 * necessary to ensure sufficient parallelism while the current
3151 * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3152 *
3153 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3154 * {@code blocker.block()} until either method returns {@code true}.
3155 * Every call to {@code blocker.block()} is preceded by a call to
3156 * {@code blocker.isReleasable()} that returned {@code false}.
3157 *
3158 * <p>If not running in a ForkJoinPool, this method is
3159 * behaviorally equivalent to
3160 * <pre> {@code
3161 * while (!blocker.isReleasable())
3162 * if (blocker.block())
3163 * break;}</pre>
3164 *
3165 * If running in a ForkJoinPool, the pool may first be expanded to
3166 * ensure sufficient parallelism available during the call to
3167 * {@code blocker.block()}.
3168 *
3169 * @param blocker the blocker task
3170 * @throws InterruptedException if {@code blocker.block()} did so
3171 */
3172 public static void managedBlock(ManagedBlocker blocker)
3173 throws InterruptedException {
3174 ForkJoinPool p;
3175 ForkJoinWorkerThread wt;
3176 WorkQueue w;
3177 Thread t = Thread.currentThread();
3178 if ((t instanceof ForkJoinWorkerThread) &&
3179 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3180 (w = wt.workQueue) != null) {
3181 int block;
3182 while (!blocker.isReleasable()) {
3183 if ((block = p.tryCompensate(w)) != 0) {
3184 try {
3185 do {} while (!blocker.isReleasable() &&
3186 !blocker.block());
3187 } finally {
3188 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3189 }
3190 break;
3191 }
3192 }
3193 }
3194 else {
3195 do {} while (!blocker.isReleasable() &&
3196 !blocker.block());
3197 }
3198 }
3199
3200 // AbstractExecutorService overrides. These rely on undocumented
3201 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3202 // implement RunnableFuture.
3203
3204 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3205 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3206 }
3207
3208 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3209 return new ForkJoinTask.AdaptedCallable<T>(callable);
3210 }
3211
3212 // Unsafe mechanics
3213 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3214 private static final long CTL;
3215 private static final long MODE;
3216 private static final int ABASE;
3217 private static final int ASHIFT;
3218
3219 static {
3220 try {
3221 CTL = U.objectFieldOffset
3222 (ForkJoinPool.class.getDeclaredField("ctl"));
3223 MODE = U.objectFieldOffset
3224 (ForkJoinPool.class.getDeclaredField("mode"));
3225 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3226 int scale = U.arrayIndexScale(ForkJoinTask[].class);
3227 if ((scale & (scale - 1)) != 0)
3228 throw new Error("array index scale not a power of two");
3229 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3230 } catch (ReflectiveOperationException e) {
3231 throw new Error(e);
3232 }
3233
3234 // Reduce the risk of rare disastrous classloading in first call to
3235 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3236 Class<?> ensureLoaded = LockSupport.class;
3237
3238 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3239 try {
3240 String p = System.getProperty
3241 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3242 if (p != null)
3243 commonMaxSpares = Integer.parseInt(p);
3244 } catch (Exception ignore) {}
3245 COMMON_MAX_SPARES = commonMaxSpares;
3246
3247 defaultForkJoinWorkerThreadFactory =
3248 new DefaultForkJoinWorkerThreadFactory();
3249 modifyThreadPermission = new RuntimePermission("modifyThread");
3250
3251 common = java.security.AccessController.doPrivileged
3252 (new java.security.PrivilegedAction<ForkJoinPool>() {
3253 public ForkJoinPool run() {
3254 return new ForkJoinPool((byte)0); }});
3255
3256 COMMON_PARALLELISM = common.mode & SMASK;
3257 }
3258
3259 /**
3260 * Factory for innocuous worker threads.
3261 */
3262 private static final class InnocuousForkJoinWorkerThreadFactory
3263 implements ForkJoinWorkerThreadFactory {
3264
3265 /**
3266 * An ACC to restrict permissions for the factory itself.
3267 * The constructed workers have no permissions set.
3268 */
3269 private static final AccessControlContext innocuousAcc;
3270 static {
3271 Permissions innocuousPerms = new Permissions();
3272 innocuousPerms.add(modifyThreadPermission);
3273 innocuousPerms.add(new RuntimePermission(
3274 "enableContextClassLoaderOverride"));
3275 innocuousPerms.add(new RuntimePermission(
3276 "modifyThreadGroup"));
3277 innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3278 new ProtectionDomain(null, innocuousPerms)
3279 });
3280 }
3281
3282 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3283 return java.security.AccessController.doPrivileged(
3284 new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3285 public ForkJoinWorkerThread run() {
3286 return new ForkJoinWorkerThread.
3287 InnocuousForkJoinWorkerThread(pool);
3288 }}, innocuousAcc);
3289 }
3290 }
3291
3292 }