ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.302
Committed: Mon Mar 14 17:54:54 2016 UTC (8 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.301: +3 -3 lines
Log Message:
lint

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6 jsr166 1.301
7 jsr166 1.1 package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.228 import java.security.AccessControlContext;
11     import java.security.Permissions;
12     import java.security.ProtectionDomain;
13 jsr166 1.1 import java.util.ArrayList;
14     import java.util.Arrays;
15     import java.util.Collection;
16     import java.util.Collections;
17     import java.util.List;
18 dl 1.300 import java.util.concurrent.TimeUnit;
19     import java.util.concurrent.CountedCompleter;
20     import java.util.concurrent.ForkJoinTask;
21     import java.util.concurrent.ForkJoinWorkerThread;
22 dl 1.243 import java.util.concurrent.locks.LockSupport;
23 jsr166 1.1
24     /**
25 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
27 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
28 jsr166 1.11 * monitoring operations.
29 jsr166 1.1 *
30 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31     * ExecutorService} mainly by virtue of employing
32     * <em>work-stealing</em>: all threads in the pool attempt to find and
33 dl 1.78 * execute tasks submitted to the pool and/or created by other active
34     * tasks (eventually blocking waiting for work if none exist). This
35     * enables efficient processing when most tasks spawn other subtasks
36     * (as do most {@code ForkJoinTask}s), as well as when many small
37     * tasks are submitted to the pool from external clients. Especially
38     * when setting <em>asyncMode</em> to true in constructors, {@code
39     * ForkJoinPool}s may also be appropriate for use with event-style
40     * tasks that are never joined.
41 jsr166 1.1 *
42 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
43 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
44     * is not explicitly submitted to a specified pool. Using the common
45     * pool normally reduces resource usage (its threads are slowly
46     * reclaimed during periods of non-use, and reinstated upon subsequent
47 dl 1.105 * use).
48 dl 1.100 *
49     * <p>For applications that require separate or custom pools, a {@code
50     * ForkJoinPool} may be constructed with a given target parallelism
51 jsr166 1.214 * level; by default, equal to the number of available processors.
52     * The pool attempts to maintain enough active (or available) threads
53     * by dynamically adding, suspending, or resuming internal worker
54 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
55     * However, no such adjustments are guaranteed in the face of blocked
56     * I/O or other unmanaged synchronization. The nested {@link
57 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
58 dl 1.300 * synchronization accommodated. The default policies may be
59     * overridden using a constructor with parameters corresponding to
60     * those documented in class {@link ThreadPoolExecutor}.
61 jsr166 1.1 *
62     * <p>In addition to execution and lifecycle control methods, this
63     * class provides status check methods (for example
64 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
65 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
66 jsr166 1.4 * {@link #toString} returns indications of pool state in a
67 jsr166 1.1 * convenient form for informal monitoring.
68     *
69 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
70 jsr166 1.84 * main task execution methods summarized in the following table.
71     * These are designed to be used primarily by clients not already
72     * engaged in fork/join computations in the current pool. The main
73     * forms of these methods accept instances of {@code ForkJoinTask},
74     * but overloaded forms also allow mixed execution of plain {@code
75     * Runnable}- or {@code Callable}- based activities as well. However,
76     * tasks that are already executing in a pool should normally instead
77     * use the within-computation forms listed in the table unless using
78     * async event-style tasks that are not usually joined, in which case
79     * there is little difference among choice of methods.
80 dl 1.18 *
81     * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 jsr166 1.159 * <caption>Summary of task execution methods</caption>
83 dl 1.18 * <tr>
84     * <td></td>
85     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87     * </tr>
88     * <tr>
89 jsr166 1.153 * <td> <b>Arrange async execution</b></td>
90 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
91     * <td> {@link ForkJoinTask#fork}</td>
92     * </tr>
93     * <tr>
94 jsr166 1.153 * <td> <b>Await and obtain result</b></td>
95 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#invoke}</td>
97     * </tr>
98     * <tr>
99 jsr166 1.153 * <td> <b>Arrange exec and obtain Future</b></td>
100 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102     * </tr>
103     * </table>
104 dl 1.19 *
105 dl 1.105 * <p>The common pool is by default constructed with default
106 jsr166 1.155 * parameters, but these may be controlled by setting three
107 jsr166 1.162 * {@linkplain System#getProperty system properties}:
108     * <ul>
109     * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110     * - the parallelism level, a non-negative integer
111     * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112     * - the class name of a {@link ForkJoinWorkerThreadFactory}
113     * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114     * - the class name of a {@link UncaughtExceptionHandler}
115 dl 1.208 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 dl 1.223 * - the maximum number of allowed extra threads to maintain target
117 dl 1.208 * parallelism (default 256).
118 jsr166 1.162 * </ul>
119 dl 1.197 * If a {@link SecurityManager} is present and no factory is
120     * specified, then the default pool uses a factory supplying
121     * threads that have no {@link Permissions} enabled.
122 jsr166 1.165 * The system class loader is used to load these classes.
123 jsr166 1.156 * Upon any error in establishing these settings, default parameters
124 dl 1.160 * are used. It is possible to disable or limit the use of threads in
125     * the common pool by setting the parallelism property to zero, and/or
126 dl 1.193 * using a factory that may return {@code null}. However doing so may
127     * cause unjoined tasks to never be executed.
128 dl 1.105 *
129 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
130     * maximum number of running threads to 32767. Attempts to create
131 jsr166 1.11 * pools with greater than the maximum number result in
132 jsr166 1.8 * {@code IllegalArgumentException}.
133 jsr166 1.1 *
134 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
135 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
136 dl 1.20 * or internal resources have been exhausted.
137 jsr166 1.11 *
138 jsr166 1.1 * @since 1.7
139     * @author Doug Lea
140     */
141     public class ForkJoinPool extends AbstractExecutorService {
142    
143     /*
144 dl 1.14 * Implementation Overview
145     *
146 dl 1.78 * This class and its nested classes provide the main
147     * functionality and control for a set of worker threads:
148 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
149     * Workers take these tasks and typically split them into subtasks
150     * that may be stolen by other workers. Preference rules give
151     * first priority to processing tasks from their own queues (LIFO
152     * or FIFO, depending on mode), then to randomized FIFO steals of
153 dl 1.200 * tasks in other queues. This framework began as vehicle for
154     * supporting tree-structured parallelism using work-stealing.
155     * Over time, its scalability advantages led to extensions and
156 dl 1.208 * changes to better support more diverse usage contexts. Because
157     * most internal methods and nested classes are interrelated,
158     * their main rationale and descriptions are presented here;
159     * individual methods and nested classes contain only brief
160     * comments about details.
161 dl 1.78 *
162 jsr166 1.84 * WorkQueues
163 dl 1.78 * ==========
164     *
165     * Most operations occur within work-stealing queues (in nested
166     * class WorkQueue). These are special forms of Deques that
167     * support only three of the four possible end-operations -- push,
168     * pop, and poll (aka steal), under the further constraints that
169     * push and pop are called only from the owning thread (or, as
170     * extended here, under a lock), while poll may be called from
171     * other threads. (If you are unfamiliar with them, you probably
172     * want to read Herlihy and Shavit's book "The Art of
173     * Multiprocessor programming", chapter 16 describing these in
174     * more detail before proceeding.) The main work-stealing queue
175     * design is roughly similar to those in the papers "Dynamic
176     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177     * (http://research.sun.com/scalable/pubs/index.html) and
178     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 dl 1.200 * The main differences ultimately stem from GC requirements that
181     * we null out taken slots as soon as we can, to maintain as small
182     * a footprint as possible even in programs generating huge
183     * numbers of tasks. To accomplish this, we shift the CAS
184     * arbitrating pop vs poll (steal) from being on the indices
185     * ("base" and "top") to the slots themselves.
186     *
187 dl 1.243 * Adding tasks then takes the form of a classic array push(task)
188     * in a circular buffer:
189     * q.array[q.top++ % length] = task;
190 dl 1.200 *
191     * (The actual code needs to null-check and size-check the array,
192 jsr166 1.247 * uses masking, not mod, for indexing a power-of-two-sized array,
193 dl 1.243 * properly fences accesses, and possibly signals waiting workers
194     * to start scanning -- see below.) Both a successful pop and
195     * poll mainly entail a CAS of a slot from non-null to null.
196 dl 1.200 *
197 jsr166 1.202 * The pop operation (always performed by owner) is:
198 dl 1.243 * if ((the task at top slot is not null) and
199 dl 1.200 * (CAS slot to null))
200     * decrement top and return task;
201     *
202     * And the poll operation (usually by a stealer) is
203 dl 1.243 * if ((the task at base slot is not null) and
204 dl 1.200 * (CAS slot to null))
205     * increment base and return task;
206     *
207 dl 1.300 * There are several variants of each of these. In particular,
208     * almost all uses of poll occur within scan operations that also
209     * interleave contention tracking (with associated code sprawl.)
210 dl 1.243 *
211     * Memory ordering. See "Correct and Efficient Work-Stealing for
212     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214     * analysis of memory ordering requirements in work-stealing
215     * algorithms similar to (but different than) the one used here.
216     * Extracting tasks in array slots via (fully fenced) CAS provides
217     * primary synchronization. The base and top indices imprecisely
218     * guide where to extract from. We do not always require strict
219     * orderings of array and index updates, so sometimes let them be
220     * subject to compiler and processor reorderings. However, the
221     * volatile "base" index also serves as a basis for memory
222     * ordering: Slot accesses are preceded by a read of base,
223 jsr166 1.247 * ensuring happens-before ordering with respect to stealers (so
224 dl 1.243 * the slots themselves can be read via plain array reads.) The
225     * only other memory orderings relied on are maintained in the
226     * course of signalling and activation (see below). A check that
227     * base == top indicates (momentary) emptiness, but otherwise may
228     * err on the side of possibly making the queue appear nonempty
229     * when a push, pop, or poll have not fully committed, or making
230     * it appear empty when an update of top has not yet been visibly
231     * written. (Method isEmpty() checks the case of a partially
232 dl 1.211 * completed removal of the last element.) Because of this, the
233     * poll operation, considered individually, is not wait-free. One
234     * thief cannot successfully continue until another in-progress
235 dl 1.243 * one (or, if previously empty, a push) visibly completes.
236     * However, in the aggregate, we ensure at least probabilistic
237     * non-blockingness. If an attempted steal fails, a scanning
238     * thief chooses a different random victim target to try next. So,
239     * in order for one thief to progress, it suffices for any
240 dl 1.205 * in-progress poll or new push on any empty queue to
241 dl 1.300 * complete.
242 dl 1.200 *
243     * This approach also enables support of a user mode in which
244     * local task processing is in FIFO, not LIFO order, simply by
245     * using poll rather than pop. This can be useful in
246     * message-passing frameworks in which tasks are never joined.
247 dl 1.78 *
248     * WorkQueues are also used in a similar way for tasks submitted
249     * to the pool. We cannot mix these tasks in the same queues used
250 dl 1.200 * by workers. Instead, we randomly associate submission queues
251 dl 1.83 * with submitting threads, using a form of hashing. The
252 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
253     * choosing existing queues, and may be randomly repositioned upon
254     * contention with other submitters. In essence, submitters act
255     * like workers except that they are restricted to executing local
256 dl 1.300 * tasks that they submitted. Insertion of tasks in shared mode
257     * requires a lock but we use only a simple spinlock (using field
258     * phase), because submitters encountering a busy queue move on to
259     * try or create other queues -- they block only when creating and
260     * registering new queues. Because it is used only as a spinlock,
261     * unlocking requires only a "releasing" store (using
262     * putOrderedInt).
263 dl 1.78 *
264 jsr166 1.84 * Management
265 dl 1.78 * ==========
266 dl 1.52 *
267     * The main throughput advantages of work-stealing stem from
268     * decentralized control -- workers mostly take tasks from
269 dl 1.200 * themselves or each other, at rates that can exceed a billion
270     * per second. The pool itself creates, activates (enables
271     * scanning for and running tasks), deactivates, blocks, and
272     * terminates threads, all with minimal central information.
273     * There are only a few properties that we can globally track or
274     * maintain, so we pack them into a small number of variables,
275     * often maintaining atomicity without blocking or locking.
276 dl 1.300 * Nearly all essentially atomic control state is held in a few
277 dl 1.200 * volatile variables that are by far most often read (not
278 dl 1.300 * written) as status and consistency checks. We pack as much
279     * information into them as we can.
280 dl 1.78 *
281 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
282 dl 1.300 * atomically decide to add, enqueue (on an event queue), and
283     * dequeue (and release)-activate workers. To enable this
284 dl 1.78 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285     * far in excess of normal operating range) to allow ids, counts,
286     * and their negations (used for thresholding) to fit into 16bit
287 dl 1.215 * subfields.
288     *
289 dl 1.300 * Field "mode" holds configuration parameters as well as lifetime
290     * status, atomically and monotonically setting SHUTDOWN, STOP,
291     * and finally TERMINATED bits.
292 dl 1.258 *
293     * Field "workQueues" holds references to WorkQueues. It is
294 dl 1.300 * updated (only during worker creation and termination) under
295     * lock (using field workerNamePrefix as lock), but is otherwise
296     * concurrently readable, and accessed directly. We also ensure
297     * that uses of the array reference itself never become too stale
298     * in case of resizing. To simplify index-based operations, the
299     * array size is always a power of two, and all readers must
300     * tolerate null slots. Worker queues are at odd indices. Shared
301     * (submission) queues are at even indices, up to a maximum of 64
302     * slots, to limit growth even if array needs to expand to add
303     * more workers. Grouping them together in this way simplifies and
304 dl 1.262 * speeds up task scanning.
305 dl 1.86 *
306     * All worker thread creation is on-demand, triggered by task
307     * submissions, replacement of terminated workers, and/or
308 dl 1.78 * compensation for blocked workers. However, all other support
309     * code is set up to work with other policies. To ensure that we
310 jsr166 1.264 * do not hold on to worker references that would prevent GC, all
311 dl 1.78 * accesses to workQueues are via indices into the workQueues
312     * array (which is one source of some of the messy code
313     * constructions here). In essence, the workQueues array serves as
314 dl 1.200 * a weak reference mechanism. Thus for example the stack top
315     * subfield of ctl stores indices, not references.
316     *
317     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318     * cannot let workers spin indefinitely scanning for tasks when
319     * none can be found immediately, and we cannot start/resume
320     * workers unless there appear to be tasks available. On the
321     * other hand, we must quickly prod them into action when new
322     * tasks are submitted or generated. In many usages, ramp-up time
323 dl 1.300 * is the main limiting factor in overall performance, which is
324     * compounded at program start-up by JIT compilation and
325     * allocation. So we streamline this as much as possible.
326     *
327     * The "ctl" field atomically maintains total worker and
328     * "released" worker counts, plus the head of the available worker
329     * queue (actually stack, represented by the lower 32bit subfield
330     * of ctl). Released workers are those known to be scanning for
331     * and/or running tasks. Unreleased ("available") workers are
332     * recorded in the ctl stack. These workers are made available for
333     * signalling by enqueuing in ctl (see method runWorker). The
334     * "queue" is a form of Treiber stack. This is ideal for
335     * activating threads in most-recently used order, and improves
336 dl 1.200 * performance and locality, outweighing the disadvantages of
337     * being prone to contention and inability to release a worker
338 dl 1.300 * unless it is topmost on stack. To avoid missed signal problems
339     * inherent in any wait/signal design, available workers rescan
340     * for (and if found run) tasks after enqueuing. Normally their
341     * release status will be updated while doing so, but the released
342     * worker ctl count may underestimate the number of active
343     * threads. (However, it is still possible to determine quiescence
344     * via a validation traversal -- see isQuiescent). After an
345     * unsuccessful rescan, available workers are blocked until
346     * signalled (see signalWork). The top stack state holds the
347     * value of the "phase" field of the worker: its index and status,
348     * plus a version counter that, in addition to the count subfields
349     * (also serving as version stamps) provide protection against
350     * Treiber stack ABA effects.
351 dl 1.200 *
352 dl 1.300 * Creating workers. To create a worker, we pre-increment counts
353     * (serving as a reservation), and attempt to construct a
354 dl 1.200 * ForkJoinWorkerThread via its factory. Upon construction, the
355     * new thread invokes registerWorker, where it constructs a
356     * WorkQueue and is assigned an index in the workQueues array
357 jsr166 1.266 * (expanding the array if necessary). The thread is then started.
358     * Upon any exception across these steps, or null return from
359     * factory, deregisterWorker adjusts counts and records
360 dl 1.200 * accordingly. If a null return, the pool continues running with
361     * fewer than the target number workers. If exceptional, the
362     * exception is propagated, generally to some external caller.
363     * Worker index assignment avoids the bias in scanning that would
364     * occur if entries were sequentially packed starting at the front
365     * of the workQueues array. We treat the array as a simple
366     * power-of-two hash table, expanding as needed. The seedIndex
367     * increment ensures no collisions until a resize is needed or a
368     * worker is deregistered and replaced, and thereafter keeps
369 jsr166 1.202 * probability of collision low. We cannot use
370 dl 1.200 * ThreadLocalRandom.getProbe() for similar purposes here because
371     * the thread has not started yet, but do so for creating
372 dl 1.243 * submission queues for existing external threads (see
373     * externalPush).
374     *
375 dl 1.300 * WorkQueue field "phase" is used by both workers and the pool to
376     * manage and track whether a worker is UNSIGNALLED (possibly
377     * blocked waiting for a signal). When a worker is enqueued its
378     * phase field is set. Note that phase field updates lag queue CAS
379     * releases so usage requires care -- seeing a negative phase does
380     * not guarantee that the worker is available. When queued, the
381     * lower 16 bits of scanState must hold its pool index. So we
382     * place the index there upon initialization (see registerWorker)
383     * and otherwise keep it there or restore it when necessary.
384 dl 1.243 *
385     * The ctl field also serves as the basis for memory
386     * synchronization surrounding activation. This uses a more
387     * efficient version of a Dekker-like rule that task producers and
388     * consumers sync with each other by both writing/CASing ctl (even
389 dl 1.253 * if to its current value). This would be extremely costly. So
390     * we relax it in several ways: (1) Producers only signal when
391     * their queue is empty. Other workers propagate this signal (in
392 dl 1.300 * method scan) when they find tasks; to further reduce flailing,
393     * each worker signals only one other per activation. (2) Workers
394     * only enqueue after scanning (see below) and not finding any
395     * tasks. (3) Rather than CASing ctl to its current value in the
396     * common case where no action is required, we reduce write
397     * contention by equivalently prefacing signalWork when called by
398     * an external task producer using a memory access with
399     * full-volatile semantics or a "fullFence".
400 dl 1.243 *
401 jsr166 1.249 * Almost always, too many signals are issued. A task producer
402 dl 1.243 * cannot in general tell if some existing worker is in the midst
403     * of finishing one task (or already scanning) and ready to take
404     * another without being signalled. So the producer might instead
405     * activate a different worker that does not find any work, and
406     * then inactivates. This scarcely matters in steady-state
407     * computations involving all workers, but can create contention
408 jsr166 1.247 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 dl 1.243 * computations involving only a few workers.
410     *
411 dl 1.300 * Scanning. Method runWorker performs top-level scanning for
412     * tasks. Each scan traverses and tries to poll from each queue
413     * starting at a random index and circularly stepping. Scans are
414     * not performed in ideal random permutation order, to reduce
415     * cacheline contention. The pseudorandom generator need not have
416     * high-quality statistical properties in the long term, but just
417     * within computations; We use Marsaglia XorShifts (often via
418     * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419     * suffice. Scanning also employs contention reduction: When
420     * scanning workers fail to extract an apparently existing task,
421     * they soon restart at a different pseudorandom index. This
422     * improves throughput when many threads are trying to take tasks
423     * from few queues, which can be common in some usages. Scans do
424     * not otherwise explicitly take into account core affinities,
425     * loads, cache localities, etc, However, they do exploit temporal
426     * locality (which usually approximates these) by preferring to
427     * re-poll (at most #workers times) from the same queue after a
428     * successful poll before trying others.
429 dl 1.52 *
430     * Trimming workers. To release resources after periods of lack of
431     * use, a worker starting to wait when the pool is quiescent will
432 dl 1.300 * time out and terminate (see method scan) if the pool has
433     * remained quiescent for period given by field keepAlive.
434 dl 1.52 *
435 dl 1.210 * Shutdown and Termination. A call to shutdownNow invokes
436     * tryTerminate to atomically set a runState bit. The calling
437     * thread, as well as every other worker thereafter terminating,
438 dl 1.300 * helps terminate others by cancelling their unprocessed tasks,
439     * and waking them up, doing so repeatedly until stable. Calls to
440     * non-abrupt shutdown() preface this by checking whether
441     * termination should commence by sweeping through queues (until
442     * stable) to ensure lack of in-flight submissions and workers
443     * about to process them before triggering the "STOP" phase of
444     * termination.
445 dl 1.211 *
446 jsr166 1.84 * Joining Tasks
447     * =============
448 dl 1.78 *
449     * Any of several actions may be taken when one worker is waiting
450 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
451 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
452 dl 1.300 * always just let them block (as in Thread.join). We also cannot
453     * just reassign the joiner's run-time stack with another and
454     * replace it later, which would be a form of "continuation", that
455     * even if possible is not necessarily a good idea since we may
456     * need both an unblocked task and its continuation to progress.
457     * Instead we combine two tactics:
458 dl 1.19 *
459     * Helping: Arranging for the joiner to execute some task that it
460 dl 1.78 * would be running if the steal had not occurred.
461 dl 1.19 *
462     * Compensating: Unless there are already enough live threads,
463 dl 1.78 * method tryCompensate() may create or re-activate a spare
464     * thread to compensate for blocked joiners until they unblock.
465     *
466 dl 1.105 * A third form (implemented in tryRemoveAndExec) amounts to
467     * helping a hypothetical compensator: If we can readily tell that
468     * a possible action of a compensator is to steal and execute the
469     * task being joined, the joining thread can do so directly,
470 dl 1.300 * without the need for a compensation thread.
471 dl 1.52 *
472     * The ManagedBlocker extension API can't use helping so relies
473     * only on compensation in method awaitBlocker.
474 dl 1.19 *
475 dl 1.300 * The algorithm in awaitJoin entails a form of "linear helping".
476     * Each worker records (in field source) the id of the queue from
477     * which it last stole a task. The scan in method awaitJoin uses
478     * these markers to try to find a worker to help (i.e., steal back
479     * a task from and execute it) that could hasten completion of the
480     * actively joined task. Thus, the joiner executes a task that
481     * would be on its own local deque if the to-be-joined task had
482     * not been stolen. This is a conservative variant of the approach
483     * described in Wagner & Calder "Leapfrogging: a portable
484     * technique for implementing efficient futures" SIGPLAN Notices,
485     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486     * mainly in that we only record queue ids, not full dependency
487     * links. This requires a linear scan of the workQueues array to
488     * locate stealers, but isolates cost to when it is needed, rather
489     * than adding to per-task overhead. Searches can fail to locate
490     * stealers GC stalls and the like delay recording sources.
491     * Further, even when accurately identified, stealers might not
492     * ever produce a task that the joiner can in turn help with. So,
493     * compensation is tried upon failure to find tasks to run.
494 dl 1.105 *
495 dl 1.300 * Compensation does not by default aim to keep exactly the target
496 dl 1.200 * parallelism number of unblocked threads running at any given
497     * time. Some previous versions of this class employed immediate
498     * compensations for any blocked join. However, in practice, the
499     * vast majority of blockages are transient byproducts of GC and
500     * other JVM or OS activities that are made worse by replacement.
501 dl 1.300 * Rather than impose arbitrary policies, we allow users to
502     * override the default of only adding threads upon apparent
503     * starvation. The compensation mechanism may also be bounded.
504     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505     * JVMs to cope with programming errors and abuse before running
506     * out of resources to do so.
507 jsr166 1.301 *
508 dl 1.105 * Common Pool
509     * ===========
510     *
511 jsr166 1.175 * The static common pool always exists after static
512 dl 1.105 * initialization. Since it (or any other created pool) need
513     * never be used, we minimize initial construction overhead and
514 dl 1.300 * footprint to the setup of about a dozen fields.
515 dl 1.105 *
516     * When external threads submit to the common pool, they can
517 dl 1.200 * perform subtask processing (see externalHelpComplete and
518     * related methods) upon joins. This caller-helps policy makes it
519     * sensible to set common pool parallelism level to one (or more)
520     * less than the total number of available cores, or even zero for
521     * pure caller-runs. We do not need to record whether external
522     * submissions are to the common pool -- if not, external help
523     * methods return quickly. These submitters would otherwise be
524     * blocked waiting for completion, so the extra effort (with
525     * liberally sprinkled task status checks) in inapplicable cases
526     * amounts to an odd form of limited spin-wait before blocking in
527     * ForkJoinTask.join.
528 dl 1.105 *
529 dl 1.197 * As a more appropriate default in managed environments, unless
530     * overridden by system properties, we use workers of subclass
531     * InnocuousForkJoinWorkerThread when there is a SecurityManager
532     * present. These workers have no permissions set, do not belong
533     * to any user-defined ThreadGroup, and erase all ThreadLocals
534 dl 1.300 * after executing any top-level task (see
535     * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536     * in ForkJoinWorkerThread) may be JVM-dependent and must access
537     * particular Thread class fields to achieve this effect.
538 jsr166 1.198 *
539 dl 1.105 * Style notes
540     * ===========
541     *
542 dl 1.200 * Memory ordering relies mainly on Unsafe intrinsics that carry
543     * the further responsibility of explicitly performing null- and
544     * bounds- checks otherwise carried out implicitly by JVMs. This
545     * can be awkward and ugly, but also reflects the need to control
546     * outcomes across the unusual cases that arise in very racy code
547     * with very few invariants. So these explicit checks would exist
548     * in some form anyway. All fields are read into locals before
549     * use, and null-checked if they are references. This is usually
550     * done in a "C"-like style of listing declarations at the heads
551     * of methods or blocks, and using inline assignments on first
552     * encounter. Array bounds-checks are usually performed by
553     * masking with array.length-1, which relies on the invariant that
554     * these arrays are created with positive lengths, which is itself
555     * paranoically checked. Nearly all explicit checks lead to
556     * bypass/return, not exception throws, because they may
557     * legitimately arise due to cancellation/revocation during
558     * shutdown.
559     *
560 dl 1.105 * There is a lot of representation-level coupling among classes
561     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562     * fields of WorkQueue maintain data structures managed by
563     * ForkJoinPool, so are directly accessed. There is little point
564     * trying to reduce this, since any associated future changes in
565     * representations will need to be accompanied by algorithmic
566     * changes anyway. Several methods intrinsically sprawl because
567 dl 1.200 * they must accumulate sets of consistent reads of fields held in
568     * local variables. There are also other coding oddities
569     * (including several unnecessary-looking hoisted null checks)
570     * that help some methods perform reasonably even when interpreted
571     * (not compiled).
572 dl 1.52 *
573 dl 1.208 * The order of declarations in this file is (with a few exceptions):
574 dl 1.86 * (1) Static utility functions
575     * (2) Nested (static) classes
576     * (3) Static fields
577     * (4) Fields, along with constants used when unpacking some of them
578     * (5) Internal control methods
579     * (6) Callbacks and other support for ForkJoinTask methods
580     * (7) Exported methods
581     * (8) Static block initializing statics in minimally dependent order
582     */
583    
584     // Static utilities
585    
586     /**
587     * If there is a security manager, makes sure caller has
588     * permission to modify threads.
589 jsr166 1.1 */
590 dl 1.86 private static void checkPermission() {
591     SecurityManager security = System.getSecurityManager();
592     if (security != null)
593     security.checkPermission(modifyThreadPermission);
594     }
595    
596     // Nested classes
597 jsr166 1.1
598     /**
599 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
600     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601     * for {@code ForkJoinWorkerThread} subclasses that extend base
602     * functionality or initialize threads with different contexts.
603 jsr166 1.1 */
604     public static interface ForkJoinWorkerThreadFactory {
605     /**
606     * Returns a new worker thread operating in the given pool.
607 dl 1.300 * Returning null or throwing an exception may result in tasks
608     * never being executed. If this method throws an exception,
609     * it is relayed to the caller of the method (for example
610     * {@code execute}) causing attempted thread creation. If this
611     * method returns null or throws an exception, it is not
612     * retried until the next attempted creation (for example
613     * another call to {@code execute}).
614 jsr166 1.1 *
615     * @param pool the pool this thread works in
616 jsr166 1.296 * @return the new worker thread, or {@code null} if the request
617 dl 1.300 * to create a thread is rejected.
618 jsr166 1.11 * @throws NullPointerException if the pool is null
619 jsr166 1.1 */
620     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621     }
622    
623     /**
624     * Default ForkJoinWorkerThreadFactory implementation; creates a
625     * new ForkJoinWorkerThread.
626     */
627 jsr166 1.278 private static final class DefaultForkJoinWorkerThreadFactory
628 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
629 dl 1.112 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 dl 1.14 return new ForkJoinWorkerThread(pool);
631 jsr166 1.1 }
632     }
633    
634 dl 1.200 // Constants shared across ForkJoinPool and WorkQueue
635    
636     // Bounds
637 dl 1.300 static final int SWIDTH = 16; // width of short
638 dl 1.200 static final int SMASK = 0xffff; // short bits == max index
639     static final int MAX_CAP = 0x7fff; // max #workers - 1
640     static final int SQMASK = 0x007e; // max 64 (even) slots
641    
642 dl 1.300 // Masks and units for WorkQueue.phase and ctl sp subfield
643 dl 1.243 static final int UNSIGNALLED = 1 << 31; // must be negative
644 dl 1.211 static final int SS_SEQ = 1 << 16; // version count
645 dl 1.300 static final int QLOCK = 1; // must be 1
646 dl 1.200
647 dl 1.300 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648     static final int OWNED = 1; // queue has owner thread
649     static final int FIFO = 1 << 16; // fifo queue or access mode
650     static final int SATURATE = 1 << 17; // for tryCompensate
651     static final int SHUTDOWN = 1 << 18;
652     static final int TERMINATED = 1 << 19;
653     static final int STOP = 1 << 31; // must be negative
654     static final int QUIET = 1 << 30; // not scanning or working
655     static final int DORMANT = QUIET | UNSIGNALLED;
656    
657     /**
658     * The maximum number of local polls from the same queue before
659     * checking others. This is a safeguard against infinitely unfair
660     * looping under unbounded user task recursion, and must be larger
661     * than plausible cases of intentional bounded task recursion.
662 dl 1.253 */
663 dl 1.300 static final int POLL_LIMIT = 1 << 10;
664 dl 1.253
665     /**
666 dl 1.78 * Queues supporting work-stealing as well as external task
667 jsr166 1.202 * submission. See above for descriptions and algorithms.
668 dl 1.78 * Performance on most platforms is very sensitive to placement of
669     * instances of both WorkQueues and their arrays -- we absolutely
670     * do not want multiple WorkQueue instances or multiple queue
671 dl 1.200 * arrays sharing cache lines. The @Contended annotation alerts
672     * JVMs to try to keep instances apart.
673 dl 1.78 */
674 dl 1.300 // For now, using manual padding.
675     // @jdk.internal.vm.annotation.Contended
676     // @sun.misc.Contended
677 dl 1.78 static final class WorkQueue {
678 dl 1.200
679 dl 1.78 /**
680     * Capacity of work-stealing queue array upon initialization.
681 dl 1.90 * Must be a power of two; at least 4, but should be larger to
682     * reduce or eliminate cacheline sharing among queues.
683     * Currently, it is much larger, as a partial workaround for
684     * the fact that JVMs often place arrays in locations that
685     * share GC bookkeeping (especially cardmarks) such that
686     * per-write accesses encounter serious memory contention.
687 dl 1.78 */
688 dl 1.90 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689 dl 1.78
690     /**
691     * Maximum size for queue arrays. Must be a power of two less
692     * than or equal to 1 << (31 - width of array entry) to ensure
693     * lack of wraparound of index calculations, but defined to a
694     * value a bit less than this to help users trap runaway
695     * programs before saturating systems.
696     */
697     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698    
699 dl 1.200 // Instance fields
700 dl 1.300 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702     volatile int phase; // versioned, negative: queued, 1: locked
703     int stackPred; // pool stack (ctl) predecessor link
704 dl 1.178 int nsteals; // number of steals
705 dl 1.300 int id; // index, mode, tag
706     volatile int source; // source queue id, or sentinel
707 dl 1.78 volatile int base; // index of next slot for poll
708     int top; // index of next slot for push
709     ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 dl 1.90 final ForkJoinPool pool; // the containing pool (may be null)
711 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 dl 1.300 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713     volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714 dl 1.112
715 dl 1.200 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 dl 1.90 this.pool = pool;
717 dl 1.78 this.owner = owner;
718 dl 1.115 // Place indices in the center of array (that is not yet allocated)
719 dl 1.78 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720     }
721    
722     /**
723 jsr166 1.220 * Returns an exportable index (used by ForkJoinWorkerThread).
724 dl 1.200 */
725     final int getPoolIndex() {
726 dl 1.300 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 dl 1.200 }
728    
729     /**
730 dl 1.115 * Returns the approximate number of tasks in the queue.
731     */
732     final int queueSize() {
733 dl 1.243 int n = base - top; // read base first
734 dl 1.115 return (n >= 0) ? 0 : -n; // ignore transient negative
735     }
736    
737 jsr166 1.180 /**
738 dl 1.115 * Provides a more accurate estimate of whether this queue has
739     * any tasks than does queueSize, by checking whether a
740     * near-empty queue has at least one unclaimed task.
741     */
742     final boolean isEmpty() {
743 dl 1.300 ForkJoinTask<?>[] a; int n, al, b;
744     return ((n = (b = base) - top) >= 0 || // possibly one task
745 dl 1.243 (n == -1 && ((a = array) == null ||
746     (al = a.length) == 0 ||
747 dl 1.300 a[(al - 1) & b] == null)));
748 dl 1.115 }
749    
750 dl 1.300
751 dl 1.115 /**
752 dl 1.256 * Pushes a task. Call only by owner in unshared queues.
753 dl 1.78 *
754     * @param task the task. Caller must ensure non-null.
755 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
756 dl 1.78 */
757 dl 1.90 final void push(ForkJoinTask<?> task) {
758 dl 1.300 int s = top; ForkJoinTask<?>[] a; int al, d;
759 dl 1.243 if ((a = array) != null && (al = a.length) > 0) {
760 dl 1.300 int index = (al - 1) & s;
761     long offset = ((long)index << ASHIFT) + ABASE;
762     ForkJoinPool p = pool;
763 dl 1.243 top = s + 1;
764 dl 1.300 U.putOrderedObject(a, offset, task);
765 dl 1.292 if ((d = base - s) == 0 && p != null) {
766 dl 1.284 U.fullFence();
767 dl 1.253 p.signalWork();
768 dl 1.284 }
769 dl 1.300 else if (d + al == 1)
770 dl 1.243 growArray();
771 dl 1.78 }
772     }
773    
774 dl 1.178 /**
775 dl 1.112 * Initializes or doubles the capacity of array. Call either
776     * by owner or with lock held -- it is OK for base, but not
777     * top, to move while resizings are in progress.
778     */
779     final ForkJoinTask<?>[] growArray() {
780     ForkJoinTask<?>[] oldA = array;
781 dl 1.300 int oldSize = oldA != null ? oldA.length : 0;
782     int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 dl 1.225 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 dl 1.112 throw new RejectedExecutionException("Queue capacity exceeded");
785     int oldMask, t, b;
786     ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 dl 1.300 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 dl 1.112 (t = top) - (b = base) > 0) {
789     int mask = size - 1;
790 dl 1.200 do { // emulate poll from old array, push to new array
791 dl 1.256 int index = b & oldMask;
792     long offset = ((long)index << ASHIFT) + ABASE;
793 dl 1.243 ForkJoinTask<?> x = (ForkJoinTask<?>)
794     U.getObjectVolatile(oldA, offset);
795     if (x != null &&
796     U.compareAndSwapObject(oldA, offset, x, null))
797     a[b & mask] = x;
798 dl 1.112 } while (++b != t);
799 dl 1.243 U.storeFence();
800 dl 1.78 }
801 dl 1.112 return a;
802 dl 1.78 }
803    
804     /**
805 dl 1.90 * Takes next task, if one exists, in LIFO order. Call only
806 dl 1.102 * by owner in unshared queues.
807 dl 1.90 */
808     final ForkJoinTask<?> pop() {
809 dl 1.243 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 dl 1.256 if ((a = array) != null && b != s && (al = a.length) > 0) {
811     int index = (al - 1) & --s;
812     long offset = ((long)index << ASHIFT) + ABASE;
813 dl 1.262 ForkJoinTask<?> t = (ForkJoinTask<?>)
814     U.getObject(a, offset);
815     if (t != null &&
816     U.compareAndSwapObject(a, offset, t, null)) {
817 dl 1.256 top = s;
818 dl 1.300 U.storeFence();
819 dl 1.78 return t;
820     }
821     }
822     return null;
823     }
824    
825     /**
826 dl 1.90 * Takes next task, if one exists, in FIFO order.
827 dl 1.78 */
828 dl 1.90 final ForkJoinTask<?> poll() {
829 dl 1.243 for (;;) {
830     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831     if ((a = array) != null && (d = b - s) < 0 &&
832     (al = a.length) > 0) {
833 dl 1.256 int index = (al - 1) & b;
834     long offset = ((long)index << ASHIFT) + ABASE;
835 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
836     U.getObjectVolatile(a, offset);
837     if (b++ == base) {
838     if (t != null) {
839     if (U.compareAndSwapObject(a, offset, t, null)) {
840     base = b;
841     return t;
842     }
843 dl 1.200 }
844 dl 1.243 else if (d == -1)
845     break; // now empty
846 dl 1.78 }
847 dl 1.90 }
848 dl 1.243 else
849     break;
850 dl 1.78 }
851     return null;
852     }
853    
854     /**
855     * Takes next task, if one exists, in order specified by mode.
856     */
857     final ForkJoinTask<?> nextLocalTask() {
858 dl 1.300 return ((id & FIFO) != 0) ? poll() : pop();
859 dl 1.78 }
860    
861     /**
862     * Returns next task, if one exists, in order specified by mode.
863     */
864     final ForkJoinTask<?> peek() {
865 dl 1.292 int al; ForkJoinTask<?>[] a;
866 dl 1.243 return ((a = array) != null && (al = a.length) > 0) ?
867 dl 1.300 a[(al - 1) &
868     ((id & FIFO) != 0 ? base : top - 1)] : null;
869 dl 1.78 }
870    
871     /**
872     * Pops the given task only if it is at the current top.
873 jsr166 1.251 */
874 dl 1.243 final boolean tryUnpush(ForkJoinTask<?> task) {
875 dl 1.256 int b = base, s = top, al; ForkJoinTask<?>[] a;
876     if ((a = array) != null && b != s && (al = a.length) > 0) {
877     int index = (al - 1) & --s;
878     long offset = ((long)index << ASHIFT) + ABASE;
879     if (U.compareAndSwapObject(a, offset, task, null)) {
880 dl 1.243 top = s;
881 dl 1.300 U.storeFence();
882 dl 1.224 return true;
883     }
884 dl 1.78 }
885     return false;
886     }
887    
888     /**
889 jsr166 1.84 * Removes and cancels all known tasks, ignoring any exceptions.
890 dl 1.78 */
891     final void cancelAll() {
892 dl 1.300 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 dl 1.78 ForkJoinTask.cancelIgnoringExceptions(t);
894     }
895    
896 dl 1.104 // Specialized execution methods
897 dl 1.78
898     /**
899 dl 1.300 * Pops and executes up to limit consecutive tasks or until empty.
900     *
901     * @param limit max runs, or zero for no limit
902 dl 1.253 */
903 dl 1.300 final void localPopAndExec(int limit) {
904     for (;;) {
905 dl 1.253 int b = base, s = top, al; ForkJoinTask<?>[] a;
906     if ((a = array) != null && b != s && (al = a.length) > 0) {
907 dl 1.256 int index = (al - 1) & --s;
908     long offset = ((long)index << ASHIFT) + ABASE;
909 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
910     U.getAndSetObject(a, offset, null);
911     if (t != null) {
912     top = s;
913 dl 1.300 U.storeFence();
914     t.doExec();
915     if (limit != 0 && --limit == 0)
916 dl 1.253 break;
917     }
918     else
919     break;
920     }
921     else
922     break;
923     }
924     }
925    
926     /**
927 dl 1.300 * Polls and executes up to limit consecutive tasks or until empty.
928     *
929     * @param limit, or zero for no limit
930 dl 1.253 */
931 dl 1.300 final void localPollAndExec(int limit) {
932     for (int polls = 0;;) {
933     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934     if ((a = array) != null && (d = b - s) < 0 &&
935     (al = a.length) > 0) {
936 dl 1.256 int index = (al - 1) & b++;
937     long offset = ((long)index << ASHIFT) + ABASE;
938 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
939     U.getAndSetObject(a, offset, null);
940     if (t != null) {
941     base = b;
942 dl 1.255 t.doExec();
943 dl 1.300 if (limit != 0 && ++polls == limit)
944 dl 1.253 break;
945     }
946 dl 1.300 else if (d == -1)
947     break; // now empty
948     else
949     polls = 0; // stolen; reset
950 dl 1.253 }
951     else
952     break;
953     }
954     }
955    
956     /**
957 jsr166 1.302 * If present, removes task from queue and executes it.
958 dl 1.94 */
959 dl 1.300 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960     ForkJoinTask<?>[] wa; int s, wal;
961     if (base - (s = top) < 0 && // traverse from top
962     (wa = array) != null && (wal = wa.length) > 0) {
963     for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964     int index = i & m;
965     long offset = (index << ASHIFT) + ABASE;
966     ForkJoinTask<?> t = (ForkJoinTask<?>)
967     U.getObject(wa, offset);
968     if (t == null)
969     break;
970     else if (t == task) {
971     if (U.compareAndSwapObject(wa, offset, t, null)) {
972     top = ns; // safely shift down
973     for (int j = i; j != ns; ++j) {
974     ForkJoinTask<?> f;
975     int pindex = (j + 1) & m;
976     long pOffset = (pindex << ASHIFT) + ABASE;
977     f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978     U.putObjectVolatile(wa, pOffset, null);
979    
980     int jindex = j & m;
981     long jOffset = (jindex << ASHIFT) + ABASE;
982     U.putOrderedObject(wa, jOffset, f);
983     }
984     U.storeFence();
985     t.doExec();
986     }
987     break;
988     }
989 dl 1.262 }
990 dl 1.215 }
991     }
992    
993     /**
994 dl 1.300 * Tries to steal and run tasks within the target's
995 jsr166 1.302 * computation until done, not found, or limit exceeded.
996 dl 1.94 *
997 dl 1.300 * @param task root of CountedCompleter computation
998     * @param limit max runs, or zero for no limit
999     * @return task status on exit
1000     */
1001     final int localHelpCC(CountedCompleter<?> task, int limit) {
1002     int status = 0;
1003     if (task != null && (status = task.status) >= 0) {
1004     for (;;) {
1005     boolean help = false;
1006     int b = base, s = top, al; ForkJoinTask<?>[] a;
1007     if ((a = array) != null && b != s && (al = a.length) > 0) {
1008     int index = (al - 1) & (s - 1);
1009     long offset = ((long)index << ASHIFT) + ABASE;
1010     ForkJoinTask<?> o = (ForkJoinTask<?>)
1011     U.getObject(a, offset);
1012     if (o instanceof CountedCompleter) {
1013     CountedCompleter<?> t = (CountedCompleter<?>)o;
1014     for (CountedCompleter<?> f = t;;) {
1015     if (f != task) {
1016     if ((f = f.completer) == null) // try parent
1017     break;
1018     }
1019     else {
1020     if (U.compareAndSwapObject(a, offset,
1021     t, null)) {
1022     top = s - 1;
1023     U.storeFence();
1024     t.doExec();
1025     help = true;
1026     }
1027     break;
1028 dl 1.200 }
1029     }
1030 dl 1.243 }
1031 dl 1.104 }
1032 dl 1.300 if ((status = task.status) < 0 || !help ||
1033     (limit != 0 && --limit == 0))
1034     break;
1035 dl 1.104 }
1036     }
1037 dl 1.300 return status;
1038     }
1039    
1040     // Operations on shared queues
1041    
1042     /**
1043 jsr166 1.302 * Tries to lock shared queue by CASing phase field.
1044 dl 1.300 */
1045     final boolean tryLockSharedQueue() {
1046     return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 dl 1.104 }
1048    
1049     /**
1050 dl 1.300 * Shared version of tryUnpush.
1051 dl 1.78 */
1052 dl 1.300 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053     boolean popped = false;
1054     int s = top - 1, al; ForkJoinTask<?>[] a;
1055     if ((a = array) != null && (al = a.length) > 0) {
1056     int index = (al - 1) & s;
1057 dl 1.256 long offset = ((long)index << ASHIFT) + ABASE;
1058 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059     if (t == task &&
1060     U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061     if (top == s + 1 && array == a &&
1062     U.compareAndSwapObject(a, offset, task, null)) {
1063     popped = true;
1064     top = s;
1065 dl 1.178 }
1066 dl 1.300 U.putOrderedInt(this, PHASE, 0);
1067 dl 1.94 }
1068 dl 1.78 }
1069 dl 1.300 return popped;
1070 dl 1.78 }
1071    
1072     /**
1073 dl 1.300 * Shared version of localHelpCC.
1074     */
1075     final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076     int status = 0;
1077     if (task != null && (status = task.status) >= 0) {
1078     for (;;) {
1079     boolean help = false;
1080     int b = base, s = top, al; ForkJoinTask<?>[] a;
1081     if ((a = array) != null && b != s && (al = a.length) > 0) {
1082     int index = (al - 1) & (s - 1);
1083     long offset = ((long)index << ASHIFT) + ABASE;
1084     ForkJoinTask<?> o = (ForkJoinTask<?>)
1085     U.getObject(a, offset);
1086     if (o instanceof CountedCompleter) {
1087     CountedCompleter<?> t = (CountedCompleter<?>)o;
1088     for (CountedCompleter<?> f = t;;) {
1089     if (f != task) {
1090     if ((f = f.completer) == null)
1091     break;
1092     }
1093     else {
1094     if (U.compareAndSwapInt(this, PHASE,
1095     0, QLOCK)) {
1096     if (top == s && array == a &&
1097     U.compareAndSwapObject(a, offset,
1098     t, null)) {
1099     help = true;
1100     top = s - 1;
1101     }
1102     U.putOrderedInt(this, PHASE, 0);
1103     if (help)
1104     t.doExec();
1105     }
1106     break;
1107     }
1108 dl 1.243 }
1109 dl 1.200 }
1110 dl 1.178 }
1111 dl 1.300 if ((status = task.status) < 0 || !help ||
1112     (limit != 0 && --limit == 0))
1113     break;
1114 dl 1.178 }
1115 dl 1.78 }
1116 dl 1.300 return status;
1117 dl 1.78 }
1118    
1119     /**
1120 dl 1.86 * Returns true if owned and not known to be blocked.
1121     */
1122     final boolean isApparentlyUnblocked() {
1123     Thread wt; Thread.State s;
1124 dl 1.300 return ((wt = owner) != null &&
1125 dl 1.86 (s = wt.getState()) != Thread.State.BLOCKED &&
1126     s != Thread.State.WAITING &&
1127     s != Thread.State.TIMED_WAITING);
1128     }
1129    
1130 dl 1.211 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 dl 1.300 private static final long PHASE;
1133 jsr166 1.291 private static final int ABASE;
1134     private static final int ASHIFT;
1135 dl 1.78 static {
1136     try {
1137 dl 1.300 PHASE = U.objectFieldOffset
1138     (WorkQueue.class.getDeclaredField("phase"));
1139 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140     int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 jsr166 1.142 if ((scale & (scale - 1)) != 0)
1142 jsr166 1.232 throw new Error("array index scale not a power of two");
1143 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 jsr166 1.231 } catch (ReflectiveOperationException e) {
1145 dl 1.78 throw new Error(e);
1146     }
1147     }
1148     }
1149 dl 1.14
1150 dl 1.112 // static fields (initialized in static initializer below)
1151    
1152     /**
1153     * Creates a new ForkJoinWorkerThread. This factory is used unless
1154     * overridden in ForkJoinPool constructors.
1155     */
1156     public static final ForkJoinWorkerThreadFactory
1157     defaultForkJoinWorkerThreadFactory;
1158    
1159 jsr166 1.1 /**
1160 dl 1.115 * Permission required for callers of methods that may start or
1161 dl 1.300 * kill threads.
1162 dl 1.115 */
1163 jsr166 1.276 static final RuntimePermission modifyThreadPermission;
1164 dl 1.115
1165     /**
1166 dl 1.101 * Common (static) pool. Non-null for public use unless a static
1167 dl 1.105 * construction exception, but internal usages null-check on use
1168     * to paranoically avoid potential initialization circularities
1169     * as well as to simplify generated code.
1170 dl 1.101 */
1171 dl 1.134 static final ForkJoinPool common;
1172 dl 1.101
1173     /**
1174 dl 1.160 * Common pool parallelism. To allow simpler use and management
1175     * when common pool threads are disabled, we allow the underlying
1176 dl 1.185 * common.parallelism field to be zero, but in that case still report
1177 dl 1.160 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 dl 1.90 */
1179 jsr166 1.274 static final int COMMON_PARALLELISM;
1180 dl 1.90
1181     /**
1182 dl 1.208 * Limit on spare thread construction in tryCompensate.
1183     */
1184 jsr166 1.273 private static final int COMMON_MAX_SPARES;
1185 dl 1.208
1186     /**
1187 dl 1.105 * Sequence number for creating workerNamePrefix.
1188 dl 1.86 */
1189 dl 1.105 private static int poolNumberSequence;
1190 dl 1.86
1191 jsr166 1.1 /**
1192 jsr166 1.132 * Returns the next sequence number. We don't expect this to
1193     * ever contend, so use simple builtin sync.
1194 dl 1.83 */
1195 dl 1.105 private static final synchronized int nextPoolId() {
1196     return ++poolNumberSequence;
1197     }
1198 dl 1.86
1199 dl 1.200 // static configuration constants
1200 dl 1.86
1201     /**
1202 dl 1.300 * Default idle timeout value (in milliseconds) for the thread
1203     * triggering quiescence to park waiting for new work
1204 dl 1.86 */
1205 dl 1.300 private static final long DEFAULT_KEEPALIVE = 60000L;
1206 dl 1.86
1207     /**
1208 dl 1.300 * Undershoot tolerance for idle timeouts
1209 dl 1.120 */
1210 dl 1.300 private static final long TIMEOUT_SLOP = 20L;
1211 dl 1.200
1212     /**
1213 jsr166 1.273 * The default value for COMMON_MAX_SPARES. Overridable using the
1214     * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215     * property. The default value is far in excess of normal
1216     * requirements, but also far short of MAX_CAP and typical OS
1217     * thread limits, so allows JVMs to catch misuse/abuse before
1218     * running out of resources needed to do so.
1219 dl 1.200 */
1220 dl 1.208 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221 dl 1.120
1222     /**
1223 dl 1.90 * Increment for seed generators. See class ThreadLocal for
1224     * explanation.
1225     */
1226 dl 1.193 private static final int SEED_INCREMENT = 0x9e3779b9;
1227 dl 1.83
1228 jsr166 1.163 /*
1229 dl 1.200 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 dl 1.300 * RC: Number of released (unqueued) workers minus target parallelism
1231 dl 1.200 * TC: Number of total workers minus target parallelism
1232     * SS: version count and status of top waiting thread
1233     * ID: poolIndex of top of Treiber stack of waiters
1234     *
1235     * When convenient, we can extract the lower 32 stack top bits
1236     * (including version bits) as sp=(int)ctl. The offsets of counts
1237     * by the target parallelism and the positionings of fields makes
1238     * it possible to perform the most common checks via sign tests of
1239 dl 1.300 * fields: When ac is negative, there are not enough unqueued
1240 dl 1.200 * workers, when tc is negative, there are not enough total
1241     * workers. When sp is non-zero, there are waiting workers. To
1242     * deal with possibly negative fields, we use casts in and out of
1243     * "short" and/or signed shifts to maintain signedness.
1244     *
1245 dl 1.300 * Because it occupies uppermost bits, we can add one release count
1246     * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 dl 1.200 * from a blocked join. Other updates entail multiple subfields
1248     * and masking, requiring CAS.
1249 dl 1.300 *
1250     * The limits packed in field "bounds" are also offset by the
1251     * parallelism level to make them comparable to the ctl rc and tc
1252     * fields.
1253 dl 1.200 */
1254    
1255     // Lower and upper word masks
1256     private static final long SP_MASK = 0xffffffffL;
1257     private static final long UC_MASK = ~SP_MASK;
1258 dl 1.86
1259 dl 1.300 // Release counts
1260     private static final int RC_SHIFT = 48;
1261     private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262     private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263 dl 1.200
1264     // Total counts
1265 dl 1.86 private static final int TC_SHIFT = 32;
1266 dl 1.200 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267     private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269    
1270 dl 1.300 // Instance fields
1271 dl 1.86
1272 dl 1.300 // Segregate ctl field, For now using padding vs @Contended
1273     // @jdk.internal.vm.annotation.Contended("fjpctl")
1274     // @sun.misc.Contended("fjpctl")
1275     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 dl 1.200 volatile long ctl; // main pool control
1278 dl 1.300 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279     volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280    
1281     volatile long stealCount; // collects worker nsteals
1282     final long keepAlive; // milliseconds before dropping if idle
1283     int indexSeed; // next worker index
1284     final int bounds; // min, max threads packed as shorts
1285     volatile int mode; // parallelism, runstate, queue mode
1286     WorkQueue[] workQueues; // main registry
1287     final String workerNamePrefix; // for worker thread string; sync lock
1288 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1289 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1290 dl 1.101
1291 dl 1.200 // Creating, registering and deregistering workers
1292    
1293 dl 1.112 /**
1294 dl 1.200 * Tries to construct and start one worker. Assumes that total
1295     * count has already been incremented as a reservation. Invokes
1296     * deregisterWorker on any failure.
1297     *
1298     * @return true if successful
1299 dl 1.115 */
1300 dl 1.300 private boolean createWorker() {
1301 dl 1.200 ForkJoinWorkerThreadFactory fac = factory;
1302     Throwable ex = null;
1303     ForkJoinWorkerThread wt = null;
1304     try {
1305     if (fac != null && (wt = fac.newThread(this)) != null) {
1306     wt.start();
1307     return true;
1308 dl 1.115 }
1309 dl 1.200 } catch (Throwable rex) {
1310     ex = rex;
1311 dl 1.112 }
1312 dl 1.200 deregisterWorker(wt, ex);
1313     return false;
1314 dl 1.112 }
1315    
1316 dl 1.200 /**
1317     * Tries to add one worker, incrementing ctl counts before doing
1318     * so, relying on createWorker to back out on failure.
1319     *
1320     * @param c incoming ctl value, with total count negative and no
1321     * idle workers. On CAS failure, c is refreshed and retried if
1322 jsr166 1.202 * this holds (otherwise, a new worker is not needed).
1323 dl 1.200 */
1324     private void tryAddWorker(long c) {
1325     do {
1326 dl 1.300 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 dl 1.200 (TC_MASK & (c + TC_UNIT)));
1328 dl 1.243 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 dl 1.300 createWorker();
1330 dl 1.243 break;
1331 dl 1.200 }
1332     } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333     }
1334 dl 1.112
1335     /**
1336 dl 1.200 * Callback from ForkJoinWorkerThread constructor to establish and
1337     * record its WorkQueue.
1338 dl 1.112 *
1339     * @param wt the worker thread
1340 dl 1.115 * @return the worker's queue
1341 dl 1.112 */
1342 dl 1.115 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 dl 1.200 UncaughtExceptionHandler handler;
1344 dl 1.300 wt.setDaemon(true); // configure thread
1345 dl 1.115 if ((handler = ueh) != null)
1346     wt.setUncaughtExceptionHandler(handler);
1347 dl 1.200 WorkQueue w = new WorkQueue(this, wt);
1348 dl 1.300 int tid = 0; // for thread name
1349     int fifo = mode & FIFO;
1350     String prefix = workerNamePrefix;
1351     if (prefix != null) {
1352 jsr166 1.301 synchronized (prefix) {
1353 dl 1.300 WorkQueue[] ws = workQueues; int n;
1354     int s = indexSeed += SEED_INCREMENT;
1355     if (ws != null && (n = ws.length) > 1) {
1356     int m = n - 1;
1357     tid = s & m;
1358     int i = m & ((s << 1) | 1); // odd-numbered indices
1359     for (int probes = n >>> 1;;) { // find empty slot
1360     WorkQueue q;
1361     if ((q = ws[i]) == null || q.phase == QUIET)
1362     break;
1363     else if (--probes == 0) {
1364     i = n | 1; // resize below
1365     break;
1366     }
1367     else
1368     i = (i + 2) & m;
1369     }
1370    
1371     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372     w.phase = w.id = id; // now publishable
1373    
1374     if (i < n)
1375     ws[i] = w;
1376     else { // expand array
1377     int an = n << 1;
1378     WorkQueue[] as = new WorkQueue[an];
1379     as[i] = w;
1380     int am = an - 1;
1381     for (int j = 0; j < n; ++j) {
1382     WorkQueue v; // copy external queue
1383     if ((v = ws[j]) != null) // position may change
1384     as[v.id & am & SQMASK] = v;
1385     if (++j >= n)
1386     break;
1387     as[j] = ws[j]; // copy worker
1388 dl 1.94 }
1389 dl 1.300 workQueues = as;
1390 dl 1.94 }
1391     }
1392 dl 1.78 }
1393 dl 1.300 wt.setName(prefix.concat(Integer.toString(tid)));
1394 dl 1.78 }
1395 dl 1.115 return w;
1396 dl 1.78 }
1397 dl 1.19
1398 jsr166 1.1 /**
1399 dl 1.86 * Final callback from terminating worker, as well as upon failure
1400 dl 1.105 * to construct or start a worker. Removes record of worker from
1401     * array, and adjusts counts. If pool is shutting down, tries to
1402     * complete termination.
1403 dl 1.78 *
1404 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1405 dl 1.78 * @param ex the exception causing failure, or null if none
1406 dl 1.45 */
1407 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408     WorkQueue w = null;
1409 dl 1.300 int phase = 0;
1410 dl 1.78 if (wt != null && (w = wt.workQueue) != null) {
1411 dl 1.300 Object lock = workerNamePrefix;
1412     long ns = (long)w.nsteals & 0xffffffffL;
1413     int idx = w.id & SMASK;
1414     if (lock != null) {
1415     WorkQueue[] ws; // remove index from array
1416 jsr166 1.301 synchronized (lock) {
1417 dl 1.243 if ((ws = workQueues) != null && ws.length > idx &&
1418     ws[idx] == w)
1419     ws[idx] = null;
1420 dl 1.300 stealCount += ns;
1421 dl 1.243 }
1422     }
1423 dl 1.300 phase = w.phase;
1424 dl 1.243 }
1425 dl 1.300 if (phase != QUIET) { // else pre-adjusted
1426 dl 1.243 long c; // decrement counts
1427     do {} while (!U.compareAndSwapLong
1428 dl 1.300 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 dl 1.243 (TC_MASK & (c - TC_UNIT)) |
1430     (SP_MASK & c))));
1431     }
1432 dl 1.300 if (w != null)
1433 dl 1.200 w.cancelAll(); // cancel remaining tasks
1434 dl 1.300
1435     if (!tryTerminate(false, false) && // possibly replace worker
1436     w != null && w.array != null) // avoid repeated failures
1437     signalWork();
1438    
1439 dl 1.200 if (ex == null) // help clean on way out
1440 dl 1.120 ForkJoinTask.helpExpungeStaleExceptions();
1441 dl 1.200 else // rethrow
1442 dl 1.104 ForkJoinTask.rethrow(ex);
1443 dl 1.78 }
1444 dl 1.52
1445 dl 1.19 /**
1446 dl 1.300 * Tries to create or release a worker if too few are running.
1447 dl 1.105 */
1448 dl 1.253 final void signalWork() {
1449 dl 1.243 for (;;) {
1450 dl 1.300 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 dl 1.243 if ((c = ctl) >= 0L) // enough workers
1452     break;
1453     else if ((sp = (int)c) == 0) { // no idle workers
1454     if ((c & ADD_WORKER) != 0L) // too few workers
1455 dl 1.200 tryAddWorker(c);
1456     break;
1457     }
1458 dl 1.243 else if ((ws = workQueues) == null)
1459     break; // unstarted/terminated
1460     else if (ws.length <= (i = sp & SMASK))
1461     break; // terminated
1462     else if ((v = ws[i]) == null)
1463     break; // terminating
1464     else {
1465 dl 1.300 int np = sp & ~UNSIGNALLED;
1466     int vp = v.phase;
1467     long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468     Thread vt = v.owner;
1469     if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470     v.phase = np;
1471     if (v.source < 0)
1472     LockSupport.unpark(vt);
1473 dl 1.243 break;
1474     }
1475 dl 1.174 }
1476 dl 1.52 }
1477 dl 1.14 }
1478    
1479 dl 1.200 /**
1480 dl 1.300 * Tries to decrement counts (sometimes implicitly) and possibly
1481     * arrange for a compensating worker in preparation for blocking:
1482     * If not all core workers yet exist, creates one, else if any are
1483     * unreleased (possibly including caller) releases one, else if
1484     * fewer than the minimum allowed number of workers running,
1485     * checks to see that they are all active, and if so creates an
1486     * extra worker unless over maximum limit and policy is to
1487     * saturate. Most of these steps can fail due to interference, in
1488     * which case 0 is returned so caller will retry. A negative
1489     * return value indicates that the caller doesn't need to
1490     * re-adjust counts when later unblocked.
1491 dl 1.243 *
1492 dl 1.300 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 dl 1.243 */
1494 dl 1.300 private int tryCompensate(WorkQueue w) {
1495     int t, n, sp;
1496     long c = ctl;
1497     WorkQueue[] ws = workQueues;
1498     if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499     if (ws == null || (n = ws.length) <= 0 || w == null)
1500     return 0; // disabled
1501     else if ((sp = (int)c) != 0) { // replace or release
1502     WorkQueue v = ws[sp & (n - 1)];
1503     int wp = w.phase;
1504     long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505     int np = sp & ~UNSIGNALLED;
1506     if (v != null) {
1507     int vp = v.phase;
1508     Thread vt = v.owner;
1509     long nc = ((long)v.stackPred & SP_MASK) | uc;
1510     if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511     v.phase = np;
1512     if (v.source < 0)
1513     LockSupport.unpark(vt);
1514     return (wp < 0) ? -1 : 1;
1515     }
1516     }
1517     return 0;
1518     }
1519     else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520     (short)(bounds & SMASK) > 0) {
1521     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522     return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523     }
1524     else { // validate
1525     int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526     boolean unstable = false;
1527     for (int i = 1; i < n; i += 2) {
1528     WorkQueue q; Thread wt; Thread.State ts;
1529     if ((q = ws[i]) != null) {
1530     if (q.source == 0) {
1531     unstable = true;
1532     break;
1533     }
1534     else {
1535     --tc;
1536     if ((wt = q.owner) != null &&
1537     ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538     ts == Thread.State.WAITING))
1539     ++bc; // worker is blocking
1540     }
1541 dl 1.243 }
1542     }
1543 dl 1.300 if (unstable || tc != 0 || ctl != c)
1544     return 0; // inconsistent
1545     else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546     if ((md & SATURATE) != 0)
1547     return -1;
1548     else if (bc < pc) { // lagging
1549     Thread.yield(); // for retry spins
1550     return 0;
1551 dl 1.243 }
1552     else
1553 dl 1.300 throw new RejectedExecutionException(
1554     "Thread limit exceeded replacing blocked worker");
1555 dl 1.200 }
1556 dl 1.177 }
1557 dl 1.243 }
1558 dl 1.300
1559     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560     return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 dl 1.243 }
1562    
1563     /**
1564     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 dl 1.300 * See above for explanation.
1566 dl 1.243 */
1567     final void runWorker(WorkQueue w) {
1568 dl 1.300 WorkQueue[] ws;
1569 dl 1.243 w.growArray(); // allocate queue
1570 dl 1.300 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571     if (r == 0) // initial nonzero seed
1572     r = 1;
1573     int lastSignalId = 0; // avoid unneeded signals
1574     while ((ws = workQueues) != null) {
1575     boolean nonempty = false; // scan
1576     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578     if ((i = r & m) >= 0 && i < n && // always true
1579     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 dl 1.243 (a = q.array) != null && (al = a.length) > 0) {
1581 dl 1.300 int qid = q.id; // (never zero)
1582 dl 1.256 int index = (al - 1) & b;
1583     long offset = ((long)index << ASHIFT) + ABASE;
1584 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585     U.getObjectVolatile(a, offset);
1586 dl 1.300 if (t != null && b++ == q.base &&
1587     U.compareAndSwapObject(a, offset, t, null)) {
1588     if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589     signalWork(); // propagate signal
1590     w.source = lastSignalId = qid;
1591     t.doExec();
1592     if ((w.id & FIFO) != 0) // run remaining locals
1593     w.localPollAndExec(POLL_LIMIT);
1594     else
1595     w.localPopAndExec(POLL_LIMIT);
1596     ForkJoinWorkerThread thread = w.owner;
1597     ++w.nsteals;
1598     w.source = 0; // now idle
1599     if (thread != null)
1600     thread.afterTopLevelExec();
1601 dl 1.243 }
1602 dl 1.300 nonempty = true;
1603 dl 1.178 }
1604 dl 1.300 else if (nonempty)
1605 dl 1.243 break;
1606 dl 1.300 else
1607     ++r;
1608 dl 1.120 }
1609 dl 1.178
1610 dl 1.300 if (nonempty) { // move (xorshift)
1611     r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612     }
1613     else {
1614     int phase;
1615     lastSignalId = 0; // clear for next scan
1616     if ((phase = w.phase) >= 0) { // enqueue
1617     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618     long c, nc;
1619     do {
1620     w.stackPred = (int)(c = ctl);
1621     nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622     } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623     }
1624     else { // already queued
1625     int pred = w.stackPred;
1626     w.source = DORMANT; // enable signal
1627     for (int steps = 0;;) {
1628     int md, rc; long c;
1629     if (w.phase >= 0) {
1630     w.source = 0;
1631     break;
1632     }
1633     else if ((md = mode) < 0) // shutting down
1634     return;
1635     else if ((rc = ((md & SMASK) + // possibly quiescent
1636     (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637     (md & SHUTDOWN) != 0 &&
1638     tryTerminate(false, false))
1639     return; // help terminate
1640     else if ((++steps & 1) == 0)
1641     Thread.interrupted(); // clear between parks
1642     else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643     long d = keepAlive + System.currentTimeMillis();
1644     LockSupport.parkUntil(this, d);
1645     if (ctl == c &&
1646     d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647     long nc = ((UC_MASK & (c - TC_UNIT)) |
1648     (SP_MASK & pred));
1649     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650     w.phase = QUIET;
1651     return; // drop on timeout
1652     }
1653     }
1654     }
1655     else
1656     LockSupport.park(this);
1657     }
1658     }
1659     }
1660     }
1661     }
1662 dl 1.200
1663 dl 1.178 /**
1664 dl 1.300 * Helps and/or blocks until the given task is done or timeout.
1665     * First tries locally helping, then scans other queues for a task
1666     * produced by one of w's stealers; compensating and blocking if
1667     * none are found (rescanning if tryCompensate fails).
1668 dl 1.200 *
1669     * @param w caller
1670 dl 1.300 * @param task the task
1671     * @param deadline for timed waits, if nonzero
1672 dl 1.200 * @return task status on exit
1673     */
1674 dl 1.300 final int xawaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1675     int s = 0;
1676     if (w != null && task != null &&
1677     (!(task instanceof CountedCompleter) ||
1678     (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1679     w.tryRemoveAndExec(task);
1680     int src = w.source, id = w.id, block = 0;
1681     s = task.status;
1682     while (s >= 0) {
1683     WorkQueue[] ws;
1684     boolean nonempty = false;
1685     int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1686     if ((ws = workQueues) != null) { // scan for matching id
1687     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1688     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1689     if ((i = r & m) >= 0 && i < n &&
1690     (q = ws[i]) != null && q.source == id &&
1691     (b = q.base) - q.top < 0 &&
1692     (a = q.array) != null && (al = a.length) > 0) {
1693     int qid = q.id;
1694     if (block > 0)
1695     U.getAndAddLong(this, CTL, RC_UNIT);
1696     block = 0;
1697     int index = (al - 1) & b;
1698     long offset = ((long)index << ASHIFT) + ABASE;
1699     ForkJoinTask<?> t = (ForkJoinTask<?>)
1700     U.getObjectVolatile(a, offset);
1701     if (t != null && b++ == q.base && id == q.source &&
1702     U.compareAndSwapObject(a, offset, t, null)) {
1703     q.base = b;
1704     w.source = qid;
1705     t.doExec();
1706     w.source = src;
1707     }
1708     nonempty = true;
1709     break;
1710     }
1711     else
1712     r += 2;
1713     }
1714     }
1715 dl 1.200 if ((s = task.status) < 0)
1716     break;
1717 dl 1.300 else if (!nonempty) {
1718     long ms, ns;
1719     if (deadline == 0L)
1720     ms = 0L; // untimed
1721     else if ((ns = deadline - System.nanoTime()) <= 0L)
1722     break; // timeout
1723     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1724     ms = 1L; // avoid 0 for timed wait
1725     if (block == 0)
1726     block = tryCompensate(w);
1727     else
1728     task.internalWait(ms);
1729     s = task.status;
1730 dl 1.200 }
1731 dl 1.300 }
1732     if (block > 0)
1733     U.getAndAddLong(this, CTL, RC_UNIT);
1734     }
1735     return s;
1736     }
1737    
1738     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1739     int s = 0;
1740     if (w != null && task != null &&
1741     (!(task instanceof CountedCompleter) ||
1742     (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1743     w.tryRemoveAndExec(task);
1744     int src = w.source, id = w.id;
1745     s = task.status;
1746     while (s >= 0) {
1747     WorkQueue[] ws;
1748     boolean nonempty = false;
1749     int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1750     if ((ws = workQueues) != null) { // scan for matching id
1751     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1752     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1753     if ((i = (r + j) & m) >= 0 && i < n &&
1754     (q = ws[i]) != null && q.source == id &&
1755     (b = q.base) - q.top < 0 &&
1756     (a = q.array) != null && (al = a.length) > 0) {
1757     int qid = q.id;
1758     int index = (al - 1) & b;
1759     long offset = ((long)index << ASHIFT) + ABASE;
1760     ForkJoinTask<?> t = (ForkJoinTask<?>)
1761     U.getObjectVolatile(a, offset);
1762     if (t != null && b++ == q.base && id == q.source &&
1763     U.compareAndSwapObject(a, offset, t, null)) {
1764     q.base = b;
1765     w.source = qid;
1766     t.doExec();
1767     w.source = src;
1768     }
1769     nonempty = true;
1770 dl 1.200 break;
1771 dl 1.300 }
1772 dl 1.200 }
1773 dl 1.300 }
1774     if ((s = task.status) < 0)
1775     break;
1776     else if (!nonempty) {
1777     long ms, ns; int block;
1778     if (deadline == 0L)
1779     ms = 0L; // untimed
1780     else if ((ns = deadline - System.nanoTime()) <= 0L)
1781     break; // timeout
1782     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1783     ms = 1L; // avoid 0 for timed wait
1784     if ((block = tryCompensate(w)) != 0) {
1785     task.internalWait(ms);
1786     U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1787 dl 1.200 }
1788 dl 1.300 s = task.status;
1789 dl 1.200 }
1790 dl 1.178 }
1791     }
1792 dl 1.200 return s;
1793 dl 1.120 }
1794    
1795     /**
1796 dl 1.300 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1797     * when tasks cannot be found, rescans until all others cannot
1798     * find tasks either.
1799 dl 1.78 */
1800 dl 1.300 final void helpQuiescePool(WorkQueue w) {
1801     int prevSrc = w.source, fifo = w.id & FIFO;
1802     for (int source = prevSrc, released = -1;;) { // -1 until known
1803     WorkQueue[] ws;
1804     if (fifo != 0)
1805     w.localPollAndExec(0);
1806     else
1807     w.localPopAndExec(0);
1808     if (released == -1 && w.phase >= 0)
1809     released = 1;
1810     boolean quiet = true, empty = true;
1811     int r = ThreadLocalRandom.nextSecondarySeed();
1812     if ((ws = workQueues) != null) {
1813     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1814     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1815     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1816     if ((b = q.base) - q.top < 0 &&
1817     (a = q.array) != null && (al = a.length) > 0) {
1818     int qid = q.id;
1819     if (released == 0) { // increment
1820     released = 1;
1821     U.getAndAddLong(this, CTL, RC_UNIT);
1822 dl 1.95 }
1823 dl 1.256 int index = (al - 1) & b;
1824     long offset = ((long)index << ASHIFT) + ABASE;
1825 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>)
1826 dl 1.262 U.getObjectVolatile(a, offset);
1827 dl 1.300 if (t != null && b++ == q.base &&
1828     U.compareAndSwapObject(a, offset, t, null)) {
1829     q.base = b;
1830     w.source = source = q.id;
1831     t.doExec();
1832     w.source = source = prevSrc;
1833 dl 1.243 }
1834 dl 1.300 quiet = empty = false;
1835 dl 1.200 break;
1836 dl 1.95 }
1837 dl 1.300 else if ((q.source & QUIET) == 0)
1838     quiet = false;
1839 dl 1.52 }
1840 dl 1.19 }
1841 dl 1.243 }
1842 dl 1.300 if (quiet) {
1843     if (released == 0)
1844     U.getAndAddLong(this, CTL, RC_UNIT);
1845     w.source = prevSrc;
1846     break;
1847     }
1848     else if (empty) {
1849     if (source != QUIET)
1850     w.source = source = QUIET;
1851     if (released == 1) { // decrement
1852     released = 0;
1853     U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1854     }
1855     }
1856 dl 1.14 }
1857 dl 1.22 }
1858    
1859 dl 1.52 /**
1860 dl 1.300 * Scans for and returns a polled task, if available.
1861     * Used only for untracked polls.
1862 dl 1.105 *
1863 dl 1.300 * @param submissionsOnly if true, only scan submission queues
1864 dl 1.19 */
1865 dl 1.300 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1866     WorkQueue[] ws; int n;
1867     rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1868     (n = ws.length) > 0) {
1869     int m = n - 1;
1870     int r = ThreadLocalRandom.nextSecondarySeed();
1871     int h = r >>> 16;
1872     int origin, step;
1873     if (submissionsOnly) {
1874     origin = (r & ~1) & m; // even indices and steps
1875     step = (h & ~1) | 2;
1876     }
1877     else {
1878     origin = r & m;
1879     step = h | 1;
1880     }
1881     for (int k = origin, oldSum = 0, checkSum = 0;;) {
1882     WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1883     if ((q = ws[k]) != null) {
1884     checkSum += b = q.base;
1885     if (b - q.top < 0 &&
1886     (a = q.array) != null && (al = a.length) > 0) {
1887     int index = (al - 1) & b;
1888     long offset = ((long)index << ASHIFT) + ABASE;
1889     ForkJoinTask<?> t = (ForkJoinTask<?>)
1890     U.getObjectVolatile(a, offset);
1891     if (t != null && b++ == q.base &&
1892     U.compareAndSwapObject(a, offset, t, null)) {
1893     q.base = b;
1894     return t;
1895     }
1896     else
1897     break; // restart
1898     }
1899     }
1900     if ((k = (k + step) & m) == origin) {
1901     if (oldSum == (oldSum = checkSum))
1902     break rescan;
1903     checkSum = 0;
1904 dl 1.178 }
1905 dl 1.52 }
1906 dl 1.90 }
1907 dl 1.300 return null;
1908     }
1909    
1910     /**
1911     * Gets and removes a local or stolen task for the given worker.
1912     *
1913     * @return a task, if available
1914     */
1915     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1916     ForkJoinTask<?> t;
1917     if (w != null &&
1918     (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1919     return t;
1920     else
1921     return pollScan(false);
1922 dl 1.90 }
1923    
1924 dl 1.300 // External operations
1925    
1926 dl 1.90 /**
1927 dl 1.300 * Adds the given task to a submission queue at submitter's
1928     * current queue, creating one if null or contended.
1929 dl 1.90 *
1930 dl 1.300 * @param task the task. Caller must ensure non-null.
1931 dl 1.90 */
1932 dl 1.300 final void externalPush(ForkJoinTask<?> task) {
1933     int r; // initialize caller's probe
1934     if ((r = ThreadLocalRandom.getProbe()) == 0) {
1935     ThreadLocalRandom.localInit();
1936     r = ThreadLocalRandom.getProbe();
1937     }
1938     for (;;) {
1939     int md = mode, n;
1940     WorkQueue[] ws = workQueues;
1941     if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1942     throw new RejectedExecutionException();
1943     else {
1944     WorkQueue q;
1945     boolean push = false, grow = false;
1946     if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1947     Object lock = workerNamePrefix;
1948     int qid = (r | QUIET) & ~(FIFO | OWNED);
1949     q = new WorkQueue(this, null);
1950     q.id = qid;
1951     q.source = QUIET;
1952     q.phase = QLOCK; // lock queue
1953     if (lock != null) {
1954 jsr166 1.301 synchronized (lock) { // lock pool to install
1955 dl 1.300 int i;
1956     if ((ws = workQueues) != null &&
1957     (n = ws.length) > 0 &&
1958     ws[i = qid & (n - 1) & SQMASK] == null) {
1959     ws[i] = q;
1960     push = grow = true;
1961     }
1962     }
1963     }
1964     }
1965     else if (q.tryLockSharedQueue()) {
1966     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1967     if ((a = q.array) != null && (al = a.length) > 0 &&
1968     al - 1 + (d = b - s) > 0) {
1969     a[(al - 1) & s] = task;
1970     q.top = s + 1; // relaxed writes OK here
1971     q.phase = 0;
1972     if (d < 0 && q.base - s < 0)
1973     break; // no signal needed
1974     }
1975 dl 1.243 else
1976 dl 1.300 grow = true;
1977     push = true;
1978     }
1979     if (push) {
1980     if (grow) {
1981     try {
1982     q.growArray();
1983     int s = q.top, al; ForkJoinTask<?>[] a;
1984     if ((a = q.array) != null && (al = a.length) > 0) {
1985     a[(al - 1) & s] = task;
1986     q.top = s + 1;
1987     }
1988     } finally {
1989     q.phase = 0;
1990     }
1991 dl 1.243 }
1992 dl 1.300 signalWork();
1993     break;
1994 dl 1.90 }
1995 dl 1.300 else // move if busy
1996     r = ThreadLocalRandom.advanceProbe(r);
1997 dl 1.90 }
1998     }
1999     }
2000    
2001 dl 1.300 /**
2002     * Pushes a possibly-external submission.
2003     */
2004     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2005     Thread t; ForkJoinWorkerThread w; WorkQueue q;
2006     if (task == null)
2007     throw new NullPointerException();
2008     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2009     (w = (ForkJoinWorkerThread)t).pool == this &&
2010     (q = w.workQueue) != null)
2011     q.push(task);
2012     else
2013     externalPush(task);
2014     return task;
2015     }
2016    
2017     /**
2018     * Returns common pool queue for an external thread.
2019     */
2020     static WorkQueue commonSubmitterQueue() {
2021     ForkJoinPool p = common;
2022     int r = ThreadLocalRandom.getProbe();
2023     WorkQueue[] ws; int n;
2024     return (p != null && (ws = p.workQueues) != null &&
2025     (n = ws.length) > 0) ?
2026     ws[(n - 1) & r & SQMASK] : null;
2027     }
2028 dl 1.90
2029     /**
2030 dl 1.300 * Performs tryUnpush for an external submitter.
2031     */
2032     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2033     int r = ThreadLocalRandom.getProbe();
2034     WorkQueue[] ws; WorkQueue w; int n;
2035     return ((ws = workQueues) != null &&
2036     (n = ws.length) > 0 &&
2037     (w = ws[(n - 1) & r & SQMASK]) != null &&
2038     w.trySharedUnpush(task));
2039 dl 1.22 }
2040    
2041     /**
2042 dl 1.300 * Performs helpComplete for an external submitter.
2043 dl 1.78 */
2044 dl 1.300 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2045     int r = ThreadLocalRandom.getProbe();
2046     WorkQueue[] ws; WorkQueue w; int n;
2047     return ((ws = workQueues) != null && (n = ws.length) > 0 &&
2048     (w = ws[(n - 1) & r & SQMASK]) != null) ?
2049     w.sharedHelpCC(task, maxTasks) : 0;
2050 dl 1.22 }
2051    
2052     /**
2053 dl 1.300 * Tries to steal and run tasks within the target's computation.
2054     * The maxTasks argument supports external usages; internal calls
2055     * use zero, allowing unbounded steps (external calls trap
2056     * non-positive values).
2057 dl 1.78 *
2058 dl 1.300 * @param w caller
2059     * @param maxTasks if non-zero, the maximum number of other tasks to run
2060     * @return task status on exit
2061 dl 1.22 */
2062 dl 1.300 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2063     int maxTasks) {
2064     return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2065 dl 1.14 }
2066    
2067     /**
2068 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
2069     * programmers, frameworks, tools, or languages have little or no
2070 jsr166 1.222 * idea about task granularity. In essence, by offering this
2071 dl 1.105 * method, we ask users only about tradeoffs in overhead vs
2072     * expected throughput and its variance, rather than how finely to
2073     * partition tasks.
2074     *
2075     * In a steady state strict (tree-structured) computation, each
2076     * thread makes available for stealing enough tasks for other
2077     * threads to remain active. Inductively, if all threads play by
2078     * the same rules, each thread should make available only a
2079     * constant number of tasks.
2080     *
2081     * The minimum useful constant is just 1. But using a value of 1
2082     * would require immediate replenishment upon each steal to
2083     * maintain enough tasks, which is infeasible. Further,
2084     * partitionings/granularities of offered tasks should minimize
2085     * steal rates, which in general means that threads nearer the top
2086     * of computation tree should generate more than those nearer the
2087     * bottom. In perfect steady state, each thread is at
2088     * approximately the same level of computation tree. However,
2089     * producing extra tasks amortizes the uncertainty of progress and
2090     * diffusion assumptions.
2091     *
2092 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2093 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2094     * against uneven progress; as traded off against the cost of
2095     * extra task overhead. We leave the user to pick a threshold
2096     * value to compare with the results of this call to guide
2097     * decisions, but recommend values such as 3.
2098     *
2099     * When all threads are active, it is on average OK to estimate
2100     * surplus strictly locally. In steady-state, if one thread is
2101     * maintaining say 2 surplus tasks, then so are others. So we can
2102     * just use estimated queue length. However, this strategy alone
2103     * leads to serious mis-estimates in some non-steady-state
2104     * conditions (ramp-up, ramp-down, other stalls). We can detect
2105     * many of these by further considering the number of "idle"
2106     * threads, that are known to have zero queued tasks, so
2107     * compensate by a factor of (#idle/#active) threads.
2108     */
2109     static int getSurplusQueuedTaskCount() {
2110     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2111 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2112     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2113     (q = wt.workQueue) != null) {
2114     int p = pool.mode & SMASK;
2115     int a = p + (int)(pool.ctl >> RC_SHIFT);
2116     int n = q.top - q.base;
2117 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2118     a > (p >>>= 1) ? 1 :
2119     a > (p >>>= 1) ? 2 :
2120     a > (p >>>= 1) ? 4 :
2121     8);
2122 dl 1.105 }
2123     return 0;
2124 dl 1.100 }
2125    
2126 dl 1.300 // Termination
2127 dl 1.14
2128     /**
2129 dl 1.210 * Possibly initiates and/or completes termination.
2130 dl 1.14 *
2131     * @param now if true, unconditionally terminate, else only
2132 dl 1.78 * if no work and no active workers
2133 dl 1.243 * @param enable if true, terminate when next possible
2134 dl 1.300 * @return true if terminating or terminated
2135 jsr166 1.1 */
2136 dl 1.300 private boolean tryTerminate(boolean now, boolean enable) {
2137     int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2138 dl 1.289
2139 dl 1.300 while (((md = mode) & SHUTDOWN) == 0) {
2140 dl 1.294 if (!enable || this == common) // cannot shutdown
2141 dl 1.300 return false;
2142 dl 1.294 else
2143 dl 1.300 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2144 dl 1.289 }
2145    
2146 dl 1.300 while (((md = mode) & STOP) == 0) { // try to initiate termination
2147     if (!now) { // check if quiescent & empty
2148 dl 1.211 for (long oldSum = 0L;;) { // repeat until stable
2149 dl 1.300 boolean running = false;
2150 dl 1.210 long checkSum = ctl;
2151 dl 1.300 WorkQueue[] ws = workQueues;
2152     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2153     running = true;
2154     else if (ws != null) {
2155     WorkQueue w; int b;
2156 dl 1.289 for (int i = 0; i < ws.length; ++i) {
2157     if ((w = ws[i]) != null) {
2158 dl 1.300 checkSum += (b = w.base) + w.id;
2159     if (b != w.top ||
2160     ((i & 1) == 1 && w.source >= 0)) {
2161     running = true;
2162     break;
2163     }
2164 dl 1.289 }
2165 dl 1.206 }
2166 dl 1.203 }
2167 dl 1.300 if (((md = mode) & STOP) != 0)
2168     break; // already triggered
2169     else if (running)
2170     return false;
2171     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2172 dl 1.210 break;
2173 dl 1.203 }
2174     }
2175 dl 1.300 if ((md & STOP) == 0)
2176     U.compareAndSwapInt(this, MODE, md, md | STOP);
2177 dl 1.200 }
2178 dl 1.210
2179 dl 1.300 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2180     for (long oldSum = 0L;;) { // repeat until stable
2181     WorkQueue[] ws; WorkQueue w;
2182     long checkSum = ctl;
2183     if ((ws = workQueues) != null) {
2184     for (int i = 0; i < ws.length; ++i) {
2185     if ((w = ws[i]) != null) {
2186     ForkJoinWorkerThread wt = w.owner;
2187     w.cancelAll(); // clear queues
2188     if (wt != null) {
2189 dl 1.289 try { // unblock join or park
2190 dl 1.210 wt.interrupt();
2191     } catch (Throwable ignore) {
2192 dl 1.200 }
2193     }
2194 dl 1.300 checkSum += w.base + w.id;
2195 dl 1.200 }
2196 dl 1.101 }
2197 dl 1.78 }
2198 dl 1.300 if (((md = mode) & TERMINATED) != 0 ||
2199     (workQueues == ws && oldSum == (oldSum = checkSum)))
2200     break;
2201 dl 1.78 }
2202 dl 1.300 if ((md & TERMINATED) != 0)
2203     break;
2204     else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2205 dl 1.210 break;
2206 dl 1.300 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2207     synchronized (this) {
2208     notifyAll(); // for awaitTermination
2209 dl 1.243 }
2210 dl 1.256 break;
2211 dl 1.200 }
2212 dl 1.52 }
2213 dl 1.300 return true;
2214 dl 1.105 }
2215    
2216 dl 1.52 // Exported methods
2217 jsr166 1.1
2218     // Constructors
2219    
2220     /**
2221 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2222 dl 1.300 * java.lang.Runtime#availableProcessors}, using defaults for all
2223     * other parameters.
2224 jsr166 1.1 *
2225     * @throws SecurityException if a security manager exists and
2226     * the caller is not permitted to modify threads
2227     * because it does not hold {@link
2228     * java.lang.RuntimePermission}{@code ("modifyThread")}
2229     */
2230     public ForkJoinPool() {
2231 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2232 dl 1.300 defaultForkJoinWorkerThreadFactory, null, false,
2233     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2234 jsr166 1.1 }
2235    
2236     /**
2237 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2238 dl 1.300 * level, using defaults for all other parameters.
2239 jsr166 1.1 *
2240 jsr166 1.9 * @param parallelism the parallelism level
2241 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2242 jsr166 1.11 * equal to zero, or greater than implementation limit
2243 jsr166 1.1 * @throws SecurityException if a security manager exists and
2244     * the caller is not permitted to modify threads
2245     * because it does not hold {@link
2246     * java.lang.RuntimePermission}{@code ("modifyThread")}
2247     */
2248     public ForkJoinPool(int parallelism) {
2249 dl 1.300 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2250     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2251 jsr166 1.1 }
2252    
2253     /**
2254 dl 1.300 * Creates a {@code ForkJoinPool} with the given parameters (using
2255     * defaults for others).
2256 jsr166 1.1 *
2257 dl 1.18 * @param parallelism the parallelism level. For default value,
2258     * use {@link java.lang.Runtime#availableProcessors}.
2259     * @param factory the factory for creating new threads. For default value,
2260     * use {@link #defaultForkJoinWorkerThreadFactory}.
2261 dl 1.19 * @param handler the handler for internal worker threads that
2262     * terminate due to unrecoverable errors encountered while executing
2263 jsr166 1.31 * tasks. For default value, use {@code null}.
2264 dl 1.19 * @param asyncMode if true,
2265 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2266     * tasks that are never joined. This mode may be more appropriate
2267     * than default locally stack-based mode in applications in which
2268     * worker threads only process event-style asynchronous tasks.
2269 jsr166 1.31 * For default value, use {@code false}.
2270 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2271 jsr166 1.11 * equal to zero, or greater than implementation limit
2272     * @throws NullPointerException if the factory is null
2273 jsr166 1.1 * @throws SecurityException if a security manager exists and
2274     * the caller is not permitted to modify threads
2275     * because it does not hold {@link
2276     * java.lang.RuntimePermission}{@code ("modifyThread")}
2277     */
2278 dl 1.19 public ForkJoinPool(int parallelism,
2279 dl 1.18 ForkJoinWorkerThreadFactory factory,
2280 jsr166 1.156 UncaughtExceptionHandler handler,
2281 dl 1.18 boolean asyncMode) {
2282 dl 1.300 this(parallelism, factory, handler, asyncMode,
2283     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2284 dl 1.152 }
2285    
2286 dl 1.300 /**
2287     * Creates a {@code ForkJoinPool} with the given parameters.
2288     *
2289     * @param parallelism the parallelism level. For default value,
2290     * use {@link java.lang.Runtime#availableProcessors}.
2291     *
2292     * @param factory the factory for creating new threads. For
2293     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2294     *
2295     * @param handler the handler for internal worker threads that
2296     * terminate due to unrecoverable errors encountered while
2297     * executing tasks. For default value, use {@code null}.
2298     *
2299     * @param asyncMode if true, establishes local first-in-first-out
2300     * scheduling mode for forked tasks that are never joined. This
2301     * mode may be more appropriate than default locally stack-based
2302     * mode in applications in which worker threads only process
2303     * event-style asynchronous tasks. For default value, use {@code
2304     * false}.
2305     *
2306     * @param corePoolSize the number of threads to keep in the pool
2307     * (unless timed out after an elapsed keep-alive). Normally (and
2308     * by default) this is the same value as the parallelism level,
2309     * but may be set to a larger value to reduce dynamic overhead if
2310     * tasks regularly block. Using a smaller value (for example
2311     * {@code 0}) has the same effect as the default.
2312     *
2313     * @param maximumPoolSize the maximum number of threads allowed.
2314     * When the maximum is reached, attempts to replace blocked
2315     * threads fail. (However, because creation and termination of
2316     * different threads may overlap, and may be managed by the given
2317     * thread factory, this value may be transiently exceeded.) The
2318     * default for the common pool is {@code 256} plus the parallelism
2319     * level. Using a value (for example {@code Integer.MAX_VALUE})
2320     * larger than the implementation's total thread limit has the
2321     * same effect as using this limit.
2322     *
2323     * @param minimumRunnable the minimum allowed number of core
2324     * threads not blocked by a join or {@link ManagedBlocker}. To
2325     * ensure progress, when too few unblocked threads exist and
2326     * unexecuted tasks may exist, new threads are constructed, up to
2327     * the given maximumPoolSize. For the default value, use {@code
2328     * 1}, that ensures liveness. A larger value might improve
2329     * throughput in the presence of blocked activities, but might
2330     * not, due to increased overhead. A value of zero may be
2331     * acceptable when submitted tasks cannot have dependencies
2332     * requiring additional threads.
2333     *
2334     * @param rejectOnSaturation if true, attempts to create more than
2335     * the maximum total allowed threads throw {@link
2336     * RejectedExecutionException}. Otherwise, the pool continues to
2337     * operate, but with fewer than the target number of runnable
2338     * threads, so might not ensure progress. For default value, use
2339     * {@code true}.
2340     *
2341     * @param keepAliveTime the elapsed time since last use before
2342     * a thread is terminated (and then later replaced if needed).
2343     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2344     *
2345     * @param unit the time unit for the {@code keepAliveTime} argument
2346     *
2347     * @throws IllegalArgumentException if parallelism is less than or
2348     * equal to zero, or is greater than implementation limit,
2349     * or if maximumPoolSize is less than parallelism,
2350     * of if the keepAliveTime is less than or equal to zero.
2351     * @throws NullPointerException if the factory is null
2352     * @throws SecurityException if a security manager exists and
2353     * the caller is not permitted to modify threads
2354     * because it does not hold {@link
2355     * java.lang.RuntimePermission}{@code ("modifyThread")}
2356     */
2357     public ForkJoinPool(int parallelism,
2358     ForkJoinWorkerThreadFactory factory,
2359     UncaughtExceptionHandler handler,
2360     boolean asyncMode,
2361     int corePoolSize,
2362     int maximumPoolSize,
2363     int minimumRunnable,
2364     boolean rejectOnSaturation,
2365     long keepAliveTime,
2366     TimeUnit unit) {
2367     // check, encode, pack parameters
2368     if (parallelism <= 0 || parallelism > MAX_CAP ||
2369     maximumPoolSize < parallelism || keepAliveTime <= 0L)
2370 dl 1.152 throw new IllegalArgumentException();
2371 dl 1.14 if (factory == null)
2372     throw new NullPointerException();
2373 dl 1.300 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2374    
2375     String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2376     int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2377     long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2378     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2379     int m = (parallelism |
2380     (asyncMode ? FIFO : 0) |
2381     (rejectOnSaturation ? 0 : SATURATE));
2382     int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2383     int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2384     int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2385     int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2386     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2387     n = (n + 1) << 1; // power of two, including space for submission queues
2388    
2389     this.workQueues = new WorkQueue[n];
2390     this.workerNamePrefix = prefix;
2391     this.factory = factory;
2392     this.ueh = handler;
2393     this.keepAlive = ms;
2394     this.bounds = b;
2395     this.mode = m;
2396     this.ctl = c;
2397     checkPermission();
2398 dl 1.152 }
2399    
2400     /**
2401 dl 1.300 * Constructor for common pool using parameters possibly
2402     * overridden by system properties
2403     */
2404     private ForkJoinPool(byte forCommonPoolOnly) {
2405     int parallelism = -1;
2406     ForkJoinWorkerThreadFactory fac = null;
2407     UncaughtExceptionHandler handler = null;
2408     try { // ignore exceptions in accessing/parsing properties
2409     String pp = System.getProperty
2410     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2411     String fp = System.getProperty
2412     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2413     String hp = System.getProperty
2414     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2415     if (pp != null)
2416     parallelism = Integer.parseInt(pp);
2417     if (fp != null)
2418     fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2419     getSystemClassLoader().loadClass(fp).newInstance());
2420     if (hp != null)
2421     handler = ((UncaughtExceptionHandler)ClassLoader.
2422     getSystemClassLoader().loadClass(hp).newInstance());
2423     } catch (Exception ignore) {
2424     }
2425    
2426     if (fac == null) {
2427     if (System.getSecurityManager() == null)
2428     fac = defaultForkJoinWorkerThreadFactory;
2429     else // use security-managed default
2430     fac = new InnocuousForkJoinWorkerThreadFactory();
2431     }
2432     if (parallelism < 0 && // default 1 less than #cores
2433     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2434     parallelism = 1;
2435     if (parallelism > MAX_CAP)
2436     parallelism = MAX_CAP;
2437    
2438     long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2439     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2440     int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2441     int m = (parallelism < 1) ? 1 : parallelism;
2442     int n = (parallelism > 1) ? parallelism - 1 : 1;
2443     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2444     n = (n + 1) << 1;
2445    
2446     this.workQueues = new WorkQueue[n];
2447     this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2448     this.factory = fac;
2449 dl 1.18 this.ueh = handler;
2450 dl 1.300 this.keepAlive = DEFAULT_KEEPALIVE;
2451     this.bounds = b;
2452     this.mode = m;
2453     this.ctl = c;
2454 dl 1.101 }
2455    
2456     /**
2457 dl 1.128 * Returns the common pool instance. This pool is statically
2458 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2459     * #shutdown} or {@link #shutdownNow}. However this pool and any
2460     * ongoing processing are automatically terminated upon program
2461     * {@link System#exit}. Any program that relies on asynchronous
2462     * task processing to complete before program termination should
2463 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2464     * before exit.
2465 dl 1.100 *
2466     * @return the common pool instance
2467 jsr166 1.138 * @since 1.8
2468 dl 1.100 */
2469     public static ForkJoinPool commonPool() {
2470 dl 1.134 // assert common != null : "static init error";
2471     return common;
2472 dl 1.100 }
2473    
2474 jsr166 1.1 // Execution methods
2475    
2476     /**
2477     * Performs the given task, returning its result upon completion.
2478 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2479     * it is rethrown as the outcome of this invocation. Rethrown
2480     * exceptions behave in the same way as regular exceptions, but,
2481     * when possible, contain stack traces (as displayed for example
2482     * using {@code ex.printStackTrace()}) of both the current thread
2483     * as well as the thread actually encountering the exception;
2484     * minimally only the latter.
2485 jsr166 1.1 *
2486     * @param task the task
2487 jsr166 1.191 * @param <T> the type of the task's result
2488 jsr166 1.1 * @return the task's result
2489 jsr166 1.11 * @throws NullPointerException if the task is null
2490     * @throws RejectedExecutionException if the task cannot be
2491     * scheduled for execution
2492 jsr166 1.1 */
2493     public <T> T invoke(ForkJoinTask<T> task) {
2494 dl 1.90 if (task == null)
2495     throw new NullPointerException();
2496 dl 1.243 externalSubmit(task);
2497 dl 1.78 return task.join();
2498 jsr166 1.1 }
2499    
2500     /**
2501     * Arranges for (asynchronous) execution of the given task.
2502     *
2503     * @param task the task
2504 jsr166 1.11 * @throws NullPointerException if the task is null
2505     * @throws RejectedExecutionException if the task cannot be
2506     * scheduled for execution
2507 jsr166 1.1 */
2508 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2509 dl 1.243 externalSubmit(task);
2510 jsr166 1.1 }
2511    
2512     // AbstractExecutorService methods
2513    
2514 jsr166 1.11 /**
2515     * @throws NullPointerException if the task is null
2516     * @throws RejectedExecutionException if the task cannot be
2517     * scheduled for execution
2518     */
2519 jsr166 1.1 public void execute(Runnable task) {
2520 dl 1.41 if (task == null)
2521     throw new NullPointerException();
2522 jsr166 1.2 ForkJoinTask<?> job;
2523 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2524     job = (ForkJoinTask<?>) task;
2525 jsr166 1.2 else
2526 dl 1.152 job = new ForkJoinTask.RunnableExecuteAction(task);
2527 dl 1.243 externalSubmit(job);
2528 jsr166 1.1 }
2529    
2530 jsr166 1.11 /**
2531 dl 1.18 * Submits a ForkJoinTask for execution.
2532     *
2533     * @param task the task to submit
2534 jsr166 1.191 * @param <T> the type of the task's result
2535 dl 1.18 * @return the task
2536     * @throws NullPointerException if the task is null
2537     * @throws RejectedExecutionException if the task cannot be
2538     * scheduled for execution
2539     */
2540     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2541 dl 1.243 return externalSubmit(task);
2542 dl 1.18 }
2543    
2544     /**
2545 jsr166 1.11 * @throws NullPointerException if the task is null
2546     * @throws RejectedExecutionException if the task cannot be
2547     * scheduled for execution
2548     */
2549 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2550 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2551 jsr166 1.1 }
2552    
2553 jsr166 1.11 /**
2554     * @throws NullPointerException if the task is null
2555     * @throws RejectedExecutionException if the task cannot be
2556     * scheduled for execution
2557     */
2558 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2559 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2560 jsr166 1.1 }
2561    
2562 jsr166 1.11 /**
2563     * @throws NullPointerException if the task is null
2564     * @throws RejectedExecutionException if the task cannot be
2565     * scheduled for execution
2566     */
2567 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2568 dl 1.41 if (task == null)
2569     throw new NullPointerException();
2570 jsr166 1.2 ForkJoinTask<?> job;
2571 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2572     job = (ForkJoinTask<?>) task;
2573 jsr166 1.2 else
2574 dl 1.90 job = new ForkJoinTask.AdaptedRunnableAction(task);
2575 dl 1.243 return externalSubmit(job);
2576 jsr166 1.1 }
2577    
2578     /**
2579 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2580     * @throws RejectedExecutionException {@inheritDoc}
2581     */
2582 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2583 dl 1.86 // In previous versions of this class, this method constructed
2584     // a task to run ForkJoinTask.invokeAll, but now external
2585     // invocation of multiple tasks is at least as efficient.
2586 jsr166 1.199 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2587 jsr166 1.1
2588 dl 1.86 try {
2589     for (Callable<T> t : tasks) {
2590 dl 1.90 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2591 jsr166 1.144 futures.add(f);
2592 dl 1.243 externalSubmit(f);
2593 dl 1.86 }
2594 jsr166 1.143 for (int i = 0, size = futures.size(); i < size; i++)
2595     ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2596 dl 1.86 return futures;
2597 jsr166 1.226 } catch (Throwable t) {
2598     for (int i = 0, size = futures.size(); i < size; i++)
2599     futures.get(i).cancel(false);
2600     throw t;
2601 jsr166 1.1 }
2602     }
2603    
2604     /**
2605     * Returns the factory used for constructing new workers.
2606     *
2607     * @return the factory used for constructing new workers
2608     */
2609     public ForkJoinWorkerThreadFactory getFactory() {
2610     return factory;
2611     }
2612    
2613     /**
2614     * Returns the handler for internal worker threads that terminate
2615     * due to unrecoverable errors encountered while executing tasks.
2616     *
2617 jsr166 1.4 * @return the handler, or {@code null} if none
2618 jsr166 1.1 */
2619 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2620 dl 1.14 return ueh;
2621 jsr166 1.1 }
2622    
2623     /**
2624 jsr166 1.9 * Returns the targeted parallelism level of this pool.
2625 jsr166 1.1 *
2626 jsr166 1.9 * @return the targeted parallelism level of this pool
2627 jsr166 1.1 */
2628     public int getParallelism() {
2629 dl 1.300 return mode & SMASK;
2630 jsr166 1.1 }
2631    
2632     /**
2633 dl 1.100 * Returns the targeted parallelism level of the common pool.
2634     *
2635     * @return the targeted parallelism level of the common pool
2636 jsr166 1.138 * @since 1.8
2637 dl 1.100 */
2638     public static int getCommonPoolParallelism() {
2639 jsr166 1.274 return COMMON_PARALLELISM;
2640 dl 1.100 }
2641    
2642     /**
2643 jsr166 1.1 * Returns the number of worker threads that have started but not
2644 jsr166 1.34 * yet terminated. The result returned by this method may differ
2645 jsr166 1.4 * from {@link #getParallelism} when threads are created to
2646 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
2647     *
2648     * @return the number of worker threads
2649     */
2650     public int getPoolSize() {
2651 dl 1.300 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2652 jsr166 1.1 }
2653    
2654     /**
2655 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
2656 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
2657     *
2658 jsr166 1.4 * @return {@code true} if this pool uses async mode
2659 jsr166 1.1 */
2660     public boolean getAsyncMode() {
2661 dl 1.300 return (mode & FIFO) != 0;
2662 jsr166 1.1 }
2663    
2664     /**
2665     * Returns an estimate of the number of worker threads that are
2666     * not blocked waiting to join tasks or for other managed
2667 dl 1.14 * synchronization. This method may overestimate the
2668     * number of running threads.
2669 jsr166 1.1 *
2670     * @return the number of worker threads
2671     */
2672     public int getRunningThreadCount() {
2673 dl 1.78 int rc = 0;
2674     WorkQueue[] ws; WorkQueue w;
2675     if ((ws = workQueues) != null) {
2676 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2677     if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2678 dl 1.78 ++rc;
2679     }
2680     }
2681     return rc;
2682 jsr166 1.1 }
2683    
2684     /**
2685     * Returns an estimate of the number of threads that are currently
2686     * stealing or executing tasks. This method may overestimate the
2687     * number of active threads.
2688     *
2689     * @return the number of active threads
2690     */
2691     public int getActiveThreadCount() {
2692 dl 1.300 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2693 jsr166 1.63 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2694 jsr166 1.1 }
2695    
2696     /**
2697 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
2698     * An idle worker is one that cannot obtain a task to execute
2699     * because none are available to steal from other threads, and
2700     * there are no pending submissions to the pool. This method is
2701     * conservative; it might not return {@code true} immediately upon
2702     * idleness of all threads, but will eventually become true if
2703     * threads remain inactive.
2704 jsr166 1.1 *
2705 jsr166 1.4 * @return {@code true} if all threads are currently idle
2706 jsr166 1.1 */
2707     public boolean isQuiescent() {
2708 dl 1.300 for (;;) {
2709     long c = ctl;
2710     int md = mode, pc = md & SMASK;
2711     int tc = pc + (short)(c >> TC_SHIFT);
2712     int rc = pc + (int)(c >> RC_SHIFT);
2713     if ((md & (STOP | TERMINATED)) != 0)
2714     return true;
2715     else if (rc > 0)
2716     return false;
2717     else {
2718     WorkQueue[] ws; WorkQueue v;
2719     if ((ws = workQueues) != null) {
2720     for (int i = 1; i < ws.length; i += 2) {
2721     if ((v = ws[i]) != null) {
2722     if ((v.source & QUIET) == 0)
2723     return false;
2724     --tc;
2725     }
2726     }
2727     }
2728     if (tc == 0 && ctl == c)
2729     return true;
2730     }
2731     }
2732 jsr166 1.1 }
2733    
2734     /**
2735     * Returns an estimate of the total number of tasks stolen from
2736     * one thread's work queue by another. The reported value
2737     * underestimates the actual total number of steals when the pool
2738     * is not quiescent. This value may be useful for monitoring and
2739     * tuning fork/join programs: in general, steal counts should be
2740     * high enough to keep threads busy, but low enough to avoid
2741     * overhead and contention across threads.
2742     *
2743     * @return the number of steals
2744     */
2745     public long getStealCount() {
2746 dl 1.300 long count = stealCount;
2747 dl 1.78 WorkQueue[] ws; WorkQueue w;
2748     if ((ws = workQueues) != null) {
2749 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2750 dl 1.78 if ((w = ws[i]) != null)
2751 dl 1.300 count += (long)w.nsteals & 0xffffffffL;
2752 dl 1.78 }
2753     }
2754     return count;
2755 jsr166 1.1 }
2756    
2757     /**
2758     * Returns an estimate of the total number of tasks currently held
2759     * in queues by worker threads (but not including tasks submitted
2760     * to the pool that have not begun executing). This value is only
2761     * an approximation, obtained by iterating across all threads in
2762     * the pool. This method may be useful for tuning task
2763     * granularities.
2764     *
2765     * @return the number of queued tasks
2766     */
2767     public long getQueuedTaskCount() {
2768     long count = 0;
2769 dl 1.78 WorkQueue[] ws; WorkQueue w;
2770     if ((ws = workQueues) != null) {
2771 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2772 dl 1.78 if ((w = ws[i]) != null)
2773     count += w.queueSize();
2774     }
2775 dl 1.52 }
2776 jsr166 1.1 return count;
2777     }
2778    
2779     /**
2780 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
2781 dl 1.55 * pool that have not yet begun executing. This method may take
2782 dl 1.52 * time proportional to the number of submissions.
2783 jsr166 1.1 *
2784     * @return the number of queued submissions
2785     */
2786     public int getQueuedSubmissionCount() {
2787 dl 1.78 int count = 0;
2788     WorkQueue[] ws; WorkQueue w;
2789     if ((ws = workQueues) != null) {
2790 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2791 dl 1.78 if ((w = ws[i]) != null)
2792     count += w.queueSize();
2793     }
2794     }
2795     return count;
2796 jsr166 1.1 }
2797    
2798     /**
2799 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
2800     * pool that have not yet begun executing.
2801 jsr166 1.1 *
2802     * @return {@code true} if there are any queued submissions
2803     */
2804     public boolean hasQueuedSubmissions() {
2805 dl 1.78 WorkQueue[] ws; WorkQueue w;
2806     if ((ws = workQueues) != null) {
2807 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2808 dl 1.115 if ((w = ws[i]) != null && !w.isEmpty())
2809 dl 1.78 return true;
2810     }
2811     }
2812     return false;
2813 jsr166 1.1 }
2814    
2815     /**
2816     * Removes and returns the next unexecuted submission if one is
2817     * available. This method may be useful in extensions to this
2818     * class that re-assign work in systems with multiple pools.
2819     *
2820 jsr166 1.4 * @return the next submission, or {@code null} if none
2821 jsr166 1.1 */
2822     protected ForkJoinTask<?> pollSubmission() {
2823 dl 1.300 return pollScan(true);
2824 jsr166 1.1 }
2825    
2826     /**
2827     * Removes all available unexecuted submitted and forked tasks
2828     * from scheduling queues and adds them to the given collection,
2829     * without altering their execution status. These may include
2830 jsr166 1.8 * artificially generated or wrapped tasks. This method is
2831     * designed to be invoked only when the pool is known to be
2832 jsr166 1.1 * quiescent. Invocations at other times may not remove all
2833     * tasks. A failure encountered while attempting to add elements
2834     * to collection {@code c} may result in elements being in
2835     * neither, either or both collections when the associated
2836     * exception is thrown. The behavior of this operation is
2837     * undefined if the specified collection is modified while the
2838     * operation is in progress.
2839     *
2840     * @param c the collection to transfer elements into
2841     * @return the number of elements transferred
2842     */
2843 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2844 dl 1.52 int count = 0;
2845 dl 1.78 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2846     if ((ws = workQueues) != null) {
2847 dl 1.86 for (int i = 0; i < ws.length; ++i) {
2848 dl 1.78 if ((w = ws[i]) != null) {
2849     while ((t = w.poll()) != null) {
2850     c.add(t);
2851     ++count;
2852     }
2853     }
2854 dl 1.52 }
2855     }
2856 dl 1.18 return count;
2857     }
2858    
2859     /**
2860 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
2861     * including indications of run state, parallelism level, and
2862     * worker and task counts.
2863     *
2864     * @return a string identifying this pool, as well as its state
2865     */
2866     public String toString() {
2867 dl 1.86 // Use a single pass through workQueues to collect counts
2868     long qt = 0L, qs = 0L; int rc = 0;
2869 dl 1.300 long st = stealCount;
2870 dl 1.86 WorkQueue[] ws; WorkQueue w;
2871     if ((ws = workQueues) != null) {
2872     for (int i = 0; i < ws.length; ++i) {
2873     if ((w = ws[i]) != null) {
2874     int size = w.queueSize();
2875     if ((i & 1) == 0)
2876     qs += size;
2877     else {
2878     qt += size;
2879 dl 1.300 st += (long)w.nsteals & 0xffffffffL;
2880 dl 1.86 if (w.isApparentlyUnblocked())
2881     ++rc;
2882     }
2883     }
2884     }
2885     }
2886 dl 1.300
2887     int md = mode;
2888     int pc = (md & SMASK);
2889     long c = ctl;
2890 dl 1.52 int tc = pc + (short)(c >>> TC_SHIFT);
2891 dl 1.300 int ac = pc + (int)(c >> RC_SHIFT);
2892 dl 1.78 if (ac < 0) // ignore transient negative
2893     ac = 0;
2894 dl 1.300 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2895     (md & STOP) != 0 ? "Terminating" :
2896     (md & SHUTDOWN) != 0 ? "Shutting down" :
2897 dl 1.200 "Running");
2898 jsr166 1.1 return super.toString() +
2899 dl 1.52 "[" + level +
2900 dl 1.14 ", parallelism = " + pc +
2901     ", size = " + tc +
2902     ", active = " + ac +
2903     ", running = " + rc +
2904 jsr166 1.1 ", steals = " + st +
2905     ", tasks = " + qt +
2906     ", submissions = " + qs +
2907     "]";
2908     }
2909    
2910     /**
2911 dl 1.100 * Possibly initiates an orderly shutdown in which previously
2912     * submitted tasks are executed, but no new tasks will be
2913     * accepted. Invocation has no effect on execution state if this
2914 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
2915 dl 1.100 * already shut down. Tasks that are in the process of being
2916     * submitted concurrently during the course of this method may or
2917     * may not be rejected.
2918 jsr166 1.1 *
2919     * @throws SecurityException if a security manager exists and
2920     * the caller is not permitted to modify threads
2921     * because it does not hold {@link
2922     * java.lang.RuntimePermission}{@code ("modifyThread")}
2923     */
2924     public void shutdown() {
2925     checkPermission();
2926 dl 1.105 tryTerminate(false, true);
2927 jsr166 1.1 }
2928    
2929     /**
2930 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
2931     * all subsequently submitted tasks. Invocation has no effect on
2932 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
2933 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
2934     * are in the process of being submitted or executed concurrently
2935     * during the course of this method may or may not be
2936     * rejected. This method cancels both existing and unexecuted
2937     * tasks, in order to permit termination in the presence of task
2938     * dependencies. So the method always returns an empty list
2939     * (unlike the case for some other Executors).
2940 jsr166 1.1 *
2941     * @return an empty list
2942     * @throws SecurityException if a security manager exists and
2943     * the caller is not permitted to modify threads
2944     * because it does not hold {@link
2945     * java.lang.RuntimePermission}{@code ("modifyThread")}
2946     */
2947     public List<Runnable> shutdownNow() {
2948     checkPermission();
2949 dl 1.105 tryTerminate(true, true);
2950 jsr166 1.1 return Collections.emptyList();
2951     }
2952    
2953     /**
2954     * Returns {@code true} if all tasks have completed following shut down.
2955     *
2956     * @return {@code true} if all tasks have completed following shut down
2957     */
2958     public boolean isTerminated() {
2959 dl 1.300 return (mode & TERMINATED) != 0;
2960 jsr166 1.1 }
2961    
2962     /**
2963     * Returns {@code true} if the process of termination has
2964 jsr166 1.9 * commenced but not yet completed. This method may be useful for
2965     * debugging. A return of {@code true} reported a sufficient
2966     * period after shutdown may indicate that submitted tasks have
2967 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
2968 dl 1.49 * causing this executor not to properly terminate. (See the
2969     * advisory notes for class {@link ForkJoinTask} stating that
2970     * tasks should not normally entail blocking operations. But if
2971     * they do, they must abort them on interrupt.)
2972 jsr166 1.1 *
2973 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
2974 jsr166 1.1 */
2975     public boolean isTerminating() {
2976 dl 1.300 int md = mode;
2977     return (md & STOP) != 0 && (md & TERMINATED) == 0;
2978 jsr166 1.1 }
2979    
2980     /**
2981     * Returns {@code true} if this pool has been shut down.
2982     *
2983     * @return {@code true} if this pool has been shut down
2984     */
2985     public boolean isShutdown() {
2986 dl 1.300 return (mode & SHUTDOWN) != 0;
2987 jsr166 1.9 }
2988    
2989     /**
2990 dl 1.105 * Blocks until all tasks have completed execution after a
2991     * shutdown request, or the timeout occurs, or the current thread
2992 dl 1.134 * is interrupted, whichever happens first. Because the {@link
2993     * #commonPool()} never terminates until program shutdown, when
2994     * applied to the common pool, this method is equivalent to {@link
2995 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2996 jsr166 1.1 *
2997     * @param timeout the maximum time to wait
2998     * @param unit the time unit of the timeout argument
2999     * @return {@code true} if this executor terminated and
3000     * {@code false} if the timeout elapsed before termination
3001     * @throws InterruptedException if interrupted while waiting
3002     */
3003     public boolean awaitTermination(long timeout, TimeUnit unit)
3004     throws InterruptedException {
3005 dl 1.134 if (Thread.interrupted())
3006     throw new InterruptedException();
3007     if (this == common) {
3008     awaitQuiescence(timeout, unit);
3009     return false;
3010     }
3011 dl 1.52 long nanos = unit.toNanos(timeout);
3012 dl 1.101 if (isTerminated())
3013     return true;
3014 dl 1.183 if (nanos <= 0L)
3015     return false;
3016     long deadline = System.nanoTime() + nanos;
3017 jsr166 1.103 synchronized (this) {
3018 jsr166 1.184 for (;;) {
3019 dl 1.183 if (isTerminated())
3020     return true;
3021     if (nanos <= 0L)
3022     return false;
3023     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3024     wait(millis > 0L ? millis : 1L);
3025     nanos = deadline - System.nanoTime();
3026 dl 1.52 }
3027 dl 1.18 }
3028 jsr166 1.1 }
3029    
3030     /**
3031 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
3032     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3033     * waits and/or attempts to assist performing tasks until this
3034     * pool {@link #isQuiescent} or the indicated timeout elapses.
3035     *
3036     * @param timeout the maximum time to wait
3037     * @param unit the time unit of the timeout argument
3038     * @return {@code true} if quiescent; {@code false} if the
3039     * timeout elapsed.
3040     */
3041     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3042     long nanos = unit.toNanos(timeout);
3043     ForkJoinWorkerThread wt;
3044     Thread thread = Thread.currentThread();
3045     if ((thread instanceof ForkJoinWorkerThread) &&
3046     (wt = (ForkJoinWorkerThread)thread).pool == this) {
3047     helpQuiescePool(wt.workQueue);
3048     return true;
3049     }
3050 dl 1.300 else {
3051     for (long startTime = System.nanoTime();;) {
3052     ForkJoinTask<?> t;
3053     if ((t = pollScan(false)) != null)
3054     t.doExec();
3055     else if (isQuiescent())
3056     return true;
3057     else if ((System.nanoTime() - startTime) > nanos)
3058 dl 1.134 return false;
3059 dl 1.300 else
3060     Thread.yield(); // cannot block
3061 dl 1.134 }
3062     }
3063     }
3064    
3065     /**
3066     * Waits and/or attempts to assist performing tasks indefinitely
3067 jsr166 1.141 * until the {@link #commonPool()} {@link #isQuiescent}.
3068 dl 1.134 */
3069 dl 1.136 static void quiesceCommonPool() {
3070 dl 1.134 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3071     }
3072    
3073     /**
3074 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3075 jsr166 1.8 * in {@link ForkJoinPool}s.
3076     *
3077 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3078 jsr166 1.218 * {@link #isReleasable} must return {@code true} if blocking is
3079     * not necessary. Method {@link #block} blocks the current thread
3080 dl 1.19 * if necessary (perhaps internally invoking {@code isReleasable}
3081 dl 1.54 * before actually blocking). These actions are performed by any
3082 jsr166 1.157 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3083     * The unusual methods in this API accommodate synchronizers that
3084     * may, but don't usually, block for long periods. Similarly, they
3085 dl 1.54 * allow more efficient internal handling of cases in which
3086     * additional workers may be, but usually are not, needed to
3087     * ensure sufficient parallelism. Toward this end,
3088     * implementations of method {@code isReleasable} must be amenable
3089     * to repeated invocation.
3090 jsr166 1.1 *
3091     * <p>For example, here is a ManagedBlocker based on a
3092     * ReentrantLock:
3093 jsr166 1.239 * <pre> {@code
3094 jsr166 1.1 * class ManagedLocker implements ManagedBlocker {
3095     * final ReentrantLock lock;
3096     * boolean hasLock = false;
3097     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3098     * public boolean block() {
3099     * if (!hasLock)
3100     * lock.lock();
3101     * return true;
3102     * }
3103     * public boolean isReleasable() {
3104     * return hasLock || (hasLock = lock.tryLock());
3105     * }
3106     * }}</pre>
3107 dl 1.19 *
3108     * <p>Here is a class that possibly blocks waiting for an
3109     * item on a given queue:
3110 jsr166 1.239 * <pre> {@code
3111 dl 1.19 * class QueueTaker<E> implements ManagedBlocker {
3112     * final BlockingQueue<E> queue;
3113     * volatile E item = null;
3114     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3115     * public boolean block() throws InterruptedException {
3116     * if (item == null)
3117 dl 1.23 * item = queue.take();
3118 dl 1.19 * return true;
3119     * }
3120     * public boolean isReleasable() {
3121 dl 1.23 * return item != null || (item = queue.poll()) != null;
3122 dl 1.19 * }
3123     * public E getItem() { // call after pool.managedBlock completes
3124     * return item;
3125     * }
3126     * }}</pre>
3127 jsr166 1.1 */
3128     public static interface ManagedBlocker {
3129     /**
3130     * Possibly blocks the current thread, for example waiting for
3131     * a lock or condition.
3132     *
3133 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3134     * (i.e., if isReleasable would return true)
3135 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3136     * (the method is not required to do so, but is allowed to)
3137     */
3138     boolean block() throws InterruptedException;
3139    
3140     /**
3141 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3142 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3143 jsr166 1.1 */
3144     boolean isReleasable();
3145     }
3146    
3147     /**
3148 jsr166 1.217 * Runs the given possibly blocking task. When {@linkplain
3149     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3150     * method possibly arranges for a spare thread to be activated if
3151     * necessary to ensure sufficient parallelism while the current
3152     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3153 jsr166 1.1 *
3154 jsr166 1.217 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3155     * {@code blocker.block()} until either method returns {@code true}.
3156     * Every call to {@code blocker.block()} is preceded by a call to
3157     * {@code blocker.isReleasable()} that returned {@code false}.
3158     *
3159     * <p>If not running in a ForkJoinPool, this method is
3160 jsr166 1.8 * behaviorally equivalent to
3161 jsr166 1.239 * <pre> {@code
3162 jsr166 1.1 * while (!blocker.isReleasable())
3163     * if (blocker.block())
3164 jsr166 1.217 * break;}</pre>
3165 jsr166 1.8 *
3166 jsr166 1.217 * If running in a ForkJoinPool, the pool may first be expanded to
3167     * ensure sufficient parallelism available during the call to
3168     * {@code blocker.block()}.
3169 jsr166 1.1 *
3170 jsr166 1.217 * @param blocker the blocker task
3171     * @throws InterruptedException if {@code blocker.block()} did so
3172 jsr166 1.1 */
3173 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3174 jsr166 1.1 throws InterruptedException {
3175 dl 1.200 ForkJoinPool p;
3176     ForkJoinWorkerThread wt;
3177 dl 1.300 WorkQueue w;
3178 jsr166 1.1 Thread t = Thread.currentThread();
3179 dl 1.200 if ((t instanceof ForkJoinWorkerThread) &&
3180 dl 1.300 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3181     (w = wt.workQueue) != null) {
3182     int block;
3183 dl 1.172 while (!blocker.isReleasable()) {
3184 dl 1.300 if ((block = p.tryCompensate(w)) != 0) {
3185 dl 1.105 try {
3186     do {} while (!blocker.isReleasable() &&
3187     !blocker.block());
3188     } finally {
3189 dl 1.300 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3190 dl 1.105 }
3191     break;
3192 dl 1.78 }
3193     }
3194 dl 1.18 }
3195 dl 1.105 else {
3196     do {} while (!blocker.isReleasable() &&
3197     !blocker.block());
3198     }
3199 jsr166 1.1 }
3200    
3201 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
3202     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3203     // implement RunnableFuture.
3204 jsr166 1.1
3205     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3206 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3207 jsr166 1.1 }
3208    
3209     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3210 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3211 jsr166 1.1 }
3212    
3213     // Unsafe mechanics
3214 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3215 dl 1.78 private static final long CTL;
3216 dl 1.300 private static final long MODE;
3217 jsr166 1.291 private static final int ABASE;
3218     private static final int ASHIFT;
3219 dl 1.52
3220     static {
3221 jsr166 1.3 try {
3222 dl 1.78 CTL = U.objectFieldOffset
3223 jsr166 1.233 (ForkJoinPool.class.getDeclaredField("ctl"));
3224 dl 1.300 MODE = U.objectFieldOffset
3225     (ForkJoinPool.class.getDeclaredField("mode"));
3226 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3227     int scale = U.arrayIndexScale(ForkJoinTask[].class);
3228 jsr166 1.142 if ((scale & (scale - 1)) != 0)
3229 jsr166 1.232 throw new Error("array index scale not a power of two");
3230 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3231 jsr166 1.231 } catch (ReflectiveOperationException e) {
3232 dl 1.52 throw new Error(e);
3233     }
3234 dl 1.105
3235 dl 1.243 // Reduce the risk of rare disastrous classloading in first call to
3236     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3237     Class<?> ensureLoaded = LockSupport.class;
3238    
3239 jsr166 1.273 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3240     try {
3241     String p = System.getProperty
3242     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3243     if (p != null)
3244     commonMaxSpares = Integer.parseInt(p);
3245     } catch (Exception ignore) {}
3246     COMMON_MAX_SPARES = commonMaxSpares;
3247    
3248 dl 1.152 defaultForkJoinWorkerThreadFactory =
3249 dl 1.112 new DefaultForkJoinWorkerThreadFactory();
3250 dl 1.115 modifyThreadPermission = new RuntimePermission("modifyThread");
3251    
3252 dl 1.152 common = java.security.AccessController.doPrivileged
3253     (new java.security.PrivilegedAction<ForkJoinPool>() {
3254 dl 1.300 public ForkJoinPool run() {
3255     return new ForkJoinPool((byte)0); }});
3256 jsr166 1.275
3257 dl 1.300 COMMON_PARALLELISM = common.mode & SMASK;
3258 jsr166 1.3 }
3259 dl 1.52
3260 dl 1.197 /**
3261 jsr166 1.279 * Factory for innocuous worker threads.
3262 dl 1.197 */
3263 jsr166 1.278 private static final class InnocuousForkJoinWorkerThreadFactory
3264 dl 1.197 implements ForkJoinWorkerThreadFactory {
3265    
3266     /**
3267     * An ACC to restrict permissions for the factory itself.
3268     * The constructed workers have no permissions set.
3269     */
3270     private static final AccessControlContext innocuousAcc;
3271     static {
3272     Permissions innocuousPerms = new Permissions();
3273     innocuousPerms.add(modifyThreadPermission);
3274     innocuousPerms.add(new RuntimePermission(
3275     "enableContextClassLoaderOverride"));
3276     innocuousPerms.add(new RuntimePermission(
3277     "modifyThreadGroup"));
3278     innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3279     new ProtectionDomain(null, innocuousPerms)
3280     });
3281     }
3282    
3283     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3284 jsr166 1.227 return java.security.AccessController.doPrivileged(
3285     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3286 dl 1.197 public ForkJoinWorkerThread run() {
3287     return new ForkJoinWorkerThread.
3288     InnocuousForkJoinWorkerThread(pool);
3289     }}, innocuousAcc);
3290     }
3291     }
3292    
3293 jsr166 1.1 }