ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.305
Committed: Mon Mar 14 18:22:57 2016 UTC (8 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.304: +11 -0 lines
Log Message:
Restore comment

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6 jsr166 1.301
7 jsr166 1.1 package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.228 import java.security.AccessControlContext;
11     import java.security.Permissions;
12     import java.security.ProtectionDomain;
13 jsr166 1.1 import java.util.ArrayList;
14     import java.util.Arrays;
15     import java.util.Collection;
16     import java.util.Collections;
17     import java.util.List;
18 dl 1.300 import java.util.concurrent.TimeUnit;
19     import java.util.concurrent.CountedCompleter;
20     import java.util.concurrent.ForkJoinTask;
21     import java.util.concurrent.ForkJoinWorkerThread;
22 dl 1.243 import java.util.concurrent.locks.LockSupport;
23 jsr166 1.1
24     /**
25 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
26 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
27 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
28 jsr166 1.11 * monitoring operations.
29 jsr166 1.1 *
30 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
31     * ExecutorService} mainly by virtue of employing
32     * <em>work-stealing</em>: all threads in the pool attempt to find and
33 dl 1.78 * execute tasks submitted to the pool and/or created by other active
34     * tasks (eventually blocking waiting for work if none exist). This
35     * enables efficient processing when most tasks spawn other subtasks
36     * (as do most {@code ForkJoinTask}s), as well as when many small
37     * tasks are submitted to the pool from external clients. Especially
38     * when setting <em>asyncMode</em> to true in constructors, {@code
39     * ForkJoinPool}s may also be appropriate for use with event-style
40     * tasks that are never joined.
41 jsr166 1.1 *
42 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
43 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
44     * is not explicitly submitted to a specified pool. Using the common
45     * pool normally reduces resource usage (its threads are slowly
46     * reclaimed during periods of non-use, and reinstated upon subsequent
47 dl 1.105 * use).
48 dl 1.100 *
49     * <p>For applications that require separate or custom pools, a {@code
50     * ForkJoinPool} may be constructed with a given target parallelism
51 jsr166 1.214 * level; by default, equal to the number of available processors.
52     * The pool attempts to maintain enough active (or available) threads
53     * by dynamically adding, suspending, or resuming internal worker
54 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
55     * However, no such adjustments are guaranteed in the face of blocked
56     * I/O or other unmanaged synchronization. The nested {@link
57 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
58 dl 1.300 * synchronization accommodated. The default policies may be
59     * overridden using a constructor with parameters corresponding to
60     * those documented in class {@link ThreadPoolExecutor}.
61 jsr166 1.1 *
62     * <p>In addition to execution and lifecycle control methods, this
63     * class provides status check methods (for example
64 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
65 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
66 jsr166 1.4 * {@link #toString} returns indications of pool state in a
67 jsr166 1.1 * convenient form for informal monitoring.
68     *
69 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
70 jsr166 1.84 * main task execution methods summarized in the following table.
71     * These are designed to be used primarily by clients not already
72     * engaged in fork/join computations in the current pool. The main
73     * forms of these methods accept instances of {@code ForkJoinTask},
74     * but overloaded forms also allow mixed execution of plain {@code
75     * Runnable}- or {@code Callable}- based activities as well. However,
76     * tasks that are already executing in a pool should normally instead
77     * use the within-computation forms listed in the table unless using
78     * async event-style tasks that are not usually joined, in which case
79     * there is little difference among choice of methods.
80 dl 1.18 *
81     * <table BORDER CELLPADDING=3 CELLSPACING=1>
82 jsr166 1.159 * <caption>Summary of task execution methods</caption>
83 dl 1.18 * <tr>
84     * <td></td>
85     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
86     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
87     * </tr>
88     * <tr>
89 jsr166 1.153 * <td> <b>Arrange async execution</b></td>
90 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
91     * <td> {@link ForkJoinTask#fork}</td>
92     * </tr>
93     * <tr>
94 jsr166 1.153 * <td> <b>Await and obtain result</b></td>
95 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
96     * <td> {@link ForkJoinTask#invoke}</td>
97     * </tr>
98     * <tr>
99 jsr166 1.153 * <td> <b>Arrange exec and obtain Future</b></td>
100 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
101     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
102     * </tr>
103     * </table>
104 dl 1.19 *
105 dl 1.105 * <p>The common pool is by default constructed with default
106 jsr166 1.155 * parameters, but these may be controlled by setting three
107 jsr166 1.162 * {@linkplain System#getProperty system properties}:
108     * <ul>
109     * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
110     * - the parallelism level, a non-negative integer
111     * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
112     * - the class name of a {@link ForkJoinWorkerThreadFactory}
113     * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
114     * - the class name of a {@link UncaughtExceptionHandler}
115 dl 1.208 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
116 dl 1.223 * - the maximum number of allowed extra threads to maintain target
117 dl 1.208 * parallelism (default 256).
118 jsr166 1.162 * </ul>
119 dl 1.197 * If a {@link SecurityManager} is present and no factory is
120     * specified, then the default pool uses a factory supplying
121     * threads that have no {@link Permissions} enabled.
122 jsr166 1.165 * The system class loader is used to load these classes.
123 jsr166 1.156 * Upon any error in establishing these settings, default parameters
124 dl 1.160 * are used. It is possible to disable or limit the use of threads in
125     * the common pool by setting the parallelism property to zero, and/or
126 dl 1.193 * using a factory that may return {@code null}. However doing so may
127     * cause unjoined tasks to never be executed.
128 dl 1.105 *
129 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
130     * maximum number of running threads to 32767. Attempts to create
131 jsr166 1.11 * pools with greater than the maximum number result in
132 jsr166 1.8 * {@code IllegalArgumentException}.
133 jsr166 1.1 *
134 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
135 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
136 dl 1.20 * or internal resources have been exhausted.
137 jsr166 1.11 *
138 jsr166 1.1 * @since 1.7
139     * @author Doug Lea
140     */
141     public class ForkJoinPool extends AbstractExecutorService {
142    
143     /*
144 dl 1.14 * Implementation Overview
145     *
146 dl 1.78 * This class and its nested classes provide the main
147     * functionality and control for a set of worker threads:
148 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
149     * Workers take these tasks and typically split them into subtasks
150     * that may be stolen by other workers. Preference rules give
151     * first priority to processing tasks from their own queues (LIFO
152     * or FIFO, depending on mode), then to randomized FIFO steals of
153 dl 1.200 * tasks in other queues. This framework began as vehicle for
154     * supporting tree-structured parallelism using work-stealing.
155     * Over time, its scalability advantages led to extensions and
156 dl 1.208 * changes to better support more diverse usage contexts. Because
157     * most internal methods and nested classes are interrelated,
158     * their main rationale and descriptions are presented here;
159     * individual methods and nested classes contain only brief
160     * comments about details.
161 dl 1.78 *
162 jsr166 1.84 * WorkQueues
163 dl 1.78 * ==========
164     *
165     * Most operations occur within work-stealing queues (in nested
166     * class WorkQueue). These are special forms of Deques that
167     * support only three of the four possible end-operations -- push,
168     * pop, and poll (aka steal), under the further constraints that
169     * push and pop are called only from the owning thread (or, as
170     * extended here, under a lock), while poll may be called from
171     * other threads. (If you are unfamiliar with them, you probably
172     * want to read Herlihy and Shavit's book "The Art of
173     * Multiprocessor programming", chapter 16 describing these in
174     * more detail before proceeding.) The main work-stealing queue
175     * design is roughly similar to those in the papers "Dynamic
176     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
177     * (http://research.sun.com/scalable/pubs/index.html) and
178     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
179     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
180 dl 1.200 * The main differences ultimately stem from GC requirements that
181     * we null out taken slots as soon as we can, to maintain as small
182     * a footprint as possible even in programs generating huge
183     * numbers of tasks. To accomplish this, we shift the CAS
184     * arbitrating pop vs poll (steal) from being on the indices
185     * ("base" and "top") to the slots themselves.
186     *
187 dl 1.243 * Adding tasks then takes the form of a classic array push(task)
188     * in a circular buffer:
189     * q.array[q.top++ % length] = task;
190 dl 1.200 *
191     * (The actual code needs to null-check and size-check the array,
192 jsr166 1.247 * uses masking, not mod, for indexing a power-of-two-sized array,
193 dl 1.243 * properly fences accesses, and possibly signals waiting workers
194     * to start scanning -- see below.) Both a successful pop and
195     * poll mainly entail a CAS of a slot from non-null to null.
196 dl 1.200 *
197 jsr166 1.202 * The pop operation (always performed by owner) is:
198 dl 1.243 * if ((the task at top slot is not null) and
199 dl 1.200 * (CAS slot to null))
200     * decrement top and return task;
201     *
202     * And the poll operation (usually by a stealer) is
203 dl 1.243 * if ((the task at base slot is not null) and
204 dl 1.200 * (CAS slot to null))
205     * increment base and return task;
206     *
207 dl 1.300 * There are several variants of each of these. In particular,
208     * almost all uses of poll occur within scan operations that also
209     * interleave contention tracking (with associated code sprawl.)
210 dl 1.243 *
211     * Memory ordering. See "Correct and Efficient Work-Stealing for
212     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
213     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
214     * analysis of memory ordering requirements in work-stealing
215     * algorithms similar to (but different than) the one used here.
216     * Extracting tasks in array slots via (fully fenced) CAS provides
217     * primary synchronization. The base and top indices imprecisely
218     * guide where to extract from. We do not always require strict
219     * orderings of array and index updates, so sometimes let them be
220     * subject to compiler and processor reorderings. However, the
221     * volatile "base" index also serves as a basis for memory
222     * ordering: Slot accesses are preceded by a read of base,
223 jsr166 1.247 * ensuring happens-before ordering with respect to stealers (so
224 dl 1.243 * the slots themselves can be read via plain array reads.) The
225     * only other memory orderings relied on are maintained in the
226     * course of signalling and activation (see below). A check that
227     * base == top indicates (momentary) emptiness, but otherwise may
228     * err on the side of possibly making the queue appear nonempty
229     * when a push, pop, or poll have not fully committed, or making
230     * it appear empty when an update of top has not yet been visibly
231     * written. (Method isEmpty() checks the case of a partially
232 dl 1.211 * completed removal of the last element.) Because of this, the
233     * poll operation, considered individually, is not wait-free. One
234     * thief cannot successfully continue until another in-progress
235 dl 1.243 * one (or, if previously empty, a push) visibly completes.
236     * However, in the aggregate, we ensure at least probabilistic
237     * non-blockingness. If an attempted steal fails, a scanning
238     * thief chooses a different random victim target to try next. So,
239     * in order for one thief to progress, it suffices for any
240 dl 1.205 * in-progress poll or new push on any empty queue to
241 dl 1.300 * complete.
242 dl 1.200 *
243     * This approach also enables support of a user mode in which
244     * local task processing is in FIFO, not LIFO order, simply by
245     * using poll rather than pop. This can be useful in
246     * message-passing frameworks in which tasks are never joined.
247 dl 1.78 *
248     * WorkQueues are also used in a similar way for tasks submitted
249     * to the pool. We cannot mix these tasks in the same queues used
250 dl 1.200 * by workers. Instead, we randomly associate submission queues
251 dl 1.83 * with submitting threads, using a form of hashing. The
252 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
253     * choosing existing queues, and may be randomly repositioned upon
254     * contention with other submitters. In essence, submitters act
255     * like workers except that they are restricted to executing local
256 dl 1.300 * tasks that they submitted. Insertion of tasks in shared mode
257     * requires a lock but we use only a simple spinlock (using field
258 jsr166 1.304 * phase), because submitters encountering a busy queue move to a
259     * different position to use or create other queues -- they block
260     * only when creating and registering new queues. Because it is
261     * used only as a spinlock, unlocking requires only a "releasing"
262     * store (using putOrderedInt).
263 dl 1.78 *
264 jsr166 1.84 * Management
265 dl 1.78 * ==========
266 dl 1.52 *
267     * The main throughput advantages of work-stealing stem from
268     * decentralized control -- workers mostly take tasks from
269 dl 1.200 * themselves or each other, at rates that can exceed a billion
270     * per second. The pool itself creates, activates (enables
271     * scanning for and running tasks), deactivates, blocks, and
272     * terminates threads, all with minimal central information.
273     * There are only a few properties that we can globally track or
274     * maintain, so we pack them into a small number of variables,
275     * often maintaining atomicity without blocking or locking.
276 dl 1.300 * Nearly all essentially atomic control state is held in a few
277 dl 1.200 * volatile variables that are by far most often read (not
278 dl 1.300 * written) as status and consistency checks. We pack as much
279     * information into them as we can.
280 dl 1.78 *
281 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
282 dl 1.300 * atomically decide to add, enqueue (on an event queue), and
283     * dequeue (and release)-activate workers. To enable this
284 dl 1.78 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
285     * far in excess of normal operating range) to allow ids, counts,
286     * and their negations (used for thresholding) to fit into 16bit
287 dl 1.215 * subfields.
288     *
289 dl 1.300 * Field "mode" holds configuration parameters as well as lifetime
290     * status, atomically and monotonically setting SHUTDOWN, STOP,
291     * and finally TERMINATED bits.
292 dl 1.258 *
293     * Field "workQueues" holds references to WorkQueues. It is
294 dl 1.300 * updated (only during worker creation and termination) under
295     * lock (using field workerNamePrefix as lock), but is otherwise
296     * concurrently readable, and accessed directly. We also ensure
297     * that uses of the array reference itself never become too stale
298     * in case of resizing. To simplify index-based operations, the
299     * array size is always a power of two, and all readers must
300     * tolerate null slots. Worker queues are at odd indices. Shared
301     * (submission) queues are at even indices, up to a maximum of 64
302     * slots, to limit growth even if array needs to expand to add
303     * more workers. Grouping them together in this way simplifies and
304 dl 1.262 * speeds up task scanning.
305 dl 1.86 *
306     * All worker thread creation is on-demand, triggered by task
307     * submissions, replacement of terminated workers, and/or
308 dl 1.78 * compensation for blocked workers. However, all other support
309     * code is set up to work with other policies. To ensure that we
310 jsr166 1.264 * do not hold on to worker references that would prevent GC, all
311 dl 1.78 * accesses to workQueues are via indices into the workQueues
312     * array (which is one source of some of the messy code
313     * constructions here). In essence, the workQueues array serves as
314 dl 1.200 * a weak reference mechanism. Thus for example the stack top
315     * subfield of ctl stores indices, not references.
316     *
317     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318     * cannot let workers spin indefinitely scanning for tasks when
319     * none can be found immediately, and we cannot start/resume
320     * workers unless there appear to be tasks available. On the
321     * other hand, we must quickly prod them into action when new
322     * tasks are submitted or generated. In many usages, ramp-up time
323 dl 1.300 * is the main limiting factor in overall performance, which is
324     * compounded at program start-up by JIT compilation and
325     * allocation. So we streamline this as much as possible.
326     *
327     * The "ctl" field atomically maintains total worker and
328     * "released" worker counts, plus the head of the available worker
329     * queue (actually stack, represented by the lower 32bit subfield
330     * of ctl). Released workers are those known to be scanning for
331     * and/or running tasks. Unreleased ("available") workers are
332     * recorded in the ctl stack. These workers are made available for
333     * signalling by enqueuing in ctl (see method runWorker). The
334     * "queue" is a form of Treiber stack. This is ideal for
335     * activating threads in most-recently used order, and improves
336 dl 1.200 * performance and locality, outweighing the disadvantages of
337     * being prone to contention and inability to release a worker
338 dl 1.300 * unless it is topmost on stack. To avoid missed signal problems
339     * inherent in any wait/signal design, available workers rescan
340     * for (and if found run) tasks after enqueuing. Normally their
341     * release status will be updated while doing so, but the released
342     * worker ctl count may underestimate the number of active
343     * threads. (However, it is still possible to determine quiescence
344     * via a validation traversal -- see isQuiescent). After an
345     * unsuccessful rescan, available workers are blocked until
346     * signalled (see signalWork). The top stack state holds the
347     * value of the "phase" field of the worker: its index and status,
348     * plus a version counter that, in addition to the count subfields
349     * (also serving as version stamps) provide protection against
350     * Treiber stack ABA effects.
351 dl 1.200 *
352 dl 1.300 * Creating workers. To create a worker, we pre-increment counts
353     * (serving as a reservation), and attempt to construct a
354 dl 1.200 * ForkJoinWorkerThread via its factory. Upon construction, the
355     * new thread invokes registerWorker, where it constructs a
356     * WorkQueue and is assigned an index in the workQueues array
357 jsr166 1.266 * (expanding the array if necessary). The thread is then started.
358     * Upon any exception across these steps, or null return from
359     * factory, deregisterWorker adjusts counts and records
360 dl 1.200 * accordingly. If a null return, the pool continues running with
361     * fewer than the target number workers. If exceptional, the
362     * exception is propagated, generally to some external caller.
363     * Worker index assignment avoids the bias in scanning that would
364     * occur if entries were sequentially packed starting at the front
365     * of the workQueues array. We treat the array as a simple
366     * power-of-two hash table, expanding as needed. The seedIndex
367     * increment ensures no collisions until a resize is needed or a
368     * worker is deregistered and replaced, and thereafter keeps
369 jsr166 1.202 * probability of collision low. We cannot use
370 dl 1.200 * ThreadLocalRandom.getProbe() for similar purposes here because
371     * the thread has not started yet, but do so for creating
372 dl 1.243 * submission queues for existing external threads (see
373     * externalPush).
374     *
375 dl 1.300 * WorkQueue field "phase" is used by both workers and the pool to
376     * manage and track whether a worker is UNSIGNALLED (possibly
377     * blocked waiting for a signal). When a worker is enqueued its
378     * phase field is set. Note that phase field updates lag queue CAS
379     * releases so usage requires care -- seeing a negative phase does
380     * not guarantee that the worker is available. When queued, the
381     * lower 16 bits of scanState must hold its pool index. So we
382     * place the index there upon initialization (see registerWorker)
383     * and otherwise keep it there or restore it when necessary.
384 dl 1.243 *
385     * The ctl field also serves as the basis for memory
386     * synchronization surrounding activation. This uses a more
387     * efficient version of a Dekker-like rule that task producers and
388     * consumers sync with each other by both writing/CASing ctl (even
389 dl 1.253 * if to its current value). This would be extremely costly. So
390     * we relax it in several ways: (1) Producers only signal when
391     * their queue is empty. Other workers propagate this signal (in
392 dl 1.300 * method scan) when they find tasks; to further reduce flailing,
393     * each worker signals only one other per activation. (2) Workers
394     * only enqueue after scanning (see below) and not finding any
395     * tasks. (3) Rather than CASing ctl to its current value in the
396     * common case where no action is required, we reduce write
397     * contention by equivalently prefacing signalWork when called by
398     * an external task producer using a memory access with
399     * full-volatile semantics or a "fullFence".
400 dl 1.243 *
401 jsr166 1.249 * Almost always, too many signals are issued. A task producer
402 dl 1.243 * cannot in general tell if some existing worker is in the midst
403     * of finishing one task (or already scanning) and ready to take
404     * another without being signalled. So the producer might instead
405     * activate a different worker that does not find any work, and
406     * then inactivates. This scarcely matters in steady-state
407     * computations involving all workers, but can create contention
408 jsr166 1.247 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
409 dl 1.243 * computations involving only a few workers.
410     *
411 dl 1.300 * Scanning. Method runWorker performs top-level scanning for
412     * tasks. Each scan traverses and tries to poll from each queue
413     * starting at a random index and circularly stepping. Scans are
414     * not performed in ideal random permutation order, to reduce
415     * cacheline contention. The pseudorandom generator need not have
416     * high-quality statistical properties in the long term, but just
417     * within computations; We use Marsaglia XorShifts (often via
418     * ThreadLocalRandom.nextSecondarySeed), which are cheap and
419     * suffice. Scanning also employs contention reduction: When
420     * scanning workers fail to extract an apparently existing task,
421     * they soon restart at a different pseudorandom index. This
422     * improves throughput when many threads are trying to take tasks
423     * from few queues, which can be common in some usages. Scans do
424     * not otherwise explicitly take into account core affinities,
425     * loads, cache localities, etc, However, they do exploit temporal
426     * locality (which usually approximates these) by preferring to
427     * re-poll (at most #workers times) from the same queue after a
428     * successful poll before trying others.
429 dl 1.52 *
430     * Trimming workers. To release resources after periods of lack of
431     * use, a worker starting to wait when the pool is quiescent will
432 dl 1.300 * time out and terminate (see method scan) if the pool has
433     * remained quiescent for period given by field keepAlive.
434 dl 1.52 *
435 dl 1.210 * Shutdown and Termination. A call to shutdownNow invokes
436     * tryTerminate to atomically set a runState bit. The calling
437     * thread, as well as every other worker thereafter terminating,
438 dl 1.300 * helps terminate others by cancelling their unprocessed tasks,
439     * and waking them up, doing so repeatedly until stable. Calls to
440     * non-abrupt shutdown() preface this by checking whether
441     * termination should commence by sweeping through queues (until
442     * stable) to ensure lack of in-flight submissions and workers
443     * about to process them before triggering the "STOP" phase of
444     * termination.
445 dl 1.211 *
446 jsr166 1.84 * Joining Tasks
447     * =============
448 dl 1.78 *
449     * Any of several actions may be taken when one worker is waiting
450 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
451 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
452 dl 1.300 * always just let them block (as in Thread.join). We also cannot
453     * just reassign the joiner's run-time stack with another and
454     * replace it later, which would be a form of "continuation", that
455     * even if possible is not necessarily a good idea since we may
456     * need both an unblocked task and its continuation to progress.
457     * Instead we combine two tactics:
458 dl 1.19 *
459     * Helping: Arranging for the joiner to execute some task that it
460 dl 1.78 * would be running if the steal had not occurred.
461 dl 1.19 *
462     * Compensating: Unless there are already enough live threads,
463 dl 1.78 * method tryCompensate() may create or re-activate a spare
464     * thread to compensate for blocked joiners until they unblock.
465     *
466 dl 1.105 * A third form (implemented in tryRemoveAndExec) amounts to
467     * helping a hypothetical compensator: If we can readily tell that
468     * a possible action of a compensator is to steal and execute the
469     * task being joined, the joining thread can do so directly,
470 dl 1.300 * without the need for a compensation thread.
471 dl 1.52 *
472     * The ManagedBlocker extension API can't use helping so relies
473     * only on compensation in method awaitBlocker.
474 dl 1.19 *
475 dl 1.300 * The algorithm in awaitJoin entails a form of "linear helping".
476     * Each worker records (in field source) the id of the queue from
477     * which it last stole a task. The scan in method awaitJoin uses
478     * these markers to try to find a worker to help (i.e., steal back
479     * a task from and execute it) that could hasten completion of the
480     * actively joined task. Thus, the joiner executes a task that
481     * would be on its own local deque if the to-be-joined task had
482     * not been stolen. This is a conservative variant of the approach
483     * described in Wagner & Calder "Leapfrogging: a portable
484     * technique for implementing efficient futures" SIGPLAN Notices,
485     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
486     * mainly in that we only record queue ids, not full dependency
487     * links. This requires a linear scan of the workQueues array to
488     * locate stealers, but isolates cost to when it is needed, rather
489     * than adding to per-task overhead. Searches can fail to locate
490     * stealers GC stalls and the like delay recording sources.
491     * Further, even when accurately identified, stealers might not
492     * ever produce a task that the joiner can in turn help with. So,
493     * compensation is tried upon failure to find tasks to run.
494 dl 1.105 *
495 dl 1.300 * Compensation does not by default aim to keep exactly the target
496 dl 1.200 * parallelism number of unblocked threads running at any given
497     * time. Some previous versions of this class employed immediate
498     * compensations for any blocked join. However, in practice, the
499     * vast majority of blockages are transient byproducts of GC and
500     * other JVM or OS activities that are made worse by replacement.
501 dl 1.300 * Rather than impose arbitrary policies, we allow users to
502     * override the default of only adding threads upon apparent
503     * starvation. The compensation mechanism may also be bounded.
504     * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
505     * JVMs to cope with programming errors and abuse before running
506     * out of resources to do so.
507 jsr166 1.301 *
508 dl 1.105 * Common Pool
509     * ===========
510     *
511 jsr166 1.175 * The static common pool always exists after static
512 dl 1.105 * initialization. Since it (or any other created pool) need
513     * never be used, we minimize initial construction overhead and
514 dl 1.300 * footprint to the setup of about a dozen fields.
515 dl 1.105 *
516     * When external threads submit to the common pool, they can
517 dl 1.200 * perform subtask processing (see externalHelpComplete and
518     * related methods) upon joins. This caller-helps policy makes it
519     * sensible to set common pool parallelism level to one (or more)
520     * less than the total number of available cores, or even zero for
521     * pure caller-runs. We do not need to record whether external
522     * submissions are to the common pool -- if not, external help
523     * methods return quickly. These submitters would otherwise be
524     * blocked waiting for completion, so the extra effort (with
525     * liberally sprinkled task status checks) in inapplicable cases
526     * amounts to an odd form of limited spin-wait before blocking in
527     * ForkJoinTask.join.
528 dl 1.105 *
529 dl 1.197 * As a more appropriate default in managed environments, unless
530     * overridden by system properties, we use workers of subclass
531     * InnocuousForkJoinWorkerThread when there is a SecurityManager
532     * present. These workers have no permissions set, do not belong
533     * to any user-defined ThreadGroup, and erase all ThreadLocals
534 dl 1.300 * after executing any top-level task (see
535     * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
536     * in ForkJoinWorkerThread) may be JVM-dependent and must access
537     * particular Thread class fields to achieve this effect.
538 jsr166 1.198 *
539 dl 1.105 * Style notes
540     * ===========
541     *
542 dl 1.200 * Memory ordering relies mainly on Unsafe intrinsics that carry
543     * the further responsibility of explicitly performing null- and
544     * bounds- checks otherwise carried out implicitly by JVMs. This
545     * can be awkward and ugly, but also reflects the need to control
546     * outcomes across the unusual cases that arise in very racy code
547     * with very few invariants. So these explicit checks would exist
548     * in some form anyway. All fields are read into locals before
549     * use, and null-checked if they are references. This is usually
550     * done in a "C"-like style of listing declarations at the heads
551     * of methods or blocks, and using inline assignments on first
552     * encounter. Array bounds-checks are usually performed by
553     * masking with array.length-1, which relies on the invariant that
554     * these arrays are created with positive lengths, which is itself
555     * paranoically checked. Nearly all explicit checks lead to
556     * bypass/return, not exception throws, because they may
557     * legitimately arise due to cancellation/revocation during
558     * shutdown.
559     *
560 dl 1.105 * There is a lot of representation-level coupling among classes
561     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
562     * fields of WorkQueue maintain data structures managed by
563     * ForkJoinPool, so are directly accessed. There is little point
564     * trying to reduce this, since any associated future changes in
565     * representations will need to be accompanied by algorithmic
566     * changes anyway. Several methods intrinsically sprawl because
567 dl 1.200 * they must accumulate sets of consistent reads of fields held in
568     * local variables. There are also other coding oddities
569     * (including several unnecessary-looking hoisted null checks)
570     * that help some methods perform reasonably even when interpreted
571     * (not compiled).
572 dl 1.52 *
573 dl 1.208 * The order of declarations in this file is (with a few exceptions):
574 dl 1.86 * (1) Static utility functions
575     * (2) Nested (static) classes
576     * (3) Static fields
577     * (4) Fields, along with constants used when unpacking some of them
578     * (5) Internal control methods
579     * (6) Callbacks and other support for ForkJoinTask methods
580     * (7) Exported methods
581     * (8) Static block initializing statics in minimally dependent order
582     */
583    
584     // Static utilities
585    
586     /**
587     * If there is a security manager, makes sure caller has
588     * permission to modify threads.
589 jsr166 1.1 */
590 dl 1.86 private static void checkPermission() {
591     SecurityManager security = System.getSecurityManager();
592     if (security != null)
593     security.checkPermission(modifyThreadPermission);
594     }
595    
596     // Nested classes
597 jsr166 1.1
598     /**
599 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
600     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
601     * for {@code ForkJoinWorkerThread} subclasses that extend base
602     * functionality or initialize threads with different contexts.
603 jsr166 1.1 */
604     public static interface ForkJoinWorkerThreadFactory {
605     /**
606     * Returns a new worker thread operating in the given pool.
607 dl 1.300 * Returning null or throwing an exception may result in tasks
608     * never being executed. If this method throws an exception,
609     * it is relayed to the caller of the method (for example
610     * {@code execute}) causing attempted thread creation. If this
611     * method returns null or throws an exception, it is not
612     * retried until the next attempted creation (for example
613     * another call to {@code execute}).
614 jsr166 1.1 *
615     * @param pool the pool this thread works in
616 jsr166 1.296 * @return the new worker thread, or {@code null} if the request
617 dl 1.300 * to create a thread is rejected.
618 jsr166 1.11 * @throws NullPointerException if the pool is null
619 jsr166 1.1 */
620     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
621     }
622    
623     /**
624     * Default ForkJoinWorkerThreadFactory implementation; creates a
625     * new ForkJoinWorkerThread.
626     */
627 jsr166 1.278 private static final class DefaultForkJoinWorkerThreadFactory
628 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
629 dl 1.112 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
630 dl 1.14 return new ForkJoinWorkerThread(pool);
631 jsr166 1.1 }
632     }
633    
634 dl 1.200 // Constants shared across ForkJoinPool and WorkQueue
635    
636     // Bounds
637 dl 1.300 static final int SWIDTH = 16; // width of short
638 dl 1.200 static final int SMASK = 0xffff; // short bits == max index
639     static final int MAX_CAP = 0x7fff; // max #workers - 1
640     static final int SQMASK = 0x007e; // max 64 (even) slots
641    
642 dl 1.300 // Masks and units for WorkQueue.phase and ctl sp subfield
643 dl 1.243 static final int UNSIGNALLED = 1 << 31; // must be negative
644 dl 1.211 static final int SS_SEQ = 1 << 16; // version count
645 dl 1.300 static final int QLOCK = 1; // must be 1
646 dl 1.200
647 dl 1.300 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
648     static final int OWNED = 1; // queue has owner thread
649     static final int FIFO = 1 << 16; // fifo queue or access mode
650     static final int SATURATE = 1 << 17; // for tryCompensate
651     static final int SHUTDOWN = 1 << 18;
652     static final int TERMINATED = 1 << 19;
653     static final int STOP = 1 << 31; // must be negative
654     static final int QUIET = 1 << 30; // not scanning or working
655     static final int DORMANT = QUIET | UNSIGNALLED;
656    
657     /**
658     * The maximum number of local polls from the same queue before
659     * checking others. This is a safeguard against infinitely unfair
660     * looping under unbounded user task recursion, and must be larger
661     * than plausible cases of intentional bounded task recursion.
662 dl 1.253 */
663 dl 1.300 static final int POLL_LIMIT = 1 << 10;
664 dl 1.253
665     /**
666 dl 1.78 * Queues supporting work-stealing as well as external task
667 jsr166 1.202 * submission. See above for descriptions and algorithms.
668 dl 1.78 * Performance on most platforms is very sensitive to placement of
669     * instances of both WorkQueues and their arrays -- we absolutely
670     * do not want multiple WorkQueue instances or multiple queue
671 dl 1.200 * arrays sharing cache lines. The @Contended annotation alerts
672     * JVMs to try to keep instances apart.
673 dl 1.78 */
674 dl 1.300 // For now, using manual padding.
675     // @jdk.internal.vm.annotation.Contended
676     // @sun.misc.Contended
677 dl 1.78 static final class WorkQueue {
678 dl 1.200
679 dl 1.78 /**
680     * Capacity of work-stealing queue array upon initialization.
681 dl 1.90 * Must be a power of two; at least 4, but should be larger to
682     * reduce or eliminate cacheline sharing among queues.
683     * Currently, it is much larger, as a partial workaround for
684     * the fact that JVMs often place arrays in locations that
685     * share GC bookkeeping (especially cardmarks) such that
686     * per-write accesses encounter serious memory contention.
687 dl 1.78 */
688 dl 1.90 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
689 dl 1.78
690     /**
691     * Maximum size for queue arrays. Must be a power of two less
692     * than or equal to 1 << (31 - width of array entry) to ensure
693     * lack of wraparound of index calculations, but defined to a
694     * value a bit less than this to help users trap runaway
695     * programs before saturating systems.
696     */
697     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
698    
699 dl 1.200 // Instance fields
700 dl 1.300 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
701     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
702     volatile int phase; // versioned, negative: queued, 1: locked
703     int stackPred; // pool stack (ctl) predecessor link
704 dl 1.178 int nsteals; // number of steals
705 dl 1.300 int id; // index, mode, tag
706     volatile int source; // source queue id, or sentinel
707 dl 1.78 volatile int base; // index of next slot for poll
708     int top; // index of next slot for push
709     ForkJoinTask<?>[] array; // the elements (initially unallocated)
710 dl 1.90 final ForkJoinPool pool; // the containing pool (may be null)
711 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
712 dl 1.300 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
713     volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e, pad1f;
714 dl 1.112
715 dl 1.200 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
716 dl 1.90 this.pool = pool;
717 dl 1.78 this.owner = owner;
718 dl 1.115 // Place indices in the center of array (that is not yet allocated)
719 dl 1.78 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
720     }
721    
722     /**
723 jsr166 1.220 * Returns an exportable index (used by ForkJoinWorkerThread).
724 dl 1.200 */
725     final int getPoolIndex() {
726 dl 1.300 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
727 dl 1.200 }
728    
729     /**
730 dl 1.115 * Returns the approximate number of tasks in the queue.
731     */
732     final int queueSize() {
733 dl 1.243 int n = base - top; // read base first
734 dl 1.115 return (n >= 0) ? 0 : -n; // ignore transient negative
735     }
736    
737 jsr166 1.180 /**
738 dl 1.115 * Provides a more accurate estimate of whether this queue has
739     * any tasks than does queueSize, by checking whether a
740     * near-empty queue has at least one unclaimed task.
741     */
742     final boolean isEmpty() {
743 dl 1.300 ForkJoinTask<?>[] a; int n, al, b;
744     return ((n = (b = base) - top) >= 0 || // possibly one task
745 dl 1.243 (n == -1 && ((a = array) == null ||
746     (al = a.length) == 0 ||
747 dl 1.300 a[(al - 1) & b] == null)));
748 dl 1.115 }
749    
750 dl 1.300
751 dl 1.115 /**
752 dl 1.256 * Pushes a task. Call only by owner in unshared queues.
753 dl 1.78 *
754     * @param task the task. Caller must ensure non-null.
755 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
756 dl 1.78 */
757 dl 1.90 final void push(ForkJoinTask<?> task) {
758 dl 1.300 int s = top; ForkJoinTask<?>[] a; int al, d;
759 dl 1.243 if ((a = array) != null && (al = a.length) > 0) {
760 dl 1.300 int index = (al - 1) & s;
761     long offset = ((long)index << ASHIFT) + ABASE;
762     ForkJoinPool p = pool;
763 dl 1.243 top = s + 1;
764 dl 1.300 U.putOrderedObject(a, offset, task);
765 dl 1.292 if ((d = base - s) == 0 && p != null) {
766 dl 1.284 U.fullFence();
767 dl 1.253 p.signalWork();
768 dl 1.284 }
769 dl 1.300 else if (d + al == 1)
770 dl 1.243 growArray();
771 dl 1.78 }
772     }
773    
774 dl 1.178 /**
775 dl 1.112 * Initializes or doubles the capacity of array. Call either
776     * by owner or with lock held -- it is OK for base, but not
777     * top, to move while resizings are in progress.
778     */
779     final ForkJoinTask<?>[] growArray() {
780     ForkJoinTask<?>[] oldA = array;
781 dl 1.300 int oldSize = oldA != null ? oldA.length : 0;
782     int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
783 dl 1.225 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
784 dl 1.112 throw new RejectedExecutionException("Queue capacity exceeded");
785     int oldMask, t, b;
786     ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
787 dl 1.300 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
788 dl 1.112 (t = top) - (b = base) > 0) {
789     int mask = size - 1;
790 dl 1.200 do { // emulate poll from old array, push to new array
791 dl 1.256 int index = b & oldMask;
792     long offset = ((long)index << ASHIFT) + ABASE;
793 dl 1.243 ForkJoinTask<?> x = (ForkJoinTask<?>)
794     U.getObjectVolatile(oldA, offset);
795     if (x != null &&
796     U.compareAndSwapObject(oldA, offset, x, null))
797     a[b & mask] = x;
798 dl 1.112 } while (++b != t);
799 dl 1.243 U.storeFence();
800 dl 1.78 }
801 dl 1.112 return a;
802 dl 1.78 }
803    
804     /**
805 dl 1.90 * Takes next task, if one exists, in LIFO order. Call only
806 dl 1.102 * by owner in unshared queues.
807 dl 1.90 */
808     final ForkJoinTask<?> pop() {
809 dl 1.243 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
810 dl 1.256 if ((a = array) != null && b != s && (al = a.length) > 0) {
811     int index = (al - 1) & --s;
812     long offset = ((long)index << ASHIFT) + ABASE;
813 dl 1.262 ForkJoinTask<?> t = (ForkJoinTask<?>)
814     U.getObject(a, offset);
815     if (t != null &&
816     U.compareAndSwapObject(a, offset, t, null)) {
817 dl 1.256 top = s;
818 dl 1.300 U.storeFence();
819 dl 1.78 return t;
820     }
821     }
822     return null;
823     }
824    
825     /**
826 dl 1.90 * Takes next task, if one exists, in FIFO order.
827 dl 1.78 */
828 dl 1.90 final ForkJoinTask<?> poll() {
829 dl 1.243 for (;;) {
830     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
831     if ((a = array) != null && (d = b - s) < 0 &&
832     (al = a.length) > 0) {
833 dl 1.256 int index = (al - 1) & b;
834     long offset = ((long)index << ASHIFT) + ABASE;
835 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
836     U.getObjectVolatile(a, offset);
837     if (b++ == base) {
838     if (t != null) {
839     if (U.compareAndSwapObject(a, offset, t, null)) {
840     base = b;
841     return t;
842     }
843 dl 1.200 }
844 dl 1.243 else if (d == -1)
845     break; // now empty
846 dl 1.78 }
847 dl 1.90 }
848 dl 1.243 else
849     break;
850 dl 1.78 }
851     return null;
852     }
853    
854     /**
855     * Takes next task, if one exists, in order specified by mode.
856     */
857     final ForkJoinTask<?> nextLocalTask() {
858 dl 1.300 return ((id & FIFO) != 0) ? poll() : pop();
859 dl 1.78 }
860    
861     /**
862     * Returns next task, if one exists, in order specified by mode.
863     */
864     final ForkJoinTask<?> peek() {
865 dl 1.292 int al; ForkJoinTask<?>[] a;
866 dl 1.243 return ((a = array) != null && (al = a.length) > 0) ?
867 dl 1.300 a[(al - 1) &
868     ((id & FIFO) != 0 ? base : top - 1)] : null;
869 dl 1.78 }
870    
871     /**
872     * Pops the given task only if it is at the current top.
873 jsr166 1.251 */
874 dl 1.243 final boolean tryUnpush(ForkJoinTask<?> task) {
875 dl 1.256 int b = base, s = top, al; ForkJoinTask<?>[] a;
876     if ((a = array) != null && b != s && (al = a.length) > 0) {
877     int index = (al - 1) & --s;
878     long offset = ((long)index << ASHIFT) + ABASE;
879     if (U.compareAndSwapObject(a, offset, task, null)) {
880 dl 1.243 top = s;
881 dl 1.300 U.storeFence();
882 dl 1.224 return true;
883     }
884 dl 1.78 }
885     return false;
886     }
887    
888     /**
889 jsr166 1.84 * Removes and cancels all known tasks, ignoring any exceptions.
890 dl 1.78 */
891     final void cancelAll() {
892 dl 1.300 for (ForkJoinTask<?> t; (t = poll()) != null; )
893 dl 1.78 ForkJoinTask.cancelIgnoringExceptions(t);
894     }
895    
896 dl 1.104 // Specialized execution methods
897 dl 1.78
898     /**
899 dl 1.300 * Pops and executes up to limit consecutive tasks or until empty.
900     *
901     * @param limit max runs, or zero for no limit
902 dl 1.253 */
903 dl 1.300 final void localPopAndExec(int limit) {
904     for (;;) {
905 dl 1.253 int b = base, s = top, al; ForkJoinTask<?>[] a;
906     if ((a = array) != null && b != s && (al = a.length) > 0) {
907 dl 1.256 int index = (al - 1) & --s;
908     long offset = ((long)index << ASHIFT) + ABASE;
909 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
910     U.getAndSetObject(a, offset, null);
911     if (t != null) {
912     top = s;
913 dl 1.300 U.storeFence();
914     t.doExec();
915     if (limit != 0 && --limit == 0)
916 dl 1.253 break;
917     }
918     else
919     break;
920     }
921     else
922     break;
923     }
924     }
925    
926     /**
927 dl 1.300 * Polls and executes up to limit consecutive tasks or until empty.
928     *
929     * @param limit, or zero for no limit
930 dl 1.253 */
931 dl 1.300 final void localPollAndExec(int limit) {
932     for (int polls = 0;;) {
933     int b = base, s = top, d, al; ForkJoinTask<?>[] a;
934     if ((a = array) != null && (d = b - s) < 0 &&
935     (al = a.length) > 0) {
936 dl 1.256 int index = (al - 1) & b++;
937     long offset = ((long)index << ASHIFT) + ABASE;
938 dl 1.253 ForkJoinTask<?> t = (ForkJoinTask<?>)
939     U.getAndSetObject(a, offset, null);
940     if (t != null) {
941     base = b;
942 dl 1.255 t.doExec();
943 dl 1.300 if (limit != 0 && ++polls == limit)
944 dl 1.253 break;
945     }
946 dl 1.300 else if (d == -1)
947     break; // now empty
948     else
949     polls = 0; // stolen; reset
950 dl 1.253 }
951     else
952     break;
953     }
954     }
955    
956     /**
957 jsr166 1.302 * If present, removes task from queue and executes it.
958 dl 1.94 */
959 dl 1.300 final void tryRemoveAndExec(ForkJoinTask<?> task) {
960     ForkJoinTask<?>[] wa; int s, wal;
961     if (base - (s = top) < 0 && // traverse from top
962     (wa = array) != null && (wal = wa.length) > 0) {
963     for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
964     int index = i & m;
965     long offset = (index << ASHIFT) + ABASE;
966     ForkJoinTask<?> t = (ForkJoinTask<?>)
967     U.getObject(wa, offset);
968     if (t == null)
969     break;
970     else if (t == task) {
971     if (U.compareAndSwapObject(wa, offset, t, null)) {
972     top = ns; // safely shift down
973     for (int j = i; j != ns; ++j) {
974     ForkJoinTask<?> f;
975     int pindex = (j + 1) & m;
976     long pOffset = (pindex << ASHIFT) + ABASE;
977     f = (ForkJoinTask<?>)U.getObject(wa, pOffset);
978     U.putObjectVolatile(wa, pOffset, null);
979    
980     int jindex = j & m;
981     long jOffset = (jindex << ASHIFT) + ABASE;
982     U.putOrderedObject(wa, jOffset, f);
983     }
984     U.storeFence();
985     t.doExec();
986     }
987     break;
988     }
989 dl 1.262 }
990 dl 1.215 }
991     }
992    
993     /**
994 dl 1.300 * Tries to steal and run tasks within the target's
995 jsr166 1.302 * computation until done, not found, or limit exceeded.
996 dl 1.94 *
997 dl 1.300 * @param task root of CountedCompleter computation
998     * @param limit max runs, or zero for no limit
999     * @return task status on exit
1000     */
1001     final int localHelpCC(CountedCompleter<?> task, int limit) {
1002     int status = 0;
1003     if (task != null && (status = task.status) >= 0) {
1004     for (;;) {
1005     boolean help = false;
1006     int b = base, s = top, al; ForkJoinTask<?>[] a;
1007     if ((a = array) != null && b != s && (al = a.length) > 0) {
1008     int index = (al - 1) & (s - 1);
1009     long offset = ((long)index << ASHIFT) + ABASE;
1010     ForkJoinTask<?> o = (ForkJoinTask<?>)
1011     U.getObject(a, offset);
1012     if (o instanceof CountedCompleter) {
1013     CountedCompleter<?> t = (CountedCompleter<?>)o;
1014     for (CountedCompleter<?> f = t;;) {
1015     if (f != task) {
1016     if ((f = f.completer) == null) // try parent
1017     break;
1018     }
1019     else {
1020     if (U.compareAndSwapObject(a, offset,
1021     t, null)) {
1022     top = s - 1;
1023     U.storeFence();
1024     t.doExec();
1025     help = true;
1026     }
1027     break;
1028 dl 1.200 }
1029     }
1030 dl 1.243 }
1031 dl 1.104 }
1032 dl 1.300 if ((status = task.status) < 0 || !help ||
1033     (limit != 0 && --limit == 0))
1034     break;
1035 dl 1.104 }
1036     }
1037 dl 1.300 return status;
1038     }
1039    
1040     // Operations on shared queues
1041    
1042     /**
1043 jsr166 1.302 * Tries to lock shared queue by CASing phase field.
1044 dl 1.300 */
1045     final boolean tryLockSharedQueue() {
1046     return U.compareAndSwapInt(this, PHASE, 0, QLOCK);
1047 dl 1.104 }
1048    
1049     /**
1050 dl 1.300 * Shared version of tryUnpush.
1051 dl 1.78 */
1052 dl 1.300 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1053     boolean popped = false;
1054     int s = top - 1, al; ForkJoinTask<?>[] a;
1055     if ((a = array) != null && (al = a.length) > 0) {
1056     int index = (al - 1) & s;
1057 dl 1.256 long offset = ((long)index << ASHIFT) + ABASE;
1058 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>) U.getObject(a, offset);
1059     if (t == task &&
1060     U.compareAndSwapInt(this, PHASE, 0, QLOCK)) {
1061     if (top == s + 1 && array == a &&
1062     U.compareAndSwapObject(a, offset, task, null)) {
1063     popped = true;
1064     top = s;
1065 dl 1.178 }
1066 dl 1.300 U.putOrderedInt(this, PHASE, 0);
1067 dl 1.94 }
1068 dl 1.78 }
1069 dl 1.300 return popped;
1070 dl 1.78 }
1071    
1072     /**
1073 dl 1.300 * Shared version of localHelpCC.
1074     */
1075     final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1076     int status = 0;
1077     if (task != null && (status = task.status) >= 0) {
1078     for (;;) {
1079     boolean help = false;
1080     int b = base, s = top, al; ForkJoinTask<?>[] a;
1081     if ((a = array) != null && b != s && (al = a.length) > 0) {
1082     int index = (al - 1) & (s - 1);
1083     long offset = ((long)index << ASHIFT) + ABASE;
1084     ForkJoinTask<?> o = (ForkJoinTask<?>)
1085     U.getObject(a, offset);
1086     if (o instanceof CountedCompleter) {
1087     CountedCompleter<?> t = (CountedCompleter<?>)o;
1088     for (CountedCompleter<?> f = t;;) {
1089     if (f != task) {
1090     if ((f = f.completer) == null)
1091     break;
1092     }
1093     else {
1094     if (U.compareAndSwapInt(this, PHASE,
1095     0, QLOCK)) {
1096     if (top == s && array == a &&
1097     U.compareAndSwapObject(a, offset,
1098     t, null)) {
1099     help = true;
1100     top = s - 1;
1101     }
1102     U.putOrderedInt(this, PHASE, 0);
1103     if (help)
1104     t.doExec();
1105     }
1106     break;
1107     }
1108 dl 1.243 }
1109 dl 1.200 }
1110 dl 1.178 }
1111 dl 1.300 if ((status = task.status) < 0 || !help ||
1112     (limit != 0 && --limit == 0))
1113     break;
1114 dl 1.178 }
1115 dl 1.78 }
1116 dl 1.300 return status;
1117 dl 1.78 }
1118    
1119     /**
1120 dl 1.86 * Returns true if owned and not known to be blocked.
1121     */
1122     final boolean isApparentlyUnblocked() {
1123     Thread wt; Thread.State s;
1124 dl 1.300 return ((wt = owner) != null &&
1125 dl 1.86 (s = wt.getState()) != Thread.State.BLOCKED &&
1126     s != Thread.State.WAITING &&
1127     s != Thread.State.TIMED_WAITING);
1128     }
1129    
1130 dl 1.211 // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1131 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1132 dl 1.300 private static final long PHASE;
1133 jsr166 1.291 private static final int ABASE;
1134     private static final int ASHIFT;
1135 dl 1.78 static {
1136     try {
1137 dl 1.300 PHASE = U.objectFieldOffset
1138     (WorkQueue.class.getDeclaredField("phase"));
1139 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
1140     int scale = U.arrayIndexScale(ForkJoinTask[].class);
1141 jsr166 1.142 if ((scale & (scale - 1)) != 0)
1142 jsr166 1.232 throw new Error("array index scale not a power of two");
1143 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1144 jsr166 1.231 } catch (ReflectiveOperationException e) {
1145 dl 1.78 throw new Error(e);
1146     }
1147     }
1148     }
1149 dl 1.14
1150 dl 1.112 // static fields (initialized in static initializer below)
1151    
1152     /**
1153     * Creates a new ForkJoinWorkerThread. This factory is used unless
1154     * overridden in ForkJoinPool constructors.
1155     */
1156     public static final ForkJoinWorkerThreadFactory
1157     defaultForkJoinWorkerThreadFactory;
1158    
1159 jsr166 1.1 /**
1160 dl 1.115 * Permission required for callers of methods that may start or
1161 dl 1.300 * kill threads.
1162 dl 1.115 */
1163 jsr166 1.276 static final RuntimePermission modifyThreadPermission;
1164 dl 1.115
1165     /**
1166 dl 1.101 * Common (static) pool. Non-null for public use unless a static
1167 dl 1.105 * construction exception, but internal usages null-check on use
1168     * to paranoically avoid potential initialization circularities
1169     * as well as to simplify generated code.
1170 dl 1.101 */
1171 dl 1.134 static final ForkJoinPool common;
1172 dl 1.101
1173     /**
1174 dl 1.160 * Common pool parallelism. To allow simpler use and management
1175     * when common pool threads are disabled, we allow the underlying
1176 dl 1.185 * common.parallelism field to be zero, but in that case still report
1177 dl 1.160 * parallelism as 1 to reflect resulting caller-runs mechanics.
1178 dl 1.90 */
1179 jsr166 1.274 static final int COMMON_PARALLELISM;
1180 dl 1.90
1181     /**
1182 dl 1.208 * Limit on spare thread construction in tryCompensate.
1183     */
1184 jsr166 1.273 private static final int COMMON_MAX_SPARES;
1185 dl 1.208
1186     /**
1187 dl 1.105 * Sequence number for creating workerNamePrefix.
1188 dl 1.86 */
1189 dl 1.105 private static int poolNumberSequence;
1190 dl 1.86
1191 jsr166 1.1 /**
1192 jsr166 1.132 * Returns the next sequence number. We don't expect this to
1193     * ever contend, so use simple builtin sync.
1194 dl 1.83 */
1195 dl 1.105 private static final synchronized int nextPoolId() {
1196     return ++poolNumberSequence;
1197     }
1198 dl 1.86
1199 dl 1.200 // static configuration constants
1200 dl 1.86
1201     /**
1202 dl 1.300 * Default idle timeout value (in milliseconds) for the thread
1203     * triggering quiescence to park waiting for new work
1204 dl 1.86 */
1205 dl 1.300 private static final long DEFAULT_KEEPALIVE = 60000L;
1206 dl 1.86
1207     /**
1208 dl 1.300 * Undershoot tolerance for idle timeouts
1209 dl 1.120 */
1210 dl 1.300 private static final long TIMEOUT_SLOP = 20L;
1211 dl 1.200
1212     /**
1213 jsr166 1.273 * The default value for COMMON_MAX_SPARES. Overridable using the
1214     * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1215     * property. The default value is far in excess of normal
1216     * requirements, but also far short of MAX_CAP and typical OS
1217     * thread limits, so allows JVMs to catch misuse/abuse before
1218     * running out of resources needed to do so.
1219 dl 1.200 */
1220 dl 1.208 private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1221 dl 1.120
1222     /**
1223 dl 1.90 * Increment for seed generators. See class ThreadLocal for
1224     * explanation.
1225     */
1226 dl 1.193 private static final int SEED_INCREMENT = 0x9e3779b9;
1227 dl 1.83
1228 jsr166 1.163 /*
1229 dl 1.200 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1230 dl 1.300 * RC: Number of released (unqueued) workers minus target parallelism
1231 dl 1.200 * TC: Number of total workers minus target parallelism
1232     * SS: version count and status of top waiting thread
1233     * ID: poolIndex of top of Treiber stack of waiters
1234     *
1235     * When convenient, we can extract the lower 32 stack top bits
1236     * (including version bits) as sp=(int)ctl. The offsets of counts
1237     * by the target parallelism and the positionings of fields makes
1238     * it possible to perform the most common checks via sign tests of
1239 dl 1.300 * fields: When ac is negative, there are not enough unqueued
1240 dl 1.200 * workers, when tc is negative, there are not enough total
1241     * workers. When sp is non-zero, there are waiting workers. To
1242     * deal with possibly negative fields, we use casts in and out of
1243     * "short" and/or signed shifts to maintain signedness.
1244     *
1245 dl 1.300 * Because it occupies uppermost bits, we can add one release count
1246     * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1247 dl 1.200 * from a blocked join. Other updates entail multiple subfields
1248     * and masking, requiring CAS.
1249 dl 1.300 *
1250     * The limits packed in field "bounds" are also offset by the
1251     * parallelism level to make them comparable to the ctl rc and tc
1252     * fields.
1253 dl 1.200 */
1254    
1255     // Lower and upper word masks
1256     private static final long SP_MASK = 0xffffffffL;
1257     private static final long UC_MASK = ~SP_MASK;
1258 dl 1.86
1259 dl 1.300 // Release counts
1260     private static final int RC_SHIFT = 48;
1261     private static final long RC_UNIT = 0x0001L << RC_SHIFT;
1262     private static final long RC_MASK = 0xffffL << RC_SHIFT;
1263 dl 1.200
1264     // Total counts
1265 dl 1.86 private static final int TC_SHIFT = 32;
1266 dl 1.200 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1267     private static final long TC_MASK = 0xffffL << TC_SHIFT;
1268     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1269    
1270 dl 1.300 // Instance fields
1271 dl 1.86
1272 dl 1.300 // Segregate ctl field, For now using padding vs @Contended
1273     // @jdk.internal.vm.annotation.Contended("fjpctl")
1274     // @sun.misc.Contended("fjpctl")
1275     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06, pad07;
1276     volatile long pad08, pad09, pad0a, pad0b, pad0c, pad0d, pad0e, pad0f;
1277 dl 1.200 volatile long ctl; // main pool control
1278 dl 1.300 volatile long pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1279     volatile long pad18, pad19, pad1a, pad1b, pad1c, pad1d, pad1e;
1280    
1281     volatile long stealCount; // collects worker nsteals
1282     final long keepAlive; // milliseconds before dropping if idle
1283     int indexSeed; // next worker index
1284     final int bounds; // min, max threads packed as shorts
1285     volatile int mode; // parallelism, runstate, queue mode
1286     WorkQueue[] workQueues; // main registry
1287     final String workerNamePrefix; // for worker thread string; sync lock
1288 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1289 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1290 dl 1.101
1291 dl 1.200 // Creating, registering and deregistering workers
1292    
1293 dl 1.112 /**
1294 dl 1.200 * Tries to construct and start one worker. Assumes that total
1295     * count has already been incremented as a reservation. Invokes
1296     * deregisterWorker on any failure.
1297     *
1298     * @return true if successful
1299 dl 1.115 */
1300 dl 1.300 private boolean createWorker() {
1301 dl 1.200 ForkJoinWorkerThreadFactory fac = factory;
1302     Throwable ex = null;
1303     ForkJoinWorkerThread wt = null;
1304     try {
1305     if (fac != null && (wt = fac.newThread(this)) != null) {
1306     wt.start();
1307     return true;
1308 dl 1.115 }
1309 dl 1.200 } catch (Throwable rex) {
1310     ex = rex;
1311 dl 1.112 }
1312 dl 1.200 deregisterWorker(wt, ex);
1313     return false;
1314 dl 1.112 }
1315    
1316 dl 1.200 /**
1317     * Tries to add one worker, incrementing ctl counts before doing
1318     * so, relying on createWorker to back out on failure.
1319     *
1320     * @param c incoming ctl value, with total count negative and no
1321     * idle workers. On CAS failure, c is refreshed and retried if
1322 jsr166 1.202 * this holds (otherwise, a new worker is not needed).
1323 dl 1.200 */
1324     private void tryAddWorker(long c) {
1325     do {
1326 dl 1.300 long nc = ((RC_MASK & (c + RC_UNIT)) |
1327 dl 1.200 (TC_MASK & (c + TC_UNIT)));
1328 dl 1.243 if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
1329 dl 1.300 createWorker();
1330 dl 1.243 break;
1331 dl 1.200 }
1332     } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1333     }
1334 dl 1.112
1335     /**
1336 dl 1.200 * Callback from ForkJoinWorkerThread constructor to establish and
1337     * record its WorkQueue.
1338 dl 1.112 *
1339     * @param wt the worker thread
1340 dl 1.115 * @return the worker's queue
1341 dl 1.112 */
1342 dl 1.115 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1343 dl 1.200 UncaughtExceptionHandler handler;
1344 dl 1.300 wt.setDaemon(true); // configure thread
1345 dl 1.115 if ((handler = ueh) != null)
1346     wt.setUncaughtExceptionHandler(handler);
1347 dl 1.200 WorkQueue w = new WorkQueue(this, wt);
1348 dl 1.300 int tid = 0; // for thread name
1349     int fifo = mode & FIFO;
1350     String prefix = workerNamePrefix;
1351     if (prefix != null) {
1352 jsr166 1.301 synchronized (prefix) {
1353 dl 1.300 WorkQueue[] ws = workQueues; int n;
1354     int s = indexSeed += SEED_INCREMENT;
1355     if (ws != null && (n = ws.length) > 1) {
1356     int m = n - 1;
1357     tid = s & m;
1358     int i = m & ((s << 1) | 1); // odd-numbered indices
1359     for (int probes = n >>> 1;;) { // find empty slot
1360     WorkQueue q;
1361     if ((q = ws[i]) == null || q.phase == QUIET)
1362     break;
1363     else if (--probes == 0) {
1364     i = n | 1; // resize below
1365     break;
1366     }
1367     else
1368     i = (i + 2) & m;
1369     }
1370    
1371     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1372     w.phase = w.id = id; // now publishable
1373    
1374     if (i < n)
1375     ws[i] = w;
1376     else { // expand array
1377     int an = n << 1;
1378     WorkQueue[] as = new WorkQueue[an];
1379     as[i] = w;
1380     int am = an - 1;
1381     for (int j = 0; j < n; ++j) {
1382     WorkQueue v; // copy external queue
1383     if ((v = ws[j]) != null) // position may change
1384     as[v.id & am & SQMASK] = v;
1385     if (++j >= n)
1386     break;
1387     as[j] = ws[j]; // copy worker
1388 dl 1.94 }
1389 dl 1.300 workQueues = as;
1390 dl 1.94 }
1391     }
1392 dl 1.78 }
1393 dl 1.300 wt.setName(prefix.concat(Integer.toString(tid)));
1394 dl 1.78 }
1395 dl 1.115 return w;
1396 dl 1.78 }
1397 dl 1.19
1398 jsr166 1.1 /**
1399 dl 1.86 * Final callback from terminating worker, as well as upon failure
1400 dl 1.105 * to construct or start a worker. Removes record of worker from
1401     * array, and adjusts counts. If pool is shutting down, tries to
1402     * complete termination.
1403 dl 1.78 *
1404 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1405 dl 1.78 * @param ex the exception causing failure, or null if none
1406 dl 1.45 */
1407 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1408     WorkQueue w = null;
1409 dl 1.300 int phase = 0;
1410 dl 1.78 if (wt != null && (w = wt.workQueue) != null) {
1411 dl 1.300 Object lock = workerNamePrefix;
1412     long ns = (long)w.nsteals & 0xffffffffL;
1413     int idx = w.id & SMASK;
1414     if (lock != null) {
1415     WorkQueue[] ws; // remove index from array
1416 jsr166 1.301 synchronized (lock) {
1417 dl 1.243 if ((ws = workQueues) != null && ws.length > idx &&
1418     ws[idx] == w)
1419     ws[idx] = null;
1420 dl 1.300 stealCount += ns;
1421 dl 1.243 }
1422     }
1423 dl 1.300 phase = w.phase;
1424 dl 1.243 }
1425 dl 1.300 if (phase != QUIET) { // else pre-adjusted
1426 dl 1.243 long c; // decrement counts
1427     do {} while (!U.compareAndSwapLong
1428 dl 1.300 (this, CTL, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1429 dl 1.243 (TC_MASK & (c - TC_UNIT)) |
1430     (SP_MASK & c))));
1431     }
1432 dl 1.300 if (w != null)
1433 dl 1.200 w.cancelAll(); // cancel remaining tasks
1434 dl 1.300
1435     if (!tryTerminate(false, false) && // possibly replace worker
1436     w != null && w.array != null) // avoid repeated failures
1437     signalWork();
1438    
1439 dl 1.200 if (ex == null) // help clean on way out
1440 dl 1.120 ForkJoinTask.helpExpungeStaleExceptions();
1441 dl 1.200 else // rethrow
1442 dl 1.104 ForkJoinTask.rethrow(ex);
1443 dl 1.78 }
1444 dl 1.52
1445 dl 1.19 /**
1446 dl 1.300 * Tries to create or release a worker if too few are running.
1447 dl 1.105 */
1448 dl 1.253 final void signalWork() {
1449 dl 1.243 for (;;) {
1450 dl 1.300 long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1451 dl 1.243 if ((c = ctl) >= 0L) // enough workers
1452     break;
1453     else if ((sp = (int)c) == 0) { // no idle workers
1454     if ((c & ADD_WORKER) != 0L) // too few workers
1455 dl 1.200 tryAddWorker(c);
1456     break;
1457     }
1458 dl 1.243 else if ((ws = workQueues) == null)
1459     break; // unstarted/terminated
1460     else if (ws.length <= (i = sp & SMASK))
1461     break; // terminated
1462     else if ((v = ws[i]) == null)
1463     break; // terminating
1464     else {
1465 dl 1.300 int np = sp & ~UNSIGNALLED;
1466     int vp = v.phase;
1467     long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1468     Thread vt = v.owner;
1469     if (sp == vp && U.compareAndSwapLong(this, CTL, c, nc)) {
1470     v.phase = np;
1471     if (v.source < 0)
1472     LockSupport.unpark(vt);
1473 dl 1.243 break;
1474     }
1475 dl 1.174 }
1476 dl 1.52 }
1477 dl 1.14 }
1478    
1479 dl 1.200 /**
1480 dl 1.300 * Tries to decrement counts (sometimes implicitly) and possibly
1481     * arrange for a compensating worker in preparation for blocking:
1482     * If not all core workers yet exist, creates one, else if any are
1483     * unreleased (possibly including caller) releases one, else if
1484     * fewer than the minimum allowed number of workers running,
1485     * checks to see that they are all active, and if so creates an
1486     * extra worker unless over maximum limit and policy is to
1487     * saturate. Most of these steps can fail due to interference, in
1488     * which case 0 is returned so caller will retry. A negative
1489     * return value indicates that the caller doesn't need to
1490     * re-adjust counts when later unblocked.
1491 dl 1.243 *
1492 dl 1.300 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1493 dl 1.243 */
1494 dl 1.300 private int tryCompensate(WorkQueue w) {
1495     int t, n, sp;
1496     long c = ctl;
1497     WorkQueue[] ws = workQueues;
1498     if ((t = (short)(c >> TC_SHIFT)) >= 0) {
1499     if (ws == null || (n = ws.length) <= 0 || w == null)
1500     return 0; // disabled
1501     else if ((sp = (int)c) != 0) { // replace or release
1502     WorkQueue v = ws[sp & (n - 1)];
1503     int wp = w.phase;
1504     long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1505     int np = sp & ~UNSIGNALLED;
1506     if (v != null) {
1507     int vp = v.phase;
1508     Thread vt = v.owner;
1509     long nc = ((long)v.stackPred & SP_MASK) | uc;
1510     if (vp == sp && U.compareAndSwapLong(this, CTL, c, nc)) {
1511     v.phase = np;
1512     if (v.source < 0)
1513     LockSupport.unpark(vt);
1514     return (wp < 0) ? -1 : 1;
1515     }
1516     }
1517     return 0;
1518     }
1519     else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1520     (short)(bounds & SMASK) > 0) {
1521     long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1522     return U.compareAndSwapLong(this, CTL, c, nc) ? 1 : 0;
1523     }
1524     else { // validate
1525     int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1526     boolean unstable = false;
1527     for (int i = 1; i < n; i += 2) {
1528     WorkQueue q; Thread wt; Thread.State ts;
1529     if ((q = ws[i]) != null) {
1530     if (q.source == 0) {
1531     unstable = true;
1532     break;
1533     }
1534     else {
1535     --tc;
1536     if ((wt = q.owner) != null &&
1537     ((ts = wt.getState()) == Thread.State.BLOCKED ||
1538     ts == Thread.State.WAITING))
1539     ++bc; // worker is blocking
1540     }
1541 dl 1.243 }
1542     }
1543 dl 1.300 if (unstable || tc != 0 || ctl != c)
1544     return 0; // inconsistent
1545     else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1546     if ((md & SATURATE) != 0)
1547     return -1;
1548     else if (bc < pc) { // lagging
1549     Thread.yield(); // for retry spins
1550     return 0;
1551 dl 1.243 }
1552     else
1553 dl 1.300 throw new RejectedExecutionException(
1554     "Thread limit exceeded replacing blocked worker");
1555 dl 1.200 }
1556 dl 1.177 }
1557 dl 1.243 }
1558 dl 1.300
1559     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1560     return U.compareAndSwapLong(this, CTL, c, nc) && createWorker() ? 1 : 0;
1561 dl 1.243 }
1562    
1563     /**
1564     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1565 dl 1.300 * See above for explanation.
1566 dl 1.243 */
1567     final void runWorker(WorkQueue w) {
1568 dl 1.300 WorkQueue[] ws;
1569 dl 1.243 w.growArray(); // allocate queue
1570 dl 1.300 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1571     if (r == 0) // initial nonzero seed
1572     r = 1;
1573     int lastSignalId = 0; // avoid unneeded signals
1574     while ((ws = workQueues) != null) {
1575     boolean nonempty = false; // scan
1576     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1577     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1578     if ((i = r & m) >= 0 && i < n && // always true
1579     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1580 dl 1.243 (a = q.array) != null && (al = a.length) > 0) {
1581 dl 1.300 int qid = q.id; // (never zero)
1582 dl 1.256 int index = (al - 1) & b;
1583     long offset = ((long)index << ASHIFT) + ABASE;
1584 dl 1.243 ForkJoinTask<?> t = (ForkJoinTask<?>)
1585     U.getObjectVolatile(a, offset);
1586 dl 1.300 if (t != null && b++ == q.base &&
1587     U.compareAndSwapObject(a, offset, t, null)) {
1588     if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1589     signalWork(); // propagate signal
1590     w.source = lastSignalId = qid;
1591     t.doExec();
1592     if ((w.id & FIFO) != 0) // run remaining locals
1593     w.localPollAndExec(POLL_LIMIT);
1594     else
1595     w.localPopAndExec(POLL_LIMIT);
1596     ForkJoinWorkerThread thread = w.owner;
1597     ++w.nsteals;
1598     w.source = 0; // now idle
1599     if (thread != null)
1600     thread.afterTopLevelExec();
1601 dl 1.243 }
1602 dl 1.300 nonempty = true;
1603 dl 1.178 }
1604 dl 1.300 else if (nonempty)
1605 dl 1.243 break;
1606 dl 1.300 else
1607     ++r;
1608 dl 1.120 }
1609 dl 1.178
1610 dl 1.300 if (nonempty) { // move (xorshift)
1611     r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1612     }
1613     else {
1614     int phase;
1615     lastSignalId = 0; // clear for next scan
1616     if ((phase = w.phase) >= 0) { // enqueue
1617     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1618     long c, nc;
1619     do {
1620     w.stackPred = (int)(c = ctl);
1621     nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1622     } while (!U.compareAndSwapLong(this, CTL, c, nc));
1623     }
1624     else { // already queued
1625     int pred = w.stackPred;
1626     w.source = DORMANT; // enable signal
1627     for (int steps = 0;;) {
1628     int md, rc; long c;
1629     if (w.phase >= 0) {
1630     w.source = 0;
1631     break;
1632     }
1633     else if ((md = mode) < 0) // shutting down
1634     return;
1635     else if ((rc = ((md & SMASK) + // possibly quiescent
1636     (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1637     (md & SHUTDOWN) != 0 &&
1638     tryTerminate(false, false))
1639     return; // help terminate
1640     else if ((++steps & 1) == 0)
1641     Thread.interrupted(); // clear between parks
1642     else if (rc <= 0 && pred != 0 && phase == (int)c) {
1643     long d = keepAlive + System.currentTimeMillis();
1644     LockSupport.parkUntil(this, d);
1645     if (ctl == c &&
1646     d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1647     long nc = ((UC_MASK & (c - TC_UNIT)) |
1648     (SP_MASK & pred));
1649     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1650     w.phase = QUIET;
1651     return; // drop on timeout
1652     }
1653     }
1654     }
1655     else
1656     LockSupport.park(this);
1657     }
1658     }
1659     }
1660     }
1661     }
1662 dl 1.200
1663 dl 1.305 /**
1664     * Helps and/or blocks until the given task is done or timeout.
1665     * First tries locally helping, then scans other queues for a task
1666     * produced by one of w's stealers; compensating and blocking if
1667     * none are found (rescanning if tryCompensate fails).
1668     *
1669     * @param w caller
1670     * @param task the task
1671     * @param deadline for timed waits, if nonzero
1672     * @return task status on exit
1673     */
1674 dl 1.300 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1675     int s = 0;
1676     if (w != null && task != null &&
1677     (!(task instanceof CountedCompleter) ||
1678     (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1679     w.tryRemoveAndExec(task);
1680     int src = w.source, id = w.id;
1681     s = task.status;
1682     while (s >= 0) {
1683     WorkQueue[] ws;
1684     boolean nonempty = false;
1685     int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1686     if ((ws = workQueues) != null) { // scan for matching id
1687     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1688     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1689     if ((i = (r + j) & m) >= 0 && i < n &&
1690     (q = ws[i]) != null && q.source == id &&
1691     (b = q.base) - q.top < 0 &&
1692     (a = q.array) != null && (al = a.length) > 0) {
1693     int qid = q.id;
1694     int index = (al - 1) & b;
1695     long offset = ((long)index << ASHIFT) + ABASE;
1696     ForkJoinTask<?> t = (ForkJoinTask<?>)
1697     U.getObjectVolatile(a, offset);
1698     if (t != null && b++ == q.base && id == q.source &&
1699     U.compareAndSwapObject(a, offset, t, null)) {
1700     q.base = b;
1701     w.source = qid;
1702     t.doExec();
1703     w.source = src;
1704     }
1705     nonempty = true;
1706 dl 1.200 break;
1707 dl 1.300 }
1708 dl 1.200 }
1709 dl 1.300 }
1710     if ((s = task.status) < 0)
1711     break;
1712     else if (!nonempty) {
1713     long ms, ns; int block;
1714     if (deadline == 0L)
1715     ms = 0L; // untimed
1716     else if ((ns = deadline - System.nanoTime()) <= 0L)
1717     break; // timeout
1718     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1719     ms = 1L; // avoid 0 for timed wait
1720     if ((block = tryCompensate(w)) != 0) {
1721     task.internalWait(ms);
1722     U.getAndAddLong(this, CTL, (block > 0) ? RC_UNIT : 0L);
1723 dl 1.200 }
1724 dl 1.300 s = task.status;
1725 dl 1.200 }
1726 dl 1.178 }
1727     }
1728 dl 1.200 return s;
1729 dl 1.120 }
1730    
1731     /**
1732 dl 1.300 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1733     * when tasks cannot be found, rescans until all others cannot
1734     * find tasks either.
1735 dl 1.78 */
1736 dl 1.300 final void helpQuiescePool(WorkQueue w) {
1737     int prevSrc = w.source, fifo = w.id & FIFO;
1738     for (int source = prevSrc, released = -1;;) { // -1 until known
1739     WorkQueue[] ws;
1740     if (fifo != 0)
1741     w.localPollAndExec(0);
1742     else
1743     w.localPopAndExec(0);
1744     if (released == -1 && w.phase >= 0)
1745     released = 1;
1746     boolean quiet = true, empty = true;
1747     int r = ThreadLocalRandom.nextSecondarySeed();
1748     if ((ws = workQueues) != null) {
1749     for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1750     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1751     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1752     if ((b = q.base) - q.top < 0 &&
1753     (a = q.array) != null && (al = a.length) > 0) {
1754     int qid = q.id;
1755     if (released == 0) { // increment
1756     released = 1;
1757     U.getAndAddLong(this, CTL, RC_UNIT);
1758 dl 1.95 }
1759 dl 1.256 int index = (al - 1) & b;
1760     long offset = ((long)index << ASHIFT) + ABASE;
1761 dl 1.300 ForkJoinTask<?> t = (ForkJoinTask<?>)
1762 dl 1.262 U.getObjectVolatile(a, offset);
1763 dl 1.300 if (t != null && b++ == q.base &&
1764     U.compareAndSwapObject(a, offset, t, null)) {
1765     q.base = b;
1766     w.source = source = q.id;
1767     t.doExec();
1768     w.source = source = prevSrc;
1769 dl 1.243 }
1770 dl 1.300 quiet = empty = false;
1771 dl 1.200 break;
1772 dl 1.95 }
1773 dl 1.300 else if ((q.source & QUIET) == 0)
1774     quiet = false;
1775 dl 1.52 }
1776 dl 1.19 }
1777 dl 1.243 }
1778 dl 1.300 if (quiet) {
1779     if (released == 0)
1780     U.getAndAddLong(this, CTL, RC_UNIT);
1781     w.source = prevSrc;
1782     break;
1783     }
1784     else if (empty) {
1785     if (source != QUIET)
1786     w.source = source = QUIET;
1787     if (released == 1) { // decrement
1788     released = 0;
1789     U.getAndAddLong(this, CTL, RC_MASK & -RC_UNIT);
1790     }
1791     }
1792 dl 1.14 }
1793 dl 1.22 }
1794    
1795 dl 1.52 /**
1796 dl 1.300 * Scans for and returns a polled task, if available.
1797     * Used only for untracked polls.
1798 dl 1.105 *
1799 dl 1.300 * @param submissionsOnly if true, only scan submission queues
1800 dl 1.19 */
1801 dl 1.300 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1802     WorkQueue[] ws; int n;
1803     rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1804     (n = ws.length) > 0) {
1805     int m = n - 1;
1806     int r = ThreadLocalRandom.nextSecondarySeed();
1807     int h = r >>> 16;
1808     int origin, step;
1809     if (submissionsOnly) {
1810     origin = (r & ~1) & m; // even indices and steps
1811     step = (h & ~1) | 2;
1812     }
1813     else {
1814     origin = r & m;
1815     step = h | 1;
1816     }
1817     for (int k = origin, oldSum = 0, checkSum = 0;;) {
1818     WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1819     if ((q = ws[k]) != null) {
1820     checkSum += b = q.base;
1821     if (b - q.top < 0 &&
1822     (a = q.array) != null && (al = a.length) > 0) {
1823     int index = (al - 1) & b;
1824     long offset = ((long)index << ASHIFT) + ABASE;
1825     ForkJoinTask<?> t = (ForkJoinTask<?>)
1826     U.getObjectVolatile(a, offset);
1827     if (t != null && b++ == q.base &&
1828     U.compareAndSwapObject(a, offset, t, null)) {
1829     q.base = b;
1830     return t;
1831     }
1832     else
1833     break; // restart
1834     }
1835     }
1836     if ((k = (k + step) & m) == origin) {
1837     if (oldSum == (oldSum = checkSum))
1838     break rescan;
1839     checkSum = 0;
1840 dl 1.178 }
1841 dl 1.52 }
1842 dl 1.90 }
1843 dl 1.300 return null;
1844     }
1845    
1846     /**
1847     * Gets and removes a local or stolen task for the given worker.
1848     *
1849     * @return a task, if available
1850     */
1851     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1852     ForkJoinTask<?> t;
1853     if (w != null &&
1854     (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1855     return t;
1856     else
1857     return pollScan(false);
1858 dl 1.90 }
1859    
1860 dl 1.300 // External operations
1861    
1862 dl 1.90 /**
1863 dl 1.300 * Adds the given task to a submission queue at submitter's
1864     * current queue, creating one if null or contended.
1865 dl 1.90 *
1866 dl 1.300 * @param task the task. Caller must ensure non-null.
1867 dl 1.90 */
1868 dl 1.300 final void externalPush(ForkJoinTask<?> task) {
1869     int r; // initialize caller's probe
1870     if ((r = ThreadLocalRandom.getProbe()) == 0) {
1871     ThreadLocalRandom.localInit();
1872     r = ThreadLocalRandom.getProbe();
1873     }
1874     for (;;) {
1875     int md = mode, n;
1876     WorkQueue[] ws = workQueues;
1877     if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1878     throw new RejectedExecutionException();
1879     else {
1880     WorkQueue q;
1881     boolean push = false, grow = false;
1882     if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1883     Object lock = workerNamePrefix;
1884     int qid = (r | QUIET) & ~(FIFO | OWNED);
1885     q = new WorkQueue(this, null);
1886     q.id = qid;
1887     q.source = QUIET;
1888     q.phase = QLOCK; // lock queue
1889     if (lock != null) {
1890 jsr166 1.301 synchronized (lock) { // lock pool to install
1891 dl 1.300 int i;
1892     if ((ws = workQueues) != null &&
1893     (n = ws.length) > 0 &&
1894     ws[i = qid & (n - 1) & SQMASK] == null) {
1895     ws[i] = q;
1896     push = grow = true;
1897     }
1898     }
1899     }
1900     }
1901     else if (q.tryLockSharedQueue()) {
1902     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1903     if ((a = q.array) != null && (al = a.length) > 0 &&
1904     al - 1 + (d = b - s) > 0) {
1905     a[(al - 1) & s] = task;
1906     q.top = s + 1; // relaxed writes OK here
1907     q.phase = 0;
1908     if (d < 0 && q.base - s < 0)
1909     break; // no signal needed
1910     }
1911 dl 1.243 else
1912 dl 1.300 grow = true;
1913     push = true;
1914     }
1915     if (push) {
1916     if (grow) {
1917     try {
1918     q.growArray();
1919     int s = q.top, al; ForkJoinTask<?>[] a;
1920     if ((a = q.array) != null && (al = a.length) > 0) {
1921     a[(al - 1) & s] = task;
1922     q.top = s + 1;
1923     }
1924     } finally {
1925     q.phase = 0;
1926     }
1927 dl 1.243 }
1928 dl 1.300 signalWork();
1929     break;
1930 dl 1.90 }
1931 dl 1.300 else // move if busy
1932     r = ThreadLocalRandom.advanceProbe(r);
1933 dl 1.90 }
1934     }
1935     }
1936    
1937 dl 1.300 /**
1938     * Pushes a possibly-external submission.
1939     */
1940     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1941     Thread t; ForkJoinWorkerThread w; WorkQueue q;
1942     if (task == null)
1943     throw new NullPointerException();
1944     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1945     (w = (ForkJoinWorkerThread)t).pool == this &&
1946     (q = w.workQueue) != null)
1947     q.push(task);
1948     else
1949     externalPush(task);
1950     return task;
1951     }
1952    
1953     /**
1954     * Returns common pool queue for an external thread.
1955     */
1956     static WorkQueue commonSubmitterQueue() {
1957     ForkJoinPool p = common;
1958     int r = ThreadLocalRandom.getProbe();
1959     WorkQueue[] ws; int n;
1960     return (p != null && (ws = p.workQueues) != null &&
1961     (n = ws.length) > 0) ?
1962     ws[(n - 1) & r & SQMASK] : null;
1963     }
1964 dl 1.90
1965     /**
1966 dl 1.300 * Performs tryUnpush for an external submitter.
1967     */
1968     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1969     int r = ThreadLocalRandom.getProbe();
1970     WorkQueue[] ws; WorkQueue w; int n;
1971     return ((ws = workQueues) != null &&
1972     (n = ws.length) > 0 &&
1973     (w = ws[(n - 1) & r & SQMASK]) != null &&
1974     w.trySharedUnpush(task));
1975 dl 1.22 }
1976    
1977     /**
1978 dl 1.300 * Performs helpComplete for an external submitter.
1979 dl 1.78 */
1980 dl 1.300 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1981     int r = ThreadLocalRandom.getProbe();
1982     WorkQueue[] ws; WorkQueue w; int n;
1983     return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1984     (w = ws[(n - 1) & r & SQMASK]) != null) ?
1985     w.sharedHelpCC(task, maxTasks) : 0;
1986 dl 1.22 }
1987    
1988     /**
1989 dl 1.300 * Tries to steal and run tasks within the target's computation.
1990     * The maxTasks argument supports external usages; internal calls
1991     * use zero, allowing unbounded steps (external calls trap
1992     * non-positive values).
1993 dl 1.78 *
1994 dl 1.300 * @param w caller
1995     * @param maxTasks if non-zero, the maximum number of other tasks to run
1996     * @return task status on exit
1997 dl 1.22 */
1998 dl 1.300 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1999     int maxTasks) {
2000     return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2001 dl 1.14 }
2002    
2003     /**
2004 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
2005     * programmers, frameworks, tools, or languages have little or no
2006 jsr166 1.222 * idea about task granularity. In essence, by offering this
2007 dl 1.105 * method, we ask users only about tradeoffs in overhead vs
2008     * expected throughput and its variance, rather than how finely to
2009     * partition tasks.
2010     *
2011     * In a steady state strict (tree-structured) computation, each
2012     * thread makes available for stealing enough tasks for other
2013     * threads to remain active. Inductively, if all threads play by
2014     * the same rules, each thread should make available only a
2015     * constant number of tasks.
2016     *
2017     * The minimum useful constant is just 1. But using a value of 1
2018     * would require immediate replenishment upon each steal to
2019     * maintain enough tasks, which is infeasible. Further,
2020     * partitionings/granularities of offered tasks should minimize
2021     * steal rates, which in general means that threads nearer the top
2022     * of computation tree should generate more than those nearer the
2023     * bottom. In perfect steady state, each thread is at
2024     * approximately the same level of computation tree. However,
2025     * producing extra tasks amortizes the uncertainty of progress and
2026     * diffusion assumptions.
2027     *
2028 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2029 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2030     * against uneven progress; as traded off against the cost of
2031     * extra task overhead. We leave the user to pick a threshold
2032     * value to compare with the results of this call to guide
2033     * decisions, but recommend values such as 3.
2034     *
2035     * When all threads are active, it is on average OK to estimate
2036     * surplus strictly locally. In steady-state, if one thread is
2037     * maintaining say 2 surplus tasks, then so are others. So we can
2038     * just use estimated queue length. However, this strategy alone
2039     * leads to serious mis-estimates in some non-steady-state
2040     * conditions (ramp-up, ramp-down, other stalls). We can detect
2041     * many of these by further considering the number of "idle"
2042     * threads, that are known to have zero queued tasks, so
2043     * compensate by a factor of (#idle/#active) threads.
2044     */
2045     static int getSurplusQueuedTaskCount() {
2046     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2047 dl 1.300 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2048     (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2049     (q = wt.workQueue) != null) {
2050     int p = pool.mode & SMASK;
2051     int a = p + (int)(pool.ctl >> RC_SHIFT);
2052     int n = q.top - q.base;
2053 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2054     a > (p >>>= 1) ? 1 :
2055     a > (p >>>= 1) ? 2 :
2056     a > (p >>>= 1) ? 4 :
2057     8);
2058 dl 1.105 }
2059     return 0;
2060 dl 1.100 }
2061    
2062 dl 1.300 // Termination
2063 dl 1.14
2064     /**
2065 dl 1.210 * Possibly initiates and/or completes termination.
2066 dl 1.14 *
2067     * @param now if true, unconditionally terminate, else only
2068 dl 1.78 * if no work and no active workers
2069 dl 1.243 * @param enable if true, terminate when next possible
2070 dl 1.300 * @return true if terminating or terminated
2071 jsr166 1.1 */
2072 dl 1.300 private boolean tryTerminate(boolean now, boolean enable) {
2073     int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2074 dl 1.289
2075 dl 1.300 while (((md = mode) & SHUTDOWN) == 0) {
2076 dl 1.294 if (!enable || this == common) // cannot shutdown
2077 dl 1.300 return false;
2078 dl 1.294 else
2079 dl 1.300 U.compareAndSwapInt(this, MODE, md, md | SHUTDOWN);
2080 dl 1.289 }
2081    
2082 dl 1.300 while (((md = mode) & STOP) == 0) { // try to initiate termination
2083     if (!now) { // check if quiescent & empty
2084 dl 1.211 for (long oldSum = 0L;;) { // repeat until stable
2085 dl 1.300 boolean running = false;
2086 dl 1.210 long checkSum = ctl;
2087 dl 1.300 WorkQueue[] ws = workQueues;
2088     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2089     running = true;
2090     else if (ws != null) {
2091     WorkQueue w; int b;
2092 dl 1.289 for (int i = 0; i < ws.length; ++i) {
2093     if ((w = ws[i]) != null) {
2094 dl 1.300 checkSum += (b = w.base) + w.id;
2095     if (b != w.top ||
2096     ((i & 1) == 1 && w.source >= 0)) {
2097     running = true;
2098     break;
2099     }
2100 dl 1.289 }
2101 dl 1.206 }
2102 dl 1.203 }
2103 dl 1.300 if (((md = mode) & STOP) != 0)
2104     break; // already triggered
2105     else if (running)
2106     return false;
2107     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2108 dl 1.210 break;
2109 dl 1.203 }
2110     }
2111 dl 1.300 if ((md & STOP) == 0)
2112     U.compareAndSwapInt(this, MODE, md, md | STOP);
2113 dl 1.200 }
2114 dl 1.210
2115 dl 1.300 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2116     for (long oldSum = 0L;;) { // repeat until stable
2117     WorkQueue[] ws; WorkQueue w;
2118     long checkSum = ctl;
2119     if ((ws = workQueues) != null) {
2120     for (int i = 0; i < ws.length; ++i) {
2121     if ((w = ws[i]) != null) {
2122     ForkJoinWorkerThread wt = w.owner;
2123     w.cancelAll(); // clear queues
2124     if (wt != null) {
2125 dl 1.289 try { // unblock join or park
2126 dl 1.210 wt.interrupt();
2127     } catch (Throwable ignore) {
2128 dl 1.200 }
2129     }
2130 dl 1.300 checkSum += w.base + w.id;
2131 dl 1.200 }
2132 dl 1.101 }
2133 dl 1.78 }
2134 dl 1.300 if (((md = mode) & TERMINATED) != 0 ||
2135     (workQueues == ws && oldSum == (oldSum = checkSum)))
2136     break;
2137 dl 1.78 }
2138 dl 1.300 if ((md & TERMINATED) != 0)
2139     break;
2140     else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2141 dl 1.210 break;
2142 dl 1.300 else if (U.compareAndSwapInt(this, MODE, md, md | TERMINATED)) {
2143     synchronized (this) {
2144     notifyAll(); // for awaitTermination
2145 dl 1.243 }
2146 dl 1.256 break;
2147 dl 1.200 }
2148 dl 1.52 }
2149 dl 1.300 return true;
2150 dl 1.105 }
2151    
2152 dl 1.52 // Exported methods
2153 jsr166 1.1
2154     // Constructors
2155    
2156     /**
2157 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2158 dl 1.300 * java.lang.Runtime#availableProcessors}, using defaults for all
2159     * other parameters.
2160 jsr166 1.1 *
2161     * @throws SecurityException if a security manager exists and
2162     * the caller is not permitted to modify threads
2163     * because it does not hold {@link
2164     * java.lang.RuntimePermission}{@code ("modifyThread")}
2165     */
2166     public ForkJoinPool() {
2167 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2168 dl 1.300 defaultForkJoinWorkerThreadFactory, null, false,
2169     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2170 jsr166 1.1 }
2171    
2172     /**
2173 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2174 dl 1.300 * level, using defaults for all other parameters.
2175 jsr166 1.1 *
2176 jsr166 1.9 * @param parallelism the parallelism level
2177 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2178 jsr166 1.11 * equal to zero, or greater than implementation limit
2179 jsr166 1.1 * @throws SecurityException if a security manager exists and
2180     * the caller is not permitted to modify threads
2181     * because it does not hold {@link
2182     * java.lang.RuntimePermission}{@code ("modifyThread")}
2183     */
2184     public ForkJoinPool(int parallelism) {
2185 dl 1.300 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2186     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2187 jsr166 1.1 }
2188    
2189     /**
2190 dl 1.300 * Creates a {@code ForkJoinPool} with the given parameters (using
2191     * defaults for others).
2192 jsr166 1.1 *
2193 dl 1.18 * @param parallelism the parallelism level. For default value,
2194     * use {@link java.lang.Runtime#availableProcessors}.
2195     * @param factory the factory for creating new threads. For default value,
2196     * use {@link #defaultForkJoinWorkerThreadFactory}.
2197 dl 1.19 * @param handler the handler for internal worker threads that
2198     * terminate due to unrecoverable errors encountered while executing
2199 jsr166 1.31 * tasks. For default value, use {@code null}.
2200 dl 1.19 * @param asyncMode if true,
2201 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2202     * tasks that are never joined. This mode may be more appropriate
2203     * than default locally stack-based mode in applications in which
2204     * worker threads only process event-style asynchronous tasks.
2205 jsr166 1.31 * For default value, use {@code false}.
2206 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2207 jsr166 1.11 * equal to zero, or greater than implementation limit
2208     * @throws NullPointerException if the factory is null
2209 jsr166 1.1 * @throws SecurityException if a security manager exists and
2210     * the caller is not permitted to modify threads
2211     * because it does not hold {@link
2212     * java.lang.RuntimePermission}{@code ("modifyThread")}
2213     */
2214 dl 1.19 public ForkJoinPool(int parallelism,
2215 dl 1.18 ForkJoinWorkerThreadFactory factory,
2216 jsr166 1.156 UncaughtExceptionHandler handler,
2217 dl 1.18 boolean asyncMode) {
2218 dl 1.300 this(parallelism, factory, handler, asyncMode,
2219     0, MAX_CAP, 1, false, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2220 dl 1.152 }
2221    
2222 dl 1.300 /**
2223     * Creates a {@code ForkJoinPool} with the given parameters.
2224     *
2225     * @param parallelism the parallelism level. For default value,
2226     * use {@link java.lang.Runtime#availableProcessors}.
2227     *
2228     * @param factory the factory for creating new threads. For
2229     * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2230     *
2231     * @param handler the handler for internal worker threads that
2232     * terminate due to unrecoverable errors encountered while
2233     * executing tasks. For default value, use {@code null}.
2234     *
2235     * @param asyncMode if true, establishes local first-in-first-out
2236     * scheduling mode for forked tasks that are never joined. This
2237     * mode may be more appropriate than default locally stack-based
2238     * mode in applications in which worker threads only process
2239     * event-style asynchronous tasks. For default value, use {@code
2240     * false}.
2241     *
2242     * @param corePoolSize the number of threads to keep in the pool
2243     * (unless timed out after an elapsed keep-alive). Normally (and
2244     * by default) this is the same value as the parallelism level,
2245     * but may be set to a larger value to reduce dynamic overhead if
2246     * tasks regularly block. Using a smaller value (for example
2247     * {@code 0}) has the same effect as the default.
2248     *
2249     * @param maximumPoolSize the maximum number of threads allowed.
2250     * When the maximum is reached, attempts to replace blocked
2251     * threads fail. (However, because creation and termination of
2252     * different threads may overlap, and may be managed by the given
2253     * thread factory, this value may be transiently exceeded.) The
2254     * default for the common pool is {@code 256} plus the parallelism
2255     * level. Using a value (for example {@code Integer.MAX_VALUE})
2256     * larger than the implementation's total thread limit has the
2257     * same effect as using this limit.
2258     *
2259     * @param minimumRunnable the minimum allowed number of core
2260     * threads not blocked by a join or {@link ManagedBlocker}. To
2261     * ensure progress, when too few unblocked threads exist and
2262     * unexecuted tasks may exist, new threads are constructed, up to
2263     * the given maximumPoolSize. For the default value, use {@code
2264     * 1}, that ensures liveness. A larger value might improve
2265     * throughput in the presence of blocked activities, but might
2266     * not, due to increased overhead. A value of zero may be
2267     * acceptable when submitted tasks cannot have dependencies
2268     * requiring additional threads.
2269     *
2270     * @param rejectOnSaturation if true, attempts to create more than
2271     * the maximum total allowed threads throw {@link
2272     * RejectedExecutionException}. Otherwise, the pool continues to
2273     * operate, but with fewer than the target number of runnable
2274     * threads, so might not ensure progress. For default value, use
2275     * {@code true}.
2276     *
2277     * @param keepAliveTime the elapsed time since last use before
2278     * a thread is terminated (and then later replaced if needed).
2279     * For the default value, use {@code 60, TimeUnit.SECONDS}.
2280     *
2281     * @param unit the time unit for the {@code keepAliveTime} argument
2282     *
2283     * @throws IllegalArgumentException if parallelism is less than or
2284     * equal to zero, or is greater than implementation limit,
2285     * or if maximumPoolSize is less than parallelism,
2286     * of if the keepAliveTime is less than or equal to zero.
2287     * @throws NullPointerException if the factory is null
2288     * @throws SecurityException if a security manager exists and
2289     * the caller is not permitted to modify threads
2290     * because it does not hold {@link
2291     * java.lang.RuntimePermission}{@code ("modifyThread")}
2292     */
2293     public ForkJoinPool(int parallelism,
2294     ForkJoinWorkerThreadFactory factory,
2295     UncaughtExceptionHandler handler,
2296     boolean asyncMode,
2297     int corePoolSize,
2298     int maximumPoolSize,
2299     int minimumRunnable,
2300     boolean rejectOnSaturation,
2301     long keepAliveTime,
2302     TimeUnit unit) {
2303     // check, encode, pack parameters
2304     if (parallelism <= 0 || parallelism > MAX_CAP ||
2305     maximumPoolSize < parallelism || keepAliveTime <= 0L)
2306 dl 1.152 throw new IllegalArgumentException();
2307 dl 1.14 if (factory == null)
2308     throw new NullPointerException();
2309 dl 1.300 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2310    
2311     String prefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2312     int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2313     long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2314     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2315     int m = (parallelism |
2316     (asyncMode ? FIFO : 0) |
2317     (rejectOnSaturation ? 0 : SATURATE));
2318     int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2319     int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2320     int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2321     int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2322     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2323     n = (n + 1) << 1; // power of two, including space for submission queues
2324    
2325     this.workQueues = new WorkQueue[n];
2326     this.workerNamePrefix = prefix;
2327     this.factory = factory;
2328     this.ueh = handler;
2329     this.keepAlive = ms;
2330     this.bounds = b;
2331     this.mode = m;
2332     this.ctl = c;
2333     checkPermission();
2334 dl 1.152 }
2335    
2336     /**
2337 dl 1.300 * Constructor for common pool using parameters possibly
2338     * overridden by system properties
2339     */
2340     private ForkJoinPool(byte forCommonPoolOnly) {
2341     int parallelism = -1;
2342     ForkJoinWorkerThreadFactory fac = null;
2343     UncaughtExceptionHandler handler = null;
2344     try { // ignore exceptions in accessing/parsing properties
2345     String pp = System.getProperty
2346     ("java.util.concurrent.ForkJoinPool.common.parallelism");
2347     String fp = System.getProperty
2348     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2349     String hp = System.getProperty
2350     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2351     if (pp != null)
2352     parallelism = Integer.parseInt(pp);
2353     if (fp != null)
2354     fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
2355     getSystemClassLoader().loadClass(fp).newInstance());
2356     if (hp != null)
2357     handler = ((UncaughtExceptionHandler)ClassLoader.
2358     getSystemClassLoader().loadClass(hp).newInstance());
2359     } catch (Exception ignore) {
2360     }
2361    
2362     if (fac == null) {
2363     if (System.getSecurityManager() == null)
2364     fac = defaultForkJoinWorkerThreadFactory;
2365     else // use security-managed default
2366     fac = new InnocuousForkJoinWorkerThreadFactory();
2367     }
2368     if (parallelism < 0 && // default 1 less than #cores
2369     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2370     parallelism = 1;
2371     if (parallelism > MAX_CAP)
2372     parallelism = MAX_CAP;
2373    
2374     long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2375     (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2376     int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2377     int m = (parallelism < 1) ? 1 : parallelism;
2378     int n = (parallelism > 1) ? parallelism - 1 : 1;
2379     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2380     n = (n + 1) << 1;
2381    
2382     this.workQueues = new WorkQueue[n];
2383     this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2384     this.factory = fac;
2385 dl 1.18 this.ueh = handler;
2386 dl 1.300 this.keepAlive = DEFAULT_KEEPALIVE;
2387     this.bounds = b;
2388     this.mode = m;
2389     this.ctl = c;
2390 dl 1.101 }
2391    
2392     /**
2393 dl 1.128 * Returns the common pool instance. This pool is statically
2394 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2395     * #shutdown} or {@link #shutdownNow}. However this pool and any
2396     * ongoing processing are automatically terminated upon program
2397     * {@link System#exit}. Any program that relies on asynchronous
2398     * task processing to complete before program termination should
2399 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2400     * before exit.
2401 dl 1.100 *
2402     * @return the common pool instance
2403 jsr166 1.138 * @since 1.8
2404 dl 1.100 */
2405     public static ForkJoinPool commonPool() {
2406 dl 1.134 // assert common != null : "static init error";
2407     return common;
2408 dl 1.100 }
2409    
2410 jsr166 1.1 // Execution methods
2411    
2412     /**
2413     * Performs the given task, returning its result upon completion.
2414 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2415     * it is rethrown as the outcome of this invocation. Rethrown
2416     * exceptions behave in the same way as regular exceptions, but,
2417     * when possible, contain stack traces (as displayed for example
2418     * using {@code ex.printStackTrace()}) of both the current thread
2419     * as well as the thread actually encountering the exception;
2420     * minimally only the latter.
2421 jsr166 1.1 *
2422     * @param task the task
2423 jsr166 1.191 * @param <T> the type of the task's result
2424 jsr166 1.1 * @return the task's result
2425 jsr166 1.11 * @throws NullPointerException if the task is null
2426     * @throws RejectedExecutionException if the task cannot be
2427     * scheduled for execution
2428 jsr166 1.1 */
2429     public <T> T invoke(ForkJoinTask<T> task) {
2430 dl 1.90 if (task == null)
2431     throw new NullPointerException();
2432 dl 1.243 externalSubmit(task);
2433 dl 1.78 return task.join();
2434 jsr166 1.1 }
2435    
2436     /**
2437     * Arranges for (asynchronous) execution of the given task.
2438     *
2439     * @param task the task
2440 jsr166 1.11 * @throws NullPointerException if the task is null
2441     * @throws RejectedExecutionException if the task cannot be
2442     * scheduled for execution
2443 jsr166 1.1 */
2444 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2445 dl 1.243 externalSubmit(task);
2446 jsr166 1.1 }
2447    
2448     // AbstractExecutorService methods
2449    
2450 jsr166 1.11 /**
2451     * @throws NullPointerException if the task is null
2452     * @throws RejectedExecutionException if the task cannot be
2453     * scheduled for execution
2454     */
2455 jsr166 1.1 public void execute(Runnable task) {
2456 dl 1.41 if (task == null)
2457     throw new NullPointerException();
2458 jsr166 1.2 ForkJoinTask<?> job;
2459 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2460     job = (ForkJoinTask<?>) task;
2461 jsr166 1.2 else
2462 dl 1.152 job = new ForkJoinTask.RunnableExecuteAction(task);
2463 dl 1.243 externalSubmit(job);
2464 jsr166 1.1 }
2465    
2466 jsr166 1.11 /**
2467 dl 1.18 * Submits a ForkJoinTask for execution.
2468     *
2469     * @param task the task to submit
2470 jsr166 1.191 * @param <T> the type of the task's result
2471 dl 1.18 * @return the task
2472     * @throws NullPointerException if the task is null
2473     * @throws RejectedExecutionException if the task cannot be
2474     * scheduled for execution
2475     */
2476     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2477 dl 1.243 return externalSubmit(task);
2478 dl 1.18 }
2479    
2480     /**
2481 jsr166 1.11 * @throws NullPointerException if the task is null
2482     * @throws RejectedExecutionException if the task cannot be
2483     * scheduled for execution
2484     */
2485 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2486 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2487 jsr166 1.1 }
2488    
2489 jsr166 1.11 /**
2490     * @throws NullPointerException if the task is null
2491     * @throws RejectedExecutionException if the task cannot be
2492     * scheduled for execution
2493     */
2494 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2495 dl 1.243 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2496 jsr166 1.1 }
2497    
2498 jsr166 1.11 /**
2499     * @throws NullPointerException if the task is null
2500     * @throws RejectedExecutionException if the task cannot be
2501     * scheduled for execution
2502     */
2503 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2504 dl 1.41 if (task == null)
2505     throw new NullPointerException();
2506 jsr166 1.2 ForkJoinTask<?> job;
2507 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2508     job = (ForkJoinTask<?>) task;
2509 jsr166 1.2 else
2510 dl 1.90 job = new ForkJoinTask.AdaptedRunnableAction(task);
2511 dl 1.243 return externalSubmit(job);
2512 jsr166 1.1 }
2513    
2514     /**
2515 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2516     * @throws RejectedExecutionException {@inheritDoc}
2517     */
2518 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2519 dl 1.86 // In previous versions of this class, this method constructed
2520     // a task to run ForkJoinTask.invokeAll, but now external
2521     // invocation of multiple tasks is at least as efficient.
2522 jsr166 1.199 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2523 jsr166 1.1
2524 dl 1.86 try {
2525     for (Callable<T> t : tasks) {
2526 dl 1.90 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2527 jsr166 1.144 futures.add(f);
2528 dl 1.243 externalSubmit(f);
2529 dl 1.86 }
2530 jsr166 1.143 for (int i = 0, size = futures.size(); i < size; i++)
2531     ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2532 dl 1.86 return futures;
2533 jsr166 1.226 } catch (Throwable t) {
2534     for (int i = 0, size = futures.size(); i < size; i++)
2535     futures.get(i).cancel(false);
2536     throw t;
2537 jsr166 1.1 }
2538     }
2539    
2540     /**
2541     * Returns the factory used for constructing new workers.
2542     *
2543     * @return the factory used for constructing new workers
2544     */
2545     public ForkJoinWorkerThreadFactory getFactory() {
2546     return factory;
2547     }
2548    
2549     /**
2550     * Returns the handler for internal worker threads that terminate
2551     * due to unrecoverable errors encountered while executing tasks.
2552     *
2553 jsr166 1.4 * @return the handler, or {@code null} if none
2554 jsr166 1.1 */
2555 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2556 dl 1.14 return ueh;
2557 jsr166 1.1 }
2558    
2559     /**
2560 jsr166 1.9 * Returns the targeted parallelism level of this pool.
2561 jsr166 1.1 *
2562 jsr166 1.9 * @return the targeted parallelism level of this pool
2563 jsr166 1.1 */
2564     public int getParallelism() {
2565 dl 1.300 return mode & SMASK;
2566 jsr166 1.1 }
2567    
2568     /**
2569 dl 1.100 * Returns the targeted parallelism level of the common pool.
2570     *
2571     * @return the targeted parallelism level of the common pool
2572 jsr166 1.138 * @since 1.8
2573 dl 1.100 */
2574     public static int getCommonPoolParallelism() {
2575 jsr166 1.274 return COMMON_PARALLELISM;
2576 dl 1.100 }
2577    
2578     /**
2579 jsr166 1.1 * Returns the number of worker threads that have started but not
2580 jsr166 1.34 * yet terminated. The result returned by this method may differ
2581 jsr166 1.4 * from {@link #getParallelism} when threads are created to
2582 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
2583     *
2584     * @return the number of worker threads
2585     */
2586     public int getPoolSize() {
2587 dl 1.300 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2588 jsr166 1.1 }
2589    
2590     /**
2591 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
2592 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
2593     *
2594 jsr166 1.4 * @return {@code true} if this pool uses async mode
2595 jsr166 1.1 */
2596     public boolean getAsyncMode() {
2597 dl 1.300 return (mode & FIFO) != 0;
2598 jsr166 1.1 }
2599    
2600     /**
2601     * Returns an estimate of the number of worker threads that are
2602     * not blocked waiting to join tasks or for other managed
2603 dl 1.14 * synchronization. This method may overestimate the
2604     * number of running threads.
2605 jsr166 1.1 *
2606     * @return the number of worker threads
2607     */
2608     public int getRunningThreadCount() {
2609 dl 1.78 int rc = 0;
2610     WorkQueue[] ws; WorkQueue w;
2611     if ((ws = workQueues) != null) {
2612 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2613     if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2614 dl 1.78 ++rc;
2615     }
2616     }
2617     return rc;
2618 jsr166 1.1 }
2619    
2620     /**
2621     * Returns an estimate of the number of threads that are currently
2622     * stealing or executing tasks. This method may overestimate the
2623     * number of active threads.
2624     *
2625     * @return the number of active threads
2626     */
2627     public int getActiveThreadCount() {
2628 dl 1.300 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2629 jsr166 1.63 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2630 jsr166 1.1 }
2631    
2632     /**
2633 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
2634     * An idle worker is one that cannot obtain a task to execute
2635     * because none are available to steal from other threads, and
2636     * there are no pending submissions to the pool. This method is
2637     * conservative; it might not return {@code true} immediately upon
2638     * idleness of all threads, but will eventually become true if
2639     * threads remain inactive.
2640 jsr166 1.1 *
2641 jsr166 1.4 * @return {@code true} if all threads are currently idle
2642 jsr166 1.1 */
2643     public boolean isQuiescent() {
2644 dl 1.300 for (;;) {
2645     long c = ctl;
2646     int md = mode, pc = md & SMASK;
2647     int tc = pc + (short)(c >> TC_SHIFT);
2648     int rc = pc + (int)(c >> RC_SHIFT);
2649     if ((md & (STOP | TERMINATED)) != 0)
2650     return true;
2651     else if (rc > 0)
2652     return false;
2653     else {
2654     WorkQueue[] ws; WorkQueue v;
2655     if ((ws = workQueues) != null) {
2656     for (int i = 1; i < ws.length; i += 2) {
2657     if ((v = ws[i]) != null) {
2658     if ((v.source & QUIET) == 0)
2659     return false;
2660     --tc;
2661     }
2662     }
2663     }
2664     if (tc == 0 && ctl == c)
2665     return true;
2666     }
2667     }
2668 jsr166 1.1 }
2669    
2670     /**
2671     * Returns an estimate of the total number of tasks stolen from
2672     * one thread's work queue by another. The reported value
2673     * underestimates the actual total number of steals when the pool
2674     * is not quiescent. This value may be useful for monitoring and
2675     * tuning fork/join programs: in general, steal counts should be
2676     * high enough to keep threads busy, but low enough to avoid
2677     * overhead and contention across threads.
2678     *
2679     * @return the number of steals
2680     */
2681     public long getStealCount() {
2682 dl 1.300 long count = stealCount;
2683 dl 1.78 WorkQueue[] ws; WorkQueue w;
2684     if ((ws = workQueues) != null) {
2685 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2686 dl 1.78 if ((w = ws[i]) != null)
2687 dl 1.300 count += (long)w.nsteals & 0xffffffffL;
2688 dl 1.78 }
2689     }
2690     return count;
2691 jsr166 1.1 }
2692    
2693     /**
2694     * Returns an estimate of the total number of tasks currently held
2695     * in queues by worker threads (but not including tasks submitted
2696     * to the pool that have not begun executing). This value is only
2697     * an approximation, obtained by iterating across all threads in
2698     * the pool. This method may be useful for tuning task
2699     * granularities.
2700     *
2701     * @return the number of queued tasks
2702     */
2703     public long getQueuedTaskCount() {
2704     long count = 0;
2705 dl 1.78 WorkQueue[] ws; WorkQueue w;
2706     if ((ws = workQueues) != null) {
2707 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2708 dl 1.78 if ((w = ws[i]) != null)
2709     count += w.queueSize();
2710     }
2711 dl 1.52 }
2712 jsr166 1.1 return count;
2713     }
2714    
2715     /**
2716 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
2717 dl 1.55 * pool that have not yet begun executing. This method may take
2718 dl 1.52 * time proportional to the number of submissions.
2719 jsr166 1.1 *
2720     * @return the number of queued submissions
2721     */
2722     public int getQueuedSubmissionCount() {
2723 dl 1.78 int count = 0;
2724     WorkQueue[] ws; WorkQueue w;
2725     if ((ws = workQueues) != null) {
2726 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2727 dl 1.78 if ((w = ws[i]) != null)
2728     count += w.queueSize();
2729     }
2730     }
2731     return count;
2732 jsr166 1.1 }
2733    
2734     /**
2735 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
2736     * pool that have not yet begun executing.
2737 jsr166 1.1 *
2738     * @return {@code true} if there are any queued submissions
2739     */
2740     public boolean hasQueuedSubmissions() {
2741 dl 1.78 WorkQueue[] ws; WorkQueue w;
2742     if ((ws = workQueues) != null) {
2743 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2744 dl 1.115 if ((w = ws[i]) != null && !w.isEmpty())
2745 dl 1.78 return true;
2746     }
2747     }
2748     return false;
2749 jsr166 1.1 }
2750    
2751     /**
2752     * Removes and returns the next unexecuted submission if one is
2753     * available. This method may be useful in extensions to this
2754     * class that re-assign work in systems with multiple pools.
2755     *
2756 jsr166 1.4 * @return the next submission, or {@code null} if none
2757 jsr166 1.1 */
2758     protected ForkJoinTask<?> pollSubmission() {
2759 dl 1.300 return pollScan(true);
2760 jsr166 1.1 }
2761    
2762     /**
2763     * Removes all available unexecuted submitted and forked tasks
2764     * from scheduling queues and adds them to the given collection,
2765     * without altering their execution status. These may include
2766 jsr166 1.8 * artificially generated or wrapped tasks. This method is
2767     * designed to be invoked only when the pool is known to be
2768 jsr166 1.1 * quiescent. Invocations at other times may not remove all
2769     * tasks. A failure encountered while attempting to add elements
2770     * to collection {@code c} may result in elements being in
2771     * neither, either or both collections when the associated
2772     * exception is thrown. The behavior of this operation is
2773     * undefined if the specified collection is modified while the
2774     * operation is in progress.
2775     *
2776     * @param c the collection to transfer elements into
2777     * @return the number of elements transferred
2778     */
2779 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2780 dl 1.52 int count = 0;
2781 dl 1.78 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2782     if ((ws = workQueues) != null) {
2783 dl 1.86 for (int i = 0; i < ws.length; ++i) {
2784 dl 1.78 if ((w = ws[i]) != null) {
2785     while ((t = w.poll()) != null) {
2786     c.add(t);
2787     ++count;
2788     }
2789     }
2790 dl 1.52 }
2791     }
2792 dl 1.18 return count;
2793     }
2794    
2795     /**
2796 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
2797     * including indications of run state, parallelism level, and
2798     * worker and task counts.
2799     *
2800     * @return a string identifying this pool, as well as its state
2801     */
2802     public String toString() {
2803 dl 1.86 // Use a single pass through workQueues to collect counts
2804     long qt = 0L, qs = 0L; int rc = 0;
2805 dl 1.300 long st = stealCount;
2806 dl 1.86 WorkQueue[] ws; WorkQueue w;
2807     if ((ws = workQueues) != null) {
2808     for (int i = 0; i < ws.length; ++i) {
2809     if ((w = ws[i]) != null) {
2810     int size = w.queueSize();
2811     if ((i & 1) == 0)
2812     qs += size;
2813     else {
2814     qt += size;
2815 dl 1.300 st += (long)w.nsteals & 0xffffffffL;
2816 dl 1.86 if (w.isApparentlyUnblocked())
2817     ++rc;
2818     }
2819     }
2820     }
2821     }
2822 dl 1.300
2823     int md = mode;
2824     int pc = (md & SMASK);
2825     long c = ctl;
2826 dl 1.52 int tc = pc + (short)(c >>> TC_SHIFT);
2827 dl 1.300 int ac = pc + (int)(c >> RC_SHIFT);
2828 dl 1.78 if (ac < 0) // ignore transient negative
2829     ac = 0;
2830 dl 1.300 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2831     (md & STOP) != 0 ? "Terminating" :
2832     (md & SHUTDOWN) != 0 ? "Shutting down" :
2833 dl 1.200 "Running");
2834 jsr166 1.1 return super.toString() +
2835 dl 1.52 "[" + level +
2836 dl 1.14 ", parallelism = " + pc +
2837     ", size = " + tc +
2838     ", active = " + ac +
2839     ", running = " + rc +
2840 jsr166 1.1 ", steals = " + st +
2841     ", tasks = " + qt +
2842     ", submissions = " + qs +
2843     "]";
2844     }
2845    
2846     /**
2847 dl 1.100 * Possibly initiates an orderly shutdown in which previously
2848     * submitted tasks are executed, but no new tasks will be
2849     * accepted. Invocation has no effect on execution state if this
2850 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
2851 dl 1.100 * already shut down. Tasks that are in the process of being
2852     * submitted concurrently during the course of this method may or
2853     * may not be rejected.
2854 jsr166 1.1 *
2855     * @throws SecurityException if a security manager exists and
2856     * the caller is not permitted to modify threads
2857     * because it does not hold {@link
2858     * java.lang.RuntimePermission}{@code ("modifyThread")}
2859     */
2860     public void shutdown() {
2861     checkPermission();
2862 dl 1.105 tryTerminate(false, true);
2863 jsr166 1.1 }
2864    
2865     /**
2866 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
2867     * all subsequently submitted tasks. Invocation has no effect on
2868 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
2869 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
2870     * are in the process of being submitted or executed concurrently
2871     * during the course of this method may or may not be
2872     * rejected. This method cancels both existing and unexecuted
2873     * tasks, in order to permit termination in the presence of task
2874     * dependencies. So the method always returns an empty list
2875     * (unlike the case for some other Executors).
2876 jsr166 1.1 *
2877     * @return an empty list
2878     * @throws SecurityException if a security manager exists and
2879     * the caller is not permitted to modify threads
2880     * because it does not hold {@link
2881     * java.lang.RuntimePermission}{@code ("modifyThread")}
2882     */
2883     public List<Runnable> shutdownNow() {
2884     checkPermission();
2885 dl 1.105 tryTerminate(true, true);
2886 jsr166 1.1 return Collections.emptyList();
2887     }
2888    
2889     /**
2890     * Returns {@code true} if all tasks have completed following shut down.
2891     *
2892     * @return {@code true} if all tasks have completed following shut down
2893     */
2894     public boolean isTerminated() {
2895 dl 1.300 return (mode & TERMINATED) != 0;
2896 jsr166 1.1 }
2897    
2898     /**
2899     * Returns {@code true} if the process of termination has
2900 jsr166 1.9 * commenced but not yet completed. This method may be useful for
2901     * debugging. A return of {@code true} reported a sufficient
2902     * period after shutdown may indicate that submitted tasks have
2903 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
2904 dl 1.49 * causing this executor not to properly terminate. (See the
2905     * advisory notes for class {@link ForkJoinTask} stating that
2906     * tasks should not normally entail blocking operations. But if
2907     * they do, they must abort them on interrupt.)
2908 jsr166 1.1 *
2909 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
2910 jsr166 1.1 */
2911     public boolean isTerminating() {
2912 dl 1.300 int md = mode;
2913     return (md & STOP) != 0 && (md & TERMINATED) == 0;
2914 jsr166 1.1 }
2915    
2916     /**
2917     * Returns {@code true} if this pool has been shut down.
2918     *
2919     * @return {@code true} if this pool has been shut down
2920     */
2921     public boolean isShutdown() {
2922 dl 1.300 return (mode & SHUTDOWN) != 0;
2923 jsr166 1.9 }
2924    
2925     /**
2926 dl 1.105 * Blocks until all tasks have completed execution after a
2927     * shutdown request, or the timeout occurs, or the current thread
2928 dl 1.134 * is interrupted, whichever happens first. Because the {@link
2929     * #commonPool()} never terminates until program shutdown, when
2930     * applied to the common pool, this method is equivalent to {@link
2931 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2932 jsr166 1.1 *
2933     * @param timeout the maximum time to wait
2934     * @param unit the time unit of the timeout argument
2935     * @return {@code true} if this executor terminated and
2936     * {@code false} if the timeout elapsed before termination
2937     * @throws InterruptedException if interrupted while waiting
2938     */
2939     public boolean awaitTermination(long timeout, TimeUnit unit)
2940     throws InterruptedException {
2941 dl 1.134 if (Thread.interrupted())
2942     throw new InterruptedException();
2943     if (this == common) {
2944     awaitQuiescence(timeout, unit);
2945     return false;
2946     }
2947 dl 1.52 long nanos = unit.toNanos(timeout);
2948 dl 1.101 if (isTerminated())
2949     return true;
2950 dl 1.183 if (nanos <= 0L)
2951     return false;
2952     long deadline = System.nanoTime() + nanos;
2953 jsr166 1.103 synchronized (this) {
2954 jsr166 1.184 for (;;) {
2955 dl 1.183 if (isTerminated())
2956     return true;
2957     if (nanos <= 0L)
2958     return false;
2959     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2960     wait(millis > 0L ? millis : 1L);
2961     nanos = deadline - System.nanoTime();
2962 dl 1.52 }
2963 dl 1.18 }
2964 jsr166 1.1 }
2965    
2966     /**
2967 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
2968     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2969     * waits and/or attempts to assist performing tasks until this
2970     * pool {@link #isQuiescent} or the indicated timeout elapses.
2971     *
2972     * @param timeout the maximum time to wait
2973     * @param unit the time unit of the timeout argument
2974     * @return {@code true} if quiescent; {@code false} if the
2975     * timeout elapsed.
2976     */
2977     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2978     long nanos = unit.toNanos(timeout);
2979     ForkJoinWorkerThread wt;
2980     Thread thread = Thread.currentThread();
2981     if ((thread instanceof ForkJoinWorkerThread) &&
2982     (wt = (ForkJoinWorkerThread)thread).pool == this) {
2983     helpQuiescePool(wt.workQueue);
2984     return true;
2985     }
2986 dl 1.300 else {
2987     for (long startTime = System.nanoTime();;) {
2988     ForkJoinTask<?> t;
2989     if ((t = pollScan(false)) != null)
2990     t.doExec();
2991     else if (isQuiescent())
2992     return true;
2993     else if ((System.nanoTime() - startTime) > nanos)
2994 dl 1.134 return false;
2995 dl 1.300 else
2996     Thread.yield(); // cannot block
2997 dl 1.134 }
2998     }
2999     }
3000    
3001     /**
3002     * Waits and/or attempts to assist performing tasks indefinitely
3003 jsr166 1.141 * until the {@link #commonPool()} {@link #isQuiescent}.
3004 dl 1.134 */
3005 dl 1.136 static void quiesceCommonPool() {
3006 dl 1.134 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3007     }
3008    
3009     /**
3010 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3011 jsr166 1.8 * in {@link ForkJoinPool}s.
3012     *
3013 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3014 jsr166 1.218 * {@link #isReleasable} must return {@code true} if blocking is
3015     * not necessary. Method {@link #block} blocks the current thread
3016 dl 1.19 * if necessary (perhaps internally invoking {@code isReleasable}
3017 dl 1.54 * before actually blocking). These actions are performed by any
3018 jsr166 1.157 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3019     * The unusual methods in this API accommodate synchronizers that
3020     * may, but don't usually, block for long periods. Similarly, they
3021 dl 1.54 * allow more efficient internal handling of cases in which
3022     * additional workers may be, but usually are not, needed to
3023     * ensure sufficient parallelism. Toward this end,
3024     * implementations of method {@code isReleasable} must be amenable
3025     * to repeated invocation.
3026 jsr166 1.1 *
3027     * <p>For example, here is a ManagedBlocker based on a
3028     * ReentrantLock:
3029 jsr166 1.239 * <pre> {@code
3030 jsr166 1.1 * class ManagedLocker implements ManagedBlocker {
3031     * final ReentrantLock lock;
3032     * boolean hasLock = false;
3033     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3034     * public boolean block() {
3035     * if (!hasLock)
3036     * lock.lock();
3037     * return true;
3038     * }
3039     * public boolean isReleasable() {
3040     * return hasLock || (hasLock = lock.tryLock());
3041     * }
3042     * }}</pre>
3043 dl 1.19 *
3044     * <p>Here is a class that possibly blocks waiting for an
3045     * item on a given queue:
3046 jsr166 1.239 * <pre> {@code
3047 dl 1.19 * class QueueTaker<E> implements ManagedBlocker {
3048     * final BlockingQueue<E> queue;
3049     * volatile E item = null;
3050     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3051     * public boolean block() throws InterruptedException {
3052     * if (item == null)
3053 dl 1.23 * item = queue.take();
3054 dl 1.19 * return true;
3055     * }
3056     * public boolean isReleasable() {
3057 dl 1.23 * return item != null || (item = queue.poll()) != null;
3058 dl 1.19 * }
3059     * public E getItem() { // call after pool.managedBlock completes
3060     * return item;
3061     * }
3062     * }}</pre>
3063 jsr166 1.1 */
3064     public static interface ManagedBlocker {
3065     /**
3066     * Possibly blocks the current thread, for example waiting for
3067     * a lock or condition.
3068     *
3069 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3070     * (i.e., if isReleasable would return true)
3071 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3072     * (the method is not required to do so, but is allowed to)
3073     */
3074     boolean block() throws InterruptedException;
3075    
3076     /**
3077 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3078 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3079 jsr166 1.1 */
3080     boolean isReleasable();
3081     }
3082    
3083     /**
3084 jsr166 1.217 * Runs the given possibly blocking task. When {@linkplain
3085     * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3086     * method possibly arranges for a spare thread to be activated if
3087     * necessary to ensure sufficient parallelism while the current
3088     * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3089 jsr166 1.1 *
3090 jsr166 1.217 * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3091     * {@code blocker.block()} until either method returns {@code true}.
3092     * Every call to {@code blocker.block()} is preceded by a call to
3093     * {@code blocker.isReleasable()} that returned {@code false}.
3094     *
3095     * <p>If not running in a ForkJoinPool, this method is
3096 jsr166 1.8 * behaviorally equivalent to
3097 jsr166 1.239 * <pre> {@code
3098 jsr166 1.1 * while (!blocker.isReleasable())
3099     * if (blocker.block())
3100 jsr166 1.217 * break;}</pre>
3101 jsr166 1.8 *
3102 jsr166 1.217 * If running in a ForkJoinPool, the pool may first be expanded to
3103     * ensure sufficient parallelism available during the call to
3104     * {@code blocker.block()}.
3105 jsr166 1.1 *
3106 jsr166 1.217 * @param blocker the blocker task
3107     * @throws InterruptedException if {@code blocker.block()} did so
3108 jsr166 1.1 */
3109 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3110 jsr166 1.1 throws InterruptedException {
3111 dl 1.200 ForkJoinPool p;
3112     ForkJoinWorkerThread wt;
3113 dl 1.300 WorkQueue w;
3114 jsr166 1.1 Thread t = Thread.currentThread();
3115 dl 1.200 if ((t instanceof ForkJoinWorkerThread) &&
3116 dl 1.300 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3117     (w = wt.workQueue) != null) {
3118     int block;
3119 dl 1.172 while (!blocker.isReleasable()) {
3120 dl 1.300 if ((block = p.tryCompensate(w)) != 0) {
3121 dl 1.105 try {
3122     do {} while (!blocker.isReleasable() &&
3123     !blocker.block());
3124     } finally {
3125 dl 1.300 U.getAndAddLong(p, CTL, (block > 0) ? RC_UNIT : 0L);
3126 dl 1.105 }
3127     break;
3128 dl 1.78 }
3129     }
3130 dl 1.18 }
3131 dl 1.105 else {
3132     do {} while (!blocker.isReleasable() &&
3133     !blocker.block());
3134     }
3135 jsr166 1.1 }
3136    
3137 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
3138     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3139     // implement RunnableFuture.
3140 jsr166 1.1
3141     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3142 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3143 jsr166 1.1 }
3144    
3145     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3146 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3147 jsr166 1.1 }
3148    
3149     // Unsafe mechanics
3150 jsr166 1.233 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3151 dl 1.78 private static final long CTL;
3152 dl 1.300 private static final long MODE;
3153 jsr166 1.291 private static final int ABASE;
3154     private static final int ASHIFT;
3155 dl 1.52
3156     static {
3157 jsr166 1.3 try {
3158 dl 1.78 CTL = U.objectFieldOffset
3159 jsr166 1.233 (ForkJoinPool.class.getDeclaredField("ctl"));
3160 dl 1.300 MODE = U.objectFieldOffset
3161     (ForkJoinPool.class.getDeclaredField("mode"));
3162 jsr166 1.233 ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3163     int scale = U.arrayIndexScale(ForkJoinTask[].class);
3164 jsr166 1.142 if ((scale & (scale - 1)) != 0)
3165 jsr166 1.232 throw new Error("array index scale not a power of two");
3166 jsr166 1.142 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3167 jsr166 1.231 } catch (ReflectiveOperationException e) {
3168 dl 1.52 throw new Error(e);
3169     }
3170 dl 1.105
3171 dl 1.243 // Reduce the risk of rare disastrous classloading in first call to
3172     // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3173     Class<?> ensureLoaded = LockSupport.class;
3174    
3175 jsr166 1.273 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3176     try {
3177     String p = System.getProperty
3178     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3179     if (p != null)
3180     commonMaxSpares = Integer.parseInt(p);
3181     } catch (Exception ignore) {}
3182     COMMON_MAX_SPARES = commonMaxSpares;
3183    
3184 dl 1.152 defaultForkJoinWorkerThreadFactory =
3185 dl 1.112 new DefaultForkJoinWorkerThreadFactory();
3186 dl 1.115 modifyThreadPermission = new RuntimePermission("modifyThread");
3187    
3188 dl 1.152 common = java.security.AccessController.doPrivileged
3189     (new java.security.PrivilegedAction<ForkJoinPool>() {
3190 dl 1.300 public ForkJoinPool run() {
3191     return new ForkJoinPool((byte)0); }});
3192 jsr166 1.275
3193 dl 1.300 COMMON_PARALLELISM = common.mode & SMASK;
3194 jsr166 1.3 }
3195 dl 1.52
3196 dl 1.197 /**
3197 jsr166 1.279 * Factory for innocuous worker threads.
3198 dl 1.197 */
3199 jsr166 1.278 private static final class InnocuousForkJoinWorkerThreadFactory
3200 dl 1.197 implements ForkJoinWorkerThreadFactory {
3201    
3202     /**
3203     * An ACC to restrict permissions for the factory itself.
3204     * The constructed workers have no permissions set.
3205     */
3206     private static final AccessControlContext innocuousAcc;
3207     static {
3208     Permissions innocuousPerms = new Permissions();
3209     innocuousPerms.add(modifyThreadPermission);
3210     innocuousPerms.add(new RuntimePermission(
3211     "enableContextClassLoaderOverride"));
3212     innocuousPerms.add(new RuntimePermission(
3213     "modifyThreadGroup"));
3214     innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3215     new ProtectionDomain(null, innocuousPerms)
3216     });
3217     }
3218    
3219     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3220 jsr166 1.227 return java.security.AccessController.doPrivileged(
3221     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3222 dl 1.197 public ForkJoinWorkerThread run() {
3223     return new ForkJoinWorkerThread.
3224     InnocuousForkJoinWorkerThread(pool);
3225     }}, innocuousAcc);
3226     }
3227     }
3228    
3229 jsr166 1.1 }