ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.304
Committed: Mon Mar 14 18:11:23 2016 UTC (8 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.303: +5 -5 lines
Log Message:
clarify implementation comment

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6 jsr166 1.301
7 jsr166 1.1 package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.228 import java.security.AccessControlContext;
11     import java.security.Permissions;
12     import java.security.ProtectionDomain;
13 jsr166 1.1 import java.util.ArrayList;
14     import java.util.Arrays;
15     import java.util.Collection;
16     import java.util.Collections;
17     import java.util.List;
18 dl 1.300 import java.util.concurrent.TimeUnit;
19     import java.util.concurrent.CountedCompleter;
20     import java.util.concurrent.ForkJoinTask;
21     import java.util.concurrent.ForkJoinWorkerThread;
22 dl 1.243 import java.util.concurrent.locks.LockSupport;
23 jsr166 1.1
24     /**
25 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
27 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
28 jsr166 1.11 * monitoring operations.
29 jsr166 1.1 *
30 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31     * ExecutorService} mainly by virtue of employing
32     * <em>work-stealing</em>: all threads in the pool attempt to find and
33 dl 1.78 * execute tasks submitted to the pool and/or created by other active
34     * tasks (eventually blocking waiting for work if none exist). This
35     * enables efficient processing when most tasks spawn other subtasks
36     * (as do most {@code ForkJoinTask}s), as well as when many small
37     * tasks are submitted to the pool from external clients. Especially
38     * when setting <em>asyncMode</em> to true in constructors, {@code
39     * ForkJoinPool}s may also be appropriate for use with event-style
40     * tasks that are never joined.
41 jsr166 1.1 *
42 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
43 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
44     * is not explicitly submitted to a specified pool. Using the common
45     * pool normally reduces resource usage (its threads are slowly
46     * reclaimed during periods of non-use, and reinstated upon subsequent
47 dl 1.105 * use).
48 dl 1.100 *
49     * <p>For applications that require separate or custom pools, a {@code
50     * ForkJoinPool} may be constructed with a given target parallelism
51 jsr166 1.214 * level; by default, equal to the number of available processors.
52     * The pool attempts to maintain enough active (or available) threads
53     * by dynamically adding, suspending, or resuming internal worker
54 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
55     * However, no such adjustments are guaranteed in the face of blocked
56     * I/O or other unmanaged synchronization. The nested {@link
57 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
58 dl 1.300 * synchronization accommodated. The default policies may be
59     * overridden using a constructor with parameters corresponding to
60     * those documented in class {@link ThreadPoolExecutor}.
61 jsr166 1.1 *
62     * <p>In addition to execution and lifecycle control methods, this
63     * class provides status check methods (for example
64 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
65 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
66 jsr166 1.4 * {@link #toString} returns indications of pool state in a
67 jsr166 1.1 * convenient form for informal monitoring.
68     *
69 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
70 jsr166 1.84 * main task execution methods summarized in the following table.
71     * These are designed to be used primarily by clients not already
72     * engaged in fork/join computations in the current pool. The main
73     * forms of these methods accept instances of {@code ForkJoinTask},
74     * but overloaded forms also allow mixed execution of plain {@code
75     * Runnable}- or {@code Callable}- based activities as well. However,
76     * tasks that are already executing in a pool should normally instead
77     * use the within-computation forms listed in the table unless using
78     * async event-style tasks that are not usually joined, in which case
79     * there is little difference among choice of methods.
80 dl 1.18 *
81     * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 jsr166 1.159 * <caption>Summary of task execution methods</caption>
83 dl 1.18 * <tr>
84     * <td></td>
85     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87     * </tr>
88     * <tr>
89 jsr166 1.153 * <td> <b>Arrange async execution</b></td>
90 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
91     * <td> {@link ForkJoinTask#fork}</td>
92     * </tr>
93     * <tr>
94 jsr166 1.153 * <td> <b>Await and obtain result</b></td>
95 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#invoke}</td>
97     * </tr>
98     * <tr>
99 jsr166 1.153 * <td> <b>Arrange exec and obtain Future</b></td>
100 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102     * </tr>
103     * </table>
104 dl 1.19 *
105 dl 1.105 * <p>The common pool is by default constructed with default
106 jsr166 1.155 * parameters, but these may be controlled by setting three
107 jsr166 1.162 * {@linkplain System#getProperty system properties}:
108     * <ul>
109     * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110     * - the parallelism level, a non-negative integer
111     * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112     * - the class name of a {@link ForkJoinWorkerThreadFactory}
113     * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114     * - the class name of a {@link UncaughtExceptionHandler}
115 dl 1.208 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 dl 1.223 * - the maximum number of allowed extra threads to maintain target
117 dl 1.208 * parallelism (default 256).
118 jsr166 1.162 * </ul>
119 dl 1.197 * If a {@link SecurityManager} is present and no factory is
120     * specified, then the default pool uses a factory supplying
121     * threads that have no {@link Permissions} enabled.
122 jsr166 1.165 * The system class loader is used to load these classes.
123 jsr166 1.156 * Upon any error in establishing these settings, default parameters
124 dl 1.160 * are used. It is possible to disable or limit the use of threads in
125     * the common pool by setting the parallelism property to zero, and/or
126 dl 1.193 * using a factory that may return {@code null}. However doing so may
127     * cause unjoined tasks to never be executed.
128 dl 1.105 *
129 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
130     * maximum number of running threads to 32767. Attempts to create
131 jsr166 1.11 * pools with greater than the maximum number result in
132 jsr166 1.8 * {@code IllegalArgumentException}.
133 jsr166 1.1 *
134 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
135 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
136 dl 1.20 * or internal resources have been exhausted.
137 jsr166 1.11 *
138 jsr166 1.1 * @since 1.7
139     * @author Doug Lea
140     */
141     public class ForkJoinPool extends AbstractExecutorService {
142    
143     /*
144 dl 1.14 * Implementation Overview
145     *
146 dl 1.78 * This class and its nested classes provide the main
147     * functionality and control for a set of worker threads:
148 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
149     * Workers take these tasks and typically split them into subtasks
150     * that may be stolen by other workers. Preference rules give
151     * first priority to processing tasks from their own queues (LIFO
152     * or FIFO, depending on mode), then to randomized FIFO steals of
153 dl 1.200 * tasks in other queues. This framework began as vehicle for
154     * supporting tree-structured parallelism using work-stealing.
155     * Over time, its scalability advantages led to extensions and
156 dl 1.208 * changes to better support more diverse usage contexts. Because
157     * most internal methods and nested classes are interrelated,
158     * their main rationale and descriptions are presented here;
159     * individual methods and nested classes contain only brief
160     * comments about details.
161 dl 1.78 *
162 jsr166 1.84 * WorkQueues
163 dl 1.78 * ==========
164     *
165     * Most operations occur within work-stealing queues (in nested
166     * class WorkQueue). These are special forms of Deques that
167     * support only three of the four possible end-operations -- push,
168     * pop, and poll (aka steal), under the further constraints that
169     * push and pop are called only from the owning thread (or, as
170     * extended here, under a lock), while poll may be called from
171     * other threads. (If you are unfamiliar with them, you probably
172     * want to read Herlihy and Shavit's book "The Art of
173     * Multiprocessor programming", chapter 16 describing these in
174     * more detail before proceeding.) The main work-stealing queue
175     * design is roughly similar to those in the papers "Dynamic
176     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177     * (http://research.sun.com/scalable/pubs/index.html) and
178     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 dl 1.200 * The main differences ultimately stem from GC requirements that
181     * we null out taken slots as soon as we can, to maintain as small
182     * a footprint as possible even in programs generating huge
183     * numbers of tasks. To accomplish this, we shift the CAS
184     * arbitrating pop vs poll (steal) from being on the indices
185     * ("base" and "top") to the slots themselves.
186     *
187 dl 1.243 * Adding tasks then takes the form of a classic array push(task)
188     * in a circular buffer:
189     * q.array[q.top++ % length] = task;
190 dl 1.200 *
191     * (The actual code needs to null-check and size-check the array,
192 jsr166 1.247 * uses masking, not mod, for indexing a power-of-two-sized array,
193 dl 1.243 * properly fences accesses, and possibly signals waiting workers
194     * to start scanning -- see below.) Both a successful pop and
195     * poll mainly entail a CAS of a slot from non-null to null.
196 dl 1.200 *
197 jsr166 1.202 * The pop operation (always performed by owner) is:
198 dl 1.243 * if ((the task at top slot is not null) and
199 dl 1.200 * (CAS slot to null))
200     * decrement top and return task;
201     *
202     * And the poll operation (usually by a stealer) is
203 dl 1.243 * if ((the task at base slot is not null) and
204 dl 1.200 * (CAS slot to null))
205     * increment base and return task;
206     *
207 dl 1.300 * There are several variants of each of these. In particular,
208     * almost all uses of poll occur within scan operations that also
209     * interleave contention tracking (with associated code sprawl.)
210 dl 1.243 *
211     * Memory ordering. See "Correct and Efficient Work-Stealing for
212     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214     * analysis of memory ordering requirements in work-stealing
215     * algorithms similar to (but different than) the one used here.
216     * Extracting tasks in array slots via (fully fenced) CAS provides
217     * primary synchronization. The base and top indices imprecisely
218     * guide where to extract from. We do not always require strict
219     * orderings of array and index updates, so sometimes let them be
220     * subject to compiler and processor reorderings. However, the
221     * volatile "base" index also serves as a basis for memory
222     * ordering: Slot accesses are preceded by a read of base,
223 jsr166 1.247 * ensuring happens-before ordering with respect to stealers (so
224 dl 1.243 * the slots themselves can be read via plain array reads.) The
225     * only other memory orderings relied on are maintained in the
226     * course of signalling and activation (see below). A check that
227     * base == top indicates (momentary) emptiness, but otherwise may
228     * err on the side of possibly making the queue appear nonempty
229     * when a push, pop, or poll have not fully committed, or making
230     * it appear empty when an update of top has not yet been visibly
231     * written. (Method isEmpty() checks the case of a partially
232 dl 1.211 * completed removal of the last element.) Because of this, the
233     * poll operation, considered individually, is not wait-free. One
234     * thief cannot successfully continue until another in-progress
235 dl 1.243 * one (or, if previously empty, a push) visibly completes.
236     * However, in the aggregate, we ensure at least probabilistic
237     * non-blockingness. If an attempted steal fails, a scanning
238     * thief chooses a different random victim target to try next. So,
239     * in order for one thief to progress, it suffices for any
240 dl 1.205 * in-progress poll or new push on any empty queue to
241 dl 1.300 * complete.
242 dl 1.200 *
243     * This approach also enables support of a user mode in which
244     * local task processing is in FIFO, not LIFO order, simply by
245     * using poll rather than pop. This can be useful in
246     * message-passing frameworks in which tasks are never joined.
247 dl 1.78 *
248     * WorkQueues are also used in a similar way for tasks submitted
249     * to the pool. We cannot mix these tasks in the same queues used
250 dl 1.200 * by workers. Instead, we randomly associate submission queues
251 dl 1.83 * with submitting threads, using a form of hashing. The
252 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
253     * choosing existing queues, and may be randomly repositioned upon
254     * contention with other submitters. In essence, submitters act
255     * like workers except that they are restricted to executing local
256 dl 1.300 * tasks that they submitted. Insertion of tasks in shared mode
257     * requires a lock but we use only a simple spinlock (using field
258 jsr166 1.304 * phase), because submitters encountering a busy queue move to a
259     * different position to use or create other queues -- they block
260     * only when creating and registering new queues. Because it is
261     * used only as a spinlock, unlocking requires only a "releasing"
262     * store (using putOrderedInt).
263 dl 1.78 *
264 jsr166 1.84 * Management
265 dl 1.78 * ==========
266 dl 1.52 *
267     * The main throughput advantages of work-stealing stem from
268     * decentralized control -- workers mostly take tasks from
269 dl 1.200 * themselves or each other, at rates that can exceed a billion
270     * per second. The pool itself creates, activates (enables
271     * scanning for and running tasks), deactivates, blocks, and
272     * terminates threads, all with minimal central information.
273     * There are only a few properties that we can globally track or
274     * maintain, so we pack them into a small number of variables,
275     * often maintaining atomicity without blocking or locking.
276 dl 1.300 * Nearly all essentially atomic control state is held in a few
277 dl 1.200 * volatile variables that are by far most often read (not
278 dl 1.300 * written) as status and consistency checks. We pack as much
279     * information into them as we can.
280 dl 1.78 *
281 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
282 dl 1.300 * atomically decide to add, enqueue (on an event queue), and
283     * dequeue (and release)-activate workers. To enable this
284 dl 1.78 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285     * far in excess of normal operating range) to allow ids, counts,
286     * and their negations (used for thresholding) to fit into 16bit
287 dl 1.215 * subfields.
288     *
289 dl 1.300 * Field "mode" holds configuration parameters as well as lifetime
290     * status, atomically and monotonically setting SHUTDOWN, STOP,
291     * and finally TERMINATED bits.
292 dl 1.258 *
293     * Field "workQueues" holds references to WorkQueues. It is
294 dl 1.300 * updated (only during worker creation and termination) under
295     * lock (using field workerNamePrefix as lock), but is otherwise
296     * concurrently readable, and accessed directly. We also ensure
297     * that uses of the array reference itself never become too stale
298     * in case of resizing. To simplify index-based operations, the
299     * array size is always a power of two, and all readers must
300     * tolerate null slots. Worker queues are at odd indices. Shared
301     * (submission) queues are at even indices, up to a maximum of 64
302     * slots, to limit growth even if array needs to expand to add
303     * more workers. Grouping them together in this way simplifies and
304 dl 1.262 * speeds up task scanning.
305 dl 1.86 *
306     * All worker thread creation is on-demand, triggered by task
307     * submissions, replacement of terminated workers, and/or
308 dl 1.78 * compensation for blocked workers. However, all other support
309     * code is set up to work with other policies. To ensure that we
310 jsr166 1.264 * do not hold on to worker references that would prevent GC, all
311 dl 1.78 * accesses to workQueues are via indices into the workQueues
312     * array (which is one source of some of the messy code
313     * constructions here). In essence, the workQueues array serves as
314 dl 1.200 * a weak reference mechanism. Thus for example the stack top
315     * subfield of ctl stores indices, not references.
316     *
317     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318     * cannot let workers spin indefinitely scanning for tasks when
319     * none can be found immediately, and we cannot start/resume
320     * workers unless there appear to be tasks available. On the
321     * other hand, we must quickly prod them into action when new
322     * tasks are submitted or generated. In many usages, ramp-up time
323 dl 1.300 * is the main limiting factor in overall performance, which is
324     * compounded at program start-up by JIT compilation and
325     * allocation. So we streamline this as much as possible.
326     *
327     * The "ctl" field atomically maintains total worker and
328     * "released" worker counts, plus the head of the available worker
329     * queue (actually stack, represented by the lower 32bit subfield
330     * of ctl). Released workers are those known to be scanning for
331     * and/or running tasks. Unreleased ("available") workers are
332     * recorded in the ctl stack. These workers are made available for
333     * signalling by enqueuing in ctl (see method runWorker). The
334     * "queue" is a form of Treiber stack. This is ideal for
335     * activating threads in most-recently used order, and improves
336 dl 1.200 * performance and locality, outweighing the disadvantages of
337     * being prone to contention and inability to release a worker
338 dl 1.300 * unless it is topmost on stack. To avoid missed signal problems
339     * inherent in any wait/signal design, available workers rescan
340     * for (and if found run) tasks after enqueuing. Normally their
341     * release status will be updated while doing so, but the released
342     * worker ctl count may underestimate the number of active
343     * threads. (However, it is still possible to determine quiescence
344     * via a validation traversal -- see isQuiescent). After an
345     * unsuccessful rescan, available workers are blocked until
346     * signalled (see signalWork). The top stack state holds the
347     * value of the "phase" field of the worker: its index and status,
348     * plus a version counter that, in addition to the count subfields
349     * (also serving as version stamps) provide protection against
350     * Treiber stack ABA effects.
351 dl 1.200 *
352 dl 1.300 * Creating workers. To create a worker, we pre-increment counts
353     * (serving as a reservation), and attempt to construct a
354 dl 1.200 * ForkJoinWorkerThread via its factory. Upon construction, the
355     * new thread invokes registerWorker, where it constructs a
356     * WorkQueue and is assigned an index in the workQueues array
357 jsr166 1.266 * (expanding the array if necessary). The thread is then started.
358     * Upon any exception across these steps, or null return from
359     * factory, deregisterWorker adjusts counts and records
360 dl 1.200 * accordingly. If a null return, the pool continues running with
361     * fewer than the target number workers. If exceptional, the
362     * exception is propagated, generally to some external caller.
363     * Worker index assignment avoids the bias in scanning that would
364     * occur if entries were sequentially packed starting at the front
365     * of the workQueues array. We treat the array as a simple
366     * power-of-two hash table, expanding as needed. The seedIndex
367     * increment ensures no collisions until a resize is needed or a
368     * worker is deregistered and replaced, and thereafter keeps
369 jsr166 1.202 * probability of collision low. We cannot use
370 dl 1.200 * ThreadLocalRandom.getProbe() for similar purposes here because
371     * the thread has not started yet, but do so for creating
372 dl 1.243 * submission queues for existing external threads (see
373     * externalPush).
374     *
375 dl 1.300 * WorkQueue field "phase" is used by both workers and the pool to
376     * manage and track whether a worker is UNSIGNALLED (possibly
377     * blocked waiting for a signal). When a worker is enqueued its
378     * phase field is set. Note that phase field updates lag queue CAS
379     * releases so usage requires care -- seeing a negative phase does
380     * not guarantee that the worker is available. When queued, the
381     * lower 16 bits of scanState must hold its pool index. So we
382     * place the index there upon initialization (see registerWorker)
383     * and otherwise keep it there or restore it when necessary.
384 dl 1.243 *
385     * The ctl field also serves as the basis for memory
386     * synchronization surrounding activation. This uses a more
387     * efficient version of a Dekker-like rule that task producers and
388     * consumers sync with each other by both writing/CASing ctl (even
389 dl 1.253 * if to its current value). This would be extremely costly. So
390     * we relax it in several ways: (1) Producers only signal when
391     * their queue is empty. Other workers propagate this signal (in
392 dl 1.300 * method scan) when they find tasks; to further reduce flailing,
393     * each worker signals only one other per activation. (2) Workers
394     * only enqueue after scanning (see below) and not finding any
395     * tasks. (3) Rather than CASing ctl to its current value in the
396     * common case where no action is required, we reduce write
397     * contention by equivalently prefacing signalWork when called by
398     * an external task producer using a memory access with
399     * full-volatile semantics or a "fullFence".
400 dl 1.243 *
401 jsr166 1.249 * Almost always, too many signals are issued. A task producer
402 dl 1.243 * cannot in general tell if some existing worker is in the midst
403     * of finishing one task (or already scanning) and ready to take
404     * another without being signalled. So the producer might instead
405     * activate a different worker that does not find any work, and
406     * then inactivates. This scarcely matters in steady-state
407     * computations involving all workers, but can create contention
408 jsr166 1.247 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 dl 1.243 * computations involving only a few workers.
410     *
411 dl 1.300 * Scanning. Method runWorker performs top-level scanning for
412     * tasks. Each scan traverses and tries to poll from each queue
413     * starting at a random index and circularly stepping. Scans are
414     * not performed in ideal random permutation order, to reduce
415     * cacheline contention. The pseudorandom generator need not have
416     * high-quality statistical properties in the long term, but just
417     * within computations; We use Marsaglia XorShifts (often via
418     * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419     * suffice. Scanning also employs contention reduction: When
420     * scanning workers fail to extract an apparently existing task,
421     * they soon restart at a different pseudorandom index. This
422     * improves throughput when many threads are trying to take tasks
423     * from few queues, which can be common in some usages. Scans do
424     * not otherwise explicitly take into account core affinities,
425     * loads, cache localities, etc, However, they do exploit temporal
426     * locality (which usually approximates these) by preferring to
427     * re-poll (at most #workers times) from the same queue after a
428     * successful poll before trying others.
429 dl 1.52 *
430     * Trimming workers. To release resources after periods of lack of
431     * use, a worker starting to wait when the pool is quiescent will
432 dl 1.300 * time out and terminate (see method scan) if the pool has
433     * remained quiescent for period given by field keepAlive.
434 dl 1.52 *
435 dl 1.210 * Shutdown and Termination. A call to shutdownNow invokes
436     * tryTerminate to atomically set a runState bit. The calling
437     * thread, as well as every other worker thereafter terminating,
438 dl 1.300 * helps terminate others by cancelling their unprocessed tasks,
439     * and waking them up, doing so repeatedly until stable. Calls to
440     * non-abrupt shutdown() preface this by checking whether
441     * termination should commence by sweeping through queues (until
442     * stable) to ensure lack of in-flight submissions and workers
443     * about to process them before triggering the "STOP" phase of
444     * termination.
445 dl 1.211 *
446 jsr166 1.84 * Joining Tasks
447     * =============
448 dl 1.78 *
449     * Any of several actions may be taken when one worker is waiting
450 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
451 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
452 dl 1.300 * always just let them block (as in Thread.join). We also cannot
453     * just reassign the joiner's run-time stack with another and
454     * replace it later, which would be a form of "continuation", that
455     * even if possible is not necessarily a good idea since we may
456     * need both an unblocked task and its continuation to progress.
457     * Instead we combine two tactics:
458 dl 1.19 *
459     * Helping: Arranging for the joiner to execute some task that it
460 dl 1.78 * would be running if the steal had not occurred.
461 dl 1.19 *
462     * Compensating: Unless there are already enough live threads,
463 dl 1.78 * method tryCompensate() may create or re-activate a spare
464     * thread to compensate for blocked joiners until they unblock.
465     *
466 dl 1.105 * A third form (implemented in tryRemoveAndExec) amounts to
467     * helping a hypothetical compensator: If we can readily tell that
468     * a possible action of a compensator is to steal and execute the
469     * task being joined, the joining thread can do so directly,
470 dl 1.300 * without the need for a compensation thread.
471 dl 1.52 *
472     * The ManagedBlocker extension API can't use helping so relies
473     * only on compensation in method awaitBlocker.
474 dl 1.19 *
475 dl 1.300 * The algorithm in awaitJoin entails a form of "linear helping".
476     * Each worker records (in field source) the id of the queue from
477     * which it last stole a task. The scan in method awaitJoin uses
478     * these markers to try to find a worker to help (i.e., steal back
479     * a task from and execute it) that could hasten completion of the
480     * actively joined task. Thus, the joiner executes a task that
481     * would be on its own local deque if the to-be-joined task had
482     * not been stolen. This is a conservative variant of the approach
483     * described in Wagner & Calder "Leapfrogging: a portable
484     * technique for implementing efficient futures" SIGPLAN Notices,
485     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486     * mainly in that we only record queue ids, not full dependency
487     * links. This requires a linear scan of the workQueues array to
488     * locate stealers, but isolates cost to when it is needed, rather
489     * than adding to per-task overhead. Searches can fail to locate
490     * stealers GC stalls and the like delay recording sources.
491     * Further, even when accurately identified, stealers might not
492     * ever produce a task that the joiner can in turn help with. So,
493     * compensation is tried upon failure to find tasks to run.
494 dl 1.105 *
495 dl 1.300 * Compensation does not by default aim to keep exactly the target
496 dl 1.200 * parallelism number of unblocked threads running at any given
497     * time. Some previous versions of this class employed immediate
498     * compensations for any blocked join. However, in practice, the
499     * vast majority of blockages are transient byproducts of GC and
500     * other JVM or OS activities that are made worse by replacement.
501 dl 1.300 * Rather than impose arbitrary policies, we allow users to
502     * override the default of only adding threads upon apparent
503     * starvation. The compensation mechanism may also be bounded.
504     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505     * JVMs to cope with programming errors and abuse before running
506     * out of resources to do so.
507 jsr166 1.301 *
508 dl 1.105 * Common Pool
509     * ===========
510     *
511 jsr166 1.175 * The static common pool always exists after static
512 dl 1.105 * initialization. Since it (or any other created pool) need
513     * never be used, we minimize initial construction overhead and
514 dl 1.300 * footprint to the setup of about a dozen fields.
515 dl 1.105 *
516     * When external threads submit to the common pool, they can
517 dl 1.200 * perform subtask processing (see externalHelpComplete and
518     * related methods) upon joins. This caller-helps policy makes it
519     * sensible to set common pool parallelism level to one (or more)
520     * less than the total number of available cores, or even zero for
521     * pure caller-runs. We do not need to record whether external
522     * submissions are to the common pool -- if not, external help
523     * methods return quickly. These submitters would otherwise be
524     * blocked waiting for completion, so the extra effort (with
525     * liberally sprinkled task status checks) in inapplicable cases
526     * amounts to an odd form of limited spin-wait before blocking in
527     * ForkJoinTask.join.
528 dl 1.105 *
529 dl 1.197 * As a more appropriate default in managed environments, unless
530     * overridden by system properties, we use workers of subclass
531     * InnocuousForkJoinWorkerThread when there is a SecurityManager
532     * present. These workers have no permissions set, do not belong
533     * to any user-defined ThreadGroup, and erase all ThreadLocals
534 dl 1.300 * after executing any top-level task (see
535     * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536     * in ForkJoinWorkerThread) may be JVM-dependent and must access
537     * particular Thread class fields to achieve this effect.
538 jsr166 1.198 *
539 dl 1.105 * Style notes
540     * ===========
541     *
542 dl 1.200 * Memory ordering relies mainly on Unsafe intrinsics that carry
543     * the further responsibility of explicitly performing null- and
544     * bounds- checks otherwise carried out implicitly by JVMs. This
545     * can be awkward and ugly, but also reflects the need to control
546     * outcomes across the unusual cases that arise in very racy code
547     * with very few invariants. So these explicit checks would exist
548     * in some form anyway. All fields are read into locals before
549     * use, and null-checked if they are references. This is usually
550     * done in a "C"-like style of listing declarations at the heads
551     * of methods or blocks, and using inline assignments on first
552     * encounter. Array bounds-checks are usually performed by
553     * masking with array.length-1, which relies on the invariant that
554     * these arrays are created with positive lengths, which is itself
555     * paranoically checked. Nearly all explicit checks lead to
556     * bypass/return, not exception throws, because they may
557     * legitimately arise due to cancellation/revocation during
558     * shutdown.
559     *
560 dl 1.105 * There is a lot of representation-level coupling among classes
561     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562     * fields of WorkQueue maintain data structures managed by
563     * ForkJoinPool, so are directly accessed. There is little point
564     * trying to reduce this, since any associated future changes in
565     * representations will need to be accompanied by algorithmic
566     * changes anyway. Several methods intrinsically sprawl because
567 dl 1.200 * they must accumulate sets of consistent reads of fields held in
568     * local variables. There are also other coding oddities
569     * (including several unnecessary-looking hoisted null checks)
570     * that help some methods perform reasonably even when interpreted
571     * (not compiled).
572 dl 1.52 *
573 dl 1.208 * The order of declarations in this file is (with a few exceptions):
574 dl 1.86 * (1) Static utility functions
575     * (2) Nested (static) classes
576     * (3) Static fields
577     * (4) Fields, along with constants used when unpacking some of them
578     * (5) Internal control methods
579     * (6) Callbacks and other support for ForkJoinTask methods
580     * (7) Exported methods
581     * (8) Static block initializing statics in minimally dependent order
582     */
583    
584     // Static utilities
585    
586     /**
587     * If there is a security manager, makes sure caller has
588     * permission to modify threads.
589 jsr166 1.1 */
590 dl 1.86 private static void checkPermission() {
591     SecurityManager security = System.getSecurityManager();
592     if (security != null)
593     security.checkPermission(modifyThreadPermission);
594     }
595    
596     // Nested classes
597 jsr166 1.1
598     /**
599 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
600     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601     * for {@code ForkJoinWorkerThread} subclasses that extend base
602     * functionality or initialize threads with different contexts.
603 jsr166 1.1 */
604     public static interface ForkJoinWorkerThreadFactory {
605     /**
606     * Returns a new worker thread operating in the given pool.
607 dl 1.300 * Returning null or throwing an exception may result in tasks
608     * never being executed. If this method throws an exception,
609     * it is relayed to the caller of the method (for example
610     * {@code execute}) causing attempted thread creation. If this
611     * method returns null or throws an exception, it is not
612     * retried until the next attempted creation (for example
613     * another call to {@code execute}).
614 jsr166 1.1 *
615     * @param pool the pool this thread works in
616 jsr166 1.296 * @return the new worker thread, or {@code null} if the request
617 dl 1.300 * to create a thread is rejected.
618 jsr166 1.11 * @throws NullPointerException if the pool is null
619 jsr166 1.1 */
620     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621     }
622    
623     /**
624     * Default ForkJoinWorkerThreadFactory implementation; creates a
625     * new ForkJoinWorkerThread.
626     */
627 jsr166 1.278 private static final class DefaultForkJoinWorkerThreadFactory
628 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
629 dl 1.112 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 dl 1.14 return new ForkJoinWorkerThread(pool);
631 jsr166 1.1 }
632     }
633    
634 dl 1.200 // Constants shared across ForkJoinPool and WorkQueue
635    
636     // Bounds
637 dl 1.300 static final int SWIDTH = 16; // width of short
638 dl 1.200 static final int SMASK = 0xffff; // short bits == max index
639     static final int MAX_CAP = 0x7fff; // max #workers - 1
640     static final int SQMASK = 0x007e; // max 64 (even) slots
641    
642 dl 1.300 // Masks and units for WorkQueue.phase and ctl sp subfield
643 dl 1.243 static final int UNSIGNALLED = 1 << 31; // must be negative
644 dl 1.211 static final int SS_SEQ = 1 << 16; // version count
645 dl 1.300 static final int QLOCK = 1; // must be 1
646 dl 1.200
647 dl 1.300 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648     static final int OWNED = 1; // queue has owner thread
649     static final int FIFO = 1 << 16; // fifo queue or access mode
650     static final int SATURATE = 1 << 17; // for tryCompensate
651     static final int SHUTDOWN = 1 << 18;
652     static final int TERMINATED = 1 << 19;
653     static final int STOP = 1 << 31; // must be negative
654     static final int QUIET = 1 << 30; // not scanning or working
655     static final int DORMANT = QUIET | UNSIGNALLED;
656    
657     /**
658     * The maximum number of local polls from the same queue before
659     * checking others. This is a safeguard against infinitely unfair
660     * looping under unbounded user task recursion, and must be larger
661     * than plausible cases of intentional bounded task recursion.
662 dl 1.253 */
663 dl 1.300 static final int POLL_LIMIT = 1 << 10;
664 dl 1.253
665     /**
666 dl 1.78 * Queues supporting work-stealing as well as external task
667 jsr166 1.202 * submission. See above for descriptions and algorithms.
668 dl 1.78 * Performance on most platforms is very sensitive to placement of
669     * instances of both WorkQueues and their arrays -- we absolutely
670     * do not want multiple WorkQueue instances or multiple queue
671 dl 1.200 * arrays sharing cache lines. The @Contended annotation alerts
672     * JVMs to try to keep instances apart.
673 dl 1.78 */
674 dl 1.300 // For now, using manual padding.
675     // @jdk.internal.vm.annotation.Contended
676     // @sun.misc.Contended
677 dl 1.78 static final class WorkQueue {
678 dl 1.200
679 dl 1.78 /**
680     * Capacity of work-stealing queue array upon initialization.
681 dl 1.90 * Must be a power of two; at least 4, but should be larger to
682     * reduce or eliminate cacheline sharing among queues.
683     * Currently, it is much larger, as a partial workaround for
684     * the fact that JVMs often place arrays in locations that
685     * share GC bookkeeping (especially cardmarks) such that
686     * per-write accesses encounter serious memory contention.
687 dl 1.78 */
688 dl 1.90 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689 dl 1.78
690     /**
691     * Maximum size for queue arrays. Must be a power of two less
692     * than or equal to 1 << (31 - width of array entry) to ensure
693     * lack of wraparound of index calculations, but defined to a
694     * value a bit less than this to help users trap runaway
695     * programs before saturating systems.
696     */
697     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698    
699 dl 1.200 // Instance fields
700 dl 1.300 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702     volatile int phase; // versioned, negative: queued, 1: locked
703     int stackPred; // pool stack (ctl) predecessor link
704 dl 1.178 int nsteals; // number of steals
705 dl 1.300 int id; // index, mode, tag
706     volatile int source; // source queue id, or sentinel
707 dl 1.78 volatile int base; // index of next slot for poll
708     int top; // index of next slot for push
709     ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 dl 1.90 final ForkJoinPool pool; // the containing pool (may be null)
711 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 dl 1.300 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713     volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714 dl 1.112
715 dl 1.200 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 dl 1.90 this.pool = pool;
717 dl 1.78 this.owner = owner;
718 dl 1.115 // Place indices in the center of array (that is not yet allocated)
719 dl 1.78 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720     }
721    
722     /**
723 jsr166 1.220 * Returns an exportable index (used by ForkJoinWorkerThread).
724 dl 1.200 */
725     final int getPoolIndex() {
726 dl 1.300 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 dl 1.200 }
728    
729     /**
730 dl 1.115 * Returns the approximate number of tasks in the queue.
731     */
732     final int queueSize() {
733 dl 1.243 int n = base - top; // read base first
734 dl 1.115 return (n >= 0) ? 0 : -n; // ignore transient negative
735     }
736    
737 jsr166 1.180 /**
738 dl 1.115 * Provides a more accurate estimate of whether this queue has
739     * any tasks than does queueSize, by checking whether a
740     * near-empty queue has at least one unclaimed task.
741     */
742     final boolean isEmpty() {
743 dl 1.300 ForkJoinTask<?>[] a; int n, al, b;
744     return ((n = (b = base) - top) >= 0 || // possibly one task
745 dl 1.243 (n == -1 && ((a = array) == null ||
746     (al = a.length) == 0 ||
747 dl 1.300 a[(al - 1) & b] == null)));
748 dl 1.115 }
749    
750 dl 1.300
751 dl 1.115 /**
752 dl 1.256 * Pushes a task. Call only by owner in unshared queues.
753 dl 1.78 *
754     * @param task the task. Caller must ensure non-null.
755 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
756 dl 1.78 */
757 dl 1.90 final void push(ForkJoinTask<?> task) {
758 dl 1.300 int s = top; ForkJoinTask<?>[] a; int al, d;
759 dl 1.243 if ((a = array) != null && (al = a.length) > 0) {
760 dl 1.300 int index = (al - 1) & s;
761     long offset = ((long)index << ASHIFT) + ABASE;
762     ForkJoinPool p = pool;
763 dl 1.243 top = s + 1;
764 dl 1.300 U.putOrderedObject(a, offset, task);
765 dl 1.292 if ((d = base - s) == 0 && p != null) {
766 dl 1.284 U.fullFence();
767 dl 1.253 p.signalWork();
768 dl 1.284 }
769 dl 1.300 else if (d + al == 1)
770 dl 1.243 growArray();
771 dl 1.78 }
772     }
773    
774 dl 1.178 /**
775 dl 1.112 * Initializes or doubles the capacity of array. Call either
776     * by owner or with lock held -- it is OK for base, but not
777     * top, to move while resizings are in progress.
778     */
779     final ForkJoinTask<?>[] growArray() {
780     ForkJoinTask<?>[] oldA = array;
781 dl 1.300 int oldSize = oldA != null ? oldA.length : 0;
782     int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 dl 1.225 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 dl 1.112 throw new RejectedExecutionException("Queue capacity exceeded");
785     int oldMask, t, b;
786     ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 dl 1.300 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 dl 1.112 (t = top) - (b = base) > 0) {
789     int mask = size - 1;
790 dl 1.200 do { // emulate poll from old array, push to new array
791 dl 1.256 int index = b & oldMask;
792     long offset = ((long)index << ASHIFT) + ABASE;
793 dl 1.243 ForkJoinTask<?> x = (ForkJoinTask<?>)
794     U.getObjectVolatile(oldA, offset);
795     if (x != null &&
796     U.compareAndSwapObject(oldA, offset, x, null))
797     a[b & mask] = x;
798 dl 1.112 } while (++b != t);
799 dl 1.243 U.storeFence();
800 dl 1.78 }
801 dl 1.112 return a;
802 dl 1.78 }
803    
804     /**
805 dl 1.90 * Takes next task, if one exists, in LIFO order. Call only
806 dl 1.102 * by owner in unshared queues.
807 dl 1.90 */
808     final ForkJoinTask<?> pop() {
809 dl 1.243 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 dl 1.256 if ((a = array) != null && b != s && (al = a.length) > 0) {
811     int index = (al - 1) & --s;
812     long offset = ((long)index << ASHIFT) + ABASE;
813 dl 1.262 ForkJoinTask<?> t = (ForkJoinTask<?>)
814     U.getObject(a, offset);
815     if (t != null &&
816     U.compareAndSwapObject(a, offset, t, null)) {
817 dl 1.256 top = s;
818 dl 1.300 U.storeFence();
819 dl 1.78 return t;
820     }
821     }
822     return null;
823     }
824    
825     /**
826 dl 1.90 * Takes next task, if one exists, in FIFO order.
827 dl 1.78 */
828 dl 1.90 final ForkJoinTask<?> poll() {
829 dl 1.243 for (;;) {
830     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831     if ((a = array) != null && (d = b - s) < 0 &&
832     (al = a.length) > 0) {
833 dl 1.256 int index = (al - 1) & b;
834     long offset = ((long)index << ASHIFT) + ABASE;
835 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
836     U.getObjectVolatile(a, offset);
837     if (b++ == base) {
838     if (t != null) {
839     if (U.compareAndSwapObject(a, offset, t, null)) {
840     base = b;
841     return t;
842     }
843 dl 1.200 }
844 dl 1.243 else if (d == -1)
845     break; // now empty
846 dl 1.78 }
847 dl 1.90 }
848 dl 1.243 else
849     break;
850 dl 1.78 }
851     return null;
852     }
853    
854     /**
855     * Takes next task, if one exists, in order specified by mode.
856     */
857     final ForkJoinTask<?> nextLocalTask() {
858 dl 1.300 return ((id & FIFO) != 0) ? poll() : pop();
859 dl 1.78 }
860    
861     /**
862     * Returns next task, if one exists, in order specified by mode.
863     */
864     final ForkJoinTask<?> peek() {
865 dl 1.292 int al; ForkJoinTask<?>[] a;
866 dl 1.243 return ((a = array) != null && (al = a.length) > 0) ?
867 dl 1.300 a[(al - 1) &
868     ((id & FIFO) != 0 ? base : top - 1)] : null;
869 dl 1.78 }
870    
871     /**
872     * Pops the given task only if it is at the current top.
873 jsr166 1.251 */
874 dl 1.243 final boolean tryUnpush(ForkJoinTask<?> task) {
875 dl 1.256 int b = base, s = top, al; ForkJoinTask<?>[] a;
876     if ((a = array) != null && b != s && (al = a.length) > 0) {
877     int index = (al - 1) & --s;
878     long offset = ((long)index << ASHIFT) + ABASE;
879     if (U.compareAndSwapObject(a, offset, task, null)) {
880 dl 1.243 top = s;
881 dl 1.300 U.storeFence();
882 dl 1.224 return true;
883     }
884 dl 1.78 }
885     return false;
886     }
887    
888     /**
889 jsr166 1.84 * Removes and cancels all known tasks, ignoring any exceptions.
890 dl 1.78 */
891     final void cancelAll() {
892 dl 1.300 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 dl 1.78 ForkJoinTask.cancelIgnoringExceptions(t);
894     }
895    
896 dl 1.104 // Specialized execution methods
897 dl 1.78
898     /**
899 dl 1.300 * Pops and executes up to limit consecutive tasks or until empty.
900     *
901     * @param limit max runs, or zero for no limit
902 dl 1.253 */
903 dl 1.300 final void localPopAndExec(int limit) {
904     for (;;) {
905 dl 1.253 int b = base, s = top, al; ForkJoinTask<?>[] a;
906     if ((a = array) != null && b != s && (al = a.length) > 0) {
907 dl 1.256 int index = (al - 1) & --s;
908     long offset = ((long)index << ASHIFT) + ABASE;
909 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
910     U.getAndSetObject(a, offset, null);
911     if (t != null) {
912     top = s;
913 dl 1.300 U.storeFence();
914     t.doExec();
915     if (limit != 0 && --limit == 0)
916 dl 1.253 break;
917     }
918     else
919     break;
920     }
921     else
922     break;
923     }
924     }
925    
926     /**
927 dl 1.300 * Polls and executes up to limit consecutive tasks or until empty.
928     *
929     * @param limit, or zero for no limit
930 dl 1.253 */
931 dl 1.300 final void localPollAndExec(int limit) {
932     for (int polls = 0;;) {
933     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934     if ((a = array) != null && (d = b - s) < 0 &&
935     (al = a.length) > 0) {
936 dl 1.256 int index = (al - 1) & b++;
937     long offset = ((long)index << ASHIFT) + ABASE;
938 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
939     U.getAndSetObject(a, offset, null);
940     if (t != null) {
941     base = b;
942 dl 1.255 t.doExec();
943 dl 1.300 if (limit != 0 && ++polls == limit)
944 dl 1.253 break;
945     }
946 dl 1.300 else if (d == -1)
947     break; // now empty
948     else
949     polls = 0; // stolen; reset
950 dl 1.253 }
951     else
952     break;
953     }
954     }
955    
956     /**
957 jsr166 1.302 * If present, removes task from queue and executes it.
958 dl 1.94 */
959 dl 1.300 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960     ForkJoinTask<?>[] wa; int s, wal;
961     if (base - (s = top) < 0 && // traverse from top
962     (wa = array) != null && (wal = wa.length) > 0) {
963     for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964     int index = i & m;
965     long offset = (index << ASHIFT) + ABASE;
966     ForkJoinTask<?> t = (ForkJoinTask<?>)
967     U.getObject(wa, offset);
968     if (t == null)
969     break;
970     else if (t == task) {
971     if (U.compareAndSwapObject(wa, offset, t, null)) {
972     top = ns; // safely shift down
973     for (int j = i; j != ns; ++j) {
974     ForkJoinTask<?> f;
975     int pindex = (j + 1) & m;
976     long pOffset = (pindex << ASHIFT) + ABASE;
977     f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978     U.putObjectVolatile(wa, pOffset, null);
979    
980     int jindex = j & m;
981     long jOffset = (jindex << ASHIFT) + ABASE;
982     U.putOrderedObject(wa, jOffset, f);
983     }
984     U.storeFence();
985     t.doExec();
986     }
987     break;
988     }
989 dl 1.262 }
990 dl 1.215 }
991     }
992    
993     /**
994 dl 1.300 * Tries to steal and run tasks within the target's
995 jsr166 1.302 * computation until done, not found, or limit exceeded.
996 dl 1.94 *
997 dl 1.300 * @param task root of CountedCompleter computation
998     * @param limit max runs, or zero for no limit
999     * @return task status on exit
1000     */
1001     final int localHelpCC(CountedCompleter<?> task, int limit) {
1002     int status = 0;
1003     if (task != null && (status = task.status) >= 0) {
1004     for (;;) {
1005     boolean help = false;
1006     int b = base, s = top, al; ForkJoinTask<?>[] a;
1007     if ((a = array) != null && b != s && (al = a.length) > 0) {
1008     int index = (al - 1) & (s - 1);
1009     long offset = ((long)index << ASHIFT) + ABASE;
1010     ForkJoinTask<?> o = (ForkJoinTask<?>)
1011     U.getObject(a, offset);
1012     if (o instanceof CountedCompleter) {
1013     CountedCompleter<?> t = (CountedCompleter<?>)o;
1014     for (CountedCompleter<?> f = t;;) {
1015     if (f != task) {
1016     if ((f = f.completer) == null) // try parent
1017     break;
1018     }
1019     else {
1020     if (U.compareAndSwapObject(a, offset,
1021     t, null)) {
1022     top = s - 1;
1023     U.storeFence();
1024     t.doExec();
1025     help = true;
1026     }
1027     break;
1028 dl 1.200 }
1029     }
1030 dl 1.243 }
1031 dl 1.104 }
1032 dl 1.300 if ((status = task.status) < 0 || !help ||
1033     (limit != 0 && --limit == 0))
1034     break;
1035 dl 1.104 }
1036     }
1037 dl 1.300 return status;
1038     }
1039    
1040     // Operations on shared queues
1041    
1042     /**
1043 jsr166 1.302 * Tries to lock shared queue by CASing phase field.
1044 dl 1.300 */
1045     final boolean tryLockSharedQueue() {
1046     return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 dl 1.104 }
1048    
1049     /**
1050 dl 1.300 * Shared version of tryUnpush.
1051 dl 1.78 */
1052 dl 1.300 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053     boolean popped = false;
1054     int s = top - 1, al; ForkJoinTask<?>[] a;
1055     if ((a = array) != null && (al = a.length) > 0) {
1056     int index = (al - 1) & s;
1057 dl 1.256 long offset = ((long)index << ASHIFT) + ABASE;
1058 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059     if (t == task &&
1060     U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061     if (top == s + 1 && array == a &&
1062     U.compareAndSwapObject(a, offset, task, null)) {
1063     popped = true;
1064     top = s;
1065 dl 1.178 }
1066 dl 1.300 U.putOrderedInt(this, PHASE, 0);
1067 dl 1.94 }
1068 dl 1.78 }
1069 dl 1.300 return popped;
1070 dl 1.78 }
1071    
1072     /**
1073 dl 1.300 * Shared version of localHelpCC.
1074     */
1075     final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076     int status = 0;
1077     if (task != null && (status = task.status) >= 0) {
1078     for (;;) {
1079     boolean help = false;
1080     int b = base, s = top, al; ForkJoinTask<?>[] a;
1081     if ((a = array) != null && b != s && (al = a.length) > 0) {
1082     int index = (al - 1) & (s - 1);
1083     long offset = ((long)index << ASHIFT) + ABASE;
1084     ForkJoinTask<?> o = (ForkJoinTask<?>)
1085     U.getObject(a, offset);
1086     if (o instanceof CountedCompleter) {
1087     CountedCompleter<?> t = (CountedCompleter<?>)o;
1088     for (CountedCompleter<?> f = t;;) {
1089     if (f != task) {
1090     if ((f = f.completer) == null)
1091     break;
1092     }
1093     else {
1094     if (U.compareAndSwapInt(this, PHASE,
1095     0, QLOCK)) {
1096     if (top == s && array == a &&
1097     U.compareAndSwapObject(a, offset,
1098     t, null)) {
1099     help = true;
1100     top = s - 1;
1101     }
1102     U.putOrderedInt(this, PHASE, 0);
1103     if (help)
1104     t.doExec();
1105     }
1106     break;
1107     }
1108 dl 1.243 }
1109 dl 1.200 }
1110 dl 1.178 }
1111 dl 1.300 if ((status = task.status) < 0 || !help ||
1112     (limit != 0 && --limit == 0))
1113     break;
1114 dl 1.178 }
1115 dl 1.78 }
1116 dl 1.300 return status;
1117 dl 1.78 }
1118    
1119     /**
1120 dl 1.86 * Returns true if owned and not known to be blocked.
1121     */
1122     final boolean isApparentlyUnblocked() {
1123     Thread wt; Thread.State s;
1124 dl 1.300 return ((wt = owner) != null &&
1125 dl 1.86 (s = wt.getState()) != Thread.State.BLOCKED &&
1126     s != Thread.State.WAITING &&
1127     s != Thread.State.TIMED_WAITING);
1128     }
1129    
1130 dl 1.211 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 dl 1.300 private static final long PHASE;
1133 jsr166 1.291 private static final int ABASE;
1134     private static final int ASHIFT;
1135 dl 1.78 static {
1136     try {
1137 dl 1.300 PHASE = U.objectFieldOffset
1138     (WorkQueue.class.getDeclaredField("phase"));
1139 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140     int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 jsr166 1.142 if ((scale & (scale - 1)) != 0)
1142 jsr166 1.232 throw new Error("array index scale not a power of two");
1143 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 jsr166 1.231 } catch (ReflectiveOperationException e) {
1145 dl 1.78 throw new Error(e);
1146     }
1147     }
1148     }
1149 dl 1.14
1150 dl 1.112 // static fields (initialized in static initializer below)
1151    
1152     /**
1153     * Creates a new ForkJoinWorkerThread. This factory is used unless
1154     * overridden in ForkJoinPool constructors.
1155     */
1156     public static final ForkJoinWorkerThreadFactory
1157     defaultForkJoinWorkerThreadFactory;
1158    
1159 jsr166 1.1 /**
1160 dl 1.115 * Permission required for callers of methods that may start or
1161 dl 1.300 * kill threads.
1162 dl 1.115 */
1163 jsr166 1.276 static final RuntimePermission modifyThreadPermission;
1164 dl 1.115
1165     /**
1166 dl 1.101 * Common (static) pool. Non-null for public use unless a static
1167 dl 1.105 * construction exception, but internal usages null-check on use
1168     * to paranoically avoid potential initialization circularities
1169     * as well as to simplify generated code.
1170 dl 1.101 */
1171 dl 1.134 static final ForkJoinPool common;
1172 dl 1.101
1173     /**
1174 dl 1.160 * Common pool parallelism. To allow simpler use and management
1175     * when common pool threads are disabled, we allow the underlying
1176 dl 1.185 * common.parallelism field to be zero, but in that case still report
1177 dl 1.160 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 dl 1.90 */
1179 jsr166 1.274 static final int COMMON_PARALLELISM;
1180 dl 1.90
1181     /**
1182 dl 1.208 * Limit on spare thread construction in tryCompensate.
1183     */
1184 jsr166 1.273 private static final int COMMON_MAX_SPARES;
1185 dl 1.208
1186     /**
1187 dl 1.105 * Sequence number for creating workerNamePrefix.
1188 dl 1.86 */
1189 dl 1.105 private static int poolNumberSequence;
1190 dl 1.86
1191 jsr166 1.1 /**
1192 jsr166 1.132 * Returns the next sequence number. We don't expect this to
1193     * ever contend, so use simple builtin sync.
1194 dl 1.83 */
1195 dl 1.105 private static final synchronized int nextPoolId() {
1196     return ++poolNumberSequence;
1197     }
1198 dl 1.86
1199 dl 1.200 // static configuration constants
1200 dl 1.86
1201     /**
1202 dl 1.300 * Default idle timeout value (in milliseconds) for the thread
1203     * triggering quiescence to park waiting for new work
1204 dl 1.86 */
1205 dl 1.300 private static final long DEFAULT_KEEPALIVE = 60000L;
1206 dl 1.86
1207     /**
1208 dl 1.300 * Undershoot tolerance for idle timeouts
1209 dl 1.120 */
1210 dl 1.300 private static final long TIMEOUT_SLOP = 20L;
1211 dl 1.200
1212     /**
1213 jsr166 1.273 * The default value for COMMON_MAX_SPARES. Overridable using the
1214     * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215     * property. The default value is far in excess of normal
1216     * requirements, but also far short of MAX_CAP and typical OS
1217     * thread limits, so allows JVMs to catch misuse/abuse before
1218     * running out of resources needed to do so.
1219 dl 1.200 */
1220 dl 1.208 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221 dl 1.120
1222     /**
1223 dl 1.90 * Increment for seed generators. See class ThreadLocal for
1224     * explanation.
1225     */
1226 dl 1.193 private static final int SEED_INCREMENT = 0x9e3779b9;
1227 dl 1.83
1228 jsr166 1.163 /*
1229 dl 1.200 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 dl 1.300 * RC: Number of released (unqueued) workers minus target parallelism
1231 dl 1.200 * TC: Number of total workers minus target parallelism
1232     * SS: version count and status of top waiting thread
1233     * ID: poolIndex of top of Treiber stack of waiters
1234     *
1235     * When convenient, we can extract the lower 32 stack top bits
1236     * (including version bits) as sp=(int)ctl. The offsets of counts
1237     * by the target parallelism and the positionings of fields makes
1238     * it possible to perform the most common checks via sign tests of
1239 dl 1.300 * fields: When ac is negative, there are not enough unqueued
1240 dl 1.200 * workers, when tc is negative, there are not enough total
1241     * workers. When sp is non-zero, there are waiting workers. To
1242     * deal with possibly negative fields, we use casts in and out of
1243     * "short" and/or signed shifts to maintain signedness.
1244     *
1245 dl 1.300 * Because it occupies uppermost bits, we can add one release count
1246     * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 dl 1.200 * from a blocked join. Other updates entail multiple subfields
1248     * and masking, requiring CAS.
1249 dl 1.300 *
1250     * The limits packed in field "bounds" are also offset by the
1251     * parallelism level to make them comparable to the ctl rc and tc
1252     * fields.
1253 dl 1.200 */
1254    
1255     // Lower and upper word masks
1256     private static final long SP_MASK = 0xffffffffL;
1257     private static final long UC_MASK = ~SP_MASK;
1258 dl 1.86
1259 dl 1.300 // Release counts
1260     private static final int RC_SHIFT = 48;
1261     private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262     private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263 dl 1.200
1264     // Total counts
1265 dl 1.86 private static final int TC_SHIFT = 32;
1266 dl 1.200 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267     private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269    
1270 dl 1.300 // Instance fields
1271 dl 1.86
1272 dl 1.300 // Segregate ctl field, For now using padding vs @Contended
1273     // @jdk.internal.vm.annotation.Contended("fjpctl")
1274     // @sun.misc.Contended("fjpctl")
1275     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 dl 1.200 volatile long ctl; // main pool control
1278 dl 1.300 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279     volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280    
1281     volatile long stealCount; // collects worker nsteals
1282     final long keepAlive; // milliseconds before dropping if idle
1283     int indexSeed; // next worker index
1284     final int bounds; // min, max threads packed as shorts
1285     volatile int mode; // parallelism, runstate, queue mode
1286     WorkQueue[] workQueues; // main registry
1287     final String workerNamePrefix; // for worker thread string; sync lock
1288 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1289 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1290 dl 1.101
1291 dl 1.200 // Creating, registering and deregistering workers
1292    
1293 dl 1.112 /**
1294 dl 1.200 * Tries to construct and start one worker. Assumes that total
1295     * count has already been incremented as a reservation. Invokes
1296     * deregisterWorker on any failure.
1297     *
1298     * @return true if successful
1299 dl 1.115 */
1300 dl 1.300 private boolean createWorker() {
1301 dl 1.200 ForkJoinWorkerThreadFactory fac = factory;
1302     Throwable ex = null;
1303     ForkJoinWorkerThread wt = null;
1304     try {
1305     if (fac != null && (wt = fac.newThread(this)) != null) {
1306     wt.start();
1307     return true;
1308 dl 1.115 }
1309 dl 1.200 } catch (Throwable rex) {
1310     ex = rex;
1311 dl 1.112 }
1312 dl 1.200 deregisterWorker(wt, ex);
1313     return false;
1314 dl 1.112 }
1315    
1316 dl 1.200 /**
1317     * Tries to add one worker, incrementing ctl counts before doing
1318     * so, relying on createWorker to back out on failure.
1319     *
1320     * @param c incoming ctl value, with total count negative and no
1321     * idle workers. On CAS failure, c is refreshed and retried if
1322 jsr166 1.202 * this holds (otherwise, a new worker is not needed).
1323 dl 1.200 */
1324     private void tryAddWorker(long c) {
1325     do {
1326 dl 1.300 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 dl 1.200 (TC_MASK & (c + TC_UNIT)));
1328 dl 1.243 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 dl 1.300 createWorker();
1330 dl 1.243 break;
1331 dl 1.200 }
1332     } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333     }
1334 dl 1.112
1335     /**
1336 dl 1.200 * Callback from ForkJoinWorkerThread constructor to establish and
1337     * record its WorkQueue.
1338 dl 1.112 *
1339     * @param wt the worker thread
1340 dl 1.115 * @return the worker's queue
1341 dl 1.112 */
1342 dl 1.115 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 dl 1.200 UncaughtExceptionHandler handler;
1344 dl 1.300 wt.setDaemon(true); // configure thread
1345 dl 1.115 if ((handler = ueh) != null)
1346     wt.setUncaughtExceptionHandler(handler);
1347 dl 1.200 WorkQueue w = new WorkQueue(this, wt);
1348 dl 1.300 int tid = 0; // for thread name
1349     int fifo = mode & FIFO;
1350     String prefix = workerNamePrefix;
1351     if (prefix != null) {
1352 jsr166 1.301 synchronized (prefix) {
1353 dl 1.300 WorkQueue[] ws = workQueues; int n;
1354     int s = indexSeed += SEED_INCREMENT;
1355     if (ws != null && (n = ws.length) > 1) {
1356     int m = n - 1;
1357     tid = s & m;
1358     int i = m & ((s << 1) | 1); // odd-numbered indices
1359     for (int probes = n >>> 1;;) { // find empty slot
1360     WorkQueue q;
1361     if ((q = ws[i]) == null || q.phase == QUIET)
1362     break;
1363     else if (--probes == 0) {
1364     i = n | 1; // resize below
1365     break;
1366     }
1367     else
1368     i = (i + 2) & m;
1369     }
1370    
1371     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372     w.phase = w.id = id; // now publishable
1373    
1374     if (i < n)
1375     ws[i] = w;
1376     else { // expand array
1377     int an = n << 1;
1378     WorkQueue[] as = new WorkQueue[an];
1379     as[i] = w;
1380     int am = an - 1;
1381     for (int j = 0; j < n; ++j) {
1382     WorkQueue v; // copy external queue
1383     if ((v = ws[j]) != null) // position may change
1384     as[v.id & am & SQMASK] = v;
1385     if (++j >= n)
1386     break;
1387     as[j] = ws[j]; // copy worker
1388 dl 1.94 }
1389 dl 1.300 workQueues = as;
1390 dl 1.94 }
1391     }
1392 dl 1.78 }
1393 dl 1.300 wt.setName(prefix.concat(Integer.toString(tid)));
1394 dl 1.78 }
1395 dl 1.115 return w;
1396 dl 1.78 }
1397 dl 1.19
1398 jsr166 1.1 /**
1399 dl 1.86 * Final callback from terminating worker, as well as upon failure
1400 dl 1.105 * to construct or start a worker. Removes record of worker from
1401     * array, and adjusts counts. If pool is shutting down, tries to
1402     * complete termination.
1403 dl 1.78 *
1404 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1405 dl 1.78 * @param ex the exception causing failure, or null if none
1406 dl 1.45 */
1407 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408     WorkQueue w = null;
1409 dl 1.300 int phase = 0;
1410 dl 1.78 if (wt != null && (w = wt.workQueue) != null) {
1411 dl 1.300 Object lock = workerNamePrefix;
1412     long ns = (long)w.nsteals & 0xffffffffL;
1413     int idx = w.id & SMASK;
1414     if (lock != null) {
1415     WorkQueue[] ws; // remove index from array
1416 jsr166 1.301 synchronized (lock) {
1417 dl 1.243 if ((ws = workQueues) != null && ws.length > idx &&
1418     ws[idx] == w)
1419     ws[idx] = null;
1420 dl 1.300 stealCount += ns;
1421 dl 1.243 }
1422     }
1423 dl 1.300 phase = w.phase;
1424 dl 1.243 }
1425 dl 1.300 if (phase != QUIET) { // else pre-adjusted
1426 dl 1.243 long c; // decrement counts
1427     do {} while (!U.compareAndSwapLong
1428 dl 1.300 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 dl 1.243 (TC_MASK & (c - TC_UNIT)) |
1430     (SP_MASK & c))));
1431     }
1432 dl 1.300 if (w != null)
1433 dl 1.200 w.cancelAll(); // cancel remaining tasks
1434 dl 1.300
1435     if (!tryTerminate(false, false) && // possibly replace worker
1436     w != null && w.array != null) // avoid repeated failures
1437     signalWork();
1438    
1439 dl 1.200 if (ex == null) // help clean on way out
1440 dl 1.120 ForkJoinTask.helpExpungeStaleExceptions();
1441 dl 1.200 else // rethrow
1442 dl 1.104 ForkJoinTask.rethrow(ex);
1443 dl 1.78 }
1444 dl 1.52
1445 dl 1.19 /**
1446 dl 1.300 * Tries to create or release a worker if too few are running.
1447 dl 1.105 */
1448 dl 1.253 final void signalWork() {
1449 dl 1.243 for (;;) {
1450 dl 1.300 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 dl 1.243 if ((c = ctl) >= 0L) // enough workers
1452     break;
1453     else if ((sp = (int)c) == 0) { // no idle workers
1454     if ((c & ADD_WORKER) != 0L) // too few workers
1455 dl 1.200 tryAddWorker(c);
1456     break;
1457     }
1458 dl 1.243 else if ((ws = workQueues) == null)
1459     break; // unstarted/terminated
1460     else if (ws.length <= (i = sp & SMASK))
1461     break; // terminated
1462     else if ((v = ws[i]) == null)
1463     break; // terminating
1464     else {
1465 dl 1.300 int np = sp & ~UNSIGNALLED;
1466     int vp = v.phase;
1467     long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468     Thread vt = v.owner;
1469     if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470     v.phase = np;
1471     if (v.source < 0)
1472     LockSupport.unpark(vt);
1473 dl 1.243 break;
1474     }
1475 dl 1.174 }
1476 dl 1.52 }
1477 dl 1.14 }
1478    
1479 dl 1.200 /**
1480 dl 1.300 * Tries to decrement counts (sometimes implicitly) and possibly
1481     * arrange for a compensating worker in preparation for blocking:
1482     * If not all core workers yet exist, creates one, else if any are
1483     * unreleased (possibly including caller) releases one, else if
1484     * fewer than the minimum allowed number of workers running,
1485     * checks to see that they are all active, and if so creates an
1486     * extra worker unless over maximum limit and policy is to
1487     * saturate. Most of these steps can fail due to interference, in
1488     * which case 0 is returned so caller will retry. A negative
1489     * return value indicates that the caller doesn't need to
1490     * re-adjust counts when later unblocked.
1491 dl 1.243 *
1492 dl 1.300 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 dl 1.243 */
1494 dl 1.300 private int tryCompensate(WorkQueue w) {
1495     int t, n, sp;
1496     long c = ctl;
1497     WorkQueue[] ws = workQueues;
1498     if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499     if (ws == null || (n = ws.length) <= 0 || w == null)
1500     return 0; // disabled
1501     else if ((sp = (int)c) != 0) { // replace or release
1502     WorkQueue v = ws[sp & (n - 1)];
1503     int wp = w.phase;
1504     long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505     int np = sp & ~UNSIGNALLED;
1506     if (v != null) {
1507     int vp = v.phase;
1508     Thread vt = v.owner;
1509     long nc = ((long)v.stackPred & SP_MASK) | uc;
1510     if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511     v.phase = np;
1512     if (v.source < 0)
1513     LockSupport.unpark(vt);
1514     return (wp < 0) ? -1 : 1;
1515     }
1516     }
1517     return 0;
1518     }
1519     else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520     (short)(bounds & SMASK) > 0) {
1521     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522     return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523     }
1524     else { // validate
1525     int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526     boolean unstable = false;
1527     for (int i = 1; i < n; i += 2) {
1528     WorkQueue q; Thread wt; Thread.State ts;
1529     if ((q = ws[i]) != null) {
1530     if (q.source == 0) {
1531     unstable = true;
1532     break;
1533     }
1534     else {
1535     --tc;
1536     if ((wt = q.owner) != null &&
1537     ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538     ts == Thread.State.WAITING))
1539     ++bc; // worker is blocking
1540     }
1541 dl 1.243 }
1542     }
1543 dl 1.300 if (unstable || tc != 0 || ctl != c)
1544     return 0; // inconsistent
1545     else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546     if ((md & SATURATE) != 0)
1547     return -1;
1548     else if (bc < pc) { // lagging
1549     Thread.yield(); // for retry spins
1550     return 0;
1551 dl 1.243 }
1552     else
1553 dl 1.300 throw new RejectedExecutionException(
1554     "Thread limit exceeded replacing blocked worker");
1555 dl 1.200 }
1556 dl 1.177 }
1557 dl 1.243 }
1558 dl 1.300
1559     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560     return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 dl 1.243 }
1562    
1563     /**
1564     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 dl 1.300 * See above for explanation.
1566 dl 1.243 */
1567     final void runWorker(WorkQueue w) {
1568 dl 1.300 WorkQueue[] ws;
1569 dl 1.243 w.growArray(); // allocate queue
1570 dl 1.300 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571     if (r == 0) // initial nonzero seed
1572     r = 1;
1573     int lastSignalId = 0; // avoid unneeded signals
1574     while ((ws = workQueues) != null) {
1575     boolean nonempty = false; // scan
1576     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578     if ((i = r & m) >= 0 && i < n && // always true
1579     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 dl 1.243 (a = q.array) != null && (al = a.length) > 0) {
1581 dl 1.300 int qid = q.id; // (never zero)
1582 dl 1.256 int index = (al - 1) & b;
1583     long offset = ((long)index << ASHIFT) + ABASE;
1584 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585     U.getObjectVolatile(a, offset);
1586 dl 1.300 if (t != null && b++ == q.base &&
1587     U.compareAndSwapObject(a, offset, t, null)) {
1588     if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589     signalWork(); // propagate signal
1590     w.source = lastSignalId = qid;
1591     t.doExec();
1592     if ((w.id & FIFO) != 0) // run remaining locals
1593     w.localPollAndExec(POLL_LIMIT);
1594     else
1595     w.localPopAndExec(POLL_LIMIT);
1596     ForkJoinWorkerThread thread = w.owner;
1597     ++w.nsteals;
1598     w.source = 0; // now idle
1599     if (thread != null)
1600     thread.afterTopLevelExec();
1601 dl 1.243 }
1602 dl 1.300 nonempty = true;
1603 dl 1.178 }
1604 dl 1.300 else if (nonempty)
1605 dl 1.243 break;
1606 dl 1.300 else
1607     ++r;
1608 dl 1.120 }
1609 dl 1.178
1610 dl 1.300 if (nonempty) { // move (xorshift)
1611     r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612     }
1613     else {
1614     int phase;
1615     lastSignalId = 0; // clear for next scan
1616     if ((phase = w.phase) >= 0) { // enqueue
1617     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618     long c, nc;
1619     do {
1620     w.stackPred = (int)(c = ctl);
1621     nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622     } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623     }
1624     else { // already queued
1625     int pred = w.stackPred;
1626     w.source = DORMANT; // enable signal
1627     for (int steps = 0;;) {
1628     int md, rc; long c;
1629     if (w.phase >= 0) {
1630     w.source = 0;
1631     break;
1632     }
1633     else if ((md = mode) < 0) // shutting down
1634     return;
1635     else if ((rc = ((md & SMASK) + // possibly quiescent
1636     (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637     (md & SHUTDOWN) != 0 &&
1638     tryTerminate(false, false))
1639     return; // help terminate
1640     else if ((++steps & 1) == 0)
1641     Thread.interrupted(); // clear between parks
1642     else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643     long d = keepAlive + System.currentTimeMillis();
1644     LockSupport.parkUntil(this, d);
1645     if (ctl == c &&
1646     d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647     long nc = ((UC_MASK & (c - TC_UNIT)) |
1648     (SP_MASK & pred));
1649     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650     w.phase = QUIET;
1651     return; // drop on timeout
1652     }
1653     }
1654     }
1655     else
1656     LockSupport.park(this);
1657     }
1658     }
1659     }
1660     }
1661     }
1662 dl 1.200
1663 dl 1.300 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1664     int s = 0;
1665     if (w != null && task != null &&
1666     (!(task instanceof CountedCompleter) ||
1667     (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1668     w.tryRemoveAndExec(task);
1669     int src = w.source, id = w.id;
1670     s = task.status;
1671     while (s >= 0) {
1672     WorkQueue[] ws;
1673     boolean nonempty = false;
1674     int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1675     if ((ws = workQueues) != null) { // scan for matching id
1676     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1677     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1678     if ((i = (r + j) & m) >= 0 && i < n &&
1679     (q = ws[i]) != null && q.source == id &&
1680     (b = q.base) - q.top < 0 &&
1681     (a = q.array) != null && (al = a.length) > 0) {
1682     int qid = q.id;
1683     int index = (al - 1) & b;
1684     long offset = ((long)index << ASHIFT) + ABASE;
1685     ForkJoinTask<?> t = (ForkJoinTask<?>)
1686     U.getObjectVolatile(a, offset);
1687     if (t != null && b++ == q.base && id == q.source &&
1688     U.compareAndSwapObject(a, offset, t, null)) {
1689     q.base = b;
1690     w.source = qid;
1691     t.doExec();
1692     w.source = src;
1693     }
1694     nonempty = true;
1695 dl 1.200 break;
1696 dl 1.300 }
1697 dl 1.200 }
1698 dl 1.300 }
1699     if ((s = task.status) < 0)
1700     break;
1701     else if (!nonempty) {
1702     long ms, ns; int block;
1703     if (deadline == 0L)
1704     ms = 0L; // untimed
1705     else if ((ns = deadline - System.nanoTime()) <= 0L)
1706     break; // timeout
1707     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1708     ms = 1L; // avoid 0 for timed wait
1709     if ((block = tryCompensate(w)) != 0) {
1710     task.internalWait(ms);
1711     U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1712 dl 1.200 }
1713 dl 1.300 s = task.status;
1714 dl 1.200 }
1715 dl 1.178 }
1716     }
1717 dl 1.200 return s;
1718 dl 1.120 }
1719    
1720     /**
1721 dl 1.300 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1722     * when tasks cannot be found, rescans until all others cannot
1723     * find tasks either.
1724 dl 1.78 */
1725 dl 1.300 final void helpQuiescePool(WorkQueue w) {
1726     int prevSrc = w.source, fifo = w.id & FIFO;
1727     for (int source = prevSrc, released = -1;;) { // -1 until known
1728     WorkQueue[] ws;
1729     if (fifo != 0)
1730     w.localPollAndExec(0);
1731     else
1732     w.localPopAndExec(0);
1733     if (released == -1 && w.phase >= 0)
1734     released = 1;
1735     boolean quiet = true, empty = true;
1736     int r = ThreadLocalRandom.nextSecondarySeed();
1737     if ((ws = workQueues) != null) {
1738     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1739     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1740     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1741     if ((b = q.base) - q.top < 0 &&
1742     (a = q.array) != null && (al = a.length) > 0) {
1743     int qid = q.id;
1744     if (released == 0) { // increment
1745     released = 1;
1746     U.getAndAddLong(this, CTL, RC_UNIT);
1747 dl 1.95 }
1748 dl 1.256 int index = (al - 1) & b;
1749     long offset = ((long)index << ASHIFT) + ABASE;
1750 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>)
1751 dl 1.262 U.getObjectVolatile(a, offset);
1752 dl 1.300 if (t != null && b++ == q.base &&
1753     U.compareAndSwapObject(a, offset, t, null)) {
1754     q.base = b;
1755     w.source = source = q.id;
1756     t.doExec();
1757     w.source = source = prevSrc;
1758 dl 1.243 }
1759 dl 1.300 quiet = empty = false;
1760 dl 1.200 break;
1761 dl 1.95 }
1762 dl 1.300 else if ((q.source & QUIET) == 0)
1763     quiet = false;
1764 dl 1.52 }
1765 dl 1.19 }
1766 dl 1.243 }
1767 dl 1.300 if (quiet) {
1768     if (released == 0)
1769     U.getAndAddLong(this, CTL, RC_UNIT);
1770     w.source = prevSrc;
1771     break;
1772     }
1773     else if (empty) {
1774     if (source != QUIET)
1775     w.source = source = QUIET;
1776     if (released == 1) { // decrement
1777     released = 0;
1778     U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1779     }
1780     }
1781 dl 1.14 }
1782 dl 1.22 }
1783    
1784 dl 1.52 /**
1785 dl 1.300 * Scans for and returns a polled task, if available.
1786     * Used only for untracked polls.
1787 dl 1.105 *
1788 dl 1.300 * @param submissionsOnly if true, only scan submission queues
1789 dl 1.19 */
1790 dl 1.300 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1791     WorkQueue[] ws; int n;
1792     rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1793     (n = ws.length) > 0) {
1794     int m = n - 1;
1795     int r = ThreadLocalRandom.nextSecondarySeed();
1796     int h = r >>> 16;
1797     int origin, step;
1798     if (submissionsOnly) {
1799     origin = (r & ~1) & m; // even indices and steps
1800     step = (h & ~1) | 2;
1801     }
1802     else {
1803     origin = r & m;
1804     step = h | 1;
1805     }
1806     for (int k = origin, oldSum = 0, checkSum = 0;;) {
1807     WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1808     if ((q = ws[k]) != null) {
1809     checkSum += b = q.base;
1810     if (b - q.top < 0 &&
1811     (a = q.array) != null && (al = a.length) > 0) {
1812     int index = (al - 1) & b;
1813     long offset = ((long)index << ASHIFT) + ABASE;
1814     ForkJoinTask<?> t = (ForkJoinTask<?>)
1815     U.getObjectVolatile(a, offset);
1816     if (t != null && b++ == q.base &&
1817     U.compareAndSwapObject(a, offset, t, null)) {
1818     q.base = b;
1819     return t;
1820     }
1821     else
1822     break; // restart
1823     }
1824     }
1825     if ((k = (k + step) & m) == origin) {
1826     if (oldSum == (oldSum = checkSum))
1827     break rescan;
1828     checkSum = 0;
1829 dl 1.178 }
1830 dl 1.52 }
1831 dl 1.90 }
1832 dl 1.300 return null;
1833     }
1834    
1835     /**
1836     * Gets and removes a local or stolen task for the given worker.
1837     *
1838     * @return a task, if available
1839     */
1840     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1841     ForkJoinTask<?> t;
1842     if (w != null &&
1843     (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1844     return t;
1845     else
1846     return pollScan(false);
1847 dl 1.90 }
1848    
1849 dl 1.300 // External operations
1850    
1851 dl 1.90 /**
1852 dl 1.300 * Adds the given task to a submission queue at submitter's
1853     * current queue, creating one if null or contended.
1854 dl 1.90 *
1855 dl 1.300 * @param task the task. Caller must ensure non-null.
1856 dl 1.90 */
1857 dl 1.300 final void externalPush(ForkJoinTask<?> task) {
1858     int r; // initialize caller's probe
1859     if ((r = ThreadLocalRandom.getProbe()) == 0) {
1860     ThreadLocalRandom.localInit();
1861     r = ThreadLocalRandom.getProbe();
1862     }
1863     for (;;) {
1864     int md = mode, n;
1865     WorkQueue[] ws = workQueues;
1866     if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1867     throw new RejectedExecutionException();
1868     else {
1869     WorkQueue q;
1870     boolean push = false, grow = false;
1871     if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1872     Object lock = workerNamePrefix;
1873     int qid = (r | QUIET) & ~(FIFO | OWNED);
1874     q = new WorkQueue(this, null);
1875     q.id = qid;
1876     q.source = QUIET;
1877     q.phase = QLOCK; // lock queue
1878     if (lock != null) {
1879 jsr166 1.301 synchronized (lock) { // lock pool to install
1880 dl 1.300 int i;
1881     if ((ws = workQueues) != null &&
1882     (n = ws.length) > 0 &&
1883     ws[i = qid & (n - 1) & SQMASK] == null) {
1884     ws[i] = q;
1885     push = grow = true;
1886     }
1887     }
1888     }
1889     }
1890     else if (q.tryLockSharedQueue()) {
1891     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1892     if ((a = q.array) != null && (al = a.length) > 0 &&
1893     al - 1 + (d = b - s) > 0) {
1894     a[(al - 1) & s] = task;
1895     q.top = s + 1; // relaxed writes OK here
1896     q.phase = 0;
1897     if (d < 0 && q.base - s < 0)
1898     break; // no signal needed
1899     }
1900 dl 1.243 else
1901 dl 1.300 grow = true;
1902     push = true;
1903     }
1904     if (push) {
1905     if (grow) {
1906     try {
1907     q.growArray();
1908     int s = q.top, al; ForkJoinTask<?>[] a;
1909     if ((a = q.array) != null && (al = a.length) > 0) {
1910     a[(al - 1) & s] = task;
1911     q.top = s + 1;
1912     }
1913     } finally {
1914     q.phase = 0;
1915     }
1916 dl 1.243 }
1917 dl 1.300 signalWork();
1918     break;
1919 dl 1.90 }
1920 dl 1.300 else // move if busy
1921     r = ThreadLocalRandom.advanceProbe(r);
1922 dl 1.90 }
1923     }
1924     }
1925    
1926 dl 1.300 /**
1927     * Pushes a possibly-external submission.
1928     */
1929     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1930     Thread t; ForkJoinWorkerThread w; WorkQueue q;
1931     if (task == null)
1932     throw new NullPointerException();
1933     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1934     (w = (ForkJoinWorkerThread)t).pool == this &&
1935     (q = w.workQueue) != null)
1936     q.push(task);
1937     else
1938     externalPush(task);
1939     return task;
1940     }
1941    
1942     /**
1943     * Returns common pool queue for an external thread.
1944     */
1945     static WorkQueue commonSubmitterQueue() {
1946     ForkJoinPool p = common;
1947     int r = ThreadLocalRandom.getProbe();
1948     WorkQueue[] ws; int n;
1949     return (p != null && (ws = p.workQueues) != null &&
1950     (n = ws.length) > 0) ?
1951     ws[(n - 1) & r & SQMASK] : null;
1952     }
1953 dl 1.90
1954     /**
1955 dl 1.300 * Performs tryUnpush for an external submitter.
1956     */
1957     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1958     int r = ThreadLocalRandom.getProbe();
1959     WorkQueue[] ws; WorkQueue w; int n;
1960     return ((ws = workQueues) != null &&
1961     (n = ws.length) > 0 &&
1962     (w = ws[(n - 1) & r & SQMASK]) != null &&
1963     w.trySharedUnpush(task));
1964 dl 1.22 }
1965    
1966     /**
1967 dl 1.300 * Performs helpComplete for an external submitter.
1968 dl 1.78 */
1969 dl 1.300 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1970     int r = ThreadLocalRandom.getProbe();
1971     WorkQueue[] ws; WorkQueue w; int n;
1972     return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1973     (w = ws[(n - 1) & r & SQMASK]) != null) ?
1974     w.sharedHelpCC(task, maxTasks) : 0;
1975 dl 1.22 }
1976    
1977     /**
1978 dl 1.300 * Tries to steal and run tasks within the target's computation.
1979     * The maxTasks argument supports external usages; internal calls
1980     * use zero, allowing unbounded steps (external calls trap
1981     * non-positive values).
1982 dl 1.78 *
1983 dl 1.300 * @param w caller
1984     * @param maxTasks if non-zero, the maximum number of other tasks to run
1985     * @return task status on exit
1986 dl 1.22 */
1987 dl 1.300 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1988     int maxTasks) {
1989     return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
1990 dl 1.14 }
1991    
1992     /**
1993 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
1994     * programmers, frameworks, tools, or languages have little or no
1995 jsr166 1.222 * idea about task granularity. In essence, by offering this
1996 dl 1.105 * method, we ask users only about tradeoffs in overhead vs
1997     * expected throughput and its variance, rather than how finely to
1998     * partition tasks.
1999     *
2000     * In a steady state strict (tree-structured) computation, each
2001     * thread makes available for stealing enough tasks for other
2002     * threads to remain active. Inductively, if all threads play by
2003     * the same rules, each thread should make available only a
2004     * constant number of tasks.
2005     *
2006     * The minimum useful constant is just 1. But using a value of 1
2007     * would require immediate replenishment upon each steal to
2008     * maintain enough tasks, which is infeasible. Further,
2009     * partitionings/granularities of offered tasks should minimize
2010     * steal rates, which in general means that threads nearer the top
2011     * of computation tree should generate more than those nearer the
2012     * bottom. In perfect steady state, each thread is at
2013     * approximately the same level of computation tree. However,
2014     * producing extra tasks amortizes the uncertainty of progress and
2015     * diffusion assumptions.
2016     *
2017 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2018 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2019     * against uneven progress; as traded off against the cost of
2020     * extra task overhead. We leave the user to pick a threshold
2021     * value to compare with the results of this call to guide
2022     * decisions, but recommend values such as 3.
2023     *
2024     * When all threads are active, it is on average OK to estimate
2025     * surplus strictly locally. In steady-state, if one thread is
2026     * maintaining say 2 surplus tasks, then so are others. So we can
2027     * just use estimated queue length. However, this strategy alone
2028     * leads to serious mis-estimates in some non-steady-state
2029     * conditions (ramp-up, ramp-down, other stalls). We can detect
2030     * many of these by further considering the number of "idle"
2031     * threads, that are known to have zero queued tasks, so
2032     * compensate by a factor of (#idle/#active) threads.
2033     */
2034     static int getSurplusQueuedTaskCount() {
2035     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2036 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2037     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2038     (q = wt.workQueue) != null) {
2039     int p = pool.mode & SMASK;
2040     int a = p + (int)(pool.ctl >> RC_SHIFT);
2041     int n = q.top - q.base;
2042 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2043     a > (p >>>= 1) ? 1 :
2044     a > (p >>>= 1) ? 2 :
2045     a > (p >>>= 1) ? 4 :
2046     8);
2047 dl 1.105 }
2048     return 0;
2049 dl 1.100 }
2050    
2051 dl 1.300 // Termination
2052 dl 1.14
2053     /**
2054 dl 1.210 * Possibly initiates and/or completes termination.
2055 dl 1.14 *
2056     * @param now if true, unconditionally terminate, else only
2057 dl 1.78 * if no work and no active workers
2058 dl 1.243 * @param enable if true, terminate when next possible
2059 dl 1.300 * @return true if terminating or terminated
2060 jsr166 1.1 */
2061 dl 1.300 private boolean tryTerminate(boolean now, boolean enable) {
2062     int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2063 dl 1.289
2064 dl 1.300 while (((md = mode) & SHUTDOWN) == 0) {
2065 dl 1.294 if (!enable || this == common) // cannot shutdown
2066 dl 1.300 return false;
2067 dl 1.294 else
2068 dl 1.300 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2069 dl 1.289 }
2070    
2071 dl 1.300 while (((md = mode) & STOP) == 0) { // try to initiate termination
2072     if (!now) { // check if quiescent & empty
2073 dl 1.211 for (long oldSum = 0L;;) { // repeat until stable
2074 dl 1.300 boolean running = false;
2075 dl 1.210 long checkSum = ctl;
2076 dl 1.300 WorkQueue[] ws = workQueues;
2077     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2078     running = true;
2079     else if (ws != null) {
2080     WorkQueue w; int b;
2081 dl 1.289 for (int i = 0; i < ws.length; ++i) {
2082     if ((w = ws[i]) != null) {
2083 dl 1.300 checkSum += (b = w.base) + w.id;
2084     if (b != w.top ||
2085     ((i & 1) == 1 && w.source >= 0)) {
2086     running = true;
2087     break;
2088     }
2089 dl 1.289 }
2090 dl 1.206 }
2091 dl 1.203 }
2092 dl 1.300 if (((md = mode) & STOP) != 0)
2093     break; // already triggered
2094     else if (running)
2095     return false;
2096     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2097 dl 1.210 break;
2098 dl 1.203 }
2099     }
2100 dl 1.300 if ((md & STOP) == 0)
2101     U.compareAndSwapInt(this, MODE, md, md | STOP);
2102 dl 1.200 }
2103 dl 1.210
2104 dl 1.300 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2105     for (long oldSum = 0L;;) { // repeat until stable
2106     WorkQueue[] ws; WorkQueue w;
2107     long checkSum = ctl;
2108     if ((ws = workQueues) != null) {
2109     for (int i = 0; i < ws.length; ++i) {
2110     if ((w = ws[i]) != null) {
2111     ForkJoinWorkerThread wt = w.owner;
2112     w.cancelAll(); // clear queues
2113     if (wt != null) {
2114 dl 1.289 try { // unblock join or park
2115 dl 1.210 wt.interrupt();
2116     } catch (Throwable ignore) {
2117 dl 1.200 }
2118     }
2119 dl 1.300 checkSum += w.base + w.id;
2120 dl 1.200 }
2121 dl 1.101 }
2122 dl 1.78 }
2123 dl 1.300 if (((md = mode) & TERMINATED) != 0 ||
2124     (workQueues == ws && oldSum == (oldSum = checkSum)))
2125     break;
2126 dl 1.78 }
2127 dl 1.300 if ((md & TERMINATED) != 0)
2128     break;
2129     else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2130 dl 1.210 break;
2131 dl 1.300 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2132     synchronized (this) {
2133     notifyAll(); // for awaitTermination
2134 dl 1.243 }
2135 dl 1.256 break;
2136 dl 1.200 }
2137 dl 1.52 }
2138 dl 1.300 return true;
2139 dl 1.105 }
2140    
2141 dl 1.52 // Exported methods
2142 jsr166 1.1
2143     // Constructors
2144    
2145     /**
2146 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2147 dl 1.300 * java.lang.Runtime#availableProcessors}, using defaults for all
2148     * other parameters.
2149 jsr166 1.1 *
2150     * @throws SecurityException if a security manager exists and
2151     * the caller is not permitted to modify threads
2152     * because it does not hold {@link
2153     * java.lang.RuntimePermission}{@code ("modifyThread")}
2154     */
2155     public ForkJoinPool() {
2156 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2157 dl 1.300 defaultForkJoinWorkerThreadFactory, null, false,
2158     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2159 jsr166 1.1 }
2160    
2161     /**
2162 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2163 dl 1.300 * level, using defaults for all other parameters.
2164 jsr166 1.1 *
2165 jsr166 1.9 * @param parallelism the parallelism level
2166 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2167 jsr166 1.11 * equal to zero, or greater than implementation limit
2168 jsr166 1.1 * @throws SecurityException if a security manager exists and
2169     * the caller is not permitted to modify threads
2170     * because it does not hold {@link
2171     * java.lang.RuntimePermission}{@code ("modifyThread")}
2172     */
2173     public ForkJoinPool(int parallelism) {
2174 dl 1.300 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2175     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2176 jsr166 1.1 }
2177    
2178     /**
2179 dl 1.300 * Creates a {@code ForkJoinPool} with the given parameters (using
2180     * defaults for others).
2181 jsr166 1.1 *
2182 dl 1.18 * @param parallelism the parallelism level. For default value,
2183     * use {@link java.lang.Runtime#availableProcessors}.
2184     * @param factory the factory for creating new threads. For default value,
2185     * use {@link #defaultForkJoinWorkerThreadFactory}.
2186 dl 1.19 * @param handler the handler for internal worker threads that
2187     * terminate due to unrecoverable errors encountered while executing
2188 jsr166 1.31 * tasks. For default value, use {@code null}.
2189 dl 1.19 * @param asyncMode if true,
2190 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2191     * tasks that are never joined. This mode may be more appropriate
2192     * than default locally stack-based mode in applications in which
2193     * worker threads only process event-style asynchronous tasks.
2194 jsr166 1.31 * For default value, use {@code false}.
2195 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2196 jsr166 1.11 * equal to zero, or greater than implementation limit
2197     * @throws NullPointerException if the factory is null
2198 jsr166 1.1 * @throws SecurityException if a security manager exists and
2199     * the caller is not permitted to modify threads
2200     * because it does not hold {@link
2201     * java.lang.RuntimePermission}{@code ("modifyThread")}
2202     */
2203 dl 1.19 public ForkJoinPool(int parallelism,
2204 dl 1.18 ForkJoinWorkerThreadFactory factory,
2205 jsr166 1.156 UncaughtExceptionHandler handler,
2206 dl 1.18 boolean asyncMode) {
2207 dl 1.300 this(parallelism, factory, handler, asyncMode,
2208     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2209 dl 1.152 }
2210    
2211 dl 1.300 /**
2212     * Creates a {@code ForkJoinPool} with the given parameters.
2213     *
2214     * @param parallelism the parallelism level. For default value,
2215     * use {@link java.lang.Runtime#availableProcessors}.
2216     *
2217     * @param factory the factory for creating new threads. For
2218     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2219     *
2220     * @param handler the handler for internal worker threads that
2221     * terminate due to unrecoverable errors encountered while
2222     * executing tasks. For default value, use {@code null}.
2223     *
2224     * @param asyncMode if true, establishes local first-in-first-out
2225     * scheduling mode for forked tasks that are never joined. This
2226     * mode may be more appropriate than default locally stack-based
2227     * mode in applications in which worker threads only process
2228     * event-style asynchronous tasks. For default value, use {@code
2229     * false}.
2230     *
2231     * @param corePoolSize the number of threads to keep in the pool
2232     * (unless timed out after an elapsed keep-alive). Normally (and
2233     * by default) this is the same value as the parallelism level,
2234     * but may be set to a larger value to reduce dynamic overhead if
2235     * tasks regularly block. Using a smaller value (for example
2236     * {@code 0}) has the same effect as the default.
2237     *
2238     * @param maximumPoolSize the maximum number of threads allowed.
2239     * When the maximum is reached, attempts to replace blocked
2240     * threads fail. (However, because creation and termination of
2241     * different threads may overlap, and may be managed by the given
2242     * thread factory, this value may be transiently exceeded.) The
2243     * default for the common pool is {@code 256} plus the parallelism
2244     * level. Using a value (for example {@code Integer.MAX_VALUE})
2245     * larger than the implementation's total thread limit has the
2246     * same effect as using this limit.
2247     *
2248     * @param minimumRunnable the minimum allowed number of core
2249     * threads not blocked by a join or {@link ManagedBlocker}. To
2250     * ensure progress, when too few unblocked threads exist and
2251     * unexecuted tasks may exist, new threads are constructed, up to
2252     * the given maximumPoolSize. For the default value, use {@code
2253     * 1}, that ensures liveness. A larger value might improve
2254     * throughput in the presence of blocked activities, but might
2255     * not, due to increased overhead. A value of zero may be
2256     * acceptable when submitted tasks cannot have dependencies
2257     * requiring additional threads.
2258     *
2259     * @param rejectOnSaturation if true, attempts to create more than
2260     * the maximum total allowed threads throw {@link
2261     * RejectedExecutionException}. Otherwise, the pool continues to
2262     * operate, but with fewer than the target number of runnable
2263     * threads, so might not ensure progress. For default value, use
2264     * {@code true}.
2265     *
2266     * @param keepAliveTime the elapsed time since last use before
2267     * a thread is terminated (and then later replaced if needed).
2268     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2269     *
2270     * @param unit the time unit for the {@code keepAliveTime} argument
2271     *
2272     * @throws IllegalArgumentException if parallelism is less than or
2273     * equal to zero, or is greater than implementation limit,
2274     * or if maximumPoolSize is less than parallelism,
2275     * of if the keepAliveTime is less than or equal to zero.
2276     * @throws NullPointerException if the factory is null
2277     * @throws SecurityException if a security manager exists and
2278     * the caller is not permitted to modify threads
2279     * because it does not hold {@link
2280     * java.lang.RuntimePermission}{@code ("modifyThread")}
2281     */
2282     public ForkJoinPool(int parallelism,
2283     ForkJoinWorkerThreadFactory factory,
2284     UncaughtExceptionHandler handler,
2285     boolean asyncMode,
2286     int corePoolSize,
2287     int maximumPoolSize,
2288     int minimumRunnable,
2289     boolean rejectOnSaturation,
2290     long keepAliveTime,
2291     TimeUnit unit) {
2292     // check, encode, pack parameters
2293     if (parallelism <= 0 || parallelism > MAX_CAP ||
2294     maximumPoolSize < parallelism || keepAliveTime <= 0L)
2295 dl 1.152 throw new IllegalArgumentException();
2296 dl 1.14 if (factory == null)
2297     throw new NullPointerException();
2298 dl 1.300 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2299    
2300     String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2301     int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2302     long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2303     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2304     int m = (parallelism |
2305     (asyncMode ? FIFO : 0) |
2306     (rejectOnSaturation ? 0 : SATURATE));
2307     int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2308     int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2309     int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2310     int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2311     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2312     n = (n + 1) << 1; // power of two, including space for submission queues
2313    
2314     this.workQueues = new WorkQueue[n];
2315     this.workerNamePrefix = prefix;
2316     this.factory = factory;
2317     this.ueh = handler;
2318     this.keepAlive = ms;
2319     this.bounds = b;
2320     this.mode = m;
2321     this.ctl = c;
2322     checkPermission();
2323 dl 1.152 }
2324    
2325     /**
2326 dl 1.300 * Constructor for common pool using parameters possibly
2327     * overridden by system properties
2328     */
2329     private ForkJoinPool(byte forCommonPoolOnly) {
2330     int parallelism = -1;
2331     ForkJoinWorkerThreadFactory fac = null;
2332     UncaughtExceptionHandler handler = null;
2333     try { // ignore exceptions in accessing/parsing properties
2334     String pp = System.getProperty
2335     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2336     String fp = System.getProperty
2337     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2338     String hp = System.getProperty
2339     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2340     if (pp != null)
2341     parallelism = Integer.parseInt(pp);
2342     if (fp != null)
2343     fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2344     getSystemClassLoader().loadClass(fp).newInstance());
2345     if (hp != null)
2346     handler = ((UncaughtExceptionHandler)ClassLoader.
2347     getSystemClassLoader().loadClass(hp).newInstance());
2348     } catch (Exception ignore) {
2349     }
2350    
2351     if (fac == null) {
2352     if (System.getSecurityManager() == null)
2353     fac = defaultForkJoinWorkerThreadFactory;
2354     else // use security-managed default
2355     fac = new InnocuousForkJoinWorkerThreadFactory();
2356     }
2357     if (parallelism < 0 && // default 1 less than #cores
2358     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2359     parallelism = 1;
2360     if (parallelism > MAX_CAP)
2361     parallelism = MAX_CAP;
2362    
2363     long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2364     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2365     int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2366     int m = (parallelism < 1) ? 1 : parallelism;
2367     int n = (parallelism > 1) ? parallelism - 1 : 1;
2368     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2369     n = (n + 1) << 1;
2370    
2371     this.workQueues = new WorkQueue[n];
2372     this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2373     this.factory = fac;
2374 dl 1.18 this.ueh = handler;
2375 dl 1.300 this.keepAlive = DEFAULT_KEEPALIVE;
2376     this.bounds = b;
2377     this.mode = m;
2378     this.ctl = c;
2379 dl 1.101 }
2380    
2381     /**
2382 dl 1.128 * Returns the common pool instance. This pool is statically
2383 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2384     * #shutdown} or {@link #shutdownNow}. However this pool and any
2385     * ongoing processing are automatically terminated upon program
2386     * {@link System#exit}. Any program that relies on asynchronous
2387     * task processing to complete before program termination should
2388 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2389     * before exit.
2390 dl 1.100 *
2391     * @return the common pool instance
2392 jsr166 1.138 * @since 1.8
2393 dl 1.100 */
2394     public static ForkJoinPool commonPool() {
2395 dl 1.134 // assert common != null : "static init error";
2396     return common;
2397 dl 1.100 }
2398    
2399 jsr166 1.1 // Execution methods
2400    
2401     /**
2402     * Performs the given task, returning its result upon completion.
2403 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2404     * it is rethrown as the outcome of this invocation. Rethrown
2405     * exceptions behave in the same way as regular exceptions, but,
2406     * when possible, contain stack traces (as displayed for example
2407     * using {@code ex.printStackTrace()}) of both the current thread
2408     * as well as the thread actually encountering the exception;
2409     * minimally only the latter.
2410 jsr166 1.1 *
2411     * @param task the task
2412 jsr166 1.191 * @param <T> the type of the task's result
2413 jsr166 1.1 * @return the task's result
2414 jsr166 1.11 * @throws NullPointerException if the task is null
2415     * @throws RejectedExecutionException if the task cannot be
2416     * scheduled for execution
2417 jsr166 1.1 */
2418     public <T> T invoke(ForkJoinTask<T> task) {
2419 dl 1.90 if (task == null)
2420     throw new NullPointerException();
2421 dl 1.243 externalSubmit(task);
2422 dl 1.78 return task.join();
2423 jsr166 1.1 }
2424    
2425     /**
2426     * Arranges for (asynchronous) execution of the given task.
2427     *
2428     * @param task the task
2429 jsr166 1.11 * @throws NullPointerException if the task is null
2430     * @throws RejectedExecutionException if the task cannot be
2431     * scheduled for execution
2432 jsr166 1.1 */
2433 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2434 dl 1.243 externalSubmit(task);
2435 jsr166 1.1 }
2436    
2437     // AbstractExecutorService methods
2438    
2439 jsr166 1.11 /**
2440     * @throws NullPointerException if the task is null
2441     * @throws RejectedExecutionException if the task cannot be
2442     * scheduled for execution
2443     */
2444 jsr166 1.1 public void execute(Runnable task) {
2445 dl 1.41 if (task == null)
2446     throw new NullPointerException();
2447 jsr166 1.2 ForkJoinTask<?> job;
2448 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2449     job = (ForkJoinTask<?>) task;
2450 jsr166 1.2 else
2451 dl 1.152 job = new ForkJoinTask.RunnableExecuteAction(task);
2452 dl 1.243 externalSubmit(job);
2453 jsr166 1.1 }
2454    
2455 jsr166 1.11 /**
2456 dl 1.18 * Submits a ForkJoinTask for execution.
2457     *
2458     * @param task the task to submit
2459 jsr166 1.191 * @param <T> the type of the task's result
2460 dl 1.18 * @return the task
2461     * @throws NullPointerException if the task is null
2462     * @throws RejectedExecutionException if the task cannot be
2463     * scheduled for execution
2464     */
2465     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2466 dl 1.243 return externalSubmit(task);
2467 dl 1.18 }
2468    
2469     /**
2470 jsr166 1.11 * @throws NullPointerException if the task is null
2471     * @throws RejectedExecutionException if the task cannot be
2472     * scheduled for execution
2473     */
2474 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2475 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2476 jsr166 1.1 }
2477    
2478 jsr166 1.11 /**
2479     * @throws NullPointerException if the task is null
2480     * @throws RejectedExecutionException if the task cannot be
2481     * scheduled for execution
2482     */
2483 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2484 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2485 jsr166 1.1 }
2486    
2487 jsr166 1.11 /**
2488     * @throws NullPointerException if the task is null
2489     * @throws RejectedExecutionException if the task cannot be
2490     * scheduled for execution
2491     */
2492 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2493 dl 1.41 if (task == null)
2494     throw new NullPointerException();
2495 jsr166 1.2 ForkJoinTask<?> job;
2496 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2497     job = (ForkJoinTask<?>) task;
2498 jsr166 1.2 else
2499 dl 1.90 job = new ForkJoinTask.AdaptedRunnableAction(task);
2500 dl 1.243 return externalSubmit(job);
2501 jsr166 1.1 }
2502    
2503     /**
2504 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2505     * @throws RejectedExecutionException {@inheritDoc}
2506     */
2507 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2508 dl 1.86 // In previous versions of this class, this method constructed
2509     // a task to run ForkJoinTask.invokeAll, but now external
2510     // invocation of multiple tasks is at least as efficient.
2511 jsr166 1.199 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2512 jsr166 1.1
2513 dl 1.86 try {
2514     for (Callable<T> t : tasks) {
2515 dl 1.90 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2516 jsr166 1.144 futures.add(f);
2517 dl 1.243 externalSubmit(f);
2518 dl 1.86 }
2519 jsr166 1.143 for (int i = 0, size = futures.size(); i < size; i++)
2520     ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2521 dl 1.86 return futures;
2522 jsr166 1.226 } catch (Throwable t) {
2523     for (int i = 0, size = futures.size(); i < size; i++)
2524     futures.get(i).cancel(false);
2525     throw t;
2526 jsr166 1.1 }
2527     }
2528    
2529     /**
2530     * Returns the factory used for constructing new workers.
2531     *
2532     * @return the factory used for constructing new workers
2533     */
2534     public ForkJoinWorkerThreadFactory getFactory() {
2535     return factory;
2536     }
2537    
2538     /**
2539     * Returns the handler for internal worker threads that terminate
2540     * due to unrecoverable errors encountered while executing tasks.
2541     *
2542 jsr166 1.4 * @return the handler, or {@code null} if none
2543 jsr166 1.1 */
2544 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2545 dl 1.14 return ueh;
2546 jsr166 1.1 }
2547    
2548     /**
2549 jsr166 1.9 * Returns the targeted parallelism level of this pool.
2550 jsr166 1.1 *
2551 jsr166 1.9 * @return the targeted parallelism level of this pool
2552 jsr166 1.1 */
2553     public int getParallelism() {
2554 dl 1.300 return mode & SMASK;
2555 jsr166 1.1 }
2556    
2557     /**
2558 dl 1.100 * Returns the targeted parallelism level of the common pool.
2559     *
2560     * @return the targeted parallelism level of the common pool
2561 jsr166 1.138 * @since 1.8
2562 dl 1.100 */
2563     public static int getCommonPoolParallelism() {
2564 jsr166 1.274 return COMMON_PARALLELISM;
2565 dl 1.100 }
2566    
2567     /**
2568 jsr166 1.1 * Returns the number of worker threads that have started but not
2569 jsr166 1.34 * yet terminated. The result returned by this method may differ
2570 jsr166 1.4 * from {@link #getParallelism} when threads are created to
2571 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
2572     *
2573     * @return the number of worker threads
2574     */
2575     public int getPoolSize() {
2576 dl 1.300 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2577 jsr166 1.1 }
2578    
2579     /**
2580 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
2581 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
2582     *
2583 jsr166 1.4 * @return {@code true} if this pool uses async mode
2584 jsr166 1.1 */
2585     public boolean getAsyncMode() {
2586 dl 1.300 return (mode & FIFO) != 0;
2587 jsr166 1.1 }
2588    
2589     /**
2590     * Returns an estimate of the number of worker threads that are
2591     * not blocked waiting to join tasks or for other managed
2592 dl 1.14 * synchronization. This method may overestimate the
2593     * number of running threads.
2594 jsr166 1.1 *
2595     * @return the number of worker threads
2596     */
2597     public int getRunningThreadCount() {
2598 dl 1.78 int rc = 0;
2599     WorkQueue[] ws; WorkQueue w;
2600     if ((ws = workQueues) != null) {
2601 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2602     if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2603 dl 1.78 ++rc;
2604     }
2605     }
2606     return rc;
2607 jsr166 1.1 }
2608    
2609     /**
2610     * Returns an estimate of the number of threads that are currently
2611     * stealing or executing tasks. This method may overestimate the
2612     * number of active threads.
2613     *
2614     * @return the number of active threads
2615     */
2616     public int getActiveThreadCount() {
2617 dl 1.300 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2618 jsr166 1.63 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2619 jsr166 1.1 }
2620    
2621     /**
2622 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
2623     * An idle worker is one that cannot obtain a task to execute
2624     * because none are available to steal from other threads, and
2625     * there are no pending submissions to the pool. This method is
2626     * conservative; it might not return {@code true} immediately upon
2627     * idleness of all threads, but will eventually become true if
2628     * threads remain inactive.
2629 jsr166 1.1 *
2630 jsr166 1.4 * @return {@code true} if all threads are currently idle
2631 jsr166 1.1 */
2632     public boolean isQuiescent() {
2633 dl 1.300 for (;;) {
2634     long c = ctl;
2635     int md = mode, pc = md & SMASK;
2636     int tc = pc + (short)(c >> TC_SHIFT);
2637     int rc = pc + (int)(c >> RC_SHIFT);
2638     if ((md & (STOP | TERMINATED)) != 0)
2639     return true;
2640     else if (rc > 0)
2641     return false;
2642     else {
2643     WorkQueue[] ws; WorkQueue v;
2644     if ((ws = workQueues) != null) {
2645     for (int i = 1; i < ws.length; i += 2) {
2646     if ((v = ws[i]) != null) {
2647     if ((v.source & QUIET) == 0)
2648     return false;
2649     --tc;
2650     }
2651     }
2652     }
2653     if (tc == 0 && ctl == c)
2654     return true;
2655     }
2656     }
2657 jsr166 1.1 }
2658    
2659     /**
2660     * Returns an estimate of the total number of tasks stolen from
2661     * one thread's work queue by another. The reported value
2662     * underestimates the actual total number of steals when the pool
2663     * is not quiescent. This value may be useful for monitoring and
2664     * tuning fork/join programs: in general, steal counts should be
2665     * high enough to keep threads busy, but low enough to avoid
2666     * overhead and contention across threads.
2667     *
2668     * @return the number of steals
2669     */
2670     public long getStealCount() {
2671 dl 1.300 long count = stealCount;
2672 dl 1.78 WorkQueue[] ws; WorkQueue w;
2673     if ((ws = workQueues) != null) {
2674 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2675 dl 1.78 if ((w = ws[i]) != null)
2676 dl 1.300 count += (long)w.nsteals & 0xffffffffL;
2677 dl 1.78 }
2678     }
2679     return count;
2680 jsr166 1.1 }
2681    
2682     /**
2683     * Returns an estimate of the total number of tasks currently held
2684     * in queues by worker threads (but not including tasks submitted
2685     * to the pool that have not begun executing). This value is only
2686     * an approximation, obtained by iterating across all threads in
2687     * the pool. This method may be useful for tuning task
2688     * granularities.
2689     *
2690     * @return the number of queued tasks
2691     */
2692     public long getQueuedTaskCount() {
2693     long count = 0;
2694 dl 1.78 WorkQueue[] ws; WorkQueue w;
2695     if ((ws = workQueues) != null) {
2696 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2697 dl 1.78 if ((w = ws[i]) != null)
2698     count += w.queueSize();
2699     }
2700 dl 1.52 }
2701 jsr166 1.1 return count;
2702     }
2703    
2704     /**
2705 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
2706 dl 1.55 * pool that have not yet begun executing. This method may take
2707 dl 1.52 * time proportional to the number of submissions.
2708 jsr166 1.1 *
2709     * @return the number of queued submissions
2710     */
2711     public int getQueuedSubmissionCount() {
2712 dl 1.78 int count = 0;
2713     WorkQueue[] ws; WorkQueue w;
2714     if ((ws = workQueues) != null) {
2715 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2716 dl 1.78 if ((w = ws[i]) != null)
2717     count += w.queueSize();
2718     }
2719     }
2720     return count;
2721 jsr166 1.1 }
2722    
2723     /**
2724 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
2725     * pool that have not yet begun executing.
2726 jsr166 1.1 *
2727     * @return {@code true} if there are any queued submissions
2728     */
2729     public boolean hasQueuedSubmissions() {
2730 dl 1.78 WorkQueue[] ws; WorkQueue w;
2731     if ((ws = workQueues) != null) {
2732 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2733 dl 1.115 if ((w = ws[i]) != null && !w.isEmpty())
2734 dl 1.78 return true;
2735     }
2736     }
2737     return false;
2738 jsr166 1.1 }
2739    
2740     /**
2741     * Removes and returns the next unexecuted submission if one is
2742     * available. This method may be useful in extensions to this
2743     * class that re-assign work in systems with multiple pools.
2744     *
2745 jsr166 1.4 * @return the next submission, or {@code null} if none
2746 jsr166 1.1 */
2747     protected ForkJoinTask<?> pollSubmission() {
2748 dl 1.300 return pollScan(true);
2749 jsr166 1.1 }
2750    
2751     /**
2752     * Removes all available unexecuted submitted and forked tasks
2753     * from scheduling queues and adds them to the given collection,
2754     * without altering their execution status. These may include
2755 jsr166 1.8 * artificially generated or wrapped tasks. This method is
2756     * designed to be invoked only when the pool is known to be
2757 jsr166 1.1 * quiescent. Invocations at other times may not remove all
2758     * tasks. A failure encountered while attempting to add elements
2759     * to collection {@code c} may result in elements being in
2760     * neither, either or both collections when the associated
2761     * exception is thrown. The behavior of this operation is
2762     * undefined if the specified collection is modified while the
2763     * operation is in progress.
2764     *
2765     * @param c the collection to transfer elements into
2766     * @return the number of elements transferred
2767     */
2768 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2769 dl 1.52 int count = 0;
2770 dl 1.78 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2771     if ((ws = workQueues) != null) {
2772 dl 1.86 for (int i = 0; i < ws.length; ++i) {
2773 dl 1.78 if ((w = ws[i]) != null) {
2774     while ((t = w.poll()) != null) {
2775     c.add(t);
2776     ++count;
2777     }
2778     }
2779 dl 1.52 }
2780     }
2781 dl 1.18 return count;
2782     }
2783    
2784     /**
2785 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
2786     * including indications of run state, parallelism level, and
2787     * worker and task counts.
2788     *
2789     * @return a string identifying this pool, as well as its state
2790     */
2791     public String toString() {
2792 dl 1.86 // Use a single pass through workQueues to collect counts
2793     long qt = 0L, qs = 0L; int rc = 0;
2794 dl 1.300 long st = stealCount;
2795 dl 1.86 WorkQueue[] ws; WorkQueue w;
2796     if ((ws = workQueues) != null) {
2797     for (int i = 0; i < ws.length; ++i) {
2798     if ((w = ws[i]) != null) {
2799     int size = w.queueSize();
2800     if ((i & 1) == 0)
2801     qs += size;
2802     else {
2803     qt += size;
2804 dl 1.300 st += (long)w.nsteals & 0xffffffffL;
2805 dl 1.86 if (w.isApparentlyUnblocked())
2806     ++rc;
2807     }
2808     }
2809     }
2810     }
2811 dl 1.300
2812     int md = mode;
2813     int pc = (md & SMASK);
2814     long c = ctl;
2815 dl 1.52 int tc = pc + (short)(c >>> TC_SHIFT);
2816 dl 1.300 int ac = pc + (int)(c >> RC_SHIFT);
2817 dl 1.78 if (ac < 0) // ignore transient negative
2818     ac = 0;
2819 dl 1.300 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2820     (md & STOP) != 0 ? "Terminating" :
2821     (md & SHUTDOWN) != 0 ? "Shutting down" :
2822 dl 1.200 "Running");
2823 jsr166 1.1 return super.toString() +
2824 dl 1.52 "[" + level +
2825 dl 1.14 ", parallelism = " + pc +
2826     ", size = " + tc +
2827     ", active = " + ac +
2828     ", running = " + rc +
2829 jsr166 1.1 ", steals = " + st +
2830     ", tasks = " + qt +
2831     ", submissions = " + qs +
2832     "]";
2833     }
2834    
2835     /**
2836 dl 1.100 * Possibly initiates an orderly shutdown in which previously
2837     * submitted tasks are executed, but no new tasks will be
2838     * accepted. Invocation has no effect on execution state if this
2839 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
2840 dl 1.100 * already shut down. Tasks that are in the process of being
2841     * submitted concurrently during the course of this method may or
2842     * may not be rejected.
2843 jsr166 1.1 *
2844     * @throws SecurityException if a security manager exists and
2845     * the caller is not permitted to modify threads
2846     * because it does not hold {@link
2847     * java.lang.RuntimePermission}{@code ("modifyThread")}
2848     */
2849     public void shutdown() {
2850     checkPermission();
2851 dl 1.105 tryTerminate(false, true);
2852 jsr166 1.1 }
2853    
2854     /**
2855 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
2856     * all subsequently submitted tasks. Invocation has no effect on
2857 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
2858 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
2859     * are in the process of being submitted or executed concurrently
2860     * during the course of this method may or may not be
2861     * rejected. This method cancels both existing and unexecuted
2862     * tasks, in order to permit termination in the presence of task
2863     * dependencies. So the method always returns an empty list
2864     * (unlike the case for some other Executors).
2865 jsr166 1.1 *
2866     * @return an empty list
2867     * @throws SecurityException if a security manager exists and
2868     * the caller is not permitted to modify threads
2869     * because it does not hold {@link
2870     * java.lang.RuntimePermission}{@code ("modifyThread")}
2871     */
2872     public List<Runnable> shutdownNow() {
2873     checkPermission();
2874 dl 1.105 tryTerminate(true, true);
2875 jsr166 1.1 return Collections.emptyList();
2876     }
2877    
2878     /**
2879     * Returns {@code true} if all tasks have completed following shut down.
2880     *
2881     * @return {@code true} if all tasks have completed following shut down
2882     */
2883     public boolean isTerminated() {
2884 dl 1.300 return (mode & TERMINATED) != 0;
2885 jsr166 1.1 }
2886    
2887     /**
2888     * Returns {@code true} if the process of termination has
2889 jsr166 1.9 * commenced but not yet completed. This method may be useful for
2890     * debugging. A return of {@code true} reported a sufficient
2891     * period after shutdown may indicate that submitted tasks have
2892 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
2893 dl 1.49 * causing this executor not to properly terminate. (See the
2894     * advisory notes for class {@link ForkJoinTask} stating that
2895     * tasks should not normally entail blocking operations. But if
2896     * they do, they must abort them on interrupt.)
2897 jsr166 1.1 *
2898 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
2899 jsr166 1.1 */
2900     public boolean isTerminating() {
2901 dl 1.300 int md = mode;
2902     return (md & STOP) != 0 && (md & TERMINATED) == 0;
2903 jsr166 1.1 }
2904    
2905     /**
2906     * Returns {@code true} if this pool has been shut down.
2907     *
2908     * @return {@code true} if this pool has been shut down
2909     */
2910     public boolean isShutdown() {
2911 dl 1.300 return (mode & SHUTDOWN) != 0;
2912 jsr166 1.9 }
2913    
2914     /**
2915 dl 1.105 * Blocks until all tasks have completed execution after a
2916     * shutdown request, or the timeout occurs, or the current thread
2917 dl 1.134 * is interrupted, whichever happens first. Because the {@link
2918     * #commonPool()} never terminates until program shutdown, when
2919     * applied to the common pool, this method is equivalent to {@link
2920 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2921 jsr166 1.1 *
2922     * @param timeout the maximum time to wait
2923     * @param unit the time unit of the timeout argument
2924     * @return {@code true} if this executor terminated and
2925     * {@code false} if the timeout elapsed before termination
2926     * @throws InterruptedException if interrupted while waiting
2927     */
2928     public boolean awaitTermination(long timeout, TimeUnit unit)
2929     throws InterruptedException {
2930 dl 1.134 if (Thread.interrupted())
2931     throw new InterruptedException();
2932     if (this == common) {
2933     awaitQuiescence(timeout, unit);
2934     return false;
2935     }
2936 dl 1.52 long nanos = unit.toNanos(timeout);
2937 dl 1.101 if (isTerminated())
2938     return true;
2939 dl 1.183 if (nanos <= 0L)
2940     return false;
2941     long deadline = System.nanoTime() + nanos;
2942 jsr166 1.103 synchronized (this) {
2943 jsr166 1.184 for (;;) {
2944 dl 1.183 if (isTerminated())
2945     return true;
2946     if (nanos <= 0L)
2947     return false;
2948     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2949     wait(millis > 0L ? millis : 1L);
2950     nanos = deadline - System.nanoTime();
2951 dl 1.52 }
2952 dl 1.18 }
2953 jsr166 1.1 }
2954    
2955     /**
2956 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
2957     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2958     * waits and/or attempts to assist performing tasks until this
2959     * pool {@link #isQuiescent} or the indicated timeout elapses.
2960     *
2961     * @param timeout the maximum time to wait
2962     * @param unit the time unit of the timeout argument
2963     * @return {@code true} if quiescent; {@code false} if the
2964     * timeout elapsed.
2965     */
2966     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2967     long nanos = unit.toNanos(timeout);
2968     ForkJoinWorkerThread wt;
2969     Thread thread = Thread.currentThread();
2970     if ((thread instanceof ForkJoinWorkerThread) &&
2971     (wt = (ForkJoinWorkerThread)thread).pool == this) {
2972     helpQuiescePool(wt.workQueue);
2973     return true;
2974     }
2975 dl 1.300 else {
2976     for (long startTime = System.nanoTime();;) {
2977     ForkJoinTask<?> t;
2978     if ((t = pollScan(false)) != null)
2979     t.doExec();
2980     else if (isQuiescent())
2981     return true;
2982     else if ((System.nanoTime() - startTime) > nanos)
2983 dl 1.134 return false;
2984 dl 1.300 else
2985     Thread.yield(); // cannot block
2986 dl 1.134 }
2987     }
2988     }
2989    
2990     /**
2991     * Waits and/or attempts to assist performing tasks indefinitely
2992 jsr166 1.141 * until the {@link #commonPool()} {@link #isQuiescent}.
2993 dl 1.134 */
2994 dl 1.136 static void quiesceCommonPool() {
2995 dl 1.134 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
2996     }
2997    
2998     /**
2999 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3000 jsr166 1.8 * in {@link ForkJoinPool}s.
3001     *
3002 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3003 jsr166 1.218 * {@link #isReleasable} must return {@code true} if blocking is
3004     * not necessary. Method {@link #block} blocks the current thread
3005 dl 1.19 * if necessary (perhaps internally invoking {@code isReleasable}
3006 dl 1.54 * before actually blocking). These actions are performed by any
3007 jsr166 1.157 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3008     * The unusual methods in this API accommodate synchronizers that
3009     * may, but don't usually, block for long periods. Similarly, they
3010 dl 1.54 * allow more efficient internal handling of cases in which
3011     * additional workers may be, but usually are not, needed to
3012     * ensure sufficient parallelism. Toward this end,
3013     * implementations of method {@code isReleasable} must be amenable
3014     * to repeated invocation.
3015 jsr166 1.1 *
3016     * <p>For example, here is a ManagedBlocker based on a
3017     * ReentrantLock:
3018 jsr166 1.239 * <pre> {@code
3019 jsr166 1.1 * class ManagedLocker implements ManagedBlocker {
3020     * final ReentrantLock lock;
3021     * boolean hasLock = false;
3022     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3023     * public boolean block() {
3024     * if (!hasLock)
3025     * lock.lock();
3026     * return true;
3027     * }
3028     * public boolean isReleasable() {
3029     * return hasLock || (hasLock = lock.tryLock());
3030     * }
3031     * }}</pre>
3032 dl 1.19 *
3033     * <p>Here is a class that possibly blocks waiting for an
3034     * item on a given queue:
3035 jsr166 1.239 * <pre> {@code
3036 dl 1.19 * class QueueTaker<E> implements ManagedBlocker {
3037     * final BlockingQueue<E> queue;
3038     * volatile E item = null;
3039     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3040     * public boolean block() throws InterruptedException {
3041     * if (item == null)
3042 dl 1.23 * item = queue.take();
3043 dl 1.19 * return true;
3044     * }
3045     * public boolean isReleasable() {
3046 dl 1.23 * return item != null || (item = queue.poll()) != null;
3047 dl 1.19 * }
3048     * public E getItem() { // call after pool.managedBlock completes
3049     * return item;
3050     * }
3051     * }}</pre>
3052 jsr166 1.1 */
3053     public static interface ManagedBlocker {
3054     /**
3055     * Possibly blocks the current thread, for example waiting for
3056     * a lock or condition.
3057     *
3058 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3059     * (i.e., if isReleasable would return true)
3060 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3061     * (the method is not required to do so, but is allowed to)
3062     */
3063     boolean block() throws InterruptedException;
3064    
3065     /**
3066 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3067 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3068 jsr166 1.1 */
3069     boolean isReleasable();
3070     }
3071    
3072     /**
3073 jsr166 1.217 * Runs the given possibly blocking task. When {@linkplain
3074     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3075     * method possibly arranges for a spare thread to be activated if
3076     * necessary to ensure sufficient parallelism while the current
3077     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3078 jsr166 1.1 *
3079 jsr166 1.217 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3080     * {@code blocker.block()} until either method returns {@code true}.
3081     * Every call to {@code blocker.block()} is preceded by a call to
3082     * {@code blocker.isReleasable()} that returned {@code false}.
3083     *
3084     * <p>If not running in a ForkJoinPool, this method is
3085 jsr166 1.8 * behaviorally equivalent to
3086 jsr166 1.239 * <pre> {@code
3087 jsr166 1.1 * while (!blocker.isReleasable())
3088     * if (blocker.block())
3089 jsr166 1.217 * break;}</pre>
3090 jsr166 1.8 *
3091 jsr166 1.217 * If running in a ForkJoinPool, the pool may first be expanded to
3092     * ensure sufficient parallelism available during the call to
3093     * {@code blocker.block()}.
3094 jsr166 1.1 *
3095 jsr166 1.217 * @param blocker the blocker task
3096     * @throws InterruptedException if {@code blocker.block()} did so
3097 jsr166 1.1 */
3098 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3099 jsr166 1.1 throws InterruptedException {
3100 dl 1.200 ForkJoinPool p;
3101     ForkJoinWorkerThread wt;
3102 dl 1.300 WorkQueue w;
3103 jsr166 1.1 Thread t = Thread.currentThread();
3104 dl 1.200 if ((t instanceof ForkJoinWorkerThread) &&
3105 dl 1.300 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3106     (w = wt.workQueue) != null) {
3107     int block;
3108 dl 1.172 while (!blocker.isReleasable()) {
3109 dl 1.300 if ((block = p.tryCompensate(w)) != 0) {
3110 dl 1.105 try {
3111     do {} while (!blocker.isReleasable() &&
3112     !blocker.block());
3113     } finally {
3114 dl 1.300 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3115 dl 1.105 }
3116     break;
3117 dl 1.78 }
3118     }
3119 dl 1.18 }
3120 dl 1.105 else {
3121     do {} while (!blocker.isReleasable() &&
3122     !blocker.block());
3123     }
3124 jsr166 1.1 }
3125    
3126 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
3127     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3128     // implement RunnableFuture.
3129 jsr166 1.1
3130     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3131 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3132 jsr166 1.1 }
3133    
3134     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3135 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3136 jsr166 1.1 }
3137    
3138     // Unsafe mechanics
3139 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3140 dl 1.78 private static final long CTL;
3141 dl 1.300 private static final long MODE;
3142 jsr166 1.291 private static final int ABASE;
3143     private static final int ASHIFT;
3144 dl 1.52
3145     static {
3146 jsr166 1.3 try {
3147 dl 1.78 CTL = U.objectFieldOffset
3148 jsr166 1.233 (ForkJoinPool.class.getDeclaredField("ctl"));
3149 dl 1.300 MODE = U.objectFieldOffset
3150     (ForkJoinPool.class.getDeclaredField("mode"));
3151 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3152     int scale = U.arrayIndexScale(ForkJoinTask[].class);
3153 jsr166 1.142 if ((scale & (scale - 1)) != 0)
3154 jsr166 1.232 throw new Error("array index scale not a power of two");
3155 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3156 jsr166 1.231 } catch (ReflectiveOperationException e) {
3157 dl 1.52 throw new Error(e);
3158     }
3159 dl 1.105
3160 dl 1.243 // Reduce the risk of rare disastrous classloading in first call to
3161     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3162     Class<?> ensureLoaded = LockSupport.class;
3163    
3164 jsr166 1.273 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3165     try {
3166     String p = System.getProperty
3167     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3168     if (p != null)
3169     commonMaxSpares = Integer.parseInt(p);
3170     } catch (Exception ignore) {}
3171     COMMON_MAX_SPARES = commonMaxSpares;
3172    
3173 dl 1.152 defaultForkJoinWorkerThreadFactory =
3174 dl 1.112 new DefaultForkJoinWorkerThreadFactory();
3175 dl 1.115 modifyThreadPermission = new RuntimePermission("modifyThread");
3176    
3177 dl 1.152 common = java.security.AccessController.doPrivileged
3178     (new java.security.PrivilegedAction<ForkJoinPool>() {
3179 dl 1.300 public ForkJoinPool run() {
3180     return new ForkJoinPool((byte)0); }});
3181 jsr166 1.275
3182 dl 1.300 COMMON_PARALLELISM = common.mode & SMASK;
3183 jsr166 1.3 }
3184 dl 1.52
3185 dl 1.197 /**
3186 jsr166 1.279 * Factory for innocuous worker threads.
3187 dl 1.197 */
3188 jsr166 1.278 private static final class InnocuousForkJoinWorkerThreadFactory
3189 dl 1.197 implements ForkJoinWorkerThreadFactory {
3190    
3191     /**
3192     * An ACC to restrict permissions for the factory itself.
3193     * The constructed workers have no permissions set.
3194     */
3195     private static final AccessControlContext innocuousAcc;
3196     static {
3197     Permissions innocuousPerms = new Permissions();
3198     innocuousPerms.add(modifyThreadPermission);
3199     innocuousPerms.add(new RuntimePermission(
3200     "enableContextClassLoaderOverride"));
3201     innocuousPerms.add(new RuntimePermission(
3202     "modifyThreadGroup"));
3203     innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3204     new ProtectionDomain(null, innocuousPerms)
3205     });
3206     }
3207    
3208     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3209 jsr166 1.227 return java.security.AccessController.doPrivileged(
3210     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3211 dl 1.197 public ForkJoinWorkerThread run() {
3212     return new ForkJoinWorkerThread.
3213     InnocuousForkJoinWorkerThread(pool);
3214     }}, innocuousAcc);
3215     }
3216     }
3217    
3218 jsr166 1.1 }