ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/ForkJoinPool.java
Revision: 1.8
Committed: Sat Mar 11 18:37:21 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.7: +1 -1 lines
Log Message:
make some methods static as suggested by errorprone [MethodCanBeStatic]

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.lang.Thread.UncaughtExceptionHandler;
10     import java.security.AccessControlContext;
11     import java.security.Permissions;
12     import java.security.ProtectionDomain;
13     import java.util.ArrayList;
14     import java.util.Collection;
15     import java.util.Collections;
16     import java.util.List;
17     import java.util.function.Predicate;
18     import java.util.concurrent.TimeUnit;
19     import java.util.concurrent.CountedCompleter;
20     import java.util.concurrent.ForkJoinTask;
21     import java.util.concurrent.ForkJoinWorkerThread;
22     import java.util.concurrent.locks.LockSupport;
23    
24     /**
25     * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26     * A {@code ForkJoinPool} provides the entry point for submissions
27     * from non-{@code ForkJoinTask} clients, as well as management and
28     * monitoring operations.
29     *
30     * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31     * ExecutorService} mainly by virtue of employing
32     * <em>work-stealing</em>: all threads in the pool attempt to find and
33     * execute tasks submitted to the pool and/or created by other active
34     * tasks (eventually blocking waiting for work if none exist). This
35     * enables efficient processing when most tasks spawn other subtasks
36     * (as do most {@code ForkJoinTask}s), as well as when many small
37     * tasks are submitted to the pool from external clients. Especially
38     * when setting <em>asyncMode</em> to true in constructors, {@code
39     * ForkJoinPool}s may also be appropriate for use with event-style
40     * tasks that are never joined.
41     *
42     * <p>A static {@link #commonPool()} is available and appropriate for
43     * most applications. The common pool is used by any ForkJoinTask that
44     * is not explicitly submitted to a specified pool. Using the common
45     * pool normally reduces resource usage (its threads are slowly
46     * reclaimed during periods of non-use, and reinstated upon subsequent
47     * use).
48     *
49     * <p>For applications that require separate or custom pools, a {@code
50     * ForkJoinPool} may be constructed with a given target parallelism
51     * level; by default, equal to the number of available processors.
52     * The pool attempts to maintain enough active (or available) threads
53     * by dynamically adding, suspending, or resuming internal worker
54     * threads, even if some tasks are stalled waiting to join others.
55     * However, no such adjustments are guaranteed in the face of blocked
56     * I/O or other unmanaged synchronization. The nested {@link
57     * ManagedBlocker} interface enables extension of the kinds of
58     * synchronization accommodated. The default policies may be
59     * overridden using a constructor with parameters corresponding to
60     * those documented in class {@link ThreadPoolExecutor}.
61     *
62     * <p>In addition to execution and lifecycle control methods, this
63     * class provides status check methods (for example
64     * {@link #getStealCount}) that are intended to aid in developing,
65     * tuning, and monitoring fork/join applications. Also, method
66     * {@link #toString} returns indications of pool state in a
67     * convenient form for informal monitoring.
68     *
69     * <p>As is the case with other ExecutorServices, there are three
70     * main task execution methods summarized in the following table.
71     * These are designed to be used primarily by clients not already
72     * engaged in fork/join computations in the current pool. The main
73     * forms of these methods accept instances of {@code ForkJoinTask},
74     * but overloaded forms also allow mixed execution of plain {@code
75     * Runnable}- or {@code Callable}- based activities as well. However,
76     * tasks that are already executing in a pool should normally instead
77     * use the within-computation forms listed in the table unless using
78     * async event-style tasks that are not usually joined, in which case
79     * there is little difference among choice of methods.
80     *
81     * <table BORDER CELLPADDING=3 CELLSPACING=1>
82     * <caption>Summary of task execution methods</caption>
83     * <tr>
84     * <td></td>
85     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87     * </tr>
88     * <tr>
89     * <td> <b>Arrange async execution</b></td>
90     * <td> {@link #execute(ForkJoinTask)}</td>
91     * <td> {@link ForkJoinTask#fork}</td>
92     * </tr>
93     * <tr>
94     * <td> <b>Await and obtain result</b></td>
95     * <td> {@link #invoke(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#invoke}</td>
97     * </tr>
98     * <tr>
99     * <td> <b>Arrange exec and obtain Future</b></td>
100     * <td> {@link #submit(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102     * </tr>
103     * </table>
104     *
105     * <p>The common pool is by default constructed with default
106     * parameters, but these may be controlled by setting three
107     * {@linkplain System#getProperty system properties}:
108     * <ul>
109     * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110     * - the parallelism level, a non-negative integer
111     * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112     * - the class name of a {@link ForkJoinWorkerThreadFactory}
113     * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114     * - the class name of a {@link UncaughtExceptionHandler}
115     * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116     * - the maximum number of allowed extra threads to maintain target
117     * parallelism (default 256).
118     * </ul>
119     * If a {@link SecurityManager} is present and no factory is
120     * specified, then the default pool uses a factory supplying
121     * threads that have no {@link Permissions} enabled.
122     * The system class loader is used to load these classes.
123     * Upon any error in establishing these settings, default parameters
124     * are used. It is possible to disable or limit the use of threads in
125     * the common pool by setting the parallelism property to zero, and/or
126     * using a factory that may return {@code null}. However doing so may
127     * cause unjoined tasks to never be executed.
128     *
129     * <p><b>Implementation notes</b>: This implementation restricts the
130     * maximum number of running threads to 32767. Attempts to create
131     * pools with greater than the maximum number result in
132     * {@code IllegalArgumentException}.
133     *
134     * <p>This implementation rejects submitted tasks (that is, by throwing
135     * {@link RejectedExecutionException}) only when the pool is shut down
136     * or internal resources have been exhausted.
137     *
138     * @since 1.7
139     * @author Doug Lea
140     */
141     public class ForkJoinPool extends AbstractExecutorService {
142    
143     /*
144     * Implementation Overview
145     *
146     * This class and its nested classes provide the main
147     * functionality and control for a set of worker threads:
148     * Submissions from non-FJ threads enter into submission queues.
149     * Workers take these tasks and typically split them into subtasks
150     * that may be stolen by other workers. Preference rules give
151     * first priority to processing tasks from their own queues (LIFO
152     * or FIFO, depending on mode), then to randomized FIFO steals of
153     * tasks in other queues. This framework began as vehicle for
154     * supporting tree-structured parallelism using work-stealing.
155     * Over time, its scalability advantages led to extensions and
156     * changes to better support more diverse usage contexts. Because
157     * most internal methods and nested classes are interrelated,
158     * their main rationale and descriptions are presented here;
159     * individual methods and nested classes contain only brief
160     * comments about details.
161     *
162     * WorkQueues
163     * ==========
164     *
165     * Most operations occur within work-stealing queues (in nested
166     * class WorkQueue). These are special forms of Deques that
167     * support only three of the four possible end-operations -- push,
168     * pop, and poll (aka steal), under the further constraints that
169     * push and pop are called only from the owning thread (or, as
170     * extended here, under a lock), while poll may be called from
171     * other threads. (If you are unfamiliar with them, you probably
172     * want to read Herlihy and Shavit's book "The Art of
173     * Multiprocessor programming", chapter 16 describing these in
174     * more detail before proceeding.) The main work-stealing queue
175     * design is roughly similar to those in the papers "Dynamic
176     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177     * (http://research.sun.com/scalable/pubs/index.html) and
178     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180     * The main differences ultimately stem from GC requirements that
181     * we null out taken slots as soon as we can, to maintain as small
182     * a footprint as possible even in programs generating huge
183     * numbers of tasks. To accomplish this, we shift the CAS
184     * arbitrating pop vs poll (steal) from being on the indices
185     * ("base" and "top") to the slots themselves.
186     *
187     * Adding tasks then takes the form of a classic array push(task)
188     * in a circular buffer:
189     * q.array[q.top++ % length] = task;
190     *
191     * (The actual code needs to null-check and size-check the array,
192     * uses masking, not mod, for indexing a power-of-two-sized array,
193     * properly fences accesses, and possibly signals waiting workers
194     * to start scanning -- see below.) Both a successful pop and
195     * poll mainly entail a CAS of a slot from non-null to null.
196     *
197     * The pop operation (always performed by owner) is:
198     * if ((the task at top slot is not null) and
199     * (CAS slot to null))
200     * decrement top and return task;
201     *
202     * And the poll operation (usually by a stealer) is
203     * if ((the task at base slot is not null) and
204     * (CAS slot to null))
205     * increment base and return task;
206     *
207     * There are several variants of each of these. In particular,
208     * almost all uses of poll occur within scan operations that also
209     * interleave contention tracking (with associated code sprawl.)
210     *
211     * Memory ordering. See "Correct and Efficient Work-Stealing for
212     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214     * analysis of memory ordering requirements in work-stealing
215     * algorithms similar to (but different than) the one used here.
216     * Extracting tasks in array slots via (fully fenced) CAS provides
217     * primary synchronization. The base and top indices imprecisely
218     * guide where to extract from. We do not always require strict
219     * orderings of array and index updates, so sometimes let them be
220     * subject to compiler and processor reorderings. However, the
221     * volatile "base" index also serves as a basis for memory
222     * ordering: Slot accesses are preceded by a read of base,
223     * ensuring happens-before ordering with respect to stealers (so
224     * the slots themselves can be read via plain array reads.) The
225     * only other memory orderings relied on are maintained in the
226     * course of signalling and activation (see below). A check that
227     * base == top indicates (momentary) emptiness, but otherwise may
228     * err on the side of possibly making the queue appear nonempty
229     * when a push, pop, or poll have not fully committed, or making
230     * it appear empty when an update of top has not yet been visibly
231     * written. (Method isEmpty() checks the case of a partially
232     * completed removal of the last element.) Because of this, the
233     * poll operation, considered individually, is not wait-free. One
234     * thief cannot successfully continue until another in-progress
235     * one (or, if previously empty, a push) visibly completes.
236     * However, in the aggregate, we ensure at least probabilistic
237     * non-blockingness. If an attempted steal fails, a scanning
238     * thief chooses a different random victim target to try next. So,
239     * in order for one thief to progress, it suffices for any
240     * in-progress poll or new push on any empty queue to
241     * complete.
242     *
243     * This approach also enables support of a user mode in which
244     * local task processing is in FIFO, not LIFO order, simply by
245     * using poll rather than pop. This can be useful in
246     * message-passing frameworks in which tasks are never joined.
247     *
248     * WorkQueues are also used in a similar way for tasks submitted
249     * to the pool. We cannot mix these tasks in the same queues used
250     * by workers. Instead, we randomly associate submission queues
251     * with submitting threads, using a form of hashing. The
252     * ThreadLocalRandom probe value serves as a hash code for
253     * choosing existing queues, and may be randomly repositioned upon
254     * contention with other submitters. In essence, submitters act
255     * like workers except that they are restricted to executing local
256     * tasks that they submitted. Insertion of tasks in shared mode
257     * requires a lock but we use only a simple spinlock (using field
258     * phase), because submitters encountering a busy queue move to a
259     * different position to use or create other queues -- they block
260     * only when creating and registering new queues. Because it is
261     * used only as a spinlock, unlocking requires only a "releasing"
262     * store (using putOrderedInt).
263     *
264     * Management
265     * ==========
266     *
267     * The main throughput advantages of work-stealing stem from
268     * decentralized control -- workers mostly take tasks from
269     * themselves or each other, at rates that can exceed a billion
270     * per second. The pool itself creates, activates (enables
271     * scanning for and running tasks), deactivates, blocks, and
272     * terminates threads, all with minimal central information.
273     * There are only a few properties that we can globally track or
274     * maintain, so we pack them into a small number of variables,
275     * often maintaining atomicity without blocking or locking.
276     * Nearly all essentially atomic control state is held in a few
277     * volatile variables that are by far most often read (not
278     * written) as status and consistency checks. We pack as much
279     * information into them as we can.
280     *
281     * Field "ctl" contains 64 bits holding information needed to
282     * atomically decide to add, enqueue (on an event queue), and
283     * dequeue (and release)-activate workers. To enable this
284     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285     * far in excess of normal operating range) to allow ids, counts,
286     * and their negations (used for thresholding) to fit into 16bit
287     * subfields.
288     *
289     * Field "mode" holds configuration parameters as well as lifetime
290     * status, atomically and monotonically setting SHUTDOWN, STOP,
291     * and finally TERMINATED bits.
292     *
293     * Field "workQueues" holds references to WorkQueues. It is
294     * updated (only during worker creation and termination) under
295     * lock (using field workerNamePrefix as lock), but is otherwise
296     * concurrently readable, and accessed directly. We also ensure
297     * that uses of the array reference itself never become too stale
298     * in case of resizing. To simplify index-based operations, the
299     * array size is always a power of two, and all readers must
300     * tolerate null slots. Worker queues are at odd indices. Shared
301     * (submission) queues are at even indices, up to a maximum of 64
302     * slots, to limit growth even if array needs to expand to add
303     * more workers. Grouping them together in this way simplifies and
304     * speeds up task scanning.
305     *
306     * All worker thread creation is on-demand, triggered by task
307     * submissions, replacement of terminated workers, and/or
308     * compensation for blocked workers. However, all other support
309     * code is set up to work with other policies. To ensure that we
310     * do not hold on to worker references that would prevent GC, all
311     * accesses to workQueues are via indices into the workQueues
312     * array (which is one source of some of the messy code
313     * constructions here). In essence, the workQueues array serves as
314     * a weak reference mechanism. Thus for example the stack top
315     * subfield of ctl stores indices, not references.
316     *
317     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318     * cannot let workers spin indefinitely scanning for tasks when
319     * none can be found immediately, and we cannot start/resume
320     * workers unless there appear to be tasks available. On the
321     * other hand, we must quickly prod them into action when new
322     * tasks are submitted or generated. In many usages, ramp-up time
323     * is the main limiting factor in overall performance, which is
324     * compounded at program start-up by JIT compilation and
325     * allocation. So we streamline this as much as possible.
326     *
327     * The "ctl" field atomically maintains total worker and
328     * "released" worker counts, plus the head of the available worker
329     * queue (actually stack, represented by the lower 32bit subfield
330     * of ctl). Released workers are those known to be scanning for
331     * and/or running tasks. Unreleased ("available") workers are
332     * recorded in the ctl stack. These workers are made available for
333     * signalling by enqueuing in ctl (see method runWorker). The
334     * "queue" is a form of Treiber stack. This is ideal for
335     * activating threads in most-recently used order, and improves
336     * performance and locality, outweighing the disadvantages of
337     * being prone to contention and inability to release a worker
338     * unless it is topmost on stack. To avoid missed signal problems
339     * inherent in any wait/signal design, available workers rescan
340     * for (and if found run) tasks after enqueuing. Normally their
341     * release status will be updated while doing so, but the released
342     * worker ctl count may underestimate the number of active
343     * threads. (However, it is still possible to determine quiescence
344     * via a validation traversal -- see isQuiescent). After an
345     * unsuccessful rescan, available workers are blocked until
346     * signalled (see signalWork). The top stack state holds the
347     * value of the "phase" field of the worker: its index and status,
348     * plus a version counter that, in addition to the count subfields
349     * (also serving as version stamps) provide protection against
350     * Treiber stack ABA effects.
351     *
352     * Creating workers. To create a worker, we pre-increment counts
353     * (serving as a reservation), and attempt to construct a
354     * ForkJoinWorkerThread via its factory. Upon construction, the
355     * new thread invokes registerWorker, where it constructs a
356     * WorkQueue and is assigned an index in the workQueues array
357     * (expanding the array if necessary). The thread is then started.
358     * Upon any exception across these steps, or null return from
359     * factory, deregisterWorker adjusts counts and records
360     * accordingly. If a null return, the pool continues running with
361     * fewer than the target number workers. If exceptional, the
362     * exception is propagated, generally to some external caller.
363     * Worker index assignment avoids the bias in scanning that would
364     * occur if entries were sequentially packed starting at the front
365     * of the workQueues array. We treat the array as a simple
366     * power-of-two hash table, expanding as needed. The seedIndex
367     * increment ensures no collisions until a resize is needed or a
368     * worker is deregistered and replaced, and thereafter keeps
369     * probability of collision low. We cannot use
370     * ThreadLocalRandom.getProbe() for similar purposes here because
371     * the thread has not started yet, but do so for creating
372     * submission queues for existing external threads (see
373     * externalPush).
374     *
375     * WorkQueue field "phase" is used by both workers and the pool to
376     * manage and track whether a worker is UNSIGNALLED (possibly
377     * blocked waiting for a signal). When a worker is enqueued its
378     * phase field is set. Note that phase field updates lag queue CAS
379     * releases so usage requires care -- seeing a negative phase does
380     * not guarantee that the worker is available. When queued, the
381     * lower 16 bits of scanState must hold its pool index. So we
382     * place the index there upon initialization (see registerWorker)
383     * and otherwise keep it there or restore it when necessary.
384     *
385     * The ctl field also serves as the basis for memory
386     * synchronization surrounding activation. This uses a more
387     * efficient version of a Dekker-like rule that task producers and
388     * consumers sync with each other by both writing/CASing ctl (even
389     * if to its current value). This would be extremely costly. So
390     * we relax it in several ways: (1) Producers only signal when
391     * their queue is empty. Other workers propagate this signal (in
392     * method scan) when they find tasks; to further reduce flailing,
393     * each worker signals only one other per activation. (2) Workers
394     * only enqueue after scanning (see below) and not finding any
395     * tasks. (3) Rather than CASing ctl to its current value in the
396     * common case where no action is required, we reduce write
397     * contention by equivalently prefacing signalWork when called by
398     * an external task producer using a memory access with
399     * full-volatile semantics or a "fullFence".
400     *
401     * Almost always, too many signals are issued. A task producer
402     * cannot in general tell if some existing worker is in the midst
403     * of finishing one task (or already scanning) and ready to take
404     * another without being signalled. So the producer might instead
405     * activate a different worker that does not find any work, and
406     * then inactivates. This scarcely matters in steady-state
407     * computations involving all workers, but can create contention
408     * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409     * computations involving only a few workers.
410     *
411     * Scanning. Method runWorker performs top-level scanning for
412     * tasks. Each scan traverses and tries to poll from each queue
413     * starting at a random index and circularly stepping. Scans are
414     * not performed in ideal random permutation order, to reduce
415     * cacheline contention. The pseudorandom generator need not have
416     * high-quality statistical properties in the long term, but just
417     * within computations; We use Marsaglia XorShifts (often via
418     * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419     * suffice. Scanning also employs contention reduction: When
420     * scanning workers fail to extract an apparently existing task,
421     * they soon restart at a different pseudorandom index. This
422     * improves throughput when many threads are trying to take tasks
423     * from few queues, which can be common in some usages. Scans do
424     * not otherwise explicitly take into account core affinities,
425     * loads, cache localities, etc, However, they do exploit temporal
426     * locality (which usually approximates these) by preferring to
427     * re-poll (at most #workers times) from the same queue after a
428     * successful poll before trying others.
429     *
430     * Trimming workers. To release resources after periods of lack of
431     * use, a worker starting to wait when the pool is quiescent will
432     * time out and terminate (see method scan) if the pool has
433     * remained quiescent for period given by field keepAlive.
434     *
435     * Shutdown and Termination. A call to shutdownNow invokes
436     * tryTerminate to atomically set a runState bit. The calling
437     * thread, as well as every other worker thereafter terminating,
438     * helps terminate others by cancelling their unprocessed tasks,
439     * and waking them up, doing so repeatedly until stable. Calls to
440     * non-abrupt shutdown() preface this by checking whether
441     * termination should commence by sweeping through queues (until
442     * stable) to ensure lack of in-flight submissions and workers
443     * about to process them before triggering the "STOP" phase of
444     * termination.
445     *
446     * Joining Tasks
447     * =============
448     *
449     * Any of several actions may be taken when one worker is waiting
450     * to join a task stolen (or always held) by another. Because we
451     * are multiplexing many tasks on to a pool of workers, we can't
452     * always just let them block (as in Thread.join). We also cannot
453     * just reassign the joiner's run-time stack with another and
454     * replace it later, which would be a form of "continuation", that
455     * even if possible is not necessarily a good idea since we may
456     * need both an unblocked task and its continuation to progress.
457     * Instead we combine two tactics:
458     *
459     * Helping: Arranging for the joiner to execute some task that it
460     * would be running if the steal had not occurred.
461     *
462     * Compensating: Unless there are already enough live threads,
463     * method tryCompensate() may create or re-activate a spare
464     * thread to compensate for blocked joiners until they unblock.
465     *
466     * A third form (implemented in tryRemoveAndExec) amounts to
467     * helping a hypothetical compensator: If we can readily tell that
468     * a possible action of a compensator is to steal and execute the
469     * task being joined, the joining thread can do so directly,
470     * without the need for a compensation thread.
471     *
472     * The ManagedBlocker extension API can't use helping so relies
473     * only on compensation in method awaitBlocker.
474     *
475     * The algorithm in awaitJoin entails a form of "linear helping".
476     * Each worker records (in field source) the id of the queue from
477     * which it last stole a task. The scan in method awaitJoin uses
478     * these markers to try to find a worker to help (i.e., steal back
479     * a task from and execute it) that could hasten completion of the
480     * actively joined task. Thus, the joiner executes a task that
481     * would be on its own local deque if the to-be-joined task had
482     * not been stolen. This is a conservative variant of the approach
483     * described in Wagner & Calder "Leapfrogging: a portable
484     * technique for implementing efficient futures" SIGPLAN Notices,
485     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486     * mainly in that we only record queue ids, not full dependency
487     * links. This requires a linear scan of the workQueues array to
488     * locate stealers, but isolates cost to when it is needed, rather
489     * than adding to per-task overhead. Searches can fail to locate
490     * stealers GC stalls and the like delay recording sources.
491     * Further, even when accurately identified, stealers might not
492     * ever produce a task that the joiner can in turn help with. So,
493     * compensation is tried upon failure to find tasks to run.
494     *
495     * Compensation does not by default aim to keep exactly the target
496     * parallelism number of unblocked threads running at any given
497     * time. Some previous versions of this class employed immediate
498     * compensations for any blocked join. However, in practice, the
499     * vast majority of blockages are transient byproducts of GC and
500     * other JVM or OS activities that are made worse by replacement.
501     * Rather than impose arbitrary policies, we allow users to
502     * override the default of only adding threads upon apparent
503     * starvation. The compensation mechanism may also be bounded.
504     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505     * JVMs to cope with programming errors and abuse before running
506     * out of resources to do so.
507     *
508     * Common Pool
509     * ===========
510     *
511     * The static common pool always exists after static
512     * initialization. Since it (or any other created pool) need
513     * never be used, we minimize initial construction overhead and
514     * footprint to the setup of about a dozen fields.
515     *
516     * When external threads submit to the common pool, they can
517     * perform subtask processing (see externalHelpComplete and
518     * related methods) upon joins. This caller-helps policy makes it
519     * sensible to set common pool parallelism level to one (or more)
520     * less than the total number of available cores, or even zero for
521     * pure caller-runs. We do not need to record whether external
522     * submissions are to the common pool -- if not, external help
523     * methods return quickly. These submitters would otherwise be
524     * blocked waiting for completion, so the extra effort (with
525     * liberally sprinkled task status checks) in inapplicable cases
526     * amounts to an odd form of limited spin-wait before blocking in
527     * ForkJoinTask.join.
528     *
529     * As a more appropriate default in managed environments, unless
530     * overridden by system properties, we use workers of subclass
531     * InnocuousForkJoinWorkerThread when there is a SecurityManager
532     * present. These workers have no permissions set, do not belong
533     * to any user-defined ThreadGroup, and erase all ThreadLocals
534     * after executing any top-level task (see
535     * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536     * in ForkJoinWorkerThread) may be JVM-dependent and must access
537     * particular Thread class fields to achieve this effect.
538     *
539     * Style notes
540     * ===========
541     *
542     * Memory ordering relies mainly on Unsafe intrinsics that carry
543     * the further responsibility of explicitly performing null- and
544     * bounds- checks otherwise carried out implicitly by JVMs. This
545     * can be awkward and ugly, but also reflects the need to control
546     * outcomes across the unusual cases that arise in very racy code
547     * with very few invariants. So these explicit checks would exist
548     * in some form anyway. All fields are read into locals before
549     * use, and null-checked if they are references. This is usually
550     * done in a "C"-like style of listing declarations at the heads
551     * of methods or blocks, and using inline assignments on first
552     * encounter. Array bounds-checks are usually performed by
553     * masking with array.length-1, which relies on the invariant that
554     * these arrays are created with positive lengths, which is itself
555     * paranoically checked. Nearly all explicit checks lead to
556     * bypass/return, not exception throws, because they may
557     * legitimately arise due to cancellation/revocation during
558     * shutdown.
559     *
560     * There is a lot of representation-level coupling among classes
561     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562     * fields of WorkQueue maintain data structures managed by
563     * ForkJoinPool, so are directly accessed. There is little point
564     * trying to reduce this, since any associated future changes in
565     * representations will need to be accompanied by algorithmic
566     * changes anyway. Several methods intrinsically sprawl because
567     * they must accumulate sets of consistent reads of fields held in
568     * local variables. There are also other coding oddities
569     * (including several unnecessary-looking hoisted null checks)
570     * that help some methods perform reasonably even when interpreted
571     * (not compiled).
572     *
573     * The order of declarations in this file is (with a few exceptions):
574     * (1) Static utility functions
575     * (2) Nested (static) classes
576     * (3) Static fields
577     * (4) Fields, along with constants used when unpacking some of them
578     * (5) Internal control methods
579     * (6) Callbacks and other support for ForkJoinTask methods
580     * (7) Exported methods
581     * (8) Static block initializing statics in minimally dependent order
582     */
583    
584     // Static utilities
585    
586     /**
587     * If there is a security manager, makes sure caller has
588     * permission to modify threads.
589     */
590     private static void checkPermission() {
591     SecurityManager security = System.getSecurityManager();
592     if (security != null)
593     security.checkPermission(modifyThreadPermission);
594     }
595    
596     // Nested classes
597    
598     /**
599     * Factory for creating new {@link ForkJoinWorkerThread}s.
600     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601     * for {@code ForkJoinWorkerThread} subclasses that extend base
602     * functionality or initialize threads with different contexts.
603     */
604     public static interface ForkJoinWorkerThreadFactory {
605     /**
606     * Returns a new worker thread operating in the given pool.
607     * Returning null or throwing an exception may result in tasks
608     * never being executed. If this method throws an exception,
609     * it is relayed to the caller of the method (for example
610     * {@code execute}) causing attempted thread creation. If this
611     * method returns null or throws an exception, it is not
612     * retried until the next attempted creation (for example
613     * another call to {@code execute}).
614     *
615     * @param pool the pool this thread works in
616     * @return the new worker thread, or {@code null} if the request
617     * to create a thread is rejected.
618     * @throws NullPointerException if the pool is null
619     */
620     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621     }
622    
623     /**
624     * Default ForkJoinWorkerThreadFactory implementation; creates a
625     * new ForkJoinWorkerThread.
626     */
627     private static final class DefaultForkJoinWorkerThreadFactory
628     implements ForkJoinWorkerThreadFactory {
629     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630     return new ForkJoinWorkerThread(pool);
631     }
632     }
633    
634     // Constants shared across ForkJoinPool and WorkQueue
635    
636     // Bounds
637     static final int SWIDTH = 16; // width of short
638     static final int SMASK = 0xffff; // short bits == max index
639     static final int MAX_CAP = 0x7fff; // max #workers - 1
640     static final int SQMASK = 0x007e; // max 64 (even) slots
641    
642     // Masks and units for WorkQueue.phase and ctl sp subfield
643     static final int UNSIGNALLED = 1 << 31; // must be negative
644     static final int SS_SEQ = 1 << 16; // version count
645     static final int QLOCK = 1; // must be 1
646    
647     // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648     static final int OWNED = 1; // queue has owner thread
649     static final int FIFO = 1 << 16; // fifo queue or access mode
650     static final int SHUTDOWN = 1 << 18;
651     static final int TERMINATED = 1 << 19;
652     static final int STOP = 1 << 31; // must be negative
653     static final int QUIET = 1 << 30; // not scanning or working
654     static final int DORMANT = QUIET | UNSIGNALLED;
655    
656     /**
657     * The maximum number of local polls from the same queue before
658     * checking others. This is a safeguard against infinitely unfair
659     * looping under unbounded user task recursion, and must be larger
660     * than plausible cases of intentional bounded task recursion.
661     */
662     static final int POLL_LIMIT = 1 << 10;
663    
664     /**
665     * Queues supporting work-stealing as well as external task
666     * submission. See above for descriptions and algorithms.
667     * Performance on most platforms is very sensitive to placement of
668     * instances of both WorkQueues and their arrays -- we absolutely
669     * do not want multiple WorkQueue instances or multiple queue
670     * arrays sharing cache lines. The @Contended annotation alerts
671     * JVMs to try to keep instances apart.
672     */
673     // For now, using manual padding.
674     // @jdk.internal.vm.annotation.Contended
675     // @sun.misc.Contended
676     static final class WorkQueue {
677    
678     /**
679     * Capacity of work-stealing queue array upon initialization.
680     * Must be a power of two; at least 4, but should be larger to
681     * reduce or eliminate cacheline sharing among queues.
682     * Currently, it is much larger, as a partial workaround for
683     * the fact that JVMs often place arrays in locations that
684     * share GC bookkeeping (especially cardmarks) such that
685     * per-write accesses encounter serious memory contention.
686     */
687     static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
688    
689     /**
690     * Maximum size for queue arrays. Must be a power of two less
691     * than or equal to 1 << (31 - width of array entry) to ensure
692     * lack of wraparound of index calculations, but defined to a
693     * value a bit less than this to help users trap runaway
694     * programs before saturating systems.
695     */
696     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
697    
698     // Instance fields
699     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
700     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
701     volatile int phase; // versioned, negative: queued, 1: locked
702     int stackPred; // pool stack (ctl) predecessor link
703     int nsteals; // number of steals
704     int id; // index, mode, tag
705     volatile int source; // source queue id, or sentinel
706     volatile int base; // index of next slot for poll
707     int top; // index of next slot for push
708     ForkJoinTask<?>[] array; // the elements (initially unallocated)
709     final ForkJoinPool pool; // the containing pool (may be null)
710     final ForkJoinWorkerThread owner; // owning thread or null if shared
711     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
712     volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
713    
714     WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
715     this.pool = pool;
716     this.owner = owner;
717     // Place indices in the center of array (that is not yet allocated)
718     base = top = INITIAL_QUEUE_CAPACITY >>> 1;
719     }
720    
721     /**
722     * Returns an exportable index (used by ForkJoinWorkerThread).
723     */
724     final int getPoolIndex() {
725     return (id & 0xffff) >>> 1; // ignore odd/even tag bit
726     }
727    
728     /**
729     * Returns the approximate number of tasks in the queue.
730     */
731     final int queueSize() {
732     int n = base - top; // read base first
733     return (n >= 0) ? 0 : -n; // ignore transient negative
734     }
735    
736     /**
737     * Provides a more accurate estimate of whether this queue has
738     * any tasks than does queueSize, by checking whether a
739     * near-empty queue has at least one unclaimed task.
740     */
741     final boolean isEmpty() {
742     ForkJoinTask<?>[] a; int n, al, b;
743     return ((n = (b = base) - top) >= 0 || // possibly one task
744     (n == -1 && ((a = array) == null ||
745     (al = a.length) == 0 ||
746     a[(al - 1) & b] == null)));
747     }
748    
749    
750     /**
751     * Pushes a task. Call only by owner in unshared queues.
752     *
753     * @param task the task. Caller must ensure non-null.
754     * @throws RejectedExecutionException if array cannot be resized
755     */
756     final void push(ForkJoinTask<?> task) {
757     int s = top; ForkJoinTask<?>[] a; int al, d;
758     if ((a = array) != null && (al = a.length) > 0) {
759     int index = (al - 1) & s;
760     long offset = ((long)index << ASHIFT) + ABASE;
761     ForkJoinPool p = pool;
762     top = s + 1;
763     U.putOrderedObject(a, offset, task);
764     if ((d = base - s) == 0 && p != null) {
765     U.fullFence();
766     p.signalWork();
767     }
768     else if (d + al == 1)
769     growArray();
770     }
771     }
772    
773     /**
774     * Initializes or doubles the capacity of array. Call either
775     * by owner or with lock held -- it is OK for base, but not
776     * top, to move while resizings are in progress.
777     */
778     final ForkJoinTask<?>[] growArray() {
779     ForkJoinTask<?>[] oldA = array;
780     int oldSize = oldA != null ? oldA.length : 0;
781     int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
782     if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
783     throw new RejectedExecutionException("Queue capacity exceeded");
784     int oldMask, t, b;
785     ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
786     if (oldA != null && (oldMask = oldSize - 1) > 0 &&
787     (t = top) - (b = base) > 0) {
788     int mask = size - 1;
789     do { // emulate poll from old array, push to new array
790     int index = b & oldMask;
791     long offset = ((long)index << ASHIFT) + ABASE;
792     ForkJoinTask<?> x = (ForkJoinTask<?>)
793     U.getObjectVolatile(oldA, offset);
794     if (x != null &&
795     U.compareAndSwapObject(oldA, offset, x, null))
796     a[b & mask] = x;
797     } while (++b != t);
798     U.storeFence();
799     }
800     return a;
801     }
802    
803     /**
804     * Takes next task, if one exists, in LIFO order. Call only
805     * by owner in unshared queues.
806     */
807     final ForkJoinTask<?> pop() {
808     int b = base, s = top, al, i; ForkJoinTask<?>[] a;
809     if ((a = array) != null && b != s && (al = a.length) > 0) {
810     int index = (al - 1) & --s;
811     long offset = ((long)index << ASHIFT) + ABASE;
812     ForkJoinTask<?> t = (ForkJoinTask<?>)
813     U.getObject(a, offset);
814     if (t != null &&
815     U.compareAndSwapObject(a, offset, t, null)) {
816     top = s;
817     U.storeFence();
818     return t;
819     }
820     }
821     return null;
822     }
823    
824     /**
825     * Takes next task, if one exists, in FIFO order.
826     */
827     final ForkJoinTask<?> poll() {
828     for (;;) {
829     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
830     if ((a = array) != null && (d = b - s) < 0 &&
831     (al = a.length) > 0) {
832     int index = (al - 1) & b;
833     long offset = ((long)index << ASHIFT) + ABASE;
834     ForkJoinTask<?> t = (ForkJoinTask<?>)
835     U.getObjectVolatile(a, offset);
836     if (b++ == base) {
837     if (t != null) {
838     if (U.compareAndSwapObject(a, offset, t, null)) {
839     base = b;
840     return t;
841     }
842     }
843     else if (d == -1)
844     break; // now empty
845     }
846     }
847     else
848     break;
849     }
850     return null;
851     }
852    
853     /**
854     * Takes next task, if one exists, in order specified by mode.
855     */
856     final ForkJoinTask<?> nextLocalTask() {
857     return ((id & FIFO) != 0) ? poll() : pop();
858     }
859    
860     /**
861     * Returns next task, if one exists, in order specified by mode.
862     */
863     final ForkJoinTask<?> peek() {
864     int al; ForkJoinTask<?>[] a;
865     return ((a = array) != null && (al = a.length) > 0) ?
866     a[(al - 1) &
867     ((id & FIFO) != 0 ? base : top - 1)] : null;
868     }
869    
870     /**
871     * Pops the given task only if it is at the current top.
872     */
873     final boolean tryUnpush(ForkJoinTask<?> task) {
874     int b = base, s = top, al; ForkJoinTask<?>[] a;
875     if ((a = array) != null && b != s && (al = a.length) > 0) {
876     int index = (al - 1) & --s;
877     long offset = ((long)index << ASHIFT) + ABASE;
878     if (U.compareAndSwapObject(a, offset, task, null)) {
879     top = s;
880     U.storeFence();
881     return true;
882     }
883     }
884     return false;
885     }
886    
887     /**
888     * Removes and cancels all known tasks, ignoring any exceptions.
889     */
890     final void cancelAll() {
891     for (ForkJoinTask<?> t; (t = poll()) != null; )
892     ForkJoinTask.cancelIgnoringExceptions(t);
893     }
894    
895     // Specialized execution methods
896    
897     /**
898     * Pops and executes up to limit consecutive tasks or until empty.
899     *
900     * @param limit max runs, or zero for no limit
901     */
902     final void localPopAndExec(int limit) {
903     for (;;) {
904     int b = base, s = top, al; ForkJoinTask<?>[] a;
905     if ((a = array) != null && b != s && (al = a.length) > 0) {
906     int index = (al - 1) & --s;
907     long offset = ((long)index << ASHIFT) + ABASE;
908     ForkJoinTask<?> t = (ForkJoinTask<?>)
909     U.getAndSetObject(a, offset, null);
910     if (t != null) {
911     top = s;
912     U.storeFence();
913     t.doExec();
914     if (limit != 0 && --limit == 0)
915     break;
916     }
917     else
918     break;
919     }
920     else
921     break;
922     }
923     }
924    
925     /**
926     * Polls and executes up to limit consecutive tasks or until empty.
927     *
928     * @param limit, or zero for no limit
929     */
930     final void localPollAndExec(int limit) {
931     for (int polls = 0;;) {
932     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
933     if ((a = array) != null && (d = b - s) < 0 &&
934     (al = a.length) > 0) {
935     int index = (al - 1) & b++;
936     long offset = ((long)index << ASHIFT) + ABASE;
937     ForkJoinTask<?> t = (ForkJoinTask<?>)
938     U.getAndSetObject(a, offset, null);
939     if (t != null) {
940     base = b;
941     t.doExec();
942     if (limit != 0 && ++polls == limit)
943     break;
944     }
945     else if (d == -1)
946     break; // now empty
947     else
948     polls = 0; // stolen; reset
949     }
950     else
951     break;
952     }
953     }
954    
955     /**
956     * If present, removes task from queue and executes it.
957     */
958     final void tryRemoveAndExec(ForkJoinTask<?> task) {
959     ForkJoinTask<?>[] wa; int s, wal;
960     if (base - (s = top) < 0 && // traverse from top
961     (wa = array) != null && (wal = wa.length) > 0) {
962     for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
963     int index = i & m;
964     long offset = (index << ASHIFT) + ABASE;
965     ForkJoinTask<?> t = (ForkJoinTask<?>)
966     U.getObject(wa, offset);
967     if (t == null)
968     break;
969     else if (t == task) {
970     if (U.compareAndSwapObject(wa, offset, t, null)) {
971     top = ns; // safely shift down
972     for (int j = i; j != ns; ++j) {
973     ForkJoinTask<?> f;
974     int pindex = (j + 1) & m;
975     long pOffset = (pindex << ASHIFT) + ABASE;
976     f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
977     U.putObjectVolatile(wa, pOffset, null);
978    
979     int jindex = j & m;
980     long jOffset = (jindex << ASHIFT) + ABASE;
981     U.putOrderedObject(wa, jOffset, f);
982     }
983     U.storeFence();
984     t.doExec();
985     }
986     break;
987     }
988     }
989     }
990     }
991    
992     /**
993     * Tries to steal and run tasks within the target's
994     * computation until done, not found, or limit exceeded.
995     *
996     * @param task root of CountedCompleter computation
997     * @param limit max runs, or zero for no limit
998     * @return task status on exit
999     */
1000     final int localHelpCC(CountedCompleter<?> task, int limit) {
1001     int status = 0;
1002     if (task != null && (status = task.status) >= 0) {
1003     for (;;) {
1004     boolean help = false;
1005     int b = base, s = top, al; ForkJoinTask<?>[] a;
1006     if ((a = array) != null && b != s && (al = a.length) > 0) {
1007     int index = (al - 1) & (s - 1);
1008     long offset = ((long)index << ASHIFT) + ABASE;
1009     ForkJoinTask<?> o = (ForkJoinTask<?>)
1010     U.getObject(a, offset);
1011     if (o instanceof CountedCompleter) {
1012     CountedCompleter<?> t = (CountedCompleter<?>)o;
1013     for (CountedCompleter<?> f = t;;) {
1014     if (f != task) {
1015     if ((f = f.completer) == null) // try parent
1016     break;
1017     }
1018     else {
1019     if (U.compareAndSwapObject(a, offset,
1020     t, null)) {
1021     top = s - 1;
1022     U.storeFence();
1023     t.doExec();
1024     help = true;
1025     }
1026     break;
1027     }
1028     }
1029     }
1030     }
1031     if ((status = task.status) < 0 || !help ||
1032     (limit != 0 && --limit == 0))
1033     break;
1034     }
1035     }
1036     return status;
1037     }
1038    
1039     // Operations on shared queues
1040    
1041     /**
1042     * Tries to lock shared queue by CASing phase field.
1043     */
1044     final boolean tryLockSharedQueue() {
1045     return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1046     }
1047    
1048     /**
1049     * Shared version of tryUnpush.
1050     */
1051     final boolean trySharedUnpush(ForkJoinTask<?> task) {
1052     boolean popped = false;
1053     int s = top - 1, al; ForkJoinTask<?>[] a;
1054     if ((a = array) != null && (al = a.length) > 0) {
1055     int index = (al - 1) & s;
1056     long offset = ((long)index << ASHIFT) + ABASE;
1057     ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1058     if (t == task &&
1059     U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1060     if (top == s + 1 && array == a &&
1061     U.compareAndSwapObject(a, offset, task, null)) {
1062     popped = true;
1063     top = s;
1064     }
1065     U.putOrderedInt(this, PHASE, 0);
1066     }
1067     }
1068     return popped;
1069     }
1070    
1071     /**
1072     * Shared version of localHelpCC.
1073     */
1074     final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1075     int status = 0;
1076     if (task != null && (status = task.status) >= 0) {
1077     for (;;) {
1078     boolean help = false;
1079     int b = base, s = top, al; ForkJoinTask<?>[] a;
1080     if ((a = array) != null && b != s && (al = a.length) > 0) {
1081     int index = (al - 1) & (s - 1);
1082     long offset = ((long)index << ASHIFT) + ABASE;
1083     ForkJoinTask<?> o = (ForkJoinTask<?>)
1084     U.getObject(a, offset);
1085     if (o instanceof CountedCompleter) {
1086     CountedCompleter<?> t = (CountedCompleter<?>)o;
1087     for (CountedCompleter<?> f = t;;) {
1088     if (f != task) {
1089     if ((f = f.completer) == null)
1090     break;
1091     }
1092     else {
1093     if (U.compareAndSwapInt(this, PHASE,
1094     0, QLOCK)) {
1095     if (top == s && array == a &&
1096     U.compareAndSwapObject(a, offset,
1097     t, null)) {
1098     help = true;
1099     top = s - 1;
1100     }
1101     U.putOrderedInt(this, PHASE, 0);
1102     if (help)
1103     t.doExec();
1104     }
1105     break;
1106     }
1107     }
1108     }
1109     }
1110     if ((status = task.status) < 0 || !help ||
1111     (limit != 0 && --limit == 0))
1112     break;
1113     }
1114     }
1115     return status;
1116     }
1117    
1118     /**
1119     * Returns true if owned and not known to be blocked.
1120     */
1121     final boolean isApparentlyUnblocked() {
1122     Thread wt; Thread.State s;
1123     return ((wt = owner) != null &&
1124     (s = wt.getState()) != Thread.State.BLOCKED &&
1125     s != Thread.State.WAITING &&
1126     s != Thread.State.TIMED_WAITING);
1127     }
1128    
1129     // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1130     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1131     private static final long PHASE;
1132     private static final int ABASE;
1133     private static final int ASHIFT;
1134     static {
1135     try {
1136     PHASE = U.objectFieldOffset
1137     (WorkQueue.class.getDeclaredField("phase"));
1138     ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1139     int scale = U.arrayIndexScale(ForkJoinTask[].class);
1140     if ((scale & (scale - 1)) != 0)
1141     throw new Error("array index scale not a power of two");
1142     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1143     } catch (ReflectiveOperationException e) {
1144     throw new Error(e);
1145     }
1146     }
1147     }
1148    
1149     // static fields (initialized in static initializer below)
1150    
1151     /**
1152     * Creates a new ForkJoinWorkerThread. This factory is used unless
1153     * overridden in ForkJoinPool constructors.
1154     */
1155     public static final ForkJoinWorkerThreadFactory
1156     defaultForkJoinWorkerThreadFactory;
1157    
1158     /**
1159     * Permission required for callers of methods that may start or
1160     * kill threads.
1161     */
1162     static final RuntimePermission modifyThreadPermission;
1163    
1164     /**
1165     * Common (static) pool. Non-null for public use unless a static
1166     * construction exception, but internal usages null-check on use
1167     * to paranoically avoid potential initialization circularities
1168     * as well as to simplify generated code.
1169     */
1170     static final ForkJoinPool common;
1171    
1172     /**
1173     * Common pool parallelism. To allow simpler use and management
1174     * when common pool threads are disabled, we allow the underlying
1175     * common.parallelism field to be zero, but in that case still report
1176     * parallelism as 1 to reflect resulting caller-runs mechanics.
1177     */
1178     static final int COMMON_PARALLELISM;
1179    
1180     /**
1181     * Limit on spare thread construction in tryCompensate.
1182     */
1183     private static final int COMMON_MAX_SPARES;
1184    
1185     /**
1186     * Sequence number for creating workerNamePrefix.
1187     */
1188     private static int poolNumberSequence;
1189    
1190     /**
1191     * Returns the next sequence number. We don't expect this to
1192     * ever contend, so use simple builtin sync.
1193     */
1194     private static final synchronized int nextPoolId() {
1195     return ++poolNumberSequence;
1196     }
1197    
1198     // static configuration constants
1199    
1200     /**
1201     * Default idle timeout value (in milliseconds) for the thread
1202     * triggering quiescence to park waiting for new work
1203     */
1204     private static final long DEFAULT_KEEPALIVE = 60000L;
1205    
1206     /**
1207     * Undershoot tolerance for idle timeouts
1208     */
1209     private static final long TIMEOUT_SLOP = 20L;
1210    
1211     /**
1212     * The default value for COMMON_MAX_SPARES. Overridable using the
1213     * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1214     * property. The default value is far in excess of normal
1215     * requirements, but also far short of MAX_CAP and typical OS
1216     * thread limits, so allows JVMs to catch misuse/abuse before
1217     * running out of resources needed to do so.
1218     */
1219     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1220    
1221     /**
1222     * Increment for seed generators. See class ThreadLocal for
1223     * explanation.
1224     */
1225     private static final int SEED_INCREMENT = 0x9e3779b9;
1226    
1227     /*
1228     * Bits and masks for field ctl, packed with 4 16 bit subfields:
1229     * RC: Number of released (unqueued) workers minus target parallelism
1230     * TC: Number of total workers minus target parallelism
1231     * SS: version count and status of top waiting thread
1232     * ID: poolIndex of top of Treiber stack of waiters
1233     *
1234     * When convenient, we can extract the lower 32 stack top bits
1235     * (including version bits) as sp=(int)ctl. The offsets of counts
1236     * by the target parallelism and the positionings of fields makes
1237     * it possible to perform the most common checks via sign tests of
1238     * fields: When ac is negative, there are not enough unqueued
1239     * workers, when tc is negative, there are not enough total
1240     * workers. When sp is non-zero, there are waiting workers. To
1241     * deal with possibly negative fields, we use casts in and out of
1242     * "short" and/or signed shifts to maintain signedness.
1243     *
1244     * Because it occupies uppermost bits, we can add one release count
1245     * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1246     * from a blocked join. Other updates entail multiple subfields
1247     * and masking, requiring CAS.
1248     *
1249     * The limits packed in field "bounds" are also offset by the
1250     * parallelism level to make them comparable to the ctl rc and tc
1251     * fields.
1252     */
1253    
1254     // Lower and upper word masks
1255     private static final long SP_MASK = 0xffffffffL;
1256     private static final long UC_MASK = ~SP_MASK;
1257    
1258     // Release counts
1259     private static final int RC_SHIFT = 48;
1260     private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1261     private static final long RC_MASK = 0xffffL << RC_SHIFT;
1262    
1263     // Total counts
1264     private static final int TC_SHIFT = 32;
1265     private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1266     private static final long TC_MASK = 0xffffL << TC_SHIFT;
1267     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1268    
1269     // Instance fields
1270    
1271     // Segregate ctl field, For now using padding vs @Contended
1272     // @jdk.internal.vm.annotation.Contended("fjpctl")
1273     // @sun.misc.Contended("fjpctl")
1274     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1275     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1276     volatile long ctl; // main pool control
1277     volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1278     volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1279    
1280     volatile long stealCount; // collects worker nsteals
1281     final long keepAlive; // milliseconds before dropping if idle
1282     int indexSeed; // next worker index
1283     final int bounds; // min, max threads packed as shorts
1284     volatile int mode; // parallelism, runstate, queue mode
1285     WorkQueue[] workQueues; // main registry
1286     final String workerNamePrefix; // for worker thread string; sync lock
1287     final ForkJoinWorkerThreadFactory factory;
1288     final UncaughtExceptionHandler ueh; // per-worker UEH
1289     final Predicate<? super ForkJoinPool> saturate;
1290    
1291     // Creating, registering and deregistering workers
1292    
1293     /**
1294     * Tries to construct and start one worker. Assumes that total
1295     * count has already been incremented as a reservation. Invokes
1296     * deregisterWorker on any failure.
1297     *
1298     * @return true if successful
1299     */
1300     private boolean createWorker() {
1301     ForkJoinWorkerThreadFactory fac = factory;
1302     Throwable ex = null;
1303     ForkJoinWorkerThread wt = null;
1304     try {
1305     if (fac != null && (wt = fac.newThread(this)) != null) {
1306     wt.start();
1307     return true;
1308     }
1309     } catch (Throwable rex) {
1310     ex = rex;
1311     }
1312     deregisterWorker(wt, ex);
1313     return false;
1314     }
1315    
1316     /**
1317     * Tries to add one worker, incrementing ctl counts before doing
1318     * so, relying on createWorker to back out on failure.
1319     *
1320     * @param c incoming ctl value, with total count negative and no
1321     * idle workers. On CAS failure, c is refreshed and retried if
1322     * this holds (otherwise, a new worker is not needed).
1323     */
1324     private void tryAddWorker(long c) {
1325     do {
1326     long nc = ((RC_MASK & (c + RC_UNIT)) |
1327     (TC_MASK & (c + TC_UNIT)));
1328     if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329     createWorker();
1330     break;
1331     }
1332     } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333     }
1334    
1335     /**
1336     * Callback from ForkJoinWorkerThread constructor to establish and
1337     * record its WorkQueue.
1338     *
1339     * @param wt the worker thread
1340     * @return the worker's queue
1341     */
1342     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343     UncaughtExceptionHandler handler;
1344     wt.setDaemon(true); // configure thread
1345     if ((handler = ueh) != null)
1346     wt.setUncaughtExceptionHandler(handler);
1347     WorkQueue w = new WorkQueue(this, wt);
1348     int tid = 0; // for thread name
1349     int fifo = mode & FIFO;
1350     String prefix = workerNamePrefix;
1351     if (prefix != null) {
1352     synchronized (prefix) {
1353     WorkQueue[] ws = workQueues; int n;
1354     int s = indexSeed += SEED_INCREMENT;
1355     if (ws != null && (n = ws.length) > 1) {
1356     int m = n - 1;
1357     tid = s & m;
1358     int i = m & ((s << 1) | 1); // odd-numbered indices
1359     for (int probes = n >>> 1;;) { // find empty slot
1360     WorkQueue q;
1361     if ((q = ws[i]) == null || q.phase == QUIET)
1362     break;
1363     else if (--probes == 0) {
1364     i = n | 1; // resize below
1365     break;
1366     }
1367     else
1368     i = (i + 2) & m;
1369     }
1370    
1371     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372     w.phase = w.id = id; // now publishable
1373    
1374     if (i < n)
1375     ws[i] = w;
1376     else { // expand array
1377     int an = n << 1;
1378     WorkQueue[] as = new WorkQueue[an];
1379     as[i] = w;
1380     int am = an - 1;
1381     for (int j = 0; j < n; ++j) {
1382     WorkQueue v; // copy external queue
1383     if ((v = ws[j]) != null) // position may change
1384     as[v.id & am & SQMASK] = v;
1385     if (++j >= n)
1386     break;
1387     as[j] = ws[j]; // copy worker
1388     }
1389     workQueues = as;
1390     }
1391     }
1392     }
1393     wt.setName(prefix.concat(Integer.toString(tid)));
1394     }
1395     return w;
1396     }
1397    
1398     /**
1399     * Final callback from terminating worker, as well as upon failure
1400     * to construct or start a worker. Removes record of worker from
1401     * array, and adjusts counts. If pool is shutting down, tries to
1402     * complete termination.
1403     *
1404     * @param wt the worker thread, or null if construction failed
1405     * @param ex the exception causing failure, or null if none
1406     */
1407     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408     WorkQueue w = null;
1409     int phase = 0;
1410     if (wt != null && (w = wt.workQueue) != null) {
1411     Object lock = workerNamePrefix;
1412     long ns = (long)w.nsteals & 0xffffffffL;
1413     int idx = w.id & SMASK;
1414     if (lock != null) {
1415     WorkQueue[] ws; // remove index from array
1416     synchronized (lock) {
1417     if ((ws = workQueues) != null && ws.length > idx &&
1418     ws[idx] == w)
1419     ws[idx] = null;
1420     stealCount += ns;
1421     }
1422     }
1423     phase = w.phase;
1424     }
1425     if (phase != QUIET) { // else pre-adjusted
1426     long c; // decrement counts
1427     do {} while (!U.compareAndSwapLong
1428     (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429     (TC_MASK & (c - TC_UNIT)) |
1430     (SP_MASK & c))));
1431     }
1432     if (w != null)
1433     w.cancelAll(); // cancel remaining tasks
1434    
1435     if (!tryTerminate(false, false) && // possibly replace worker
1436     w != null && w.array != null) // avoid repeated failures
1437     signalWork();
1438    
1439     if (ex == null) // help clean on way out
1440     ForkJoinTask.helpExpungeStaleExceptions();
1441     else // rethrow
1442     ForkJoinTask.rethrow(ex);
1443     }
1444    
1445     /**
1446     * Tries to create or release a worker if too few are running.
1447     */
1448     final void signalWork() {
1449     for (;;) {
1450     long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451     if ((c = ctl) >= 0L) // enough workers
1452     break;
1453     else if ((sp = (int)c) == 0) { // no idle workers
1454     if ((c & ADD_WORKER) != 0L) // too few workers
1455     tryAddWorker(c);
1456     break;
1457     }
1458     else if ((ws = workQueues) == null)
1459     break; // unstarted/terminated
1460     else if (ws.length <= (i = sp & SMASK))
1461     break; // terminated
1462     else if ((v = ws[i]) == null)
1463     break; // terminating
1464     else {
1465     int np = sp & ~UNSIGNALLED;
1466     int vp = v.phase;
1467     long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468     Thread vt = v.owner;
1469     if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470     v.phase = np;
1471     if (v.source < 0)
1472     LockSupport.unpark(vt);
1473     break;
1474     }
1475     }
1476     }
1477     }
1478    
1479     /**
1480     * Tries to decrement counts (sometimes implicitly) and possibly
1481     * arrange for a compensating worker in preparation for blocking:
1482     * If not all core workers yet exist, creates one, else if any are
1483     * unreleased (possibly including caller) releases one, else if
1484     * fewer than the minimum allowed number of workers running,
1485     * checks to see that they are all active, and if so creates an
1486     * extra worker unless over maximum limit and policy is to
1487     * saturate. Most of these steps can fail due to interference, in
1488     * which case 0 is returned so caller will retry. A negative
1489     * return value indicates that the caller doesn't need to
1490     * re-adjust counts when later unblocked.
1491     *
1492     * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493     */
1494     private int tryCompensate(WorkQueue w) {
1495     int t, n, sp;
1496     long c = ctl;
1497     WorkQueue[] ws = workQueues;
1498 dl 1.2 if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1499 jsr166 1.1 if (ws == null || (n = ws.length) <= 0 || w == null)
1500     return 0; // disabled
1501     else if ((sp = (int)c) != 0) { // replace or release
1502     WorkQueue v = ws[sp & (n - 1)];
1503     int wp = w.phase;
1504     long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505     int np = sp & ~UNSIGNALLED;
1506     if (v != null) {
1507     int vp = v.phase;
1508     Thread vt = v.owner;
1509     long nc = ((long)v.stackPred & SP_MASK) | uc;
1510     if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511     v.phase = np;
1512     if (v.source < 0)
1513     LockSupport.unpark(vt);
1514     return (wp < 0) ? -1 : 1;
1515     }
1516     }
1517     return 0;
1518     }
1519     else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520     (short)(bounds & SMASK) > 0) {
1521     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522     return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523     }
1524     else { // validate
1525     int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526     boolean unstable = false;
1527     for (int i = 1; i < n; i += 2) {
1528     WorkQueue q; Thread wt; Thread.State ts;
1529     if ((q = ws[i]) != null) {
1530     if (q.source == 0) {
1531     unstable = true;
1532     break;
1533     }
1534     else {
1535     --tc;
1536     if ((wt = q.owner) != null &&
1537     ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538     ts == Thread.State.WAITING))
1539     ++bc; // worker is blocking
1540     }
1541     }
1542     }
1543     if (unstable || tc != 0 || ctl != c)
1544     return 0; // inconsistent
1545     else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546     Predicate<? super ForkJoinPool> sat;
1547     if ((sat = saturate) != null && sat.test(this))
1548     return -1;
1549     else if (bc < pc) { // lagging
1550     Thread.yield(); // for retry spins
1551     return 0;
1552     }
1553     else
1554     throw new RejectedExecutionException(
1555     "Thread limit exceeded replacing blocked worker");
1556     }
1557     }
1558     }
1559    
1560     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1561     return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1562     }
1563    
1564     /**
1565     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1566     * See above for explanation.
1567     */
1568     final void runWorker(WorkQueue w) {
1569     WorkQueue[] ws;
1570     w.growArray(); // allocate queue
1571     int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1572     if (r == 0) // initial nonzero seed
1573     r = 1;
1574     int lastSignalId = 0; // avoid unneeded signals
1575     while ((ws = workQueues) != null) {
1576     boolean nonempty = false; // scan
1577     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1578     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1579     if ((i = r & m) >= 0 && i < n && // always true
1580     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1581     (a = q.array) != null && (al = a.length) > 0) {
1582     int qid = q.id; // (never zero)
1583     int index = (al - 1) & b;
1584     long offset = ((long)index << ASHIFT) + ABASE;
1585     ForkJoinTask<?> t = (ForkJoinTask<?>)
1586     U.getObjectVolatile(a, offset);
1587     if (t != null && b++ == q.base &&
1588     U.compareAndSwapObject(a, offset, t, null)) {
1589     if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1590     signalWork(); // propagate signal
1591     w.source = lastSignalId = qid;
1592     t.doExec();
1593     if ((w.id & FIFO) != 0) // run remaining locals
1594     w.localPollAndExec(POLL_LIMIT);
1595     else
1596     w.localPopAndExec(POLL_LIMIT);
1597     ForkJoinWorkerThread thread = w.owner;
1598     ++w.nsteals;
1599     w.source = 0; // now idle
1600     if (thread != null)
1601     thread.afterTopLevelExec();
1602     }
1603     nonempty = true;
1604     }
1605     else if (nonempty)
1606     break;
1607     else
1608     ++r;
1609     }
1610    
1611     if (nonempty) { // move (xorshift)
1612     r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1613     }
1614     else {
1615     int phase;
1616     lastSignalId = 0; // clear for next scan
1617     if ((phase = w.phase) >= 0) { // enqueue
1618     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1619     long c, nc;
1620     do {
1621     w.stackPred = (int)(c = ctl);
1622     nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1623     } while (!U.compareAndSwapLong(this, CTL, c, nc));
1624     }
1625     else { // already queued
1626     int pred = w.stackPred;
1627     w.source = DORMANT; // enable signal
1628     for (int steps = 0;;) {
1629     int md, rc; long c;
1630     if (w.phase >= 0) {
1631     w.source = 0;
1632     break;
1633     }
1634     else if ((md = mode) < 0) // shutting down
1635     return;
1636     else if ((rc = ((md & SMASK) + // possibly quiescent
1637     (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1638     (md & SHUTDOWN) != 0 &&
1639     tryTerminate(false, false))
1640     return; // help terminate
1641     else if ((++steps & 1) == 0)
1642     Thread.interrupted(); // clear between parks
1643     else if (rc <= 0 && pred != 0 && phase == (int)c) {
1644     long d = keepAlive + System.currentTimeMillis();
1645     LockSupport.parkUntil(this, d);
1646     if (ctl == c &&
1647     d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1648     long nc = ((UC_MASK & (c - TC_UNIT)) |
1649     (SP_MASK & pred));
1650     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1651     w.phase = QUIET;
1652     return; // drop on timeout
1653     }
1654     }
1655     }
1656     else
1657     LockSupport.park(this);
1658     }
1659     }
1660     }
1661     }
1662     }
1663    
1664     /**
1665     * Helps and/or blocks until the given task is done or timeout.
1666     * First tries locally helping, then scans other queues for a task
1667     * produced by one of w's stealers; compensating and blocking if
1668     * none are found (rescanning if tryCompensate fails).
1669     *
1670     * @param w caller
1671     * @param task the task
1672     * @param deadline for timed waits, if nonzero
1673     * @return task status on exit
1674     */
1675     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1676     int s = 0;
1677     if (w != null && task != null &&
1678     (!(task instanceof CountedCompleter) ||
1679     (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1680     w.tryRemoveAndExec(task);
1681     int src = w.source, id = w.id;
1682     s = task.status;
1683     while (s >= 0) {
1684     WorkQueue[] ws;
1685     boolean nonempty = false;
1686     int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1687     if ((ws = workQueues) != null) { // scan for matching id
1688     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1689     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1690     if ((i = (r + j) & m) >= 0 && i < n &&
1691     (q = ws[i]) != null && q.source == id &&
1692     (b = q.base) - q.top < 0 &&
1693     (a = q.array) != null && (al = a.length) > 0) {
1694     int qid = q.id;
1695     int index = (al - 1) & b;
1696     long offset = ((long)index << ASHIFT) + ABASE;
1697     ForkJoinTask<?> t = (ForkJoinTask<?>)
1698     U.getObjectVolatile(a, offset);
1699     if (t != null && b++ == q.base && id == q.source &&
1700     U.compareAndSwapObject(a, offset, t, null)) {
1701     q.base = b;
1702     w.source = qid;
1703     t.doExec();
1704     w.source = src;
1705     }
1706     nonempty = true;
1707     break;
1708     }
1709     }
1710     }
1711     if ((s = task.status) < 0)
1712     break;
1713     else if (!nonempty) {
1714     long ms, ns; int block;
1715     if (deadline == 0L)
1716     ms = 0L; // untimed
1717     else if ((ns = deadline - System.nanoTime()) <= 0L)
1718     break; // timeout
1719     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1720     ms = 1L; // avoid 0 for timed wait
1721     if ((block = tryCompensate(w)) != 0) {
1722     task.internalWait(ms);
1723     U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1724     }
1725     s = task.status;
1726     }
1727     }
1728     }
1729     return s;
1730     }
1731    
1732     /**
1733     * Runs tasks until {@code isQuiescent()}. Rather than blocking
1734     * when tasks cannot be found, rescans until all others cannot
1735     * find tasks either.
1736     */
1737     final void helpQuiescePool(WorkQueue w) {
1738     int prevSrc = w.source, fifo = w.id & FIFO;
1739     for (int source = prevSrc, released = -1;;) { // -1 until known
1740     WorkQueue[] ws;
1741     if (fifo != 0)
1742     w.localPollAndExec(0);
1743     else
1744     w.localPopAndExec(0);
1745     if (released == -1 && w.phase >= 0)
1746     released = 1;
1747     boolean quiet = true, empty = true;
1748     int r = ThreadLocalRandom.nextSecondarySeed();
1749     if ((ws = workQueues) != null) {
1750     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1751     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1752     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1753     if ((b = q.base) - q.top < 0 &&
1754     (a = q.array) != null && (al = a.length) > 0) {
1755     int qid = q.id;
1756     if (released == 0) { // increment
1757     released = 1;
1758     U.getAndAddLong(this, CTL, RC_UNIT);
1759     }
1760     int index = (al - 1) & b;
1761     long offset = ((long)index << ASHIFT) + ABASE;
1762     ForkJoinTask<?> t = (ForkJoinTask<?>)
1763     U.getObjectVolatile(a, offset);
1764     if (t != null && b++ == q.base &&
1765     U.compareAndSwapObject(a, offset, t, null)) {
1766     q.base = b;
1767     w.source = source = q.id;
1768     t.doExec();
1769     w.source = source = prevSrc;
1770     }
1771     quiet = empty = false;
1772     break;
1773     }
1774     else if ((q.source & QUIET) == 0)
1775     quiet = false;
1776     }
1777     }
1778     }
1779     if (quiet) {
1780     if (released == 0)
1781     U.getAndAddLong(this, CTL, RC_UNIT);
1782     w.source = prevSrc;
1783     break;
1784     }
1785     else if (empty) {
1786     if (source != QUIET)
1787     w.source = source = QUIET;
1788     if (released == 1) { // decrement
1789     released = 0;
1790     U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1791     }
1792     }
1793     }
1794     }
1795    
1796     /**
1797     * Scans for and returns a polled task, if available.
1798     * Used only for untracked polls.
1799     *
1800     * @param submissionsOnly if true, only scan submission queues
1801     */
1802     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1803     WorkQueue[] ws; int n;
1804     rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1805     (n = ws.length) > 0) {
1806     int m = n - 1;
1807     int r = ThreadLocalRandom.nextSecondarySeed();
1808     int h = r >>> 16;
1809     int origin, step;
1810     if (submissionsOnly) {
1811     origin = (r & ~1) & m; // even indices and steps
1812     step = (h & ~1) | 2;
1813     }
1814     else {
1815     origin = r & m;
1816     step = h | 1;
1817     }
1818     for (int k = origin, oldSum = 0, checkSum = 0;;) {
1819     WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1820     if ((q = ws[k]) != null) {
1821     checkSum += b = q.base;
1822     if (b - q.top < 0 &&
1823     (a = q.array) != null && (al = a.length) > 0) {
1824     int index = (al - 1) & b;
1825     long offset = ((long)index << ASHIFT) + ABASE;
1826     ForkJoinTask<?> t = (ForkJoinTask<?>)
1827     U.getObjectVolatile(a, offset);
1828     if (t != null && b++ == q.base &&
1829     U.compareAndSwapObject(a, offset, t, null)) {
1830     q.base = b;
1831     return t;
1832     }
1833     else
1834     break; // restart
1835     }
1836     }
1837     if ((k = (k + step) & m) == origin) {
1838     if (oldSum == (oldSum = checkSum))
1839     break rescan;
1840     checkSum = 0;
1841     }
1842     }
1843     }
1844     return null;
1845     }
1846    
1847     /**
1848     * Gets and removes a local or stolen task for the given worker.
1849     *
1850     * @return a task, if available
1851     */
1852     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1853     ForkJoinTask<?> t;
1854     if (w != null &&
1855     (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1856     return t;
1857     else
1858     return pollScan(false);
1859     }
1860    
1861     // External operations
1862    
1863     /**
1864     * Adds the given task to a submission queue at submitter's
1865     * current queue, creating one if null or contended.
1866     *
1867     * @param task the task. Caller must ensure non-null.
1868     */
1869     final void externalPush(ForkJoinTask<?> task) {
1870     int r; // initialize caller's probe
1871     if ((r = ThreadLocalRandom.getProbe()) == 0) {
1872     ThreadLocalRandom.localInit();
1873     r = ThreadLocalRandom.getProbe();
1874     }
1875     for (;;) {
1876     int md = mode, n;
1877     WorkQueue[] ws = workQueues;
1878     if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1879     throw new RejectedExecutionException();
1880     else {
1881     WorkQueue q;
1882     boolean push = false, grow = false;
1883     if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1884     Object lock = workerNamePrefix;
1885     int qid = (r | QUIET) & ~(FIFO | OWNED);
1886     q = new WorkQueue(this, null);
1887     q.id = qid;
1888     q.source = QUIET;
1889     q.phase = QLOCK; // lock queue
1890     if (lock != null) {
1891     synchronized (lock) { // lock pool to install
1892     int i;
1893     if ((ws = workQueues) != null &&
1894     (n = ws.length) > 0 &&
1895     ws[i = qid & (n - 1) & SQMASK] == null) {
1896     ws[i] = q;
1897     push = grow = true;
1898     }
1899     }
1900     }
1901     }
1902     else if (q.tryLockSharedQueue()) {
1903     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1904     if ((a = q.array) != null && (al = a.length) > 0 &&
1905     al - 1 + (d = b - s) > 0) {
1906     a[(al - 1) & s] = task;
1907     q.top = s + 1; // relaxed writes OK here
1908     q.phase = 0;
1909 dl 1.3 if (d < 0 && q.base - s < -1)
1910 jsr166 1.1 break; // no signal needed
1911     }
1912     else
1913     grow = true;
1914     push = true;
1915     }
1916     if (push) {
1917     if (grow) {
1918     try {
1919     q.growArray();
1920     int s = q.top, al; ForkJoinTask<?>[] a;
1921     if ((a = q.array) != null && (al = a.length) > 0) {
1922     a[(al - 1) & s] = task;
1923     q.top = s + 1;
1924     }
1925     } finally {
1926     q.phase = 0;
1927     }
1928     }
1929     signalWork();
1930     break;
1931     }
1932     else // move if busy
1933     r = ThreadLocalRandom.advanceProbe(r);
1934     }
1935     }
1936     }
1937    
1938     /**
1939     * Pushes a possibly-external submission.
1940     */
1941     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1942     Thread t; ForkJoinWorkerThread w; WorkQueue q;
1943     if (task == null)
1944     throw new NullPointerException();
1945     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1946     (w = (ForkJoinWorkerThread)t).pool == this &&
1947     (q = w.workQueue) != null)
1948     q.push(task);
1949     else
1950     externalPush(task);
1951     return task;
1952     }
1953    
1954     /**
1955     * Returns common pool queue for an external thread.
1956     */
1957     static WorkQueue commonSubmitterQueue() {
1958     ForkJoinPool p = common;
1959     int r = ThreadLocalRandom.getProbe();
1960     WorkQueue[] ws; int n;
1961     return (p != null && (ws = p.workQueues) != null &&
1962     (n = ws.length) > 0) ?
1963     ws[(n - 1) & r & SQMASK] : null;
1964     }
1965    
1966     /**
1967     * Performs tryUnpush for an external submitter.
1968     */
1969     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1970     int r = ThreadLocalRandom.getProbe();
1971     WorkQueue[] ws; WorkQueue w; int n;
1972     return ((ws = workQueues) != null &&
1973     (n = ws.length) > 0 &&
1974     (w = ws[(n - 1) & r & SQMASK]) != null &&
1975     w.trySharedUnpush(task));
1976     }
1977    
1978     /**
1979     * Performs helpComplete for an external submitter.
1980     */
1981     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1982     int r = ThreadLocalRandom.getProbe();
1983     WorkQueue[] ws; WorkQueue w; int n;
1984     return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1985     (w = ws[(n - 1) & r & SQMASK]) != null) ?
1986     w.sharedHelpCC(task, maxTasks) : 0;
1987     }
1988    
1989     /**
1990     * Tries to steal and run tasks within the target's computation.
1991     * The maxTasks argument supports external usages; internal calls
1992     * use zero, allowing unbounded steps (external calls trap
1993     * non-positive values).
1994     *
1995     * @param w caller
1996     * @param maxTasks if non-zero, the maximum number of other tasks to run
1997     * @return task status on exit
1998     */
1999     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2000     int maxTasks) {
2001     return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2002     }
2003    
2004     /**
2005     * Returns a cheap heuristic guide for task partitioning when
2006     * programmers, frameworks, tools, or languages have little or no
2007     * idea about task granularity. In essence, by offering this
2008     * method, we ask users only about tradeoffs in overhead vs
2009     * expected throughput and its variance, rather than how finely to
2010     * partition tasks.
2011     *
2012     * In a steady state strict (tree-structured) computation, each
2013     * thread makes available for stealing enough tasks for other
2014     * threads to remain active. Inductively, if all threads play by
2015     * the same rules, each thread should make available only a
2016     * constant number of tasks.
2017     *
2018     * The minimum useful constant is just 1. But using a value of 1
2019     * would require immediate replenishment upon each steal to
2020     * maintain enough tasks, which is infeasible. Further,
2021     * partitionings/granularities of offered tasks should minimize
2022     * steal rates, which in general means that threads nearer the top
2023     * of computation tree should generate more than those nearer the
2024     * bottom. In perfect steady state, each thread is at
2025     * approximately the same level of computation tree. However,
2026     * producing extra tasks amortizes the uncertainty of progress and
2027     * diffusion assumptions.
2028     *
2029     * So, users will want to use values larger (but not much larger)
2030     * than 1 to both smooth over transient shortages and hedge
2031     * against uneven progress; as traded off against the cost of
2032     * extra task overhead. We leave the user to pick a threshold
2033     * value to compare with the results of this call to guide
2034     * decisions, but recommend values such as 3.
2035     *
2036     * When all threads are active, it is on average OK to estimate
2037     * surplus strictly locally. In steady-state, if one thread is
2038     * maintaining say 2 surplus tasks, then so are others. So we can
2039     * just use estimated queue length. However, this strategy alone
2040     * leads to serious mis-estimates in some non-steady-state
2041     * conditions (ramp-up, ramp-down, other stalls). We can detect
2042     * many of these by further considering the number of "idle"
2043     * threads, that are known to have zero queued tasks, so
2044     * compensate by a factor of (#idle/#active) threads.
2045     */
2046     static int getSurplusQueuedTaskCount() {
2047     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2048     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2049     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2050     (q = wt.workQueue) != null) {
2051     int p = pool.mode & SMASK;
2052     int a = p + (int)(pool.ctl >> RC_SHIFT);
2053     int n = q.top - q.base;
2054     return n - (a > (p >>>= 1) ? 0 :
2055     a > (p >>>= 1) ? 1 :
2056     a > (p >>>= 1) ? 2 :
2057     a > (p >>>= 1) ? 4 :
2058     8);
2059     }
2060     return 0;
2061     }
2062    
2063     // Termination
2064    
2065     /**
2066     * Possibly initiates and/or completes termination.
2067     *
2068     * @param now if true, unconditionally terminate, else only
2069     * if no work and no active workers
2070     * @param enable if true, terminate when next possible
2071     * @return true if terminating or terminated
2072     */
2073     private boolean tryTerminate(boolean now, boolean enable) {
2074     int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2075    
2076     while (((md = mode) & SHUTDOWN) == 0) {
2077     if (!enable || this == common) // cannot shutdown
2078     return false;
2079     else
2080     U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2081     }
2082    
2083     while (((md = mode) & STOP) == 0) { // try to initiate termination
2084     if (!now) { // check if quiescent & empty
2085     for (long oldSum = 0L;;) { // repeat until stable
2086     boolean running = false;
2087     long checkSum = ctl;
2088     WorkQueue[] ws = workQueues;
2089     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2090     running = true;
2091     else if (ws != null) {
2092     WorkQueue w; int b;
2093     for (int i = 0; i < ws.length; ++i) {
2094     if ((w = ws[i]) != null) {
2095     checkSum += (b = w.base) + w.id;
2096     if (b != w.top ||
2097     ((i & 1) == 1 && w.source >= 0)) {
2098     running = true;
2099     break;
2100     }
2101     }
2102     }
2103     }
2104     if (((md = mode) & STOP) != 0)
2105     break; // already triggered
2106     else if (running)
2107     return false;
2108     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2109     break;
2110     }
2111     }
2112     if ((md & STOP) == 0)
2113     U.compareAndSwapInt(this, MODE, md, md | STOP);
2114     }
2115    
2116     while (((md = mode) & TERMINATED) == 0) { // help terminate others
2117     for (long oldSum = 0L;;) { // repeat until stable
2118     WorkQueue[] ws; WorkQueue w;
2119     long checkSum = ctl;
2120     if ((ws = workQueues) != null) {
2121     for (int i = 0; i < ws.length; ++i) {
2122     if ((w = ws[i]) != null) {
2123     ForkJoinWorkerThread wt = w.owner;
2124     w.cancelAll(); // clear queues
2125     if (wt != null) {
2126     try { // unblock join or park
2127     wt.interrupt();
2128     } catch (Throwable ignore) {
2129     }
2130     }
2131     checkSum += w.base + w.id;
2132     }
2133     }
2134     }
2135     if (((md = mode) & TERMINATED) != 0 ||
2136     (workQueues == ws && oldSum == (oldSum = checkSum)))
2137     break;
2138     }
2139     if ((md & TERMINATED) != 0)
2140     break;
2141     else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2142     break;
2143     else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2144     synchronized (this) {
2145     notifyAll(); // for awaitTermination
2146     }
2147     break;
2148     }
2149     }
2150     return true;
2151     }
2152    
2153     // Exported methods
2154    
2155     // Constructors
2156    
2157     /**
2158     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2159     * java.lang.Runtime#availableProcessors}, using defaults for all
2160     * other parameters.
2161     *
2162     * @throws SecurityException if a security manager exists and
2163     * the caller is not permitted to modify threads
2164     * because it does not hold {@link
2165     * java.lang.RuntimePermission}{@code ("modifyThread")}
2166     */
2167     public ForkJoinPool() {
2168     this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2169     defaultForkJoinWorkerThreadFactory, null, false,
2170     0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2171     }
2172    
2173     /**
2174     * Creates a {@code ForkJoinPool} with the indicated parallelism
2175     * level, using defaults for all other parameters.
2176     *
2177     * @param parallelism the parallelism level
2178     * @throws IllegalArgumentException if parallelism less than or
2179     * equal to zero, or greater than implementation limit
2180     * @throws SecurityException if a security manager exists and
2181     * the caller is not permitted to modify threads
2182     * because it does not hold {@link
2183     * java.lang.RuntimePermission}{@code ("modifyThread")}
2184     */
2185     public ForkJoinPool(int parallelism) {
2186     this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2187     0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2188     }
2189    
2190     /**
2191     * Creates a {@code ForkJoinPool} with the given parameters (using
2192     * defaults for others).
2193     *
2194     * @param parallelism the parallelism level. For default value,
2195     * use {@link java.lang.Runtime#availableProcessors}.
2196     * @param factory the factory for creating new threads. For default value,
2197     * use {@link #defaultForkJoinWorkerThreadFactory}.
2198     * @param handler the handler for internal worker threads that
2199     * terminate due to unrecoverable errors encountered while executing
2200     * tasks. For default value, use {@code null}.
2201     * @param asyncMode if true,
2202     * establishes local first-in-first-out scheduling mode for forked
2203     * tasks that are never joined. This mode may be more appropriate
2204     * than default locally stack-based mode in applications in which
2205     * worker threads only process event-style asynchronous tasks.
2206     * For default value, use {@code false}.
2207     * @throws IllegalArgumentException if parallelism less than or
2208     * equal to zero, or greater than implementation limit
2209     * @throws NullPointerException if the factory is null
2210     * @throws SecurityException if a security manager exists and
2211     * the caller is not permitted to modify threads
2212     * because it does not hold {@link
2213     * java.lang.RuntimePermission}{@code ("modifyThread")}
2214     */
2215     public ForkJoinPool(int parallelism,
2216     ForkJoinWorkerThreadFactory factory,
2217     UncaughtExceptionHandler handler,
2218     boolean asyncMode) {
2219     this(parallelism, factory, handler, asyncMode,
2220     0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2221     }
2222    
2223     /**
2224     * Creates a {@code ForkJoinPool} with the given parameters.
2225     *
2226     * @param parallelism the parallelism level. For default value,
2227     * use {@link java.lang.Runtime#availableProcessors}.
2228     *
2229     * @param factory the factory for creating new threads. For
2230     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2231     *
2232     * @param handler the handler for internal worker threads that
2233     * terminate due to unrecoverable errors encountered while
2234     * executing tasks. For default value, use {@code null}.
2235     *
2236     * @param asyncMode if true, establishes local first-in-first-out
2237     * scheduling mode for forked tasks that are never joined. This
2238     * mode may be more appropriate than default locally stack-based
2239     * mode in applications in which worker threads only process
2240     * event-style asynchronous tasks. For default value, use {@code
2241     * false}.
2242     *
2243     * @param corePoolSize the number of threads to keep in the pool
2244     * (unless timed out after an elapsed keep-alive). Normally (and
2245     * by default) this is the same value as the parallelism level,
2246     * but may be set to a larger value to reduce dynamic overhead if
2247     * tasks regularly block. Using a smaller value (for example
2248     * {@code 0}) has the same effect as the default.
2249     *
2250     * @param maximumPoolSize the maximum number of threads allowed.
2251     * When the maximum is reached, attempts to replace blocked
2252     * threads fail. (However, because creation and termination of
2253     * different threads may overlap, and may be managed by the given
2254     * thread factory, this value may be transiently exceeded.) To
2255     * arrange the same value as is used by default for the common
2256     * pool, use {@code 256} plus the parallelism level. Using a value
2257     * (for example {@code Integer.MAX_VALUE}) larger than the
2258     * implementation's total thread limit has the same effect as
2259     * using this limit (which is the default).
2260     *
2261     * @param minimumRunnable the minimum allowed number of core
2262     * threads not blocked by a join or {@link ManagedBlocker}. To
2263     * ensure progress, when too few unblocked threads exist and
2264     * unexecuted tasks may exist, new threads are constructed, up to
2265     * the given maximumPoolSize. For the default value, use {@code
2266     * 1}, that ensures liveness. A larger value might improve
2267     * throughput in the presence of blocked activities, but might
2268     * not, due to increased overhead. A value of zero may be
2269     * acceptable when submitted tasks cannot have dependencies
2270     * requiring additional threads.
2271     *
2272 jsr166 1.4 * @param saturate if non-null, a predicate invoked upon attempts
2273 jsr166 1.1 * to create more than the maximum total allowed threads. By
2274     * default, when a thread is about to block on a join or {@link
2275     * ManagedBlocker}, but cannot be replaced because the
2276     * maximumPoolSize would be exceeded, a {@link
2277     * RejectedExecutionException} is thrown. But if this predicate
2278     * returns {@code true}, then no exception is thrown, so the pool
2279     * continues to operate with fewer than the target number of
2280     * runnable threads, which might not ensure progress.
2281     *
2282     * @param keepAliveTime the elapsed time since last use before
2283     * a thread is terminated (and then later replaced if needed).
2284     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2285     *
2286     * @param unit the time unit for the {@code keepAliveTime} argument
2287     *
2288     * @throws IllegalArgumentException if parallelism is less than or
2289     * equal to zero, or is greater than implementation limit,
2290     * or if maximumPoolSize is less than parallelism,
2291     * of if the keepAliveTime is less than or equal to zero.
2292     * @throws NullPointerException if the factory is null
2293     * @throws SecurityException if a security manager exists and
2294     * the caller is not permitted to modify threads
2295     * because it does not hold {@link
2296     * java.lang.RuntimePermission}{@code ("modifyThread")}
2297     * @since 9
2298     */
2299     public ForkJoinPool(int parallelism,
2300     ForkJoinWorkerThreadFactory factory,
2301     UncaughtExceptionHandler handler,
2302     boolean asyncMode,
2303     int corePoolSize,
2304     int maximumPoolSize,
2305     int minimumRunnable,
2306     Predicate<? super ForkJoinPool> saturate,
2307     long keepAliveTime,
2308     TimeUnit unit) {
2309     // check, encode, pack parameters
2310     if (parallelism <= 0 || parallelism > MAX_CAP ||
2311     maximumPoolSize < parallelism || keepAliveTime <= 0L)
2312     throw new IllegalArgumentException();
2313     if (factory == null)
2314     throw new NullPointerException();
2315     long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2316    
2317     int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2318     long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2319     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2320     int m = parallelism | (asyncMode ? FIFO : 0);
2321     int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2322     int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2323     int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2324     int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2325     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2326     n = (n + 1) << 1; // power of two, including space for submission queues
2327    
2328 jsr166 1.6 this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2329 jsr166 1.1 this.workQueues = new WorkQueue[n];
2330     this.factory = factory;
2331     this.ueh = handler;
2332     this.saturate = saturate;
2333     this.keepAlive = ms;
2334     this.bounds = b;
2335     this.mode = m;
2336     this.ctl = c;
2337     checkPermission();
2338     }
2339    
2340 jsr166 1.8 private static Object newInstanceFromSystemProperty(String property)
2341 jsr166 1.5 throws ReflectiveOperationException {
2342     String className = System.getProperty(property);
2343     return (className == null)
2344     ? null
2345     : ClassLoader.getSystemClassLoader().loadClass(className)
2346     .getConstructor().newInstance();
2347     }
2348    
2349 jsr166 1.1 /**
2350     * Constructor for common pool using parameters possibly
2351     * overridden by system properties
2352     */
2353     private ForkJoinPool(byte forCommonPoolOnly) {
2354     int parallelism = -1;
2355     ForkJoinWorkerThreadFactory fac = null;
2356     UncaughtExceptionHandler handler = null;
2357     try { // ignore exceptions in accessing/parsing properties
2358     String pp = System.getProperty
2359     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2360     if (pp != null)
2361     parallelism = Integer.parseInt(pp);
2362 jsr166 1.5 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2363     "java.util.concurrent.ForkJoinPool.common.threadFactory");
2364     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2365     "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2366 jsr166 1.1 } catch (Exception ignore) {
2367     }
2368    
2369     if (fac == null) {
2370     if (System.getSecurityManager() == null)
2371     fac = defaultForkJoinWorkerThreadFactory;
2372     else // use security-managed default
2373     fac = new InnocuousForkJoinWorkerThreadFactory();
2374     }
2375     if (parallelism < 0 && // default 1 less than #cores
2376     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2377     parallelism = 1;
2378     if (parallelism > MAX_CAP)
2379     parallelism = MAX_CAP;
2380    
2381     long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2382     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2383     int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2384     int n = (parallelism > 1) ? parallelism - 1 : 1;
2385     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2386     n = (n + 1) << 1;
2387    
2388 jsr166 1.6 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2389 jsr166 1.1 this.workQueues = new WorkQueue[n];
2390     this.factory = fac;
2391     this.ueh = handler;
2392     this.saturate = null;
2393     this.keepAlive = DEFAULT_KEEPALIVE;
2394     this.bounds = b;
2395 dl 1.2 this.mode = parallelism;
2396 jsr166 1.1 this.ctl = c;
2397     }
2398    
2399     /**
2400     * Returns the common pool instance. This pool is statically
2401     * constructed; its run state is unaffected by attempts to {@link
2402     * #shutdown} or {@link #shutdownNow}. However this pool and any
2403     * ongoing processing are automatically terminated upon program
2404     * {@link System#exit}. Any program that relies on asynchronous
2405     * task processing to complete before program termination should
2406     * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2407     * before exit.
2408     *
2409     * @return the common pool instance
2410     * @since 1.8
2411     */
2412     public static ForkJoinPool commonPool() {
2413     // assert common != null : "static init error";
2414     return common;
2415     }
2416    
2417     // Execution methods
2418    
2419     /**
2420     * Performs the given task, returning its result upon completion.
2421     * If the computation encounters an unchecked Exception or Error,
2422     * it is rethrown as the outcome of this invocation. Rethrown
2423     * exceptions behave in the same way as regular exceptions, but,
2424     * when possible, contain stack traces (as displayed for example
2425     * using {@code ex.printStackTrace()}) of both the current thread
2426     * as well as the thread actually encountering the exception;
2427     * minimally only the latter.
2428     *
2429     * @param task the task
2430     * @param <T> the type of the task's result
2431     * @return the task's result
2432     * @throws NullPointerException if the task is null
2433     * @throws RejectedExecutionException if the task cannot be
2434     * scheduled for execution
2435     */
2436     public <T> T invoke(ForkJoinTask<T> task) {
2437     if (task == null)
2438     throw new NullPointerException();
2439     externalSubmit(task);
2440     return task.join();
2441     }
2442    
2443     /**
2444     * Arranges for (asynchronous) execution of the given task.
2445     *
2446     * @param task the task
2447     * @throws NullPointerException if the task is null
2448     * @throws RejectedExecutionException if the task cannot be
2449     * scheduled for execution
2450     */
2451     public void execute(ForkJoinTask<?> task) {
2452     externalSubmit(task);
2453     }
2454    
2455     // AbstractExecutorService methods
2456    
2457     /**
2458     * @throws NullPointerException if the task is null
2459     * @throws RejectedExecutionException if the task cannot be
2460     * scheduled for execution
2461     */
2462     public void execute(Runnable task) {
2463     if (task == null)
2464     throw new NullPointerException();
2465     ForkJoinTask<?> job;
2466     if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2467     job = (ForkJoinTask<?>) task;
2468     else
2469     job = new ForkJoinTask.RunnableExecuteAction(task);
2470     externalSubmit(job);
2471     }
2472    
2473     /**
2474     * Submits a ForkJoinTask for execution.
2475     *
2476     * @param task the task to submit
2477     * @param <T> the type of the task's result
2478     * @return the task
2479     * @throws NullPointerException if the task is null
2480     * @throws RejectedExecutionException if the task cannot be
2481     * scheduled for execution
2482     */
2483     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2484     return externalSubmit(task);
2485     }
2486    
2487     /**
2488     * @throws NullPointerException if the task is null
2489     * @throws RejectedExecutionException if the task cannot be
2490     * scheduled for execution
2491     */
2492     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2493     return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2494     }
2495    
2496     /**
2497     * @throws NullPointerException if the task is null
2498     * @throws RejectedExecutionException if the task cannot be
2499     * scheduled for execution
2500     */
2501     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2502     return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2503     }
2504    
2505     /**
2506     * @throws NullPointerException if the task is null
2507     * @throws RejectedExecutionException if the task cannot be
2508     * scheduled for execution
2509     */
2510     public ForkJoinTask<?> submit(Runnable task) {
2511     if (task == null)
2512     throw new NullPointerException();
2513     ForkJoinTask<?> job;
2514     if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2515     job = (ForkJoinTask<?>) task;
2516     else
2517     job = new ForkJoinTask.AdaptedRunnableAction(task);
2518     return externalSubmit(job);
2519     }
2520    
2521     /**
2522     * @throws NullPointerException {@inheritDoc}
2523     * @throws RejectedExecutionException {@inheritDoc}
2524     */
2525     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2526     // In previous versions of this class, this method constructed
2527     // a task to run ForkJoinTask.invokeAll, but now external
2528     // invocation of multiple tasks is at least as efficient.
2529     ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2530    
2531     try {
2532     for (Callable<T> t : tasks) {
2533     ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2534     futures.add(f);
2535     externalSubmit(f);
2536     }
2537     for (int i = 0, size = futures.size(); i < size; i++)
2538     ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2539     return futures;
2540     } catch (Throwable t) {
2541     for (int i = 0, size = futures.size(); i < size; i++)
2542     futures.get(i).cancel(false);
2543     throw t;
2544     }
2545     }
2546    
2547     /**
2548     * Returns the factory used for constructing new workers.
2549     *
2550     * @return the factory used for constructing new workers
2551     */
2552     public ForkJoinWorkerThreadFactory getFactory() {
2553     return factory;
2554     }
2555    
2556     /**
2557     * Returns the handler for internal worker threads that terminate
2558     * due to unrecoverable errors encountered while executing tasks.
2559     *
2560     * @return the handler, or {@code null} if none
2561     */
2562     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2563     return ueh;
2564     }
2565    
2566     /**
2567     * Returns the targeted parallelism level of this pool.
2568     *
2569     * @return the targeted parallelism level of this pool
2570     */
2571     public int getParallelism() {
2572 dl 1.2 int par = mode & SMASK;
2573     return (par > 0) ? par : 1;
2574 jsr166 1.1 }
2575    
2576     /**
2577     * Returns the targeted parallelism level of the common pool.
2578     *
2579     * @return the targeted parallelism level of the common pool
2580     * @since 1.8
2581     */
2582     public static int getCommonPoolParallelism() {
2583     return COMMON_PARALLELISM;
2584     }
2585    
2586     /**
2587     * Returns the number of worker threads that have started but not
2588     * yet terminated. The result returned by this method may differ
2589     * from {@link #getParallelism} when threads are created to
2590     * maintain parallelism when others are cooperatively blocked.
2591     *
2592     * @return the number of worker threads
2593     */
2594     public int getPoolSize() {
2595     return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2596     }
2597    
2598     /**
2599     * Returns {@code true} if this pool uses local first-in-first-out
2600     * scheduling mode for forked tasks that are never joined.
2601     *
2602     * @return {@code true} if this pool uses async mode
2603     */
2604     public boolean getAsyncMode() {
2605     return (mode & FIFO) != 0;
2606     }
2607    
2608     /**
2609     * Returns an estimate of the number of worker threads that are
2610     * not blocked waiting to join tasks or for other managed
2611     * synchronization. This method may overestimate the
2612     * number of running threads.
2613     *
2614     * @return the number of worker threads
2615     */
2616     public int getRunningThreadCount() {
2617     int rc = 0;
2618     WorkQueue[] ws; WorkQueue w;
2619     if ((ws = workQueues) != null) {
2620     for (int i = 1; i < ws.length; i += 2) {
2621     if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2622     ++rc;
2623     }
2624     }
2625     return rc;
2626     }
2627    
2628     /**
2629     * Returns an estimate of the number of threads that are currently
2630     * stealing or executing tasks. This method may overestimate the
2631     * number of active threads.
2632     *
2633     * @return the number of active threads
2634     */
2635     public int getActiveThreadCount() {
2636     int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2637     return (r <= 0) ? 0 : r; // suppress momentarily negative values
2638     }
2639    
2640     /**
2641     * Returns {@code true} if all worker threads are currently idle.
2642     * An idle worker is one that cannot obtain a task to execute
2643     * because none are available to steal from other threads, and
2644     * there are no pending submissions to the pool. This method is
2645     * conservative; it might not return {@code true} immediately upon
2646     * idleness of all threads, but will eventually become true if
2647     * threads remain inactive.
2648     *
2649     * @return {@code true} if all threads are currently idle
2650     */
2651     public boolean isQuiescent() {
2652     for (;;) {
2653     long c = ctl;
2654     int md = mode, pc = md & SMASK;
2655 dl 1.2 int tc = pc + (short)(c >>> TC_SHIFT);
2656 jsr166 1.1 int rc = pc + (int)(c >> RC_SHIFT);
2657     if ((md & (STOP | TERMINATED)) != 0)
2658     return true;
2659     else if (rc > 0)
2660     return false;
2661     else {
2662     WorkQueue[] ws; WorkQueue v;
2663     if ((ws = workQueues) != null) {
2664     for (int i = 1; i < ws.length; i += 2) {
2665     if ((v = ws[i]) != null) {
2666     if ((v.source & QUIET) == 0)
2667     return false;
2668     --tc;
2669     }
2670     }
2671     }
2672     if (tc == 0 && ctl == c)
2673     return true;
2674     }
2675     }
2676     }
2677    
2678     /**
2679     * Returns an estimate of the total number of tasks stolen from
2680     * one thread's work queue by another. The reported value
2681     * underestimates the actual total number of steals when the pool
2682     * is not quiescent. This value may be useful for monitoring and
2683     * tuning fork/join programs: in general, steal counts should be
2684     * high enough to keep threads busy, but low enough to avoid
2685     * overhead and contention across threads.
2686     *
2687     * @return the number of steals
2688     */
2689     public long getStealCount() {
2690     long count = stealCount;
2691     WorkQueue[] ws; WorkQueue w;
2692     if ((ws = workQueues) != null) {
2693     for (int i = 1; i < ws.length; i += 2) {
2694     if ((w = ws[i]) != null)
2695     count += (long)w.nsteals & 0xffffffffL;
2696     }
2697     }
2698     return count;
2699     }
2700    
2701     /**
2702     * Returns an estimate of the total number of tasks currently held
2703     * in queues by worker threads (but not including tasks submitted
2704     * to the pool that have not begun executing). This value is only
2705     * an approximation, obtained by iterating across all threads in
2706     * the pool. This method may be useful for tuning task
2707     * granularities.
2708     *
2709     * @return the number of queued tasks
2710     */
2711     public long getQueuedTaskCount() {
2712     long count = 0;
2713     WorkQueue[] ws; WorkQueue w;
2714     if ((ws = workQueues) != null) {
2715     for (int i = 1; i < ws.length; i += 2) {
2716     if ((w = ws[i]) != null)
2717     count += w.queueSize();
2718     }
2719     }
2720     return count;
2721     }
2722    
2723     /**
2724     * Returns an estimate of the number of tasks submitted to this
2725     * pool that have not yet begun executing. This method may take
2726     * time proportional to the number of submissions.
2727     *
2728     * @return the number of queued submissions
2729     */
2730     public int getQueuedSubmissionCount() {
2731     int count = 0;
2732     WorkQueue[] ws; WorkQueue w;
2733     if ((ws = workQueues) != null) {
2734     for (int i = 0; i < ws.length; i += 2) {
2735     if ((w = ws[i]) != null)
2736     count += w.queueSize();
2737     }
2738     }
2739     return count;
2740     }
2741    
2742     /**
2743     * Returns {@code true} if there are any tasks submitted to this
2744     * pool that have not yet begun executing.
2745     *
2746     * @return {@code true} if there are any queued submissions
2747     */
2748     public boolean hasQueuedSubmissions() {
2749     WorkQueue[] ws; WorkQueue w;
2750     if ((ws = workQueues) != null) {
2751     for (int i = 0; i < ws.length; i += 2) {
2752     if ((w = ws[i]) != null && !w.isEmpty())
2753     return true;
2754     }
2755     }
2756     return false;
2757     }
2758    
2759     /**
2760     * Removes and returns the next unexecuted submission if one is
2761     * available. This method may be useful in extensions to this
2762     * class that re-assign work in systems with multiple pools.
2763     *
2764     * @return the next submission, or {@code null} if none
2765     */
2766     protected ForkJoinTask<?> pollSubmission() {
2767     return pollScan(true);
2768     }
2769    
2770     /**
2771     * Removes all available unexecuted submitted and forked tasks
2772     * from scheduling queues and adds them to the given collection,
2773     * without altering their execution status. These may include
2774     * artificially generated or wrapped tasks. This method is
2775     * designed to be invoked only when the pool is known to be
2776     * quiescent. Invocations at other times may not remove all
2777     * tasks. A failure encountered while attempting to add elements
2778     * to collection {@code c} may result in elements being in
2779     * neither, either or both collections when the associated
2780     * exception is thrown. The behavior of this operation is
2781     * undefined if the specified collection is modified while the
2782     * operation is in progress.
2783     *
2784     * @param c the collection to transfer elements into
2785     * @return the number of elements transferred
2786     */
2787     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2788     int count = 0;
2789     WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2790     if ((ws = workQueues) != null) {
2791     for (int i = 0; i < ws.length; ++i) {
2792     if ((w = ws[i]) != null) {
2793     while ((t = w.poll()) != null) {
2794     c.add(t);
2795     ++count;
2796     }
2797     }
2798     }
2799     }
2800     return count;
2801     }
2802    
2803     /**
2804     * Returns a string identifying this pool, as well as its state,
2805     * including indications of run state, parallelism level, and
2806     * worker and task counts.
2807     *
2808     * @return a string identifying this pool, as well as its state
2809     */
2810     public String toString() {
2811     // Use a single pass through workQueues to collect counts
2812     long qt = 0L, qs = 0L; int rc = 0;
2813     long st = stealCount;
2814     WorkQueue[] ws; WorkQueue w;
2815     if ((ws = workQueues) != null) {
2816     for (int i = 0; i < ws.length; ++i) {
2817     if ((w = ws[i]) != null) {
2818     int size = w.queueSize();
2819     if ((i & 1) == 0)
2820     qs += size;
2821     else {
2822     qt += size;
2823     st += (long)w.nsteals & 0xffffffffL;
2824     if (w.isApparentlyUnblocked())
2825     ++rc;
2826     }
2827     }
2828     }
2829     }
2830    
2831     int md = mode;
2832     int pc = (md & SMASK);
2833     long c = ctl;
2834     int tc = pc + (short)(c >>> TC_SHIFT);
2835     int ac = pc + (int)(c >> RC_SHIFT);
2836     if (ac < 0) // ignore transient negative
2837     ac = 0;
2838     String level = ((md & TERMINATED) != 0 ? "Terminated" :
2839     (md & STOP) != 0 ? "Terminating" :
2840     (md & SHUTDOWN) != 0 ? "Shutting down" :
2841     "Running");
2842     return super.toString() +
2843     "[" + level +
2844     ", parallelism = " + pc +
2845     ", size = " + tc +
2846     ", active = " + ac +
2847     ", running = " + rc +
2848     ", steals = " + st +
2849     ", tasks = " + qt +
2850     ", submissions = " + qs +
2851     "]";
2852     }
2853    
2854     /**
2855     * Possibly initiates an orderly shutdown in which previously
2856     * submitted tasks are executed, but no new tasks will be
2857     * accepted. Invocation has no effect on execution state if this
2858     * is the {@link #commonPool()}, and no additional effect if
2859     * already shut down. Tasks that are in the process of being
2860     * submitted concurrently during the course of this method may or
2861     * may not be rejected.
2862     *
2863     * @throws SecurityException if a security manager exists and
2864     * the caller is not permitted to modify threads
2865     * because it does not hold {@link
2866     * java.lang.RuntimePermission}{@code ("modifyThread")}
2867     */
2868     public void shutdown() {
2869     checkPermission();
2870     tryTerminate(false, true);
2871     }
2872    
2873     /**
2874     * Possibly attempts to cancel and/or stop all tasks, and reject
2875     * all subsequently submitted tasks. Invocation has no effect on
2876     * execution state if this is the {@link #commonPool()}, and no
2877     * additional effect if already shut down. Otherwise, tasks that
2878     * are in the process of being submitted or executed concurrently
2879     * during the course of this method may or may not be
2880     * rejected. This method cancels both existing and unexecuted
2881     * tasks, in order to permit termination in the presence of task
2882     * dependencies. So the method always returns an empty list
2883     * (unlike the case for some other Executors).
2884     *
2885     * @return an empty list
2886     * @throws SecurityException if a security manager exists and
2887     * the caller is not permitted to modify threads
2888     * because it does not hold {@link
2889     * java.lang.RuntimePermission}{@code ("modifyThread")}
2890     */
2891     public List<Runnable> shutdownNow() {
2892     checkPermission();
2893     tryTerminate(true, true);
2894     return Collections.emptyList();
2895     }
2896    
2897     /**
2898     * Returns {@code true} if all tasks have completed following shut down.
2899     *
2900     * @return {@code true} if all tasks have completed following shut down
2901     */
2902     public boolean isTerminated() {
2903     return (mode & TERMINATED) != 0;
2904     }
2905    
2906     /**
2907     * Returns {@code true} if the process of termination has
2908     * commenced but not yet completed. This method may be useful for
2909     * debugging. A return of {@code true} reported a sufficient
2910     * period after shutdown may indicate that submitted tasks have
2911     * ignored or suppressed interruption, or are waiting for I/O,
2912     * causing this executor not to properly terminate. (See the
2913     * advisory notes for class {@link ForkJoinTask} stating that
2914     * tasks should not normally entail blocking operations. But if
2915     * they do, they must abort them on interrupt.)
2916     *
2917     * @return {@code true} if terminating but not yet terminated
2918     */
2919     public boolean isTerminating() {
2920     int md = mode;
2921     return (md & STOP) != 0 && (md & TERMINATED) == 0;
2922     }
2923    
2924     /**
2925     * Returns {@code true} if this pool has been shut down.
2926     *
2927     * @return {@code true} if this pool has been shut down
2928     */
2929     public boolean isShutdown() {
2930     return (mode & SHUTDOWN) != 0;
2931     }
2932    
2933     /**
2934     * Blocks until all tasks have completed execution after a
2935     * shutdown request, or the timeout occurs, or the current thread
2936     * is interrupted, whichever happens first. Because the {@link
2937     * #commonPool()} never terminates until program shutdown, when
2938     * applied to the common pool, this method is equivalent to {@link
2939     * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2940     *
2941     * @param timeout the maximum time to wait
2942     * @param unit the time unit of the timeout argument
2943     * @return {@code true} if this executor terminated and
2944     * {@code false} if the timeout elapsed before termination
2945     * @throws InterruptedException if interrupted while waiting
2946     */
2947     public boolean awaitTermination(long timeout, TimeUnit unit)
2948     throws InterruptedException {
2949     if (Thread.interrupted())
2950     throw new InterruptedException();
2951     if (this == common) {
2952     awaitQuiescence(timeout, unit);
2953     return false;
2954     }
2955     long nanos = unit.toNanos(timeout);
2956     if (isTerminated())
2957     return true;
2958     if (nanos <= 0L)
2959     return false;
2960     long deadline = System.nanoTime() + nanos;
2961     synchronized (this) {
2962     for (;;) {
2963     if (isTerminated())
2964     return true;
2965     if (nanos <= 0L)
2966     return false;
2967     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2968     wait(millis > 0L ? millis : 1L);
2969     nanos = deadline - System.nanoTime();
2970     }
2971     }
2972     }
2973    
2974     /**
2975     * If called by a ForkJoinTask operating in this pool, equivalent
2976     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2977     * waits and/or attempts to assist performing tasks until this
2978     * pool {@link #isQuiescent} or the indicated timeout elapses.
2979     *
2980     * @param timeout the maximum time to wait
2981     * @param unit the time unit of the timeout argument
2982     * @return {@code true} if quiescent; {@code false} if the
2983     * timeout elapsed.
2984     */
2985     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2986     long nanos = unit.toNanos(timeout);
2987     ForkJoinWorkerThread wt;
2988     Thread thread = Thread.currentThread();
2989     if ((thread instanceof ForkJoinWorkerThread) &&
2990     (wt = (ForkJoinWorkerThread)thread).pool == this) {
2991     helpQuiescePool(wt.workQueue);
2992     return true;
2993     }
2994     else {
2995     for (long startTime = System.nanoTime();;) {
2996     ForkJoinTask<?> t;
2997     if ((t = pollScan(false)) != null)
2998     t.doExec();
2999     else if (isQuiescent())
3000     return true;
3001     else if ((System.nanoTime() - startTime) > nanos)
3002     return false;
3003     else
3004     Thread.yield(); // cannot block
3005     }
3006     }
3007     }
3008    
3009     /**
3010     * Waits and/or attempts to assist performing tasks indefinitely
3011     * until the {@link #commonPool()} {@link #isQuiescent}.
3012     */
3013     static void quiesceCommonPool() {
3014     common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3015     }
3016    
3017     /**
3018     * Interface for extending managed parallelism for tasks running
3019     * in {@link ForkJoinPool}s.
3020     *
3021     * <p>A {@code ManagedBlocker} provides two methods. Method
3022     * {@link #isReleasable} must return {@code true} if blocking is
3023     * not necessary. Method {@link #block} blocks the current thread
3024     * if necessary (perhaps internally invoking {@code isReleasable}
3025     * before actually blocking). These actions are performed by any
3026     * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3027     * The unusual methods in this API accommodate synchronizers that
3028     * may, but don't usually, block for long periods. Similarly, they
3029     * allow more efficient internal handling of cases in which
3030     * additional workers may be, but usually are not, needed to
3031     * ensure sufficient parallelism. Toward this end,
3032     * implementations of method {@code isReleasable} must be amenable
3033     * to repeated invocation.
3034     *
3035     * <p>For example, here is a ManagedBlocker based on a
3036     * ReentrantLock:
3037     * <pre> {@code
3038     * class ManagedLocker implements ManagedBlocker {
3039     * final ReentrantLock lock;
3040     * boolean hasLock = false;
3041     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3042     * public boolean block() {
3043     * if (!hasLock)
3044     * lock.lock();
3045     * return true;
3046     * }
3047     * public boolean isReleasable() {
3048     * return hasLock || (hasLock = lock.tryLock());
3049     * }
3050     * }}</pre>
3051     *
3052     * <p>Here is a class that possibly blocks waiting for an
3053     * item on a given queue:
3054     * <pre> {@code
3055     * class QueueTaker<E> implements ManagedBlocker {
3056     * final BlockingQueue<E> queue;
3057     * volatile E item = null;
3058     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3059     * public boolean block() throws InterruptedException {
3060     * if (item == null)
3061     * item = queue.take();
3062     * return true;
3063     * }
3064     * public boolean isReleasable() {
3065     * return item != null || (item = queue.poll()) != null;
3066     * }
3067     * public E getItem() { // call after pool.managedBlock completes
3068     * return item;
3069     * }
3070     * }}</pre>
3071     */
3072     public static interface ManagedBlocker {
3073     /**
3074     * Possibly blocks the current thread, for example waiting for
3075     * a lock or condition.
3076     *
3077     * @return {@code true} if no additional blocking is necessary
3078     * (i.e., if isReleasable would return true)
3079     * @throws InterruptedException if interrupted while waiting
3080     * (the method is not required to do so, but is allowed to)
3081     */
3082     boolean block() throws InterruptedException;
3083    
3084     /**
3085     * Returns {@code true} if blocking is unnecessary.
3086     * @return {@code true} if blocking is unnecessary
3087     */
3088     boolean isReleasable();
3089     }
3090    
3091     /**
3092     * Runs the given possibly blocking task. When {@linkplain
3093     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3094     * method possibly arranges for a spare thread to be activated if
3095     * necessary to ensure sufficient parallelism while the current
3096     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3097     *
3098     * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3099     * {@code blocker.block()} until either method returns {@code true}.
3100     * Every call to {@code blocker.block()} is preceded by a call to
3101     * {@code blocker.isReleasable()} that returned {@code false}.
3102     *
3103     * <p>If not running in a ForkJoinPool, this method is
3104     * behaviorally equivalent to
3105     * <pre> {@code
3106     * while (!blocker.isReleasable())
3107     * if (blocker.block())
3108     * break;}</pre>
3109     *
3110     * If running in a ForkJoinPool, the pool may first be expanded to
3111     * ensure sufficient parallelism available during the call to
3112     * {@code blocker.block()}.
3113     *
3114     * @param blocker the blocker task
3115     * @throws InterruptedException if {@code blocker.block()} did so
3116     */
3117     public static void managedBlock(ManagedBlocker blocker)
3118     throws InterruptedException {
3119     ForkJoinPool p;
3120     ForkJoinWorkerThread wt;
3121     WorkQueue w;
3122     Thread t = Thread.currentThread();
3123     if ((t instanceof ForkJoinWorkerThread) &&
3124     (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3125     (w = wt.workQueue) != null) {
3126     int block;
3127     while (!blocker.isReleasable()) {
3128     if ((block = p.tryCompensate(w)) != 0) {
3129     try {
3130     do {} while (!blocker.isReleasable() &&
3131     !blocker.block());
3132     } finally {
3133     U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3134     }
3135     break;
3136     }
3137     }
3138     }
3139     else {
3140     do {} while (!blocker.isReleasable() &&
3141     !blocker.block());
3142     }
3143     }
3144    
3145 dl 1.2 /**
3146     * If the given executor is a ForkJoinPool, poll and execute
3147     * AsynchronousCompletionTasks from worker's queue until none are
3148     * available or blocker is released.
3149     */
3150     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3151     if (blocker != null && (e instanceof ForkJoinPool)) {
3152     WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3153     ForkJoinPool p = (ForkJoinPool)e;
3154     Thread thread = Thread.currentThread();
3155     if (thread instanceof ForkJoinWorkerThread &&
3156     (wt = (ForkJoinWorkerThread)thread).pool == p)
3157     w = wt.workQueue;
3158     else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3159     (ws = p.workQueues) != null && (n = ws.length) > 0)
3160     w = ws[(n - 1) & r & SQMASK];
3161     else
3162     w = null;
3163     if (w != null) {
3164     for (;;) {
3165     int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3166     if ((a = w.array) != null && (d = b - s) < 0 &&
3167     (al = a.length) > 0) {
3168     int index = (al - 1) & b;
3169     long offset = ((long)index << ASHIFT) + ABASE;
3170     ForkJoinTask<?> t = (ForkJoinTask<?>)
3171     U.getObjectVolatile(a, offset);
3172     if (blocker.isReleasable())
3173     break;
3174     else if (b++ == w.base) {
3175     if (t == null) {
3176     if (d == -1)
3177     break;
3178     }
3179     else if (!(t instanceof CompletableFuture.
3180     AsynchronousCompletionTask))
3181     break;
3182     else if (U.compareAndSwapObject(a, offset,
3183     t, null)) {
3184     w.base = b;
3185     t.doExec();
3186     }
3187     }
3188     }
3189     else
3190     break;
3191     }
3192     }
3193     }
3194     }
3195    
3196 jsr166 1.1 // AbstractExecutorService overrides. These rely on undocumented
3197     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3198     // implement RunnableFuture.
3199    
3200     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3201     return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3202     }
3203    
3204     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3205     return new ForkJoinTask.AdaptedCallable<T>(callable);
3206     }
3207    
3208     // Unsafe mechanics
3209     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3210     private static final long CTL;
3211     private static final long MODE;
3212     private static final int ABASE;
3213     private static final int ASHIFT;
3214    
3215     static {
3216     try {
3217     CTL = U.objectFieldOffset
3218     (ForkJoinPool.class.getDeclaredField("ctl"));
3219     MODE = U.objectFieldOffset
3220     (ForkJoinPool.class.getDeclaredField("mode"));
3221     ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3222     int scale = U.arrayIndexScale(ForkJoinTask[].class);
3223     if ((scale & (scale - 1)) != 0)
3224     throw new Error("array index scale not a power of two");
3225     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3226     } catch (ReflectiveOperationException e) {
3227     throw new Error(e);
3228     }
3229    
3230     // Reduce the risk of rare disastrous classloading in first call to
3231     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3232     Class<?> ensureLoaded = LockSupport.class;
3233    
3234     int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3235     try {
3236     String p = System.getProperty
3237     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3238     if (p != null)
3239     commonMaxSpares = Integer.parseInt(p);
3240     } catch (Exception ignore) {}
3241     COMMON_MAX_SPARES = commonMaxSpares;
3242    
3243     defaultForkJoinWorkerThreadFactory =
3244     new DefaultForkJoinWorkerThreadFactory();
3245     modifyThreadPermission = new RuntimePermission("modifyThread");
3246    
3247     common = java.security.AccessController.doPrivileged
3248     (new java.security.PrivilegedAction<ForkJoinPool>() {
3249     public ForkJoinPool run() {
3250     return new ForkJoinPool((byte)0); }});
3251    
3252 dl 1.2 COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3253 jsr166 1.1 }
3254    
3255     /**
3256     * Factory for innocuous worker threads.
3257     */
3258     private static final class InnocuousForkJoinWorkerThreadFactory
3259     implements ForkJoinWorkerThreadFactory {
3260    
3261     /**
3262     * An ACC to restrict permissions for the factory itself.
3263     * The constructed workers have no permissions set.
3264     */
3265     private static final AccessControlContext innocuousAcc;
3266     static {
3267     Permissions innocuousPerms = new Permissions();
3268     innocuousPerms.add(modifyThreadPermission);
3269     innocuousPerms.add(new RuntimePermission(
3270     "enableContextClassLoaderOverride"));
3271     innocuousPerms.add(new RuntimePermission(
3272     "modifyThreadGroup"));
3273     innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3274     new ProtectionDomain(null, innocuousPerms)
3275     });
3276     }
3277    
3278     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3279     return java.security.AccessController.doPrivileged(
3280     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3281     public ForkJoinWorkerThread run() {
3282     return new ForkJoinWorkerThread.
3283     InnocuousForkJoinWorkerThread(pool);
3284     }}, innocuousAcc);
3285     }
3286     }
3287    
3288     }