ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.201
Committed: Mon Jul 7 19:06:46 2014 UTC (9 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.200: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 jsr166 1.1 */
6    
7     package java.util.concurrent;
8    
9 jsr166 1.156 import java.lang.Thread.UncaughtExceptionHandler;
10 jsr166 1.1 import java.util.ArrayList;
11     import java.util.Arrays;
12     import java.util.Collection;
13     import java.util.Collections;
14     import java.util.List;
15 dl 1.36 import java.util.concurrent.AbstractExecutorService;
16     import java.util.concurrent.Callable;
17     import java.util.concurrent.ExecutorService;
18     import java.util.concurrent.Future;
19     import java.util.concurrent.RejectedExecutionException;
20     import java.util.concurrent.RunnableFuture;
21 dl 1.178 import java.util.concurrent.ThreadLocalRandom;
22 dl 1.36 import java.util.concurrent.TimeUnit;
23 dl 1.197 import java.security.AccessControlContext;
24     import java.security.ProtectionDomain;
25     import java.security.Permissions;
26 jsr166 1.1
27     /**
28 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
29 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
30 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
31 jsr166 1.11 * monitoring operations.
32 jsr166 1.1 *
33 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
34     * ExecutorService} mainly by virtue of employing
35     * <em>work-stealing</em>: all threads in the pool attempt to find and
36 dl 1.78 * execute tasks submitted to the pool and/or created by other active
37     * tasks (eventually blocking waiting for work if none exist). This
38     * enables efficient processing when most tasks spawn other subtasks
39     * (as do most {@code ForkJoinTask}s), as well as when many small
40     * tasks are submitted to the pool from external clients. Especially
41     * when setting <em>asyncMode</em> to true in constructors, {@code
42     * ForkJoinPool}s may also be appropriate for use with event-style
43     * tasks that are never joined.
44 jsr166 1.1 *
45 dl 1.112 * <p>A static {@link #commonPool()} is available and appropriate for
46 dl 1.101 * most applications. The common pool is used by any ForkJoinTask that
47     * is not explicitly submitted to a specified pool. Using the common
48     * pool normally reduces resource usage (its threads are slowly
49     * reclaimed during periods of non-use, and reinstated upon subsequent
50 dl 1.105 * use).
51 dl 1.100 *
52     * <p>For applications that require separate or custom pools, a {@code
53     * ForkJoinPool} may be constructed with a given target parallelism
54     * level; by default, equal to the number of available processors. The
55     * pool attempts to maintain enough active (or available) threads by
56     * dynamically adding, suspending, or resuming internal worker
57 jsr166 1.187 * threads, even if some tasks are stalled waiting to join others.
58     * However, no such adjustments are guaranteed in the face of blocked
59     * I/O or other unmanaged synchronization. The nested {@link
60 dl 1.100 * ManagedBlocker} interface enables extension of the kinds of
61 dl 1.18 * synchronization accommodated.
62 jsr166 1.1 *
63     * <p>In addition to execution and lifecycle control methods, this
64     * class provides status check methods (for example
65 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
66 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
67 jsr166 1.4 * {@link #toString} returns indications of pool state in a
68 jsr166 1.1 * convenient form for informal monitoring.
69     *
70 jsr166 1.109 * <p>As is the case with other ExecutorServices, there are three
71 jsr166 1.84 * main task execution methods summarized in the following table.
72     * These are designed to be used primarily by clients not already
73     * engaged in fork/join computations in the current pool. The main
74     * forms of these methods accept instances of {@code ForkJoinTask},
75     * but overloaded forms also allow mixed execution of plain {@code
76     * Runnable}- or {@code Callable}- based activities as well. However,
77     * tasks that are already executing in a pool should normally instead
78     * use the within-computation forms listed in the table unless using
79     * async event-style tasks that are not usually joined, in which case
80     * there is little difference among choice of methods.
81 dl 1.18 *
82     * <table BORDER CELLPADDING=3 CELLSPACING=1>
83 jsr166 1.159 * <caption>Summary of task execution methods</caption>
84 dl 1.18 * <tr>
85     * <td></td>
86     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
87     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
88     * </tr>
89     * <tr>
90 jsr166 1.153 * <td> <b>Arrange async execution</b></td>
91 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
92     * <td> {@link ForkJoinTask#fork}</td>
93     * </tr>
94     * <tr>
95 jsr166 1.153 * <td> <b>Await and obtain result</b></td>
96 dl 1.18 * <td> {@link #invoke(ForkJoinTask)}</td>
97     * <td> {@link ForkJoinTask#invoke}</td>
98     * </tr>
99     * <tr>
100 jsr166 1.153 * <td> <b>Arrange exec and obtain Future</b></td>
101 dl 1.18 * <td> {@link #submit(ForkJoinTask)}</td>
102     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
103     * </tr>
104     * </table>
105 dl 1.19 *
106 dl 1.105 * <p>The common pool is by default constructed with default
107 jsr166 1.155 * parameters, but these may be controlled by setting three
108 jsr166 1.162 * {@linkplain System#getProperty system properties}:
109     * <ul>
110     * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
111     * - the parallelism level, a non-negative integer
112     * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
113     * - the class name of a {@link ForkJoinWorkerThreadFactory}
114     * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
115     * - the class name of a {@link UncaughtExceptionHandler}
116     * </ul>
117 dl 1.197 * If a {@link SecurityManager} is present and no factory is
118     * specified, then the default pool uses a factory supplying
119     * threads that have no {@link Permissions} enabled.
120 jsr166 1.165 * The system class loader is used to load these classes.
121 jsr166 1.156 * Upon any error in establishing these settings, default parameters
122 dl 1.160 * are used. It is possible to disable or limit the use of threads in
123     * the common pool by setting the parallelism property to zero, and/or
124 dl 1.193 * using a factory that may return {@code null}. However doing so may
125     * cause unjoined tasks to never be executed.
126 dl 1.105 *
127 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
128     * maximum number of running threads to 32767. Attempts to create
129 jsr166 1.11 * pools with greater than the maximum number result in
130 jsr166 1.8 * {@code IllegalArgumentException}.
131 jsr166 1.1 *
132 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
133 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
134 dl 1.20 * or internal resources have been exhausted.
135 jsr166 1.11 *
136 jsr166 1.1 * @since 1.7
137     * @author Doug Lea
138     */
139 jsr166 1.171 @sun.misc.Contended
140 jsr166 1.1 public class ForkJoinPool extends AbstractExecutorService {
141    
142     /*
143 dl 1.14 * Implementation Overview
144     *
145 dl 1.78 * This class and its nested classes provide the main
146     * functionality and control for a set of worker threads:
147 jsr166 1.84 * Submissions from non-FJ threads enter into submission queues.
148     * Workers take these tasks and typically split them into subtasks
149     * that may be stolen by other workers. Preference rules give
150     * first priority to processing tasks from their own queues (LIFO
151     * or FIFO, depending on mode), then to randomized FIFO steals of
152 dl 1.200 * tasks in other queues. This framework began as vehicle for
153     * supporting tree-structured parallelism using work-stealing.
154     * Over time, its scalability advantages led to extensions and
155     * changes to better support more diverse usage contexts.
156 dl 1.78 *
157 jsr166 1.84 * WorkQueues
158 dl 1.78 * ==========
159     *
160     * Most operations occur within work-stealing queues (in nested
161     * class WorkQueue). These are special forms of Deques that
162     * support only three of the four possible end-operations -- push,
163     * pop, and poll (aka steal), under the further constraints that
164     * push and pop are called only from the owning thread (or, as
165     * extended here, under a lock), while poll may be called from
166     * other threads. (If you are unfamiliar with them, you probably
167     * want to read Herlihy and Shavit's book "The Art of
168     * Multiprocessor programming", chapter 16 describing these in
169     * more detail before proceeding.) The main work-stealing queue
170     * design is roughly similar to those in the papers "Dynamic
171     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
172     * (http://research.sun.com/scalable/pubs/index.html) and
173     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
174     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
175 dl 1.200 * The main differences ultimately stem from GC requirements that
176     * we null out taken slots as soon as we can, to maintain as small
177     * a footprint as possible even in programs generating huge
178     * numbers of tasks. To accomplish this, we shift the CAS
179     * arbitrating pop vs poll (steal) from being on the indices
180     * ("base" and "top") to the slots themselves.
181     *
182     * Adding tasks then takes the form of a classic array push(task):
183     * q.array[q.top] = task; ++q.top;
184     *
185     * (The actual code needs to null-check and size-check the array,
186     * properly fence the accesses, and possibly signal waiting
187     * workers to start scanning -- see below.) Both a successful pop
188     * and poll mainly entail a CAS of a slot from non-null to null.
189     *
190     * The pop operation (always performed by owener) is:
191     * if ((base != top) and
192     * (the task at top slot is not null) and
193     * (CAS slot to null))
194     * decrement top and return task;
195     *
196     * And the poll operation (usually by a stealer) is
197     * if ((base != top) and
198     * (the task at base slot is not null) and
199     * (base has not changed) and
200     * (CAS slot to null))
201     * increment base and return task;
202     *
203     * Because we rely on CASes of references, we do not need tag bits
204     * on base or top. They are simple ints as used in any circular
205 dl 1.170 * array-based queue (see for example ArrayDeque). Updates to the
206 dl 1.200 * indices guarantee that top == base means the queue is empty,
207     * but otherwise may err on the side of possibly making the queue
208     * appear nonempty when a push, pop, or poll have not fully
209     * committed. Note that this means that the poll operation,
210     * considered individually, is not wait-free. One thief cannot
211     * successfully continue until another in-progress one (or, if
212     * previously empty, a push) completes. However, in the
213     * aggregate, we ensure at least probabilistic non-blockingness.
214     * If an attempted steal fails, a thief always chooses a different
215     * random victim target to try next. So, in order for one thief to
216     * progress, it suffices for any in-progress poll or new push on
217     * any empty queue to complete. (This is why we normally use
218     * method pollAt and its variants that try once at the apparent
219     * base index, else consider alternative actions, rather than
220     * method poll, which retries.)
221     *
222     * This approach also enables support of a user mode in which
223     * local task processing is in FIFO, not LIFO order, simply by
224     * using poll rather than pop. This can be useful in
225     * message-passing frameworks in which tasks are never joined.
226     * However neither mode considers affinities, loads, cache
227     * localities, etc, so rarely provide the best possible
228     * performance on a given machine, but portably provide good
229     * throughput by averaging over these factors. Further, even if
230     * we did try to use such information, we do not usually have a
231     * basis for exploiting it. For example, some sets of tasks
232     * profit from cache affinities, but others are harmed by cache
233     * pollution effects. Additionally, even though it requires
234     * scanning, long-term throughput is often best using random
235     * selection rather than directed selection policies, so cheap
236     * randomization of sufficient quality is used whenever
237     * applicable. Various Marsaglia XorShifts (some with different
238     * shift constants) are inlined at use points.
239 dl 1.78 *
240     * WorkQueues are also used in a similar way for tasks submitted
241     * to the pool. We cannot mix these tasks in the same queues used
242 dl 1.200 * by workers. Instead, we randomly associate submission queues
243 dl 1.83 * with submitting threads, using a form of hashing. The
244 dl 1.139 * ThreadLocalRandom probe value serves as a hash code for
245     * choosing existing queues, and may be randomly repositioned upon
246     * contention with other submitters. In essence, submitters act
247     * like workers except that they are restricted to executing local
248     * tasks that they submitted (or in the case of CountedCompleters,
249 dl 1.200 * others with the same root task). Insertion of tasks in shared
250 dl 1.139 * mode requires a lock (mainly to protect in the case of
251 dl 1.200 * resizing) but we use only a simple spinlock (using field
252     * qlock), because submitters encountering a busy queue move on to
253     * try or create other queues -- they block only when creating and
254     * registering new queues.
255 dl 1.78 *
256 jsr166 1.84 * Management
257 dl 1.78 * ==========
258 dl 1.52 *
259     * The main throughput advantages of work-stealing stem from
260     * decentralized control -- workers mostly take tasks from
261 dl 1.200 * themselves or each other, at rates that can exceed a billion
262     * per second. The pool itself creates, activates (enables
263     * scanning for and running tasks), deactivates, blocks, and
264     * terminates threads, all with minimal central information.
265     * There are only a few properties that we can globally track or
266     * maintain, so we pack them into a small number of variables,
267     * often maintaining atomicity without blocking or locking.
268     * Nearly all essentially atomic control state is held in two
269     * volatile variables that are by far most often read (not
270     * written) as status and consistency checks.
271 dl 1.78 *
272 dl 1.200 * Field "ctl" contains 64 bits holding information needed to
273     * atomically decide to add, inactivate, enqueue (on an event
274 dl 1.78 * queue), dequeue, and/or re-activate workers. To enable this
275     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
276     * far in excess of normal operating range) to allow ids, counts,
277     * and their negations (used for thresholding) to fit into 16bit
278 dl 1.200 * subfields. Field "runState" holds lockable state bits
279     * (STARTED, STOP, etc) also protecting updates to the workQueues
280     * array. When used as a lock, it is normally held only for a few
281     * instructions (the only exceptions are one-time array
282     * initialization and uncommon resizing), so is nearly always
283     * available after at most a brief spin. But to be extra-cautious,
284     * we use a monitor-based backup strategy to block when needed
285     * (see awaitRunStateLock). Usages of "runState" vs "ctl"
286     * interact in only one case: deciding to add a worker thread (see
287     * tryAddWorker), in which case the ctl CAS is performed while the
288     * lock is held. Field "config" holds unchanging configuration
289     * state.
290 dl 1.78 *
291     * Recording WorkQueues. WorkQueues are recorded in the
292 dl 1.200 * "workQueues" array. The array is created upon first use (see
293     * externalSubmit) and expanded if necessary. Updates to the
294     * array while recording new workers and unrecording terminated
295     * ones are protected from each other by the runState lock, but
296     * the array is otherwise concurrently readable, and accessed
297     * directly. We also ensure that reads of the array reference
298     * itself never become too stale. To simplify index-based
299     * operations, the array size is always a power of two, and all
300     * readers must tolerate null slots. Worker queues are at odd
301     * indices. Shared (submission) queues are at even indices, up to
302     * a maximum of 64 slots, to limit growth even if array needs to
303     * expand to add more workers. Grouping them together in this way
304     * simplifies and speeds up task scanning.
305 dl 1.86 *
306     * All worker thread creation is on-demand, triggered by task
307     * submissions, replacement of terminated workers, and/or
308 dl 1.78 * compensation for blocked workers. However, all other support
309     * code is set up to work with other policies. To ensure that we
310 dl 1.200 * do not hold on to worker references that would prevent GC, All
311 dl 1.78 * accesses to workQueues are via indices into the workQueues
312     * array (which is one source of some of the messy code
313     * constructions here). In essence, the workQueues array serves as
314 dl 1.200 * a weak reference mechanism. Thus for example the stack top
315     * subfield of ctl stores indices, not references.
316     *
317     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
318     * cannot let workers spin indefinitely scanning for tasks when
319     * none can be found immediately, and we cannot start/resume
320     * workers unless there appear to be tasks available. On the
321     * other hand, we must quickly prod them into action when new
322     * tasks are submitted or generated. In many usages, ramp-up time
323     * to activate workers is the main limiting factor in overall
324     * performance, which is compounded at program start-up by JIT
325     * compilation and allocation. So we streamline this as much as
326     * possible.
327     *
328     * The "ctl" field atomically maintains active and total worker
329     * counts as well as a queue to place waiting threads so they can
330     * be located for signalling. Active counts also play the role of
331     * quiescence indicators, so are decremented when workers believe
332     * that there are no more tasks to execute. The "queue" is
333     * actually a form of Treiber stack. A stack is ideal for
334     * activating threads in most-recently used order. This improves
335     * performance and locality, outweighing the disadvantages of
336     * being prone to contention and inability to release a worker
337     * unless it is topmost on stack. We park/unpark workers after
338     * pushing on the idle worker stack (represented by the lower
339     * 32bit subfield of ctl) when they cannot find work. The top
340     * stack state holds the value of the "scanState" field of the
341     * worker: its index and status, plus a version counter that, in
342     * addition to the count subfields (also serving as version
343     * stamps) provide protection against Treiber stack ABA effects.
344     *
345     * Field scanState is used by both workers and the pool to manage
346     * and track whether a worker is INACTIVE (possibly blocked
347     * waiting for a signal), or SCANNING for tasks (when neither hold
348     * it is busy running tasks). When a worker is inactivated, its
349     * scanState field is set, and is prevented from executing tasks,
350     * even though it must scan once for them to avoid queuing
351     * races. Note that scanState updates lag queue CAS releases so
352     * usage requires care. When queued, the lower 16 bits of
353     * scanState must hold its pool index. So we place the index there
354     * upon initialization (see registerWorker) and otherwise keep it
355     * there or restore it when necessary.
356     *
357     * Memory ordering. See "Correct and Efficient Work-Stealing for
358     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
359     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
360     * analysis of memory ordering requirements in work-stealing
361     * algorithms similar to the one used here. We usually need
362     * stronger than minimal ordering because we must sometimes signal
363     * workers, requiring Dekker-like full-fences to avoid lost
364     * signals. Arranging for enough ordering without expensive
365     * over-fencing requires tradeoffs among the supported means of
366     * expressing access constraints. The most central operations,
367     * taking from queues and updating ctl state, require full-fence
368     * CAS. Array slots are read using the emulation of volatiles
369     * provided by Unsafe. Access from other threads to WorkQueue
370     * base, top, and array requires a volatile load of the first of
371     * any of these read. We use the convention of declaring the
372     * "base" index volatile, and always read it before other fields.
373     * The owner thread must ensure ordered updates, so writes use
374     * ordered intrinsics unless they can piggyback on those for other
375     * writes. Similar conventions and rationales hold for other
376     * WorkQueue fields (such as "currentSteal") that are only written
377     * by owners but observed by others.
378     *
379     * Creating workers. To create a worker, we pre-increment total
380     * count (serving as a reservation), and attempt to construct a
381     * ForkJoinWorkerThread via its factory. Upon construction, the
382     * new thread invokes registerWorker, where it constructs a
383     * WorkQueue and is assigned an index in the workQueues array
384     * (expanding the array if necessary). The thread is then
385     * started. Upon any exception across these steps, or null return
386     * from factory, deregisterWorker adjusts counts and records
387     * accordingly. If a null return, the pool continues running with
388     * fewer than the target number workers. If exceptional, the
389     * exception is propagated, generally to some external caller.
390     * Worker index assignment avoids the bias in scanning that would
391     * occur if entries were sequentially packed starting at the front
392     * of the workQueues array. We treat the array as a simple
393     * power-of-two hash table, expanding as needed. The seedIndex
394     * increment ensures no collisions until a resize is needed or a
395     * worker is deregistered and replaced, and thereafter keeps
396     * probablility of collision low. We cannot use
397     * ThreadLocalRandom.getProbe() for similar purposes here because
398     * the thread has not started yet, but do so for creating
399     * submission queues for existing external threads.
400     *
401     * Deactivation and waiting. Queuing enounters several intrinsic
402     * races; most notably that a task-producing thread can miss
403     * seeing (and signalling) another thread that gave up looking for
404     * work but has not yet entered the wait queue. When a worker
405     * cannot find a task to steal, it deactivates and enqueues. Very
406     * often, the lack of tasks is transient due to GC or OS
407     * scheduling. To reduce false-alarm deactivation, scanners
408     * compute checksums of queue states during sweeps. They give up
409     * and try to deactivate only after the sum is stable across
410     * scans. Further, to avoid missed signals, they repeat this
411     * scanning process after successful enqueuing until again stable.
412     * In this state, the worker cannot take/run a task it sees until
413     * it is released from the queue, so the worker itself eventually
414     * tries to release itself or any succcessor (see tryRelease).
415     * Otherwise, upon an empty scan, a deactivated worker uses an
416     * adaptive local spin construction (see awaitWork) before
417     * blocking (via park). Note the unusual conventions about
418     * Thread.interrupts surrounding parking and other blocking:
419     * Because interrupts are used solely to alert threads to check
420     * termination, which is checked anyway upon blocking, we clear
421     * status (using Thread.interrupted) before any call to park, so
422     * that park does not immediately return due to status being set
423     * via some other unrelated call to interrupt in user code.
424     *
425     * Signalling and activation. Workers are created or activated
426     * only when there appears to be at least one task they might be
427     * able to find and execute. Upon push (either by a worker or an
428     * external submission) to a previously empty queue, workers are
429     * signalled if idle, or created if fewer exist than the given
430     * parallelism level. These primary signals are buttressed by
431     * others whenever other threads remove a task from a queue and
432     * notice that there are other tasks there as well. On most
433 dl 1.174 * platforms, signalling (unpark) overhead time is noticeably
434     * long, and the time between signalling a thread and it actually
435     * making progress can be very noticeably long, so it is worth
436     * offloading these delays from critical paths as much as
437 dl 1.200 * possible. Also, because enqueued workers are often rescanning
438     * or spinning rather than blocking, we set and clear the "parker"
439     * field of WorkQueues to reduce unnecessary calls to unpark.
440     * (This requires a secondary recheck to avoid missed signals.)
441 dl 1.52 *
442     * Trimming workers. To release resources after periods of lack of
443     * use, a worker starting to wait when the pool is quiescent will
444 dl 1.100 * time out and terminate if the pool has remained quiescent for a
445     * given period -- a short period if there are more threads than
446     * parallelism, longer as the number of threads decreases. This
447 dl 1.200 * eventually terminates all workers after periods of non-use.
448 dl 1.52 *
449 dl 1.78 * Shutdown and Termination. A call to shutdownNow atomically sets
450 dl 1.200 * a runState bit and then (non-atomically) sets each worker's
451     * qlock status, cancels all unprocessed tasks, and wakes up all
452     * waiting workers. Detecting whether termination should commence
453     * after a non-abrupt shutdown() call relies on the active count
454     * bits of "ctl" maintaining consensus about quiescence.
455 dl 1.78 *
456 jsr166 1.84 * Joining Tasks
457     * =============
458 dl 1.78 *
459     * Any of several actions may be taken when one worker is waiting
460 jsr166 1.84 * to join a task stolen (or always held) by another. Because we
461 dl 1.78 * are multiplexing many tasks on to a pool of workers, we can't
462     * just let them block (as in Thread.join). We also cannot just
463     * reassign the joiner's run-time stack with another and replace
464     * it later, which would be a form of "continuation", that even if
465 dl 1.200 * possible is not necessarily a good idea since we may need both
466     * an unblocked task and its continuation to progress. Instead we
467     * combine two tactics:
468 dl 1.19 *
469     * Helping: Arranging for the joiner to execute some task that it
470 dl 1.78 * would be running if the steal had not occurred.
471 dl 1.19 *
472     * Compensating: Unless there are already enough live threads,
473 dl 1.78 * method tryCompensate() may create or re-activate a spare
474     * thread to compensate for blocked joiners until they unblock.
475     *
476 dl 1.105 * A third form (implemented in tryRemoveAndExec) amounts to
477     * helping a hypothetical compensator: If we can readily tell that
478     * a possible action of a compensator is to steal and execute the
479     * task being joined, the joining thread can do so directly,
480     * without the need for a compensation thread (although at the
481     * expense of larger run-time stacks, but the tradeoff is
482     * typically worthwhile).
483 dl 1.52 *
484     * The ManagedBlocker extension API can't use helping so relies
485     * only on compensation in method awaitBlocker.
486 dl 1.19 *
487 dl 1.200 * The algorithm in helpStealer entails a form of "linear
488     * helping". Each worker records (in field currentSteal) the most
489     * recent task it stole from some other worker (or a submission).
490     * It also records (in field currentJoin) the task it is currently
491     * actively joining. Method helpStealer uses these markers to try
492     * to find a worker to help (i.e., steal back a task from and
493     * execute it) that could hasten completion of the actively joined
494     * task. Thus, the joiner executes a task that would be on its
495     * own local deque had the to-be-joined task not been stolen. This
496     * is a conservative variant of the approach described in Wagner &
497 dl 1.78 * Calder "Leapfrogging: a portable technique for implementing
498     * efficient futures" SIGPLAN Notices, 1993
499     * (http://portal.acm.org/citation.cfm?id=155354). It differs in
500     * that: (1) We only maintain dependency links across workers upon
501     * steals, rather than use per-task bookkeeping. This sometimes
502 dl 1.90 * requires a linear scan of workQueues array to locate stealers,
503     * but often doesn't because stealers leave hints (that may become
504 dl 1.112 * stale/wrong) of where to locate them. It is only a hint
505     * because a worker might have had multiple steals and the hint
506     * records only one of them (usually the most current). Hinting
507     * isolates cost to when it is needed, rather than adding to
508     * per-task overhead. (2) It is "shallow", ignoring nesting and
509     * potentially cyclic mutual steals. (3) It is intentionally
510 dl 1.78 * racy: field currentJoin is updated only while actively joining,
511     * which means that we miss links in the chain during long-lived
512     * tasks, GC stalls etc (which is OK since blocking in such cases
513     * is usually a good idea). (4) We bound the number of attempts
514 dl 1.200 * to find work using checksums and fall back to suspending the
515 dl 1.90 * worker and if necessary replacing it with another.
516 dl 1.78 *
517 dl 1.200 * Helping actions for CountedCompleters do not require tracking
518     * currentJoins: Method helpComplete takes and executes any task
519     * with the same root as the task being waited on (prefering local
520     * pops to non-local polls). However, this still entails some
521 dl 1.105 * traversal of completer chains, so is less efficient than using
522     * CountedCompleters without explicit joins.
523     *
524 dl 1.200 * Compensation does not aim to keep exactly the target
525     * parallelism number of unblocked threads running at any given
526     * time. Some previous versions of this class employed immediate
527     * compensations for any blocked join. However, in practice, the
528     * vast majority of blockages are transient byproducts of GC and
529     * other JVM or OS activities that are made worse by replacement.
530     * Currently, compensation is attempted only after validating that
531     * all purportedly active threads are processing tasks by checking
532     * field WorkQueue.scanState, which eliminates most false
533     * positives. Also, compensation is bypassed (tolerating fewer
534     * threads) in the most common case in which it is rarely
535     * beneficial: when a worker with an empty queue (thus no
536     * continuation tasks) blocks on a join and there still remain
537     * enough threads to ensure liveness.
538     *
539     * Bounds. The compensation mechanism is bounded (see MAX_SPARES),
540     * to better enable JVMs to cope with programming errors and abuse
541     * before running out of resources to do so, and may be further
542     * bounded via factories that limit thread construction. The
543     * effects of bounding in this pool (like all others) is
544     * imprecise. Total worker counts are decremented when threads
545     * deregister, not when they exit and resources are reclaimed by
546     * the JVM and OS. So the number of simultaneously live threads
547     * may transiently exceed bounds.
548 dl 1.105 *
549     * Common Pool
550     * ===========
551     *
552 jsr166 1.175 * The static common pool always exists after static
553 dl 1.105 * initialization. Since it (or any other created pool) need
554     * never be used, we minimize initial construction overhead and
555     * footprint to the setup of about a dozen fields, with no nested
556     * allocation. Most bootstrapping occurs within method
557 dl 1.200 * externalSubmit during the first submission to the pool.
558 dl 1.105 *
559     * When external threads submit to the common pool, they can
560 dl 1.200 * perform subtask processing (see externalHelpComplete and
561     * related methods) upon joins. This caller-helps policy makes it
562     * sensible to set common pool parallelism level to one (or more)
563     * less than the total number of available cores, or even zero for
564     * pure caller-runs. We do not need to record whether external
565     * submissions are to the common pool -- if not, external help
566     * methods return quickly. These submitters would otherwise be
567     * blocked waiting for completion, so the extra effort (with
568     * liberally sprinkled task status checks) in inapplicable cases
569     * amounts to an odd form of limited spin-wait before blocking in
570     * ForkJoinTask.join.
571 dl 1.105 *
572 dl 1.197 * As a more appropriate default in managed environments, unless
573     * overridden by system properties, we use workers of subclass
574     * InnocuousForkJoinWorkerThread when there is a SecurityManager
575     * present. These workers have no permissions set, do not belong
576     * to any user-defined ThreadGroup, and erase all ThreadLocals
577 dl 1.200 * after executing any top-level task (see WorkQueue.runTask).
578     * The associated mechanics (mainly in ForkJoinWorkerThread) may
579     * be JVM-dependent and must access particular Thread class fields
580     * to achieve this effect.
581 jsr166 1.198 *
582 dl 1.105 * Style notes
583     * ===========
584     *
585 dl 1.200 * Memory ordering relies mainly on Unsafe intrinsics that carry
586     * the further responsibility of explicitly performing null- and
587     * bounds- checks otherwise carried out implicitly by JVMs. This
588     * can be awkward and ugly, but also reflects the need to control
589     * outcomes across the unusual cases that arise in very racy code
590     * with very few invariants. So these explicit checks would exist
591     * in some form anyway. All fields are read into locals before
592     * use, and null-checked if they are references. This is usually
593     * done in a "C"-like style of listing declarations at the heads
594     * of methods or blocks, and using inline assignments on first
595     * encounter. Array bounds-checks are usually performed by
596     * masking with array.length-1, which relies on the invariant that
597     * these arrays are created with positive lengths, which is itself
598     * paranoically checked. Nearly all explicit checks lead to
599     * bypass/return, not exception throws, because they may
600     * legitimately arise due to cancellation/revocation during
601     * shutdown.
602     *
603 dl 1.105 * There is a lot of representation-level coupling among classes
604     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
605     * fields of WorkQueue maintain data structures managed by
606     * ForkJoinPool, so are directly accessed. There is little point
607     * trying to reduce this, since any associated future changes in
608     * representations will need to be accompanied by algorithmic
609     * changes anyway. Several methods intrinsically sprawl because
610 dl 1.200 * they must accumulate sets of consistent reads of fields held in
611     * local variables. There are also other coding oddities
612     * (including several unnecessary-looking hoisted null checks)
613     * that help some methods perform reasonably even when interpreted
614     * (not compiled).
615 dl 1.52 *
616 jsr166 1.84 * The order of declarations in this file is:
617 dl 1.86 * (1) Static utility functions
618     * (2) Nested (static) classes
619     * (3) Static fields
620     * (4) Fields, along with constants used when unpacking some of them
621     * (5) Internal control methods
622     * (6) Callbacks and other support for ForkJoinTask methods
623     * (7) Exported methods
624     * (8) Static block initializing statics in minimally dependent order
625     */
626    
627     // Static utilities
628    
629     /**
630     * If there is a security manager, makes sure caller has
631     * permission to modify threads.
632 jsr166 1.1 */
633 dl 1.86 private static void checkPermission() {
634     SecurityManager security = System.getSecurityManager();
635     if (security != null)
636     security.checkPermission(modifyThreadPermission);
637     }
638    
639     // Nested classes
640 jsr166 1.1
641     /**
642 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
643     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
644     * for {@code ForkJoinWorkerThread} subclasses that extend base
645     * functionality or initialize threads with different contexts.
646 jsr166 1.1 */
647     public static interface ForkJoinWorkerThreadFactory {
648     /**
649     * Returns a new worker thread operating in the given pool.
650     *
651     * @param pool the pool this thread works in
652 jsr166 1.192 * @return the new worker thread
653 jsr166 1.11 * @throws NullPointerException if the pool is null
654 jsr166 1.1 */
655     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
656     }
657    
658     /**
659     * Default ForkJoinWorkerThreadFactory implementation; creates a
660     * new ForkJoinWorkerThread.
661     */
662 dl 1.112 static final class DefaultForkJoinWorkerThreadFactory
663 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
664 dl 1.112 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
665 dl 1.14 return new ForkJoinWorkerThread(pool);
666 jsr166 1.1 }
667     }
668    
669     /**
670 dl 1.86 * Class for artificial tasks that are used to replace the target
671     * of local joins if they are removed from an interior queue slot
672     * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
673     * actually do anything beyond having a unique identity.
674 jsr166 1.1 */
675 dl 1.86 static final class EmptyTask extends ForkJoinTask<Void> {
676 dl 1.105 private static final long serialVersionUID = -7721805057305804111L;
677 dl 1.86 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
678     public final Void getRawResult() { return null; }
679     public final void setRawResult(Void x) {}
680     public final boolean exec() { return true; }
681 jsr166 1.1 }
682    
683 dl 1.200 // Constants shared across ForkJoinPool and WorkQueue
684    
685     // Bounds
686     static final int SMASK = 0xffff; // short bits == max index
687     static final int MAX_CAP = 0x7fff; // max #workers - 1
688     static final int EVENMASK = 0xfffe; // even short bits
689     static final int SQMASK = 0x007e; // max 64 (even) slots
690    
691     // Masks and units for WorkQueue.scanState and ctl sp subfield
692     static final int SCANNING = 1; // false when running tasks
693     static final int INACTIVE = 1 << 31; // must be negative
694     static final int SS_SHIFT = 16; // shift for version count
695     static final int SS_SEQ = 1 << SS_SHIFT; // version number
696     static final int SS_MASK = 0x7fffffff; // mask on update
697    
698     // Mode bits for ForkJoinPool.config and WorkQueue.config
699     static final int MODE_MASK = SMASK << 16;
700     static final int LIFO_QUEUE = 0;
701     static final int FIFO_QUEUE = 1 << 16;
702     static final int SHARED_QUEUE = 1 << 31; // must be negative
703    
704 jsr166 1.1 /**
705 dl 1.78 * Queues supporting work-stealing as well as external task
706 dl 1.200 * submission. See above for dewscriptions and algorithms
707 dl 1.78 * Performance on most platforms is very sensitive to placement of
708     * instances of both WorkQueues and their arrays -- we absolutely
709     * do not want multiple WorkQueue instances or multiple queue
710 dl 1.200 * arrays sharing cache lines. The @Contended annotation alerts
711     * JVMs to try to keep instances apart.
712 dl 1.78 */
713 jsr166 1.171 @sun.misc.Contended
714 dl 1.78 static final class WorkQueue {
715 dl 1.200
716 dl 1.78 /**
717     * Capacity of work-stealing queue array upon initialization.
718 dl 1.90 * Must be a power of two; at least 4, but should be larger to
719     * reduce or eliminate cacheline sharing among queues.
720     * Currently, it is much larger, as a partial workaround for
721     * the fact that JVMs often place arrays in locations that
722     * share GC bookkeeping (especially cardmarks) such that
723     * per-write accesses encounter serious memory contention.
724 dl 1.78 */
725 dl 1.90 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
726 dl 1.78
727     /**
728     * Maximum size for queue arrays. Must be a power of two less
729     * than or equal to 1 << (31 - width of array entry) to ensure
730     * lack of wraparound of index calculations, but defined to a
731     * value a bit less than this to help users trap runaway
732     * programs before saturating systems.
733     */
734     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
735    
736 dl 1.200 // Instance fields
737     volatile int scanState; // versioned, <0: inactive; odd:scanning
738     int stackPred; // pool stack (ctl) predecessor
739 dl 1.178 int nsteals; // number of steals
740 dl 1.200 int hint; // randomization and stealer index hint
741     int config; // pool index and mode
742     volatile int qlock; // 1: locked, < 0: terminate; else 0
743 dl 1.78 volatile int base; // index of next slot for poll
744     int top; // index of next slot for push
745     ForkJoinTask<?>[] array; // the elements (initially unallocated)
746 dl 1.90 final ForkJoinPool pool; // the containing pool (may be null)
747 dl 1.78 final ForkJoinWorkerThread owner; // owning thread or null if shared
748     volatile Thread parker; // == owner during call to park; else null
749 dl 1.95 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
750 dl 1.200 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
751 dl 1.112
752 dl 1.200 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
753 dl 1.90 this.pool = pool;
754 dl 1.78 this.owner = owner;
755 dl 1.115 // Place indices in the center of array (that is not yet allocated)
756 dl 1.78 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
757     }
758    
759     /**
760 dl 1.200 * Returns an exportable index (used by ForkJoinWorkerThread)
761     */
762     final int getPoolIndex() {
763     return (config & 0xffff) >>> 1; // ignore odd/even tag bit
764     }
765    
766     /**
767 dl 1.115 * Returns the approximate number of tasks in the queue.
768     */
769     final int queueSize() {
770     int n = base - top; // non-owner callers must read base first
771     return (n >= 0) ? 0 : -n; // ignore transient negative
772     }
773    
774 jsr166 1.180 /**
775 dl 1.115 * Provides a more accurate estimate of whether this queue has
776     * any tasks than does queueSize, by checking whether a
777     * near-empty queue has at least one unclaimed task.
778     */
779     final boolean isEmpty() {
780 dl 1.200 ForkJoinTask<?>[] a; int n, m, s;
781     return ((n = base - (s = top)) >= 0 ||
782     (n == -1 && // possibly one task
783     ((a = array) == null || (m = a.length - 1) < 0 ||
784 dl 1.115 U.getObject
785     (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
786     }
787    
788     /**
789     * Pushes a task. Call only by owner in unshared queues. (The
790     * shared-queue version is embedded in method externalPush.)
791 dl 1.78 *
792     * @param task the task. Caller must ensure non-null.
793 jsr166 1.146 * @throws RejectedExecutionException if array cannot be resized
794 dl 1.78 */
795 dl 1.90 final void push(ForkJoinTask<?> task) {
796 dl 1.200 int s; ForkJoinTask<?>[] a; ForkJoinPool p;
797     int n = base - (s = top); // negative of incoming size
798 dl 1.112 if ((a = array) != null) { // ignore if queue removed
799 dl 1.200 int m = a.length - 1; // fenced write for task visibility
800 dl 1.178 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
801 dl 1.200 U.putOrderedInt(this, QTOP, s + 1);
802     if (n == 0) {
803     if ((p = pool) != null)
804     p.signalWork(p.workQueues, this);
805     }
806     else if (n + m == 0)
807 dl 1.112 growArray();
808 dl 1.78 }
809     }
810    
811 dl 1.178 /**
812 dl 1.112 * Initializes or doubles the capacity of array. Call either
813     * by owner or with lock held -- it is OK for base, but not
814     * top, to move while resizings are in progress.
815     */
816     final ForkJoinTask<?>[] growArray() {
817     ForkJoinTask<?>[] oldA = array;
818     int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
819     if (size > MAXIMUM_QUEUE_CAPACITY)
820     throw new RejectedExecutionException("Queue capacity exceeded");
821     int oldMask, t, b;
822     ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
823     if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
824     (t = top) - (b = base) > 0) {
825     int mask = size - 1;
826 dl 1.200 do { // emulate poll from old array, push to new array
827 dl 1.112 ForkJoinTask<?> x;
828     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
829     int j = ((b & mask) << ASHIFT) + ABASE;
830     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
831     if (x != null &&
832     U.compareAndSwapObject(oldA, oldj, x, null))
833     U.putObjectVolatile(a, j, x);
834     } while (++b != t);
835 dl 1.78 }
836 dl 1.112 return a;
837 dl 1.78 }
838    
839     /**
840 dl 1.90 * Takes next task, if one exists, in LIFO order. Call only
841 dl 1.102 * by owner in unshared queues.
842 dl 1.90 */
843     final ForkJoinTask<?> pop() {
844 dl 1.94 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
845     if ((a = array) != null && (m = a.length - 1) >= 0) {
846 dl 1.90 for (int s; (s = top - 1) - base >= 0;) {
847 dl 1.94 long j = ((m & s) << ASHIFT) + ABASE;
848     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
849 dl 1.90 break;
850     if (U.compareAndSwapObject(a, j, t, null)) {
851 dl 1.200 U.putOrderedInt(this, QTOP, s);
852 dl 1.90 return t;
853     }
854     }
855     }
856     return null;
857     }
858    
859     /**
860     * Takes a task in FIFO order if b is base of queue and a task
861     * can be claimed without contention. Specialized versions
862 dl 1.200 * appear in ForkJoinPool methods scan and helpStealer.
863 dl 1.78 */
864 dl 1.90 final ForkJoinTask<?> pollAt(int b) {
865 dl 1.178 ForkJoinTask<?> t; ForkJoinTask<?>[] a;
866 dl 1.90 if ((a = array) != null) {
867 dl 1.86 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
868     if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
869 dl 1.178 base == b && U.compareAndSwapObject(a, j, t, null)) {
870 dl 1.200 base = b + 1;
871 dl 1.78 return t;
872     }
873     }
874     return null;
875     }
876    
877     /**
878 dl 1.90 * Takes next task, if one exists, in FIFO order.
879 dl 1.78 */
880 dl 1.90 final ForkJoinTask<?> poll() {
881 dl 1.178 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
882 dl 1.90 while ((b = base) - top < 0 && (a = array) != null) {
883     int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
884     t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
885 dl 1.200 if (base == b) {
886     if (t != null) {
887     if (U.compareAndSwapObject(a, j, t, null)) {
888     base = b + 1;
889     return t;
890     }
891 dl 1.78 }
892 dl 1.200 else if (b + 1 == top) // now empty
893 dl 1.90 break;
894     }
895 dl 1.78 }
896     return null;
897     }
898    
899     /**
900     * Takes next task, if one exists, in order specified by mode.
901     */
902     final ForkJoinTask<?> nextLocalTask() {
903 dl 1.200 return (config & FIFO_QUEUE) == 0 ? pop() : poll();
904 dl 1.78 }
905    
906     /**
907     * Returns next task, if one exists, in order specified by mode.
908     */
909     final ForkJoinTask<?> peek() {
910     ForkJoinTask<?>[] a = array; int m;
911     if (a == null || (m = a.length - 1) < 0)
912     return null;
913 dl 1.200 int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
914 dl 1.78 int j = ((i & m) << ASHIFT) + ABASE;
915     return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
916     }
917    
918     /**
919     * Pops the given task only if it is at the current top.
920 dl 1.105 * (A shared version is available only via FJP.tryExternalUnpush)
921 dl 1.200 */
922 dl 1.78 final boolean tryUnpush(ForkJoinTask<?> t) {
923     ForkJoinTask<?>[] a; int s;
924     if ((a = array) != null && (s = top) != base &&
925     U.compareAndSwapObject
926     (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
927 dl 1.200 U.putOrderedInt(this, QTOP, s);
928 dl 1.78 return true;
929     }
930     return false;
931     }
932    
933     /**
934 jsr166 1.84 * Removes and cancels all known tasks, ignoring any exceptions.
935 dl 1.78 */
936     final void cancelAll() {
937 dl 1.200 ForkJoinTask<?> t;
938     if ((t = currentJoin) != null) {
939     currentJoin = null;
940     ForkJoinTask.cancelIgnoringExceptions(t);
941     }
942     if ((t = currentSteal) != null) {
943     currentSteal = null;
944     ForkJoinTask.cancelIgnoringExceptions(t);
945     }
946     while ((t = poll()) != null)
947 dl 1.78 ForkJoinTask.cancelIgnoringExceptions(t);
948     }
949    
950 dl 1.104 // Specialized execution methods
951 dl 1.78
952     /**
953 dl 1.178 * Polls and runs tasks until empty.
954 dl 1.78 */
955 dl 1.178 final void pollAndExecAll() {
956     for (ForkJoinTask<?> t; (t = poll()) != null;)
957     t.doExec();
958 dl 1.94 }
959    
960     /**
961 dl 1.200 * Removes and executes all local tasks. If LIFO, invokes
962     * pollAndExecAll. Otherwise implements a specialized pop loop
963     * to exec until empty.
964     */
965     final void execLocalTasks() {
966     int b = base, m, s;
967     ForkJoinTask<?>[] a = array;
968     if (b - (s = top - 1) <= 0 && a != null &&
969     (m = a.length - 1) >= 0) {
970     if ((config & FIFO_QUEUE) == 0) {
971     for (ForkJoinTask<?> t;;) {
972     if ((t = (ForkJoinTask<?>)U.getAndSetObject
973     (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
974     break;
975     U.putOrderedInt(this, QTOP, s);
976     t.doExec();
977     if (base - (s = top - 1) > 0)
978     break;
979     }
980     }
981     else
982     pollAndExecAll();
983     }
984     }
985    
986     /**
987     * Executes the given task and any remaining local tasks
988 dl 1.94 */
989 dl 1.178 final void runTask(ForkJoinTask<?> task) {
990 dl 1.200 if (task != null) {
991     scanState &= ~SCANNING; // mark as busy
992     (currentSteal = task).doExec();
993     U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
994     execLocalTasks();
995     ForkJoinWorkerThread thread = owner;
996 dl 1.178 ++nsteals;
997 dl 1.200 scanState |= SCANNING;
998     if (thread != null)
999 dl 1.197 thread.afterTopLevelExec();
1000 dl 1.178 }
1001 dl 1.94 }
1002    
1003     /**
1004 dl 1.105 * If present, removes from queue and executes the given task,
1005 dl 1.200 * or any other cancelled task. Used only by awaitJoin
1006 dl 1.94 *
1007 dl 1.200 * Returns true if queue empty and task not known to be done
1008 dl 1.94 */
1009 dl 1.105 final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
1010 dl 1.94 ForkJoinTask<?>[] a; int m, s, b, n;
1011 dl 1.200 if ((a = array) != null && (m = a.length - 1) >= 0 &&
1012     task != null) {
1013     while ((n = (s = top) - (b = base)) > 0) {
1014     for (ForkJoinTask<?> t;;) { // traverse from s to b
1015     long j = ((--s & m) << ASHIFT) + ABASE;
1016     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
1017     return s + 1 == top; // shorter than expected
1018     else if (t == task) {
1019     boolean removed = false;
1020     if (s + 1 == top) { // pop
1021     if (U.compareAndSwapObject(a, j, task, null)) {
1022     U.putOrderedInt(this, QTOP, s);
1023     removed = true;
1024     }
1025     }
1026     else if (base == b) // replace with proxy
1027     removed = U.compareAndSwapObject(
1028     a, j, task, new EmptyTask());
1029     if (removed)
1030     task.doExec();
1031     break;
1032 dl 1.90 }
1033 dl 1.200 else if (t.status < 0 && s + 1 == top) {
1034     if (U.compareAndSwapObject(a, j, t, null))
1035     U.putOrderedInt(this, QTOP, s);
1036     break; // was cancelled
1037 dl 1.104 }
1038 dl 1.200 if (--n == 0)
1039     return false;
1040 dl 1.104 }
1041 dl 1.200 if (task.status < 0)
1042     return false;
1043 dl 1.104 }
1044     }
1045 dl 1.200 return true;
1046 dl 1.104 }
1047    
1048     /**
1049 dl 1.200 * Pops task if in the same CC computation as the given task,
1050     * in either shared or owned mode. Used only by helpComplete.
1051 dl 1.78 */
1052 dl 1.200 final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
1053     int s; ForkJoinTask<?>[] a; Object o;
1054 dl 1.178 if (base - (s = top) < 0 && (a = array) != null) {
1055     long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
1056 dl 1.200 if ((o = U.getObjectVolatile(a, j)) != null &&
1057     (o instanceof CountedCompleter)) {
1058     CountedCompleter<?> t = (CountedCompleter<?>)o;
1059     for (CountedCompleter<?> r = t;;) {
1060     if (r == task) {
1061     if (mode < 0) { // must lock
1062     if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1063     if (top == s && array == a &&
1064     U.compareAndSwapObject(a, j, t, null)) {
1065     U.putOrderedInt(this, QTOP, s - 1);
1066     U.putOrderedInt(this, QLOCK, 0);
1067     return t;
1068     }
1069 dl 1.178 qlock = 0;
1070     }
1071     }
1072 dl 1.200 else if (U.compareAndSwapObject(a, j, t, null)) {
1073     U.putOrderedInt(this, QTOP, s - 1);
1074     return t;
1075     }
1076     break;
1077 dl 1.178 }
1078 dl 1.200 else if ((r = r.completer) == null) // try parent
1079 dl 1.178 break;
1080     }
1081 dl 1.94 }
1082 dl 1.78 }
1083 dl 1.200 return null;
1084 dl 1.78 }
1085    
1086     /**
1087 dl 1.200 * Steals and runs a task in the same CC computation as the
1088     * given task if one exists and can be taken without
1089     * contention. Otherwise returns a checksum/control value for
1090     * use by method helpComplete.
1091     *
1092     * @return 1 if successful, 2 if retryable (lost to another
1093     * stealer), -1 if non-empty but no matching task found, else
1094     * the base index, forced negative.
1095     */
1096     final int pollAndExecCC(CountedCompleter<?> task) {
1097     int b, h; ForkJoinTask<?>[] a; Object o;
1098     if ((b = base) - top >= 0 || (a = array) == null)
1099     h = b | Integer.MIN_VALUE; // to sense movement on re-poll
1100     else {
1101     long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
1102     if ((o = U.getObjectVolatile(a, j)) == null)
1103     h = 2; // retryable
1104     else if (!(o instanceof CountedCompleter))
1105     h = -1; // unmatchable
1106     else {
1107     CountedCompleter<?> t = (CountedCompleter<?>)o;
1108     for (CountedCompleter<?> r = t;;) {
1109     if (r == task) {
1110     if (base == b &&
1111     U.compareAndSwapObject(a, j, t, null)) {
1112     base = b + 1;
1113 dl 1.178 t.doExec();
1114 dl 1.200 h = 1; // success
1115 dl 1.178 }
1116 dl 1.200 else
1117     h = 2; // lost CAS
1118     break;
1119 dl 1.178 }
1120 dl 1.200 else if ((r = r.completer) == null) {
1121     h = -1; // unmatched
1122 dl 1.178 break;
1123 dl 1.200 }
1124 dl 1.178 }
1125     }
1126 dl 1.78 }
1127 dl 1.200 return h;
1128 dl 1.78 }
1129    
1130     /**
1131 dl 1.86 * Returns true if owned and not known to be blocked.
1132     */
1133     final boolean isApparentlyUnblocked() {
1134     Thread wt; Thread.State s;
1135 dl 1.200 return (scanState >= 0 &&
1136 dl 1.86 (wt = owner) != null &&
1137     (s = wt.getState()) != Thread.State.BLOCKED &&
1138     s != Thread.State.WAITING &&
1139     s != Thread.State.TIMED_WAITING);
1140     }
1141    
1142 dl 1.78 // Unsafe mechanics
1143     private static final sun.misc.Unsafe U;
1144 dl 1.200 private static final int ABASE;
1145     private static final int ASHIFT;
1146 dl 1.170 private static final long QBASE;
1147 dl 1.200 private static final long QTOP;
1148 dl 1.105 private static final long QLOCK;
1149 dl 1.200 private static final long QSCANSTATE;
1150     private static final long QCURRENTSTEAL;
1151 dl 1.78 static {
1152     try {
1153     U = sun.misc.Unsafe.getUnsafe();
1154 dl 1.200 Class<?> wk = WorkQueue.class;
1155 dl 1.78 Class<?> ak = ForkJoinTask[].class;
1156 dl 1.170 QBASE = U.objectFieldOffset
1157 dl 1.200 (wk.getDeclaredField("base"));
1158     QTOP = U.objectFieldOffset
1159     (wk.getDeclaredField("top"));
1160 dl 1.105 QLOCK = U.objectFieldOffset
1161 dl 1.200 (wk.getDeclaredField("qlock"));
1162     QSCANSTATE = U.objectFieldOffset
1163     (wk.getDeclaredField("scanState"));
1164     QCURRENTSTEAL = U.objectFieldOffset
1165     (wk.getDeclaredField("currentSteal"));
1166 dl 1.78 ABASE = U.arrayBaseOffset(ak);
1167 jsr166 1.142 int scale = U.arrayIndexScale(ak);
1168     if ((scale & (scale - 1)) != 0)
1169     throw new Error("data type scale not a power of two");
1170     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1171 dl 1.78 } catch (Exception e) {
1172     throw new Error(e);
1173     }
1174     }
1175     }
1176 dl 1.14
1177 dl 1.112 // static fields (initialized in static initializer below)
1178    
1179     /**
1180     * Creates a new ForkJoinWorkerThread. This factory is used unless
1181     * overridden in ForkJoinPool constructors.
1182     */
1183     public static final ForkJoinWorkerThreadFactory
1184     defaultForkJoinWorkerThreadFactory;
1185    
1186 jsr166 1.1 /**
1187 dl 1.115 * Permission required for callers of methods that may start or
1188     * kill threads.
1189     */
1190     private static final RuntimePermission modifyThreadPermission;
1191    
1192     /**
1193 dl 1.101 * Common (static) pool. Non-null for public use unless a static
1194 dl 1.105 * construction exception, but internal usages null-check on use
1195     * to paranoically avoid potential initialization circularities
1196     * as well as to simplify generated code.
1197 dl 1.101 */
1198 dl 1.134 static final ForkJoinPool common;
1199 dl 1.101
1200     /**
1201 dl 1.160 * Common pool parallelism. To allow simpler use and management
1202     * when common pool threads are disabled, we allow the underlying
1203 dl 1.185 * common.parallelism field to be zero, but in that case still report
1204 dl 1.160 * parallelism as 1 to reflect resulting caller-runs mechanics.
1205 dl 1.90 */
1206 dl 1.134 static final int commonParallelism;
1207 dl 1.90
1208     /**
1209 dl 1.105 * Sequence number for creating workerNamePrefix.
1210 dl 1.86 */
1211 dl 1.105 private static int poolNumberSequence;
1212 dl 1.86
1213 jsr166 1.1 /**
1214 jsr166 1.132 * Returns the next sequence number. We don't expect this to
1215     * ever contend, so use simple builtin sync.
1216 dl 1.83 */
1217 dl 1.105 private static final synchronized int nextPoolId() {
1218     return ++poolNumberSequence;
1219     }
1220 dl 1.86
1221 dl 1.200 // static configuration constants
1222 dl 1.86
1223     /**
1224 dl 1.105 * Initial timeout value (in nanoseconds) for the thread
1225     * triggering quiescence to park waiting for new work. On timeout,
1226     * the thread will instead try to shrink the number of
1227     * workers. The value should be large enough to avoid overly
1228     * aggressive shrinkage during most transient stalls (long GCs
1229     * etc).
1230 dl 1.86 */
1231 dl 1.105 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
1232 dl 1.86
1233     /**
1234 dl 1.100 * Timeout value when there are more threads than parallelism level
1235 dl 1.86 */
1236 dl 1.200 private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L; // 100ms
1237 dl 1.86
1238     /**
1239 dl 1.120 * Tolerance for idle timeouts, to cope with timer undershoots
1240     */
1241 dl 1.200 private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
1242    
1243     /**
1244     * Limit on spare thread construction in tryCompensate. The
1245     * current value is far in excess of normal requirements, but also
1246     * far short of MAX_CAP and typical OS thread limits, so allows
1247     * JVMs to catch misuse/abuse before running out of resources
1248     * needed to do so.
1249     */
1250     private static int MAX_SPARES = 256;
1251 dl 1.120
1252     /**
1253 dl 1.200 * Number of times to spin-wait before blocking. The spins (in
1254     * awaitRunStateLock and awaitWork) currently use randomized
1255     * spins. If/when MWAIT-like intrinsics becomes available, they
1256     * may allow quieter spinning. The value of SPINS must be a power
1257     * of two, at least 4. The current value causes spinning for a
1258     * small fraction of context-switch times that is worthwhile given
1259     * the typical likelihoods that blocking is not necessary.
1260 dl 1.90 */
1261 dl 1.200 private static final int SPINS = 1 << 11;
1262 dl 1.90
1263     /**
1264     * Increment for seed generators. See class ThreadLocal for
1265     * explanation.
1266     */
1267 dl 1.193 private static final int SEED_INCREMENT = 0x9e3779b9;
1268 dl 1.83
1269 jsr166 1.163 /*
1270 dl 1.200 * Bits and masks for field ctl, packed with 4 16 bit subfields:
1271     * AC: Number of active running workers minus target parallelism
1272     * TC: Number of total workers minus target parallelism
1273     * SS: version count and status of top waiting thread
1274     * ID: poolIndex of top of Treiber stack of waiters
1275     *
1276     * When convenient, we can extract the lower 32 stack top bits
1277     * (including version bits) as sp=(int)ctl. The offsets of counts
1278     * by the target parallelism and the positionings of fields makes
1279     * it possible to perform the most common checks via sign tests of
1280     * fields: When ac is negative, there are not enough active
1281     * workers, when tc is negative, there are not enough total
1282     * workers. When sp is non-zero, there are waiting workers. To
1283     * deal with possibly negative fields, we use casts in and out of
1284     * "short" and/or signed shifts to maintain signedness.
1285     *
1286     * Because it occupies uppermost bits, we can add one active count
1287     * using getAndAddLong of AC_UNIT, rather than CAS, when returning
1288     * from a blocked join. Other updates entail multiple subfields
1289     * and masking, requiring CAS.
1290     */
1291    
1292     // Lower and upper word masks
1293     private static final long SP_MASK = 0xffffffffL;
1294     private static final long UC_MASK = ~SP_MASK;
1295 dl 1.86
1296 dl 1.200 // Active counts
1297 dl 1.86 private static final int AC_SHIFT = 48;
1298 dl 1.200 private static final long AC_UNIT = 0x0001L << AC_SHIFT;
1299     private static final long AC_MASK = 0xffffL << AC_SHIFT;
1300    
1301     // Total counts
1302 dl 1.86 private static final int TC_SHIFT = 32;
1303 dl 1.200 private static final long TC_UNIT = 0x0001L << TC_SHIFT;
1304     private static final long TC_MASK = 0xffffL << TC_SHIFT;
1305     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1306    
1307     // runState bits (arbitrary powers of two)
1308     private static final int RSLOCK = 1;
1309     private static final int RSIGNAL = 1 << 1;
1310     private static final int STARTED = 1 << 2;
1311     private static final int SHUTDOWN = 1 << 3;
1312     private static final int STOP = 1 << 4;
1313     private static final int TERMINATED = 1 << 5;
1314 dl 1.86
1315     // Instance fields
1316 dl 1.200 volatile long stealCount; // collects worker counts
1317     volatile long ctl; // main pool control
1318     volatile int runState; // lockable status
1319     final int config; // parallelism, mode
1320     int indexSeed; // to generate worker index
1321     volatile WorkQueue[] workQueues; // main registry
1322 dl 1.112 final ForkJoinWorkerThreadFactory factory;
1323 dl 1.200 final UncaughtExceptionHandler ueh; // per-worker UEH
1324     final String workerNamePrefix; // to create worker name string
1325 dl 1.101
1326 jsr166 1.145 /**
1327 dl 1.200 * Acquires the runState lock; returns current (locked) runState
1328 dl 1.105 */
1329 dl 1.200 private int lockRunState() {
1330     int rs;
1331     return ((((rs = runState) & RSLOCK) != 0 ||
1332     !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
1333     awaitRunStateLock() : rs);
1334     }
1335    
1336     /**
1337     * Spins and/or blocks until runstate lock is available. This
1338     * method is called only if an initial CAS fails. This acts as a
1339     * spinlock for normal cases, but falls back to builtin monitor to
1340     * block when (rarely) needed. This would be a terrible idea for a
1341     * highly contended lock, but most pools run without the lock ever
1342     * contending after the spin limit, so this works fine as a more
1343     * conservative alternative to a pure spinlock.
1344     */
1345     private int awaitRunStateLock() {
1346     for (int spins = SPINS, r = 0, rs, nrs;;) {
1347     if (((rs = runState) & RSLOCK) == 0) {
1348     if (U.compareAndSwapInt(this, RUNSTATE, rs, nrs = rs | RSLOCK))
1349     return nrs;
1350     }
1351     else if (r == 0)
1352     r = ThreadLocalRandom.nextSecondarySeed();
1353     else if (spins > 0) {
1354     r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
1355     if (r >= 0)
1356 dl 1.101 --spins;
1357     }
1358 dl 1.200 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
1359 jsr166 1.106 synchronized (this) {
1360 dl 1.200 if ((runState & RSIGNAL) != 0) {
1361 dl 1.101 try {
1362     wait();
1363     } catch (InterruptedException ie) {
1364 dl 1.104 try {
1365     Thread.currentThread().interrupt();
1366     } catch (SecurityException ignore) {
1367     }
1368 dl 1.101 }
1369     }
1370     else
1371 dl 1.105 notifyAll();
1372 dl 1.101 }
1373     }
1374     }
1375     }
1376 dl 1.78
1377 jsr166 1.1 /**
1378 dl 1.200 * Unlocks and sets runState to newRunState.
1379     *
1380     * @param oldRunState a value returned from lockRunState
1381     * @param newRunState the next value (must have lock bit clear).
1382 jsr166 1.1 */
1383 dl 1.200 private void unlockRunState(int oldRunState, int newRunState) {
1384     if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
1385     runState = newRunState; // clears RSIGNAL bit
1386     synchronized (this) { notifyAll(); }
1387     }
1388 dl 1.78 }
1389 jsr166 1.1
1390 dl 1.200 // Creating, registering and deregistering workers
1391    
1392 dl 1.112 /**
1393 dl 1.200 * Tries to construct and start one worker. Assumes that total
1394     * count has already been incremented as a reservation. Invokes
1395     * deregisterWorker on any failure.
1396     *
1397     * @return true if successful
1398 dl 1.115 */
1399 dl 1.200 private boolean createWorker() {
1400     ForkJoinWorkerThreadFactory fac = factory;
1401     Throwable ex = null;
1402     ForkJoinWorkerThread wt = null;
1403     try {
1404     if (fac != null && (wt = fac.newThread(this)) != null) {
1405     wt.start();
1406     return true;
1407 dl 1.115 }
1408 dl 1.200 } catch (Throwable rex) {
1409     ex = rex;
1410 dl 1.112 }
1411 dl 1.200 deregisterWorker(wt, ex);
1412     return false;
1413 dl 1.112 }
1414    
1415 dl 1.200 /**
1416     * Tries to add one worker, incrementing ctl counts before doing
1417     * so, relying on createWorker to back out on failure.
1418     *
1419     * @param c incoming ctl value, with total count negative and no
1420     * idle workers. On CAS failure, c is refreshed and retried if
1421     * this holds (otherwise, a new worker is not neeeded).
1422     */
1423     private void tryAddWorker(long c) {
1424     boolean add = false;
1425     do {
1426     long nc = ((AC_MASK & (c + AC_UNIT)) |
1427     (TC_MASK & (c + TC_UNIT)));
1428     if (ctl == c) {
1429     int rs, stop; // check if terminating
1430     if ((stop = (rs = lockRunState()) & STOP) == 0)
1431     add = U.compareAndSwapLong(this, CTL, c, nc);
1432     unlockRunState(rs, rs & ~RSLOCK);
1433     if (stop != 0)
1434     break;
1435     if (add) {
1436     createWorker();
1437     break;
1438     }
1439     }
1440     } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1441     }
1442 dl 1.112
1443     /**
1444 dl 1.200 * Callback from ForkJoinWorkerThread constructor to establish and
1445     * record its WorkQueue.
1446 dl 1.112 *
1447     * @param wt the worker thread
1448 dl 1.115 * @return the worker's queue
1449 dl 1.112 */
1450 dl 1.115 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1451 dl 1.200 UncaughtExceptionHandler handler;
1452     wt.setDaemon(true); // configure thread
1453 dl 1.115 if ((handler = ueh) != null)
1454     wt.setUncaughtExceptionHandler(handler);
1455 dl 1.200 WorkQueue w = new WorkQueue(this, wt);
1456     int i = 0; // assign a pool index
1457     int mode = config & MODE_MASK;
1458     int rs = lockRunState();
1459 dl 1.115 try {
1460 dl 1.200 WorkQueue[] ws; int n; // skip if no array
1461     if ((ws = workQueues) != null && (n = ws.length) > 0) {
1462     int s = indexSeed += SEED_INCREMENT; // unlikely to collide
1463     int m = n - 1;
1464     i = ((s << 1) | 1) & m; // odd-numbered indices
1465     if (ws[i] != null) { // collision
1466     int probes = 0; // step by approx half n
1467 dl 1.115 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1468 dl 1.200 while (ws[i = (i + step) & m] != null) {
1469 dl 1.115 if (++probes >= n) {
1470     workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1471     m = n - 1;
1472     probes = 0;
1473 dl 1.94 }
1474     }
1475     }
1476 dl 1.200 w.hint = s; // use as random seed
1477     w.config = i | mode;
1478     w.scanState = i; // publication fence
1479     ws[i] = w;
1480 dl 1.78 }
1481 dl 1.115 } finally {
1482 dl 1.200 unlockRunState(rs, rs & ~RSLOCK);
1483 dl 1.78 }
1484 dl 1.200 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
1485 dl 1.115 return w;
1486 dl 1.78 }
1487 dl 1.19
1488 jsr166 1.1 /**
1489 dl 1.86 * Final callback from terminating worker, as well as upon failure
1490 dl 1.105 * to construct or start a worker. Removes record of worker from
1491     * array, and adjusts counts. If pool is shutting down, tries to
1492     * complete termination.
1493 dl 1.78 *
1494 jsr166 1.151 * @param wt the worker thread, or null if construction failed
1495 dl 1.78 * @param ex the exception causing failure, or null if none
1496 dl 1.45 */
1497 dl 1.78 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1498     WorkQueue w = null;
1499     if (wt != null && (w = wt.workQueue) != null) {
1500 dl 1.200 WorkQueue[] ws; // remove index from array
1501     int idx = w.config & SMASK;
1502     int rs = lockRunState();
1503     if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
1504     ws[idx] = null;
1505     unlockRunState(rs, rs & ~RSLOCK);
1506     }
1507     long c; // decrement counts
1508     do {} while (!U.compareAndSwapLong
1509     (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
1510     (TC_MASK & (c - TC_UNIT)) |
1511     (SP_MASK & c))));
1512     if (w != null) {
1513     w.qlock = -1; // ensure set
1514     w.cancelAll(); // cancel remaining tasks
1515 dl 1.178 U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1516 dl 1.78 }
1517 dl 1.120 if (!tryTerminate(false, false) && w != null && w.array != null) {
1518 dl 1.200 WorkQueue[] ws = workQueues; int m, sp;
1519     if (ws != null && (m = ws.length - 1) >= 0 && (c = ctl) < 0L) {
1520     if ((sp = (int)c) != 0) // wake up replacement
1521     tryRelease(c, ws[sp & m], AC_UNIT);
1522     else if (ex != null && (c & ADD_WORKER) != 0L)
1523     tryAddWorker(c); // create replacement
1524 dl 1.120 }
1525 dl 1.78 }
1526 dl 1.200 if (ex == null) // help clean on way out
1527 dl 1.120 ForkJoinTask.helpExpungeStaleExceptions();
1528 dl 1.200 else // rethrow
1529 dl 1.104 ForkJoinTask.rethrow(ex);
1530 dl 1.78 }
1531 dl 1.52
1532 dl 1.200 // Signalling
1533 dl 1.19
1534     /**
1535 dl 1.115 * Tries to create or activate a worker if too few are active.
1536     *
1537 dl 1.178 * @param ws the worker array to use to find signallees
1538 dl 1.200 * @param q a WorkQueue --if non-null, don't retry if now empty
1539 dl 1.105 */
1540 dl 1.178 final void signalWork(WorkQueue[] ws, WorkQueue q) {
1541 dl 1.200 long c; int sp, i; WorkQueue v; Thread p;
1542     while ((c = ctl) < 0L) {
1543     if ((sp = (int)c) == 0) { // no idle workers
1544     if ((c & ADD_WORKER) != 0L) // too few workers
1545     tryAddWorker(c);
1546     break;
1547     }
1548     if (ws == null) // unstarted/terminated
1549 dl 1.174 break;
1550 dl 1.200 if (ws.length <= (i = sp & SMASK)) // terminated
1551 dl 1.115 break;
1552 dl 1.200 if ((v = ws[i]) == null) // terminating
1553 dl 1.174 break;
1554 dl 1.200 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
1555     int d = sp - v.scanState; // screen CAS
1556     long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
1557     if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
1558     v.scanState = vs; // activate v
1559     if ((p = v.parker) != null)
1560 dl 1.174 U.unpark(p);
1561     break;
1562     }
1563 dl 1.200 if (q != null && q.isEmpty()) // no more work
1564 dl 1.174 break;
1565 dl 1.52 }
1566 dl 1.14 }
1567    
1568 dl 1.200 /**
1569     * Signals and releases worker v if it is top of idle worker
1570     * stack. This performs a one-shot version of signalWork only if
1571     * there is (apparently) at least one idle worker.
1572     *
1573     * @param c incoming ctl value
1574     * @param v if non-null, a worker
1575     * @param inc the increment to active count (zero when compensating)
1576     * @return true if successful
1577     */
1578     private boolean tryRelease(long c, WorkQueue v, long inc) {
1579     int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
1580     if (v != null && v.scanState == sp) { // v is at top of stack
1581     long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
1582     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1583     v.scanState = vs;
1584     if ((p = v.parker) != null)
1585     U.unpark(p);
1586     return true;
1587     }
1588     }
1589     return false;
1590     }
1591    
1592 dl 1.90 // Scanning for tasks
1593    
1594 dl 1.14 /**
1595 dl 1.90 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1596 dl 1.14 */
1597 dl 1.90 final void runWorker(WorkQueue w) {
1598 dl 1.200 w.growArray(); // allocate queue
1599     int seed = w.hint; // initially holds randomization hint
1600     int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
1601     for (ForkJoinTask<?> t;;) {
1602     if ((t = scan(w, r)) != null)
1603     w.runTask(t);
1604     else if (!awaitWork(w, r))
1605     break;
1606 dl 1.178 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1607     }
1608 dl 1.14 }
1609    
1610     /**
1611 dl 1.200 * Scans for and tries to steal a top-level task. Scans start at a
1612     * random location, randomly moving on apparentl contention,
1613     * otherwise continuing linearly until reaching two consecutive
1614     * empty passes over all queues with the same checksum (summing
1615     * each base index of each queue, that moves on each steal), at
1616     * which point the worker tries to inactivate and then re-scans,
1617     * attempting to re-activate (itself or some other worker) if
1618     * finding a task; otherwise returning null to await work. Scans
1619     * otherwise touch as little memory as possible, to reduce
1620     * disruption on other scanning threads.
1621 dl 1.78 *
1622     * @param w the worker (via its WorkQueue)
1623 dl 1.178 * @param r a random seed
1624 dl 1.200 * @return a task, or null if none found
1625 dl 1.78 */
1626 dl 1.200 private ForkJoinTask<?> scan(WorkQueue w, int r) {
1627 dl 1.115 WorkQueue[] ws; int m;
1628 dl 1.200 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
1629     int ss = w.scanState; // initially non-negative
1630     for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
1631     WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1632     int b, n; long c;
1633     if ((q = ws[k]) != null) {
1634     if ((n = (b = q.base) - q.top) < 0 &&
1635     (a = q.array) != null) { // non-empty
1636     long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1637     if ((t = ((ForkJoinTask<?>)
1638     U.getObjectVolatile(a, i))) != null &&
1639     q.base == b) {
1640     if (ss >= 0) {
1641     if (U.compareAndSwapObject(a, i, t, null)) {
1642     q.base = b + 1;
1643     if (n < -1) // signal others
1644     signalWork(ws, q);
1645     return t;
1646     }
1647     }
1648     else if (oldSum == 0 && // try to activate
1649     w.scanState < 0)
1650     tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
1651 dl 1.178 }
1652 dl 1.200 if (ss < 0) // refresh
1653     ss = w.scanState;
1654     r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
1655     origin = k = r & m; // move and rescan
1656     oldSum = checkSum = 0;
1657     continue;
1658     }
1659     checkSum += b;
1660     }
1661     if ((k = (k + 1) & m) == origin) { // continue until stable
1662     if ((ss >= 0 || (ss == (ss = w.scanState))) &&
1663     oldSum == (oldSum = checkSum)) {
1664     if (ss < 0) // already inactive
1665     break;
1666     int ns = ss | INACTIVE; // try to inactivate
1667     long nc = ((SP_MASK & ns) |
1668     (UC_MASK & ((c = ctl) - AC_UNIT)));
1669     w.stackPred = (int)c; // hold prev stack top
1670     U.putInt(w, QSCANSTATE, ns);
1671     if (U.compareAndSwapLong(this, CTL, c, nc))
1672     ss = ns;
1673     else
1674     w.scanState = ss; // back out
1675 dl 1.174 }
1676 dl 1.200 checkSum = 0;
1677 dl 1.115 }
1678     }
1679 dl 1.52 }
1680 dl 1.200 return null;
1681 dl 1.14 }
1682    
1683     /**
1684 dl 1.200 * Possibly blocks worker w waiting for a task to steal, or
1685     * returns false if the worker should terminate. If inactivating
1686     * w has caused the pool to become quiescent, checks for pool
1687 dl 1.178 * termination, and, so long as this is not the only worker, waits
1688 dl 1.200 * for up to a given duration. On timeout, if ctl has not
1689     * changed, terminates the worker, which will in turn wake up
1690 dl 1.178 * another worker to possibly repeat this process.
1691 dl 1.52 *
1692 dl 1.78 * @param w the calling worker
1693 dl 1.200 * param r a random seed (for spins)
1694     * @return false if the worker should terminate
1695 dl 1.14 */
1696 dl 1.200 private boolean awaitWork(WorkQueue w, int r) {
1697     if (w == null || w.qlock < 0) // w is terminating
1698     return false;
1699     for (int pred = w.stackPred, spins = SPINS, ss;;) {
1700     if ((ss = w.scanState) >= 0)
1701     break;
1702     else if (spins > 0) {
1703     r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
1704     if (r >= 0 && --spins == 0) { // randomize spins
1705     WorkQueue v; WorkQueue[] ws; int s, j;
1706     if (pred != 0 && (ws = workQueues) != null &&
1707     (j = pred & SMASK) < ws.length &&
1708     (v = ws[j]) != null && // see if pred parking
1709     (v.parker == null || v.scanState >= 0))
1710     spins = SPINS; // continue spinnning
1711     else if ((s = w.nsteals) != 0) {
1712     w.nsteals = 0; // collect steals
1713     U.getAndAddLong(this, STEALCOUNT, s);
1714     }
1715     }
1716 dl 1.177 }
1717 dl 1.200 else if (w.qlock < 0) // recheck after spins
1718     return false;
1719     else if (!Thread.interrupted()) {
1720     long c, prevctl, parkTime, deadline;
1721     if ((runState & STOP) != 0) // pool terminating
1722     return false;
1723     int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
1724     if (ac <= 0 && tryTerminate(false, false))
1725     return false;
1726     if (ac <= 0 && ss == (int)c) { // is last waiter
1727     int t = (short)(c >>> TC_SHIFT); // use timed wait
1728     prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
1729     parkTime = (t > 0 ? FAST_IDLE_TIMEOUT:
1730     (1 - t) * IDLE_TIMEOUT);
1731 dl 1.178 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1732     }
1733     else
1734 dl 1.200 prevctl = parkTime = deadline = 0L;
1735     Thread wt = Thread.currentThread();
1736     U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
1737     w.parker = wt;
1738     if (w.scanState < 0 && ctl == c) // recheck before park
1739     U.park(false, parkTime);
1740     U.putOrderedObject(w, QPARKER, null);
1741     U.putObject(wt, PARKBLOCKER, null);
1742     if (w.scanState >= 0)
1743     break;
1744     if (parkTime != 0L && ctl == c &&
1745     deadline - System.nanoTime() <= 0L &&
1746     U.compareAndSwapLong(this, CTL, c, prevctl))
1747     return false; // shrink pool
1748 dl 1.120 }
1749     }
1750 dl 1.200 return true;
1751 dl 1.178 }
1752    
1753 dl 1.200 // Joining tasks
1754    
1755 dl 1.178 /**
1756 dl 1.200 * Tries to steal and run tasks within the target's computation.
1757     * Uses a variant of the top-level algorithm, restricted to tasks
1758     * with the given task as ancestor: It prefers taking and running
1759     * eligible tasks popped from the worker's own queue (via
1760     * popCC). Otherwise it scans others, randomly moving on
1761     * contention or execution, deciding to give up based on a
1762     * checksum (via return codes frob pollAndExecCC). The maxTasks
1763     * argument supports external usages; internal calls use zero,
1764     * allowing unbounded steps (external calls trap non-positive
1765     * values).
1766     *
1767     * @param w caller
1768     * @param maxTasks if non-sero, the maximum number of other tasks to run
1769     * @return task status on exit
1770     */
1771     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1772     int maxTasks) {
1773     WorkQueue[] ws; int s = 0, m;
1774     if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1775     task != null && w != null) {
1776     int mode = w.config; // for popCC
1777     int r = w.hint ^ w.top; // arbitrary seed for origin
1778     int origin = r & m; // first queue to scan
1779     int h = 1; // 1:ran, >1:contended, <0:hash
1780     for (int k = origin, oldSum = 0, checkSum = 0;;) {
1781     CountedCompleter<?> p; WorkQueue q;
1782     if ((s = task.status) < 0)
1783     break;
1784     if (h == 1 && (p = w.popCC(task, mode)) != null) {
1785     p.doExec(); // run local task
1786     if (maxTasks != 0 && --maxTasks == 0)
1787     break;
1788     origin = k; // reset
1789     oldSum = checkSum = 0;
1790     }
1791     else { // poll other queues
1792     if ((q = ws[k]) == null)
1793     h = 0;
1794     else if ((h = q.pollAndExecCC(task)) < 0)
1795     checkSum += h;
1796     if (h > 0) {
1797     if (h == 1 && maxTasks != 0 && --maxTasks == 0)
1798     break;
1799     r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1800     origin = k = r & m; // move and restart
1801     oldSum = checkSum = 0;
1802     }
1803     else if ((k = (k + 1) & m) == origin) {
1804     if (oldSum == (oldSum = checkSum))
1805     break;
1806     checkSum = 0;
1807     }
1808     }
1809 dl 1.178 }
1810     }
1811 dl 1.200 return s;
1812 dl 1.120 }
1813    
1814     /**
1815 dl 1.78 * Tries to locate and execute tasks for a stealer of the given
1816     * task, or in turn one of its stealers, Traces currentSteal ->
1817     * currentJoin links looking for a thread working on a descendant
1818     * of the given task and with a non-empty queue to steal back and
1819     * execute tasks from. The first call to this method upon a
1820     * waiting join will often entail scanning/search, (which is OK
1821     * because the joiner has nothing better to do), but this method
1822 dl 1.200 * leaves hints in workers to speed up subsequent calls.
1823 dl 1.78 *
1824 dl 1.200 * @param w caller
1825 dl 1.78 * @param task the task to join
1826     */
1827 dl 1.200 private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
1828     WorkQueue[] ws = workQueues;
1829     int oldSum = 0, checkSum, m;
1830     if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
1831     task != null) {
1832     do { // restart point
1833     checkSum = 0; // for stability check
1834     ForkJoinTask<?> subtask;
1835     WorkQueue j = w, v; // v is subtask stealer
1836     descent: for (subtask = task; subtask.status >= 0; ) {
1837     for (int h = j.hint | 1, k = 0, i; ; k += 2) {
1838     if (k > m) // can't find stealer
1839     break descent;
1840     if ((v = ws[i = (h + k) & m]) != null) {
1841     if (v.currentSteal == subtask) {
1842     j.hint = i;
1843 dl 1.95 break;
1844     }
1845 dl 1.200 checkSum += v.base;
1846 dl 1.78 }
1847     }
1848 dl 1.200 for (;;) { // help v or descend
1849 jsr166 1.195 ForkJoinTask<?>[] a; int b;
1850 dl 1.200 checkSum += (b = v.base);
1851     ForkJoinTask<?> next = v.currentJoin;
1852     if (subtask.status < 0 || j.currentJoin != subtask ||
1853     v.currentSteal != subtask) // stale
1854     break descent;
1855     if (b - v.top >= 0 || (a = v.array) == null) {
1856     if ((subtask = next) == null)
1857     break descent;
1858     j = v;
1859     break;
1860 dl 1.95 }
1861 dl 1.200 int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1862     ForkJoinTask<?> t = ((ForkJoinTask<?>)
1863     U.getObjectVolatile(a, i));
1864     if (v.base == b) {
1865     if (t == null) // stale
1866     break descent;
1867     if (U.compareAndSwapObject(a, i, t, null)) {
1868     v.base = b + 1;
1869     ForkJoinTask<?> ps = w.currentSteal;
1870     U.putOrderedObject(w, QCURRENTSTEAL, t);
1871     t.doExec();
1872     U.putOrderedObject(w, QCURRENTSTEAL, ps);
1873     if (!w.isEmpty())
1874     return; // can't further help
1875 dl 1.95 }
1876 dl 1.78 }
1877 dl 1.52 }
1878 dl 1.19 }
1879 dl 1.200 } while (task.status >= 0 && oldSum != (oldSum = checkSum));
1880 dl 1.14 }
1881 dl 1.22 }
1882    
1883 dl 1.52 /**
1884 dl 1.200 * Tries to decrement active count (sometimes implicitly) and
1885     * possibly release or create a compensating worker in preparation
1886     * for blocking. Returns false (retryable by caller), on
1887     * contention, detected staleness, instability or termination.
1888 dl 1.105 *
1889 dl 1.200 * @param w caller
1890 dl 1.19 */
1891 dl 1.200 private boolean tryCompensate(WorkQueue w) {
1892     boolean canBlock;
1893     WorkQueue[] ws; long c; int m, pc, sp;
1894     if (w == null || w.qlock < 0 || // caller terminating
1895     (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
1896     (pc = config & SMASK) == 0) // parallelism disabled
1897     canBlock = false;
1898     else if ((sp = (int)(c = ctl)) != 0) // release idle worker
1899     canBlock = tryRelease(c, ws[sp & m], 0L);
1900     else {
1901     int ac = (int)(c >> AC_SHIFT) + pc;
1902     int tc = (short)(c >> TC_SHIFT) + pc;
1903     int nbusy = 0; // validate saturation
1904     for (int i = 0; i <= m; ++i) { // two passes of odd indices
1905     WorkQueue v;
1906     if ((v = ws[((i << 1) | 1) & m]) != null) {
1907     if ((v.scanState & SCANNING) != 0)
1908 dl 1.190 break;
1909 dl 1.200 ++nbusy;
1910 dl 1.178 }
1911 dl 1.52 }
1912 dl 1.200 if (nbusy != (tc << 1) || ctl != c)
1913     canBlock = false; // unstable or stale
1914     else if (tc >= pc && ac > 1 && w.isEmpty()) {
1915     long nc = ((AC_MASK & (c - AC_UNIT)) |
1916     (~AC_MASK & c)); // uncompensated
1917     canBlock = U.compareAndSwapLong(this, CTL, c, nc);
1918 dl 1.105 }
1919 dl 1.200 else if (tc >= MAX_CAP || tc >= pc + MAX_SPARES)
1920     throw new RejectedExecutionException(
1921     "Thread limit exceeded replacing blocked worker");
1922     else { // similar to tryAddWorker
1923     boolean add = false; int rs; // CAS within lock
1924     long nc = ((AC_MASK & c) |
1925     (TC_MASK & (c + TC_UNIT)));
1926     if (((rs = lockRunState()) & STOP) == 0)
1927     add = U.compareAndSwapLong(this, CTL, c, nc);
1928     unlockRunState(rs, rs & ~RSLOCK);
1929     canBlock = add && createWorker(); // throws on exception
1930 dl 1.90 }
1931     }
1932 dl 1.200 return canBlock;
1933 dl 1.90 }
1934    
1935     /**
1936 dl 1.200 * Helps and/or blocks until the given task is done or timeout.
1937 dl 1.90 *
1938 dl 1.200 * @param w caller
1939 dl 1.90 * @param task the task
1940 dl 1.200 * @param if nonzero, deadline for timed waits
1941 dl 1.90 * @return task status on exit
1942     */
1943 dl 1.200 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1944 dl 1.105 int s = 0;
1945 dl 1.200 if (task != null && w != null) {
1946     ForkJoinTask<?> prevJoin = w.currentJoin;
1947     U.putOrderedObject(w, QCURRENTJOIN, task);
1948     CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
1949     (CountedCompleter<?>)task : null;
1950     for (;;) {
1951     if ((s = task.status) < 0)
1952     break;
1953     if (cc != null)
1954     helpComplete(w, cc, 0);
1955     else if (w.base == w.top || w.tryRemoveAndExec(task))
1956     helpStealer(w, task);
1957     if ((s = task.status) < 0)
1958     break;
1959     long ms, ns;
1960     if (deadline == 0L)
1961     ms = 0L;
1962     else if ((ns = deadline - System.nanoTime()) <= 0L)
1963     break;
1964     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1965     ms = 1L;
1966     if (tryCompensate(w)) {
1967     task.internalWait(ms);
1968     U.getAndAddLong(this, CTL, AC_UNIT);
1969 dl 1.90 }
1970     }
1971 dl 1.200 U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
1972 dl 1.90 }
1973 dl 1.94 return s;
1974 dl 1.90 }
1975    
1976 dl 1.200 // Specialized scanning
1977 dl 1.90
1978     /**
1979     * Returns a (probably) non-empty steal queue, if one is found
1980 dl 1.131 * during a scan, else null. This method must be retried by
1981     * caller if, by the time it tries to use the queue, it is empty.
1982 dl 1.78 */
1983 dl 1.178 private WorkQueue findNonEmptyStealQueue() {
1984 dl 1.200 int r = ThreadLocalRandom.nextSecondarySeed(), oldSum = 0, checkSum;
1985     do {
1986     checkSum = 0;
1987     WorkQueue[] ws; WorkQueue q; int m, k, b;
1988     if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
1989     for (int i = 0; i <= m; ++i) {
1990     if ((k = (i + r + m) & m) <= m && k >= 0 &&
1991     (q = ws[k]) != null) {
1992     if ((b = q.base) - q.top < 0)
1993     return q;
1994     checkSum += b;
1995     }
1996 dl 1.52 }
1997     }
1998 dl 1.200 } while (oldSum != (oldSum = checkSum));
1999     return null;
2000 dl 1.22 }
2001    
2002     /**
2003 dl 1.78 * Runs tasks until {@code isQuiescent()}. We piggyback on
2004     * active count ctl maintenance, but rather than blocking
2005     * when tasks cannot be found, we rescan until all others cannot
2006     * find tasks either.
2007     */
2008     final void helpQuiescePool(WorkQueue w) {
2009     for (boolean active = true;;) {
2010 dl 1.131 long c; WorkQueue q; ForkJoinTask<?> t; int b;
2011 dl 1.172 while ((t = w.nextLocalTask()) != null)
2012 dl 1.131 t.doExec();
2013 dl 1.178 if ((q = findNonEmptyStealQueue()) != null) {
2014 dl 1.78 if (!active) { // re-establish active count
2015     active = true;
2016 dl 1.200 U.getAndAddLong(this, CTL, AC_UNIT);
2017     }
2018     if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2019     ForkJoinTask<?> ps = w.currentSteal;
2020     U.putOrderedObject(w, QCURRENTSTEAL, t);
2021     t.doExec();
2022     U.putOrderedObject(w, QCURRENTSTEAL, ps);
2023     ++w.nsteals;
2024 dl 1.178 }
2025 dl 1.78 }
2026 jsr166 1.194 else if (active) { // decrement active count without queuing
2027 dl 1.200 long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
2028     if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
2029 dl 1.185 break; // bypass decrement-then-increment
2030 dl 1.131 if (U.compareAndSwapLong(this, CTL, c, nc))
2031 dl 1.78 active = false;
2032 dl 1.22 }
2033 dl 1.200 else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
2034     U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2035 dl 1.185 break;
2036 dl 1.22 }
2037     }
2038    
2039     /**
2040 jsr166 1.84 * Gets and removes a local or stolen task for the given worker.
2041 dl 1.78 *
2042     * @return a task, if available
2043 dl 1.22 */
2044 dl 1.78 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2045     for (ForkJoinTask<?> t;;) {
2046 dl 1.90 WorkQueue q; int b;
2047 dl 1.78 if ((t = w.nextLocalTask()) != null)
2048     return t;
2049 dl 1.178 if ((q = findNonEmptyStealQueue()) == null)
2050 dl 1.78 return null;
2051 dl 1.172 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2052 dl 1.78 return t;
2053 dl 1.52 }
2054 dl 1.14 }
2055    
2056     /**
2057 dl 1.105 * Returns a cheap heuristic guide for task partitioning when
2058     * programmers, frameworks, tools, or languages have little or no
2059     * idea about task granularity. In essence by offering this
2060     * method, we ask users only about tradeoffs in overhead vs
2061     * expected throughput and its variance, rather than how finely to
2062     * partition tasks.
2063     *
2064     * In a steady state strict (tree-structured) computation, each
2065     * thread makes available for stealing enough tasks for other
2066     * threads to remain active. Inductively, if all threads play by
2067     * the same rules, each thread should make available only a
2068     * constant number of tasks.
2069     *
2070     * The minimum useful constant is just 1. But using a value of 1
2071     * would require immediate replenishment upon each steal to
2072     * maintain enough tasks, which is infeasible. Further,
2073     * partitionings/granularities of offered tasks should minimize
2074     * steal rates, which in general means that threads nearer the top
2075     * of computation tree should generate more than those nearer the
2076     * bottom. In perfect steady state, each thread is at
2077     * approximately the same level of computation tree. However,
2078     * producing extra tasks amortizes the uncertainty of progress and
2079     * diffusion assumptions.
2080     *
2081 jsr166 1.161 * So, users will want to use values larger (but not much larger)
2082 dl 1.105 * than 1 to both smooth over transient shortages and hedge
2083     * against uneven progress; as traded off against the cost of
2084     * extra task overhead. We leave the user to pick a threshold
2085     * value to compare with the results of this call to guide
2086     * decisions, but recommend values such as 3.
2087     *
2088     * When all threads are active, it is on average OK to estimate
2089     * surplus strictly locally. In steady-state, if one thread is
2090     * maintaining say 2 surplus tasks, then so are others. So we can
2091     * just use estimated queue length. However, this strategy alone
2092     * leads to serious mis-estimates in some non-steady-state
2093     * conditions (ramp-up, ramp-down, other stalls). We can detect
2094     * many of these by further considering the number of "idle"
2095     * threads, that are known to have zero queued tasks, so
2096     * compensate by a factor of (#idle/#active) threads.
2097     */
2098     static int getSurplusQueuedTaskCount() {
2099     Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2100     if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2101 dl 1.200 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).
2102     config & SMASK;
2103 dl 1.112 int n = (q = wt.workQueue).top - q.base;
2104 dl 1.105 int a = (int)(pool.ctl >> AC_SHIFT) + p;
2105 dl 1.112 return n - (a > (p >>>= 1) ? 0 :
2106     a > (p >>>= 1) ? 1 :
2107     a > (p >>>= 1) ? 2 :
2108     a > (p >>>= 1) ? 4 :
2109     8);
2110 dl 1.105 }
2111     return 0;
2112 dl 1.100 }
2113    
2114 dl 1.86 // Termination
2115 dl 1.14
2116     /**
2117 dl 1.200 * Possibly initiates and/or completes termination. When
2118     * terminating (STOP phase), runs three passes through workQueues:
2119     * (0) Setting termination status (which also stops external
2120     * submitters by locking queues), (1) cancelling all tasks; (2)
2121     * interrupting lagging threads (likely in external tasks, but
2122     * possibly also blocked in joins). Each pass repeats previous
2123     * steps because of potential lagging thread creation.
2124 dl 1.14 *
2125     * @param now if true, unconditionally terminate, else only
2126 dl 1.78 * if no work and no active workers
2127 jsr166 1.87 * @param enable if true, enable shutdown when next possible
2128 dl 1.14 * @return true if now terminating or terminated
2129 jsr166 1.1 */
2130 dl 1.86 private boolean tryTerminate(boolean now, boolean enable) {
2131 dl 1.200 int rs;
2132     if (this == common) // cannot shut down
2133 dl 1.105 return false;
2134 dl 1.200 if (((rs = runState) & SHUTDOWN) == 0) { // enable
2135 dl 1.131 if (!enable)
2136     return false;
2137 dl 1.200 rs = lockRunState();
2138     unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
2139     }
2140     if ((rs & STOP) == 0) { // enter STOP phase
2141     if (!now && (int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2142     return false;
2143     rs = lockRunState();
2144     unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2145     }
2146     for (int pass = 0; pass < 3; ++pass) { // clobber other workers
2147     WorkQueue[] ws; int n;
2148     if ((ws = workQueues) != null && (n = ws.length) > 0) {
2149     WorkQueue w; Thread wt;
2150     for (int i = 0; i < n; ++i) {
2151     if ((w = ws[i]) != null) {
2152     w.qlock = -1;
2153     if (pass > 0) {
2154     w.cancelAll(); // clear queue
2155     if (pass > 1 && (wt = w.owner) != null) {
2156     if (!wt.isInterrupted()) {
2157     try {
2158     wt.interrupt();
2159     } catch (Throwable ignore) {
2160     }
2161     }
2162     U.unpark(wt); // wake up
2163     }
2164     }
2165 dl 1.101 }
2166 dl 1.78 }
2167     }
2168 dl 1.200 }
2169     if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2170     rs = lockRunState(); // done -- no more workers
2171     unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
2172     synchronized (this) { // release awaitTermination
2173     notifyAll();
2174     }
2175     }
2176     return true;
2177     }
2178    
2179     // External operations
2180    
2181     /**
2182     * Full version of externalPush, handling uncommon cases, as well
2183     * as performing secondary initialization upon the first
2184     * submission of the first task to the pool. It also detects
2185     * first submission by an external thread and creates a new shared
2186     * queue if the one at index if empty or contended.
2187     *
2188     * @param task the task. Caller must ensure non-null.
2189     */
2190     private void externalSubmit(ForkJoinTask<?> task) {
2191     int r; // initialize caller's probe
2192     if ((r = ThreadLocalRandom.getProbe()) == 0) {
2193     ThreadLocalRandom.localInit();
2194     r = ThreadLocalRandom.getProbe();
2195     }
2196     for (;;) {
2197     WorkQueue[] ws; WorkQueue q; int rs, m, k;
2198     boolean move = false;
2199     if (((rs = runState) & SHUTDOWN) != 0)
2200     throw new RejectedExecutionException();
2201     else if ((rs & STARTED) == 0 || // initialize workQueues array
2202     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
2203     int ns = 0;
2204     rs = lockRunState();
2205     try {
2206     if ((rs & STARTED) == 0) { // find power of two table size
2207     int p = config & SMASK; // ensure at least 2 slots
2208     int n = (p > 1) ? p - 1 : 1;
2209     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
2210     n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
2211     workQueues = new WorkQueue[n];
2212     ns = STARTED;
2213 dl 1.78 }
2214 dl 1.200 } finally {
2215     unlockRunState(rs, (rs & ~RSLOCK) | ns);
2216 dl 1.52 }
2217     }
2218 dl 1.200 else if ((q = ws[k = r & m & SQMASK]) != null) {
2219     if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2220     ForkJoinTask<?>[] a = q.array;
2221     int s = q.top;
2222     boolean submitted = false; // initial submission or resizing
2223     try { // locked version of push
2224     if ((a != null && a.length > s + 1 - q.base) ||
2225     (a = q.growArray()) != null) {
2226     int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
2227     U.putOrderedObject(a, j, task);
2228     U.putOrderedInt(q, QTOP, s + 1);
2229     submitted = true;
2230 dl 1.86 }
2231 dl 1.200 } finally {
2232     q.qlock = 0;
2233     }
2234     if (submitted) {
2235     signalWork(ws, q);
2236     return;
2237 dl 1.78 }
2238 dl 1.52 }
2239 dl 1.200 move = true; // move on failure
2240 dl 1.52 }
2241 dl 1.200 else if (((rs = runState) & RSLOCK) == 0) { // create new queue
2242     q = new WorkQueue(this, null);
2243     q.hint = r;
2244     q.config = k | SHARED_QUEUE;
2245     rs = lockRunState(); // publish index
2246     if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
2247     ws[k] = q; // else terminated
2248     unlockRunState(rs, rs & ~RSLOCK);
2249     }
2250     else
2251     move = true; // move if busy
2252     if (move)
2253     r = ThreadLocalRandom.advanceProbe(r);
2254 dl 1.52 }
2255     }
2256    
2257 dl 1.200 /**
2258     * Tries to add the given task to a submission queue at
2259     * submitter's current queue. Only the (vastly) most common path
2260     * is directly handled in this method, while screening for need
2261     * for externalSubmit.
2262     *
2263     * @param task the task. Caller must ensure non-null.
2264     */
2265     final void externalPush(ForkJoinTask<?> task) {
2266     WorkQueue[] ws; WorkQueue q; int m;
2267     int r = ThreadLocalRandom.getProbe();
2268     if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
2269     (q = ws[m & r & SQMASK]) != null && r != 0 &&
2270     U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2271     ForkJoinTask<?>[] a; int am, n, s;
2272     if ((a = q.array) != null &&
2273     (am = a.length - 1) > (n = (s = q.top) - q.base)) {
2274     int j = ((am & s) << ASHIFT) + ABASE;
2275     U.putOrderedObject(a, j, task);
2276     U.putOrderedInt(q, QTOP, s + 1);
2277     U.putOrderedInt(q, QLOCK, 0);
2278     if (n == 0)
2279     signalWork(ws, q);
2280     return;
2281     }
2282     q.qlock = 0;
2283     }
2284     externalSubmit(task);
2285     }
2286 dl 1.105
2287     /**
2288 dl 1.200 * Returns common pool queue for an external thread
2289 dl 1.105 */
2290     static WorkQueue commonSubmitterQueue() {
2291 dl 1.200 ForkJoinPool p = common;
2292     int r = ThreadLocalRandom.getProbe();
2293     WorkQueue[] ws; int m;
2294     return (p != null && (ws = p.workQueues) != null &&
2295 dl 1.105 (m = ws.length - 1) >= 0) ?
2296 dl 1.200 ws[m & r & SQMASK] : null;
2297 dl 1.105 }
2298    
2299     /**
2300 dl 1.200 * Performs tryUnpush for an external submitter: Finds queue,
2301     * locks if apparently non-empty, validates upon locking, and
2302     * adjusts top. Each check can fail but rarely does.
2303 dl 1.105 */
2304 dl 1.178 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2305 dl 1.200 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
2306     int r = ThreadLocalRandom.getProbe();
2307     if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
2308     (w = ws[m & r & SQMASK]) != null &&
2309     (a = w.array) != null && (s = w.top) != w.base) {
2310 dl 1.115 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2311 dl 1.200 if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
2312     if (w.top == s && w.array == a &&
2313     U.getObject(a, j) == task &&
2314 dl 1.178 U.compareAndSwapObject(a, j, task, null)) {
2315 dl 1.200 U.putOrderedInt(w, QTOP, s - 1);
2316     U.putOrderedInt(w, QLOCK, 0);
2317     return true;
2318 dl 1.115 }
2319 dl 1.200 w.qlock = 0;
2320 dl 1.105 }
2321     }
2322 dl 1.200 return false;
2323 dl 1.105 }
2324    
2325 dl 1.200 /**
2326     * Performs helpComplete for an external submitter
2327     */
2328 dl 1.190 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2329 dl 1.200 WorkQueue[] ws; int n;
2330     int r = ThreadLocalRandom.getProbe();
2331     return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
2332     helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
2333 dl 1.105 }
2334    
2335 dl 1.52 // Exported methods
2336 jsr166 1.1
2337     // Constructors
2338    
2339     /**
2340 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2341 dl 1.18 * java.lang.Runtime#availableProcessors}, using the {@linkplain
2342     * #defaultForkJoinWorkerThreadFactory default thread factory},
2343     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2344 jsr166 1.1 *
2345     * @throws SecurityException if a security manager exists and
2346     * the caller is not permitted to modify threads
2347     * because it does not hold {@link
2348     * java.lang.RuntimePermission}{@code ("modifyThread")}
2349     */
2350     public ForkJoinPool() {
2351 jsr166 1.148 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2352     defaultForkJoinWorkerThreadFactory, null, false);
2353 jsr166 1.1 }
2354    
2355     /**
2356 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
2357 dl 1.18 * level, the {@linkplain
2358     * #defaultForkJoinWorkerThreadFactory default thread factory},
2359     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2360 jsr166 1.1 *
2361 jsr166 1.9 * @param parallelism the parallelism level
2362 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2363 jsr166 1.11 * equal to zero, or greater than implementation limit
2364 jsr166 1.1 * @throws SecurityException if a security manager exists and
2365     * the caller is not permitted to modify threads
2366     * because it does not hold {@link
2367     * java.lang.RuntimePermission}{@code ("modifyThread")}
2368     */
2369     public ForkJoinPool(int parallelism) {
2370 dl 1.18 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2371 jsr166 1.1 }
2372    
2373     /**
2374 dl 1.18 * Creates a {@code ForkJoinPool} with the given parameters.
2375 jsr166 1.1 *
2376 dl 1.18 * @param parallelism the parallelism level. For default value,
2377     * use {@link java.lang.Runtime#availableProcessors}.
2378     * @param factory the factory for creating new threads. For default value,
2379     * use {@link #defaultForkJoinWorkerThreadFactory}.
2380 dl 1.19 * @param handler the handler for internal worker threads that
2381     * terminate due to unrecoverable errors encountered while executing
2382 jsr166 1.31 * tasks. For default value, use {@code null}.
2383 dl 1.19 * @param asyncMode if true,
2384 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
2385     * tasks that are never joined. This mode may be more appropriate
2386     * than default locally stack-based mode in applications in which
2387     * worker threads only process event-style asynchronous tasks.
2388 jsr166 1.31 * For default value, use {@code false}.
2389 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
2390 jsr166 1.11 * equal to zero, or greater than implementation limit
2391     * @throws NullPointerException if the factory is null
2392 jsr166 1.1 * @throws SecurityException if a security manager exists and
2393     * the caller is not permitted to modify threads
2394     * because it does not hold {@link
2395     * java.lang.RuntimePermission}{@code ("modifyThread")}
2396     */
2397 dl 1.19 public ForkJoinPool(int parallelism,
2398 dl 1.18 ForkJoinWorkerThreadFactory factory,
2399 jsr166 1.156 UncaughtExceptionHandler handler,
2400 dl 1.18 boolean asyncMode) {
2401 dl 1.152 this(checkParallelism(parallelism),
2402     checkFactory(factory),
2403     handler,
2404 jsr166 1.201 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
2405 dl 1.152 "ForkJoinPool-" + nextPoolId() + "-worker-");
2406 dl 1.14 checkPermission();
2407 dl 1.152 }
2408    
2409     private static int checkParallelism(int parallelism) {
2410     if (parallelism <= 0 || parallelism > MAX_CAP)
2411     throw new IllegalArgumentException();
2412     return parallelism;
2413     }
2414    
2415     private static ForkJoinWorkerThreadFactory checkFactory
2416     (ForkJoinWorkerThreadFactory factory) {
2417 dl 1.14 if (factory == null)
2418     throw new NullPointerException();
2419 dl 1.152 return factory;
2420     }
2421    
2422     /**
2423     * Creates a {@code ForkJoinPool} with the given parameters, without
2424     * any security checks or parameter validation. Invoked directly by
2425     * makeCommonPool.
2426     */
2427     private ForkJoinPool(int parallelism,
2428     ForkJoinWorkerThreadFactory factory,
2429 jsr166 1.156 UncaughtExceptionHandler handler,
2430 dl 1.185 int mode,
2431 dl 1.152 String workerNamePrefix) {
2432     this.workerNamePrefix = workerNamePrefix;
2433 jsr166 1.1 this.factory = factory;
2434 dl 1.18 this.ueh = handler;
2435 dl 1.200 this.config = (parallelism & SMASK) | mode;
2436 dl 1.52 long np = (long)(-parallelism); // offset ctl counts
2437     this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2438 dl 1.101 }
2439    
2440     /**
2441 dl 1.128 * Returns the common pool instance. This pool is statically
2442 dl 1.134 * constructed; its run state is unaffected by attempts to {@link
2443     * #shutdown} or {@link #shutdownNow}. However this pool and any
2444     * ongoing processing are automatically terminated upon program
2445     * {@link System#exit}. Any program that relies on asynchronous
2446     * task processing to complete before program termination should
2447 jsr166 1.158 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2448     * before exit.
2449 dl 1.100 *
2450     * @return the common pool instance
2451 jsr166 1.138 * @since 1.8
2452 dl 1.100 */
2453     public static ForkJoinPool commonPool() {
2454 dl 1.134 // assert common != null : "static init error";
2455     return common;
2456 dl 1.100 }
2457    
2458 jsr166 1.1 // Execution methods
2459    
2460     /**
2461     * Performs the given task, returning its result upon completion.
2462 dl 1.52 * If the computation encounters an unchecked Exception or Error,
2463     * it is rethrown as the outcome of this invocation. Rethrown
2464     * exceptions behave in the same way as regular exceptions, but,
2465     * when possible, contain stack traces (as displayed for example
2466     * using {@code ex.printStackTrace()}) of both the current thread
2467     * as well as the thread actually encountering the exception;
2468     * minimally only the latter.
2469 jsr166 1.1 *
2470     * @param task the task
2471 jsr166 1.191 * @param <T> the type of the task's result
2472 jsr166 1.1 * @return the task's result
2473 jsr166 1.11 * @throws NullPointerException if the task is null
2474     * @throws RejectedExecutionException if the task cannot be
2475     * scheduled for execution
2476 jsr166 1.1 */
2477     public <T> T invoke(ForkJoinTask<T> task) {
2478 dl 1.90 if (task == null)
2479     throw new NullPointerException();
2480 dl 1.105 externalPush(task);
2481 dl 1.78 return task.join();
2482 jsr166 1.1 }
2483    
2484     /**
2485     * Arranges for (asynchronous) execution of the given task.
2486     *
2487     * @param task the task
2488 jsr166 1.11 * @throws NullPointerException if the task is null
2489     * @throws RejectedExecutionException if the task cannot be
2490     * scheduled for execution
2491 jsr166 1.1 */
2492 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
2493 dl 1.90 if (task == null)
2494     throw new NullPointerException();
2495 dl 1.105 externalPush(task);
2496 jsr166 1.1 }
2497    
2498     // AbstractExecutorService methods
2499    
2500 jsr166 1.11 /**
2501     * @throws NullPointerException if the task is null
2502     * @throws RejectedExecutionException if the task cannot be
2503     * scheduled for execution
2504     */
2505 jsr166 1.1 public void execute(Runnable task) {
2506 dl 1.41 if (task == null)
2507     throw new NullPointerException();
2508 jsr166 1.2 ForkJoinTask<?> job;
2509 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2510     job = (ForkJoinTask<?>) task;
2511 jsr166 1.2 else
2512 dl 1.152 job = new ForkJoinTask.RunnableExecuteAction(task);
2513 dl 1.105 externalPush(job);
2514 jsr166 1.1 }
2515    
2516 jsr166 1.11 /**
2517 dl 1.18 * Submits a ForkJoinTask for execution.
2518     *
2519     * @param task the task to submit
2520 jsr166 1.191 * @param <T> the type of the task's result
2521 dl 1.18 * @return the task
2522     * @throws NullPointerException if the task is null
2523     * @throws RejectedExecutionException if the task cannot be
2524     * scheduled for execution
2525     */
2526     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2527 dl 1.90 if (task == null)
2528     throw new NullPointerException();
2529 dl 1.105 externalPush(task);
2530 dl 1.18 return task;
2531     }
2532    
2533     /**
2534 jsr166 1.11 * @throws NullPointerException if the task is null
2535     * @throws RejectedExecutionException if the task cannot be
2536     * scheduled for execution
2537     */
2538 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
2539 dl 1.90 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2540 dl 1.105 externalPush(job);
2541 jsr166 1.1 return job;
2542     }
2543    
2544 jsr166 1.11 /**
2545     * @throws NullPointerException if the task is null
2546     * @throws RejectedExecutionException if the task cannot be
2547     * scheduled for execution
2548     */
2549 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2550 dl 1.90 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2551 dl 1.105 externalPush(job);
2552 jsr166 1.1 return job;
2553     }
2554    
2555 jsr166 1.11 /**
2556     * @throws NullPointerException if the task is null
2557     * @throws RejectedExecutionException if the task cannot be
2558     * scheduled for execution
2559     */
2560 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
2561 dl 1.41 if (task == null)
2562     throw new NullPointerException();
2563 jsr166 1.2 ForkJoinTask<?> job;
2564 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2565     job = (ForkJoinTask<?>) task;
2566 jsr166 1.2 else
2567 dl 1.90 job = new ForkJoinTask.AdaptedRunnableAction(task);
2568 dl 1.105 externalPush(job);
2569 jsr166 1.1 return job;
2570     }
2571    
2572     /**
2573 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
2574     * @throws RejectedExecutionException {@inheritDoc}
2575     */
2576 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2577 dl 1.86 // In previous versions of this class, this method constructed
2578     // a task to run ForkJoinTask.invokeAll, but now external
2579     // invocation of multiple tasks is at least as efficient.
2580 jsr166 1.199 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2581 jsr166 1.1
2582 dl 1.86 boolean done = false;
2583     try {
2584     for (Callable<T> t : tasks) {
2585 dl 1.90 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2586 jsr166 1.144 futures.add(f);
2587 dl 1.105 externalPush(f);
2588 dl 1.86 }
2589 jsr166 1.143 for (int i = 0, size = futures.size(); i < size; i++)
2590     ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2591 dl 1.86 done = true;
2592     return futures;
2593     } finally {
2594     if (!done)
2595 jsr166 1.143 for (int i = 0, size = futures.size(); i < size; i++)
2596     futures.get(i).cancel(false);
2597 jsr166 1.1 }
2598     }
2599    
2600     /**
2601     * Returns the factory used for constructing new workers.
2602     *
2603     * @return the factory used for constructing new workers
2604     */
2605     public ForkJoinWorkerThreadFactory getFactory() {
2606     return factory;
2607     }
2608    
2609     /**
2610     * Returns the handler for internal worker threads that terminate
2611     * due to unrecoverable errors encountered while executing tasks.
2612     *
2613 jsr166 1.4 * @return the handler, or {@code null} if none
2614 jsr166 1.1 */
2615 jsr166 1.156 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2616 dl 1.14 return ueh;
2617 jsr166 1.1 }
2618    
2619     /**
2620 jsr166 1.9 * Returns the targeted parallelism level of this pool.
2621 jsr166 1.1 *
2622 jsr166 1.9 * @return the targeted parallelism level of this pool
2623 jsr166 1.1 */
2624     public int getParallelism() {
2625 dl 1.185 int par;
2626 dl 1.200 return ((par = config & SMASK) > 0) ? par : 1;
2627 jsr166 1.1 }
2628    
2629     /**
2630 dl 1.100 * Returns the targeted parallelism level of the common pool.
2631     *
2632     * @return the targeted parallelism level of the common pool
2633 jsr166 1.138 * @since 1.8
2634 dl 1.100 */
2635     public static int getCommonPoolParallelism() {
2636 dl 1.134 return commonParallelism;
2637 dl 1.100 }
2638    
2639     /**
2640 jsr166 1.1 * Returns the number of worker threads that have started but not
2641 jsr166 1.34 * yet terminated. The result returned by this method may differ
2642 jsr166 1.4 * from {@link #getParallelism} when threads are created to
2643 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
2644     *
2645     * @return the number of worker threads
2646     */
2647     public int getPoolSize() {
2648 dl 1.200 return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2649 jsr166 1.1 }
2650    
2651     /**
2652 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
2653 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
2654     *
2655 jsr166 1.4 * @return {@code true} if this pool uses async mode
2656 jsr166 1.1 */
2657     public boolean getAsyncMode() {
2658 dl 1.200 return (config & FIFO_QUEUE) != 0;
2659 jsr166 1.1 }
2660    
2661     /**
2662     * Returns an estimate of the number of worker threads that are
2663     * not blocked waiting to join tasks or for other managed
2664 dl 1.14 * synchronization. This method may overestimate the
2665     * number of running threads.
2666 jsr166 1.1 *
2667     * @return the number of worker threads
2668     */
2669     public int getRunningThreadCount() {
2670 dl 1.78 int rc = 0;
2671     WorkQueue[] ws; WorkQueue w;
2672     if ((ws = workQueues) != null) {
2673 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2674     if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2675 dl 1.78 ++rc;
2676     }
2677     }
2678     return rc;
2679 jsr166 1.1 }
2680    
2681     /**
2682     * Returns an estimate of the number of threads that are currently
2683     * stealing or executing tasks. This method may overestimate the
2684     * number of active threads.
2685     *
2686     * @return the number of active threads
2687     */
2688     public int getActiveThreadCount() {
2689 dl 1.200 int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2690 jsr166 1.63 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2691 jsr166 1.1 }
2692    
2693     /**
2694 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
2695     * An idle worker is one that cannot obtain a task to execute
2696     * because none are available to steal from other threads, and
2697     * there are no pending submissions to the pool. This method is
2698     * conservative; it might not return {@code true} immediately upon
2699     * idleness of all threads, but will eventually become true if
2700     * threads remain inactive.
2701 jsr166 1.1 *
2702 jsr166 1.4 * @return {@code true} if all threads are currently idle
2703 jsr166 1.1 */
2704     public boolean isQuiescent() {
2705 dl 1.200 return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
2706 jsr166 1.1 }
2707    
2708     /**
2709     * Returns an estimate of the total number of tasks stolen from
2710     * one thread's work queue by another. The reported value
2711     * underestimates the actual total number of steals when the pool
2712     * is not quiescent. This value may be useful for monitoring and
2713     * tuning fork/join programs: in general, steal counts should be
2714     * high enough to keep threads busy, but low enough to avoid
2715     * overhead and contention across threads.
2716     *
2717     * @return the number of steals
2718     */
2719     public long getStealCount() {
2720 dl 1.101 long count = stealCount;
2721 dl 1.78 WorkQueue[] ws; WorkQueue w;
2722     if ((ws = workQueues) != null) {
2723 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2724 dl 1.78 if ((w = ws[i]) != null)
2725 dl 1.105 count += w.nsteals;
2726 dl 1.78 }
2727     }
2728     return count;
2729 jsr166 1.1 }
2730    
2731     /**
2732     * Returns an estimate of the total number of tasks currently held
2733     * in queues by worker threads (but not including tasks submitted
2734     * to the pool that have not begun executing). This value is only
2735     * an approximation, obtained by iterating across all threads in
2736     * the pool. This method may be useful for tuning task
2737     * granularities.
2738     *
2739     * @return the number of queued tasks
2740     */
2741     public long getQueuedTaskCount() {
2742     long count = 0;
2743 dl 1.78 WorkQueue[] ws; WorkQueue w;
2744     if ((ws = workQueues) != null) {
2745 dl 1.86 for (int i = 1; i < ws.length; i += 2) {
2746 dl 1.78 if ((w = ws[i]) != null)
2747     count += w.queueSize();
2748     }
2749 dl 1.52 }
2750 jsr166 1.1 return count;
2751     }
2752    
2753     /**
2754 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
2755 dl 1.55 * pool that have not yet begun executing. This method may take
2756 dl 1.52 * time proportional to the number of submissions.
2757 jsr166 1.1 *
2758     * @return the number of queued submissions
2759     */
2760     public int getQueuedSubmissionCount() {
2761 dl 1.78 int count = 0;
2762     WorkQueue[] ws; WorkQueue w;
2763     if ((ws = workQueues) != null) {
2764 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2765 dl 1.78 if ((w = ws[i]) != null)
2766     count += w.queueSize();
2767     }
2768     }
2769     return count;
2770 jsr166 1.1 }
2771    
2772     /**
2773 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
2774     * pool that have not yet begun executing.
2775 jsr166 1.1 *
2776     * @return {@code true} if there are any queued submissions
2777     */
2778     public boolean hasQueuedSubmissions() {
2779 dl 1.78 WorkQueue[] ws; WorkQueue w;
2780     if ((ws = workQueues) != null) {
2781 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2782 dl 1.115 if ((w = ws[i]) != null && !w.isEmpty())
2783 dl 1.78 return true;
2784     }
2785     }
2786     return false;
2787 jsr166 1.1 }
2788    
2789     /**
2790     * Removes and returns the next unexecuted submission if one is
2791     * available. This method may be useful in extensions to this
2792     * class that re-assign work in systems with multiple pools.
2793     *
2794 jsr166 1.4 * @return the next submission, or {@code null} if none
2795 jsr166 1.1 */
2796     protected ForkJoinTask<?> pollSubmission() {
2797 dl 1.78 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2798     if ((ws = workQueues) != null) {
2799 dl 1.86 for (int i = 0; i < ws.length; i += 2) {
2800 dl 1.78 if ((w = ws[i]) != null && (t = w.poll()) != null)
2801     return t;
2802 dl 1.52 }
2803     }
2804     return null;
2805 jsr166 1.1 }
2806    
2807     /**
2808     * Removes all available unexecuted submitted and forked tasks
2809     * from scheduling queues and adds them to the given collection,
2810     * without altering their execution status. These may include
2811 jsr166 1.8 * artificially generated or wrapped tasks. This method is
2812     * designed to be invoked only when the pool is known to be
2813 jsr166 1.1 * quiescent. Invocations at other times may not remove all
2814     * tasks. A failure encountered while attempting to add elements
2815     * to collection {@code c} may result in elements being in
2816     * neither, either or both collections when the associated
2817     * exception is thrown. The behavior of this operation is
2818     * undefined if the specified collection is modified while the
2819     * operation is in progress.
2820     *
2821     * @param c the collection to transfer elements into
2822     * @return the number of elements transferred
2823     */
2824 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2825 dl 1.52 int count = 0;
2826 dl 1.78 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2827     if ((ws = workQueues) != null) {
2828 dl 1.86 for (int i = 0; i < ws.length; ++i) {
2829 dl 1.78 if ((w = ws[i]) != null) {
2830     while ((t = w.poll()) != null) {
2831     c.add(t);
2832     ++count;
2833     }
2834     }
2835 dl 1.52 }
2836     }
2837 dl 1.18 return count;
2838     }
2839    
2840     /**
2841 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
2842     * including indications of run state, parallelism level, and
2843     * worker and task counts.
2844     *
2845     * @return a string identifying this pool, as well as its state
2846     */
2847     public String toString() {
2848 dl 1.86 // Use a single pass through workQueues to collect counts
2849     long qt = 0L, qs = 0L; int rc = 0;
2850 dl 1.101 long st = stealCount;
2851 dl 1.86 long c = ctl;
2852     WorkQueue[] ws; WorkQueue w;
2853     if ((ws = workQueues) != null) {
2854     for (int i = 0; i < ws.length; ++i) {
2855     if ((w = ws[i]) != null) {
2856     int size = w.queueSize();
2857     if ((i & 1) == 0)
2858     qs += size;
2859     else {
2860     qt += size;
2861 dl 1.105 st += w.nsteals;
2862 dl 1.86 if (w.isApparentlyUnblocked())
2863     ++rc;
2864     }
2865     }
2866     }
2867     }
2868 dl 1.200 int pc = (config & SMASK);
2869 dl 1.52 int tc = pc + (short)(c >>> TC_SHIFT);
2870 dl 1.78 int ac = pc + (int)(c >> AC_SHIFT);
2871     if (ac < 0) // ignore transient negative
2872     ac = 0;
2873 dl 1.200 int rs = runState;
2874     String level = ((rs & TERMINATED) != 0 ? "Terminated" :
2875     (rs & STOP) != 0 ? "Terminating" :
2876     (rs & SHUTDOWN) != 0 ? "Shutting down" :
2877     "Running");
2878 jsr166 1.1 return super.toString() +
2879 dl 1.52 "[" + level +
2880 dl 1.14 ", parallelism = " + pc +
2881     ", size = " + tc +
2882     ", active = " + ac +
2883     ", running = " + rc +
2884 jsr166 1.1 ", steals = " + st +
2885     ", tasks = " + qt +
2886     ", submissions = " + qs +
2887     "]";
2888     }
2889    
2890     /**
2891 dl 1.100 * Possibly initiates an orderly shutdown in which previously
2892     * submitted tasks are executed, but no new tasks will be
2893     * accepted. Invocation has no effect on execution state if this
2894 jsr166 1.137 * is the {@link #commonPool()}, and no additional effect if
2895 dl 1.100 * already shut down. Tasks that are in the process of being
2896     * submitted concurrently during the course of this method may or
2897     * may not be rejected.
2898 jsr166 1.1 *
2899     * @throws SecurityException if a security manager exists and
2900     * the caller is not permitted to modify threads
2901     * because it does not hold {@link
2902     * java.lang.RuntimePermission}{@code ("modifyThread")}
2903     */
2904     public void shutdown() {
2905     checkPermission();
2906 dl 1.105 tryTerminate(false, true);
2907 jsr166 1.1 }
2908    
2909     /**
2910 dl 1.100 * Possibly attempts to cancel and/or stop all tasks, and reject
2911     * all subsequently submitted tasks. Invocation has no effect on
2912 jsr166 1.137 * execution state if this is the {@link #commonPool()}, and no
2913 dl 1.100 * additional effect if already shut down. Otherwise, tasks that
2914     * are in the process of being submitted or executed concurrently
2915     * during the course of this method may or may not be
2916     * rejected. This method cancels both existing and unexecuted
2917     * tasks, in order to permit termination in the presence of task
2918     * dependencies. So the method always returns an empty list
2919     * (unlike the case for some other Executors).
2920 jsr166 1.1 *
2921     * @return an empty list
2922     * @throws SecurityException if a security manager exists and
2923     * the caller is not permitted to modify threads
2924     * because it does not hold {@link
2925     * java.lang.RuntimePermission}{@code ("modifyThread")}
2926     */
2927     public List<Runnable> shutdownNow() {
2928     checkPermission();
2929 dl 1.105 tryTerminate(true, true);
2930 jsr166 1.1 return Collections.emptyList();
2931     }
2932    
2933     /**
2934     * Returns {@code true} if all tasks have completed following shut down.
2935     *
2936     * @return {@code true} if all tasks have completed following shut down
2937     */
2938     public boolean isTerminated() {
2939 dl 1.200 return (runState & TERMINATED) != 0;
2940 jsr166 1.1 }
2941    
2942     /**
2943     * Returns {@code true} if the process of termination has
2944 jsr166 1.9 * commenced but not yet completed. This method may be useful for
2945     * debugging. A return of {@code true} reported a sufficient
2946     * period after shutdown may indicate that submitted tasks have
2947 jsr166 1.119 * ignored or suppressed interruption, or are waiting for I/O,
2948 dl 1.49 * causing this executor not to properly terminate. (See the
2949     * advisory notes for class {@link ForkJoinTask} stating that
2950     * tasks should not normally entail blocking operations. But if
2951     * they do, they must abort them on interrupt.)
2952 jsr166 1.1 *
2953 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
2954 jsr166 1.1 */
2955     public boolean isTerminating() {
2956 dl 1.200 int rs = runState;
2957     return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
2958 jsr166 1.1 }
2959    
2960     /**
2961     * Returns {@code true} if this pool has been shut down.
2962     *
2963     * @return {@code true} if this pool has been shut down
2964     */
2965     public boolean isShutdown() {
2966 dl 1.200 return (runState & SHUTDOWN) != 0;
2967 jsr166 1.9 }
2968    
2969     /**
2970 dl 1.105 * Blocks until all tasks have completed execution after a
2971     * shutdown request, or the timeout occurs, or the current thread
2972 dl 1.134 * is interrupted, whichever happens first. Because the {@link
2973     * #commonPool()} never terminates until program shutdown, when
2974     * applied to the common pool, this method is equivalent to {@link
2975 jsr166 1.158 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2976 jsr166 1.1 *
2977     * @param timeout the maximum time to wait
2978     * @param unit the time unit of the timeout argument
2979     * @return {@code true} if this executor terminated and
2980     * {@code false} if the timeout elapsed before termination
2981     * @throws InterruptedException if interrupted while waiting
2982     */
2983     public boolean awaitTermination(long timeout, TimeUnit unit)
2984     throws InterruptedException {
2985 dl 1.134 if (Thread.interrupted())
2986     throw new InterruptedException();
2987     if (this == common) {
2988     awaitQuiescence(timeout, unit);
2989     return false;
2990     }
2991 dl 1.52 long nanos = unit.toNanos(timeout);
2992 dl 1.101 if (isTerminated())
2993     return true;
2994 dl 1.183 if (nanos <= 0L)
2995     return false;
2996     long deadline = System.nanoTime() + nanos;
2997 jsr166 1.103 synchronized (this) {
2998 jsr166 1.184 for (;;) {
2999 dl 1.183 if (isTerminated())
3000     return true;
3001     if (nanos <= 0L)
3002     return false;
3003     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3004     wait(millis > 0L ? millis : 1L);
3005     nanos = deadline - System.nanoTime();
3006 dl 1.52 }
3007 dl 1.18 }
3008 jsr166 1.1 }
3009    
3010     /**
3011 dl 1.134 * If called by a ForkJoinTask operating in this pool, equivalent
3012     * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3013     * waits and/or attempts to assist performing tasks until this
3014     * pool {@link #isQuiescent} or the indicated timeout elapses.
3015     *
3016     * @param timeout the maximum time to wait
3017     * @param unit the time unit of the timeout argument
3018     * @return {@code true} if quiescent; {@code false} if the
3019     * timeout elapsed.
3020     */
3021     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3022     long nanos = unit.toNanos(timeout);
3023     ForkJoinWorkerThread wt;
3024     Thread thread = Thread.currentThread();
3025     if ((thread instanceof ForkJoinWorkerThread) &&
3026     (wt = (ForkJoinWorkerThread)thread).pool == this) {
3027     helpQuiescePool(wt.workQueue);
3028     return true;
3029     }
3030     long startTime = System.nanoTime();
3031     WorkQueue[] ws;
3032     int r = 0, m;
3033     boolean found = true;
3034     while (!isQuiescent() && (ws = workQueues) != null &&
3035     (m = ws.length - 1) >= 0) {
3036     if (!found) {
3037     if ((System.nanoTime() - startTime) > nanos)
3038     return false;
3039     Thread.yield(); // cannot block
3040     }
3041     found = false;
3042     for (int j = (m + 1) << 2; j >= 0; --j) {
3043 dl 1.200 ForkJoinTask<?> t; WorkQueue q; int b, k;
3044     if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
3045     (b = q.base) - q.top < 0) {
3046 dl 1.134 found = true;
3047 dl 1.172 if ((t = q.pollAt(b)) != null)
3048 dl 1.134 t.doExec();
3049     break;
3050     }
3051     }
3052     }
3053     return true;
3054     }
3055    
3056     /**
3057     * Waits and/or attempts to assist performing tasks indefinitely
3058 jsr166 1.141 * until the {@link #commonPool()} {@link #isQuiescent}.
3059 dl 1.134 */
3060 dl 1.136 static void quiesceCommonPool() {
3061 dl 1.134 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3062     }
3063    
3064     /**
3065 jsr166 1.1 * Interface for extending managed parallelism for tasks running
3066 jsr166 1.8 * in {@link ForkJoinPool}s.
3067     *
3068 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
3069     * {@code isReleasable} must return {@code true} if blocking is
3070     * not necessary. Method {@code block} blocks the current thread
3071     * if necessary (perhaps internally invoking {@code isReleasable}
3072 dl 1.54 * before actually blocking). These actions are performed by any
3073 jsr166 1.157 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3074     * The unusual methods in this API accommodate synchronizers that
3075     * may, but don't usually, block for long periods. Similarly, they
3076 dl 1.54 * allow more efficient internal handling of cases in which
3077     * additional workers may be, but usually are not, needed to
3078     * ensure sufficient parallelism. Toward this end,
3079     * implementations of method {@code isReleasable} must be amenable
3080     * to repeated invocation.
3081 jsr166 1.1 *
3082     * <p>For example, here is a ManagedBlocker based on a
3083     * ReentrantLock:
3084     * <pre> {@code
3085     * class ManagedLocker implements ManagedBlocker {
3086     * final ReentrantLock lock;
3087     * boolean hasLock = false;
3088     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3089     * public boolean block() {
3090     * if (!hasLock)
3091     * lock.lock();
3092     * return true;
3093     * }
3094     * public boolean isReleasable() {
3095     * return hasLock || (hasLock = lock.tryLock());
3096     * }
3097     * }}</pre>
3098 dl 1.19 *
3099     * <p>Here is a class that possibly blocks waiting for an
3100     * item on a given queue:
3101     * <pre> {@code
3102     * class QueueTaker<E> implements ManagedBlocker {
3103     * final BlockingQueue<E> queue;
3104     * volatile E item = null;
3105     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3106     * public boolean block() throws InterruptedException {
3107     * if (item == null)
3108 dl 1.23 * item = queue.take();
3109 dl 1.19 * return true;
3110     * }
3111     * public boolean isReleasable() {
3112 dl 1.23 * return item != null || (item = queue.poll()) != null;
3113 dl 1.19 * }
3114     * public E getItem() { // call after pool.managedBlock completes
3115     * return item;
3116     * }
3117     * }}</pre>
3118 jsr166 1.1 */
3119     public static interface ManagedBlocker {
3120     /**
3121     * Possibly blocks the current thread, for example waiting for
3122     * a lock or condition.
3123     *
3124 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
3125     * (i.e., if isReleasable would return true)
3126 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
3127     * (the method is not required to do so, but is allowed to)
3128     */
3129     boolean block() throws InterruptedException;
3130    
3131     /**
3132 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
3133 jsr166 1.154 * @return {@code true} if blocking is unnecessary
3134 jsr166 1.1 */
3135     boolean isReleasable();
3136     }
3137    
3138     /**
3139     * Blocks in accord with the given blocker. If the current thread
3140 jsr166 1.8 * is a {@link ForkJoinWorkerThread}, this method possibly
3141     * arranges for a spare thread to be activated if necessary to
3142 dl 1.18 * ensure sufficient parallelism while the current thread is blocked.
3143 jsr166 1.1 *
3144 jsr166 1.8 * <p>If the caller is not a {@link ForkJoinTask}, this method is
3145     * behaviorally equivalent to
3146 jsr166 1.82 * <pre> {@code
3147 jsr166 1.1 * while (!blocker.isReleasable())
3148     * if (blocker.block())
3149 jsr166 1.196 * return;}</pre>
3150 jsr166 1.8 *
3151     * If the caller is a {@code ForkJoinTask}, then the pool may
3152     * first be expanded to ensure parallelism, and later adjusted.
3153 jsr166 1.1 *
3154     * @param blocker the blocker
3155     * @throws InterruptedException if blocker.block did so
3156     */
3157 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
3158 jsr166 1.1 throws InterruptedException {
3159 dl 1.200 ForkJoinPool p;
3160     ForkJoinWorkerThread wt;
3161 jsr166 1.1 Thread t = Thread.currentThread();
3162 dl 1.200 if ((t instanceof ForkJoinWorkerThread) &&
3163     (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
3164     WorkQueue w = wt.workQueue;
3165 dl 1.172 while (!blocker.isReleasable()) {
3166 dl 1.200 if (p.tryCompensate(w)) {
3167 dl 1.105 try {
3168     do {} while (!blocker.isReleasable() &&
3169     !blocker.block());
3170     } finally {
3171 dl 1.200 U.getAndAddLong(p, CTL, AC_UNIT);
3172 dl 1.105 }
3173     break;
3174 dl 1.78 }
3175     }
3176 dl 1.18 }
3177 dl 1.105 else {
3178     do {} while (!blocker.isReleasable() &&
3179     !blocker.block());
3180     }
3181 jsr166 1.1 }
3182    
3183 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
3184     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3185     // implement RunnableFuture.
3186 jsr166 1.1
3187     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3188 dl 1.90 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3189 jsr166 1.1 }
3190    
3191     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3192 dl 1.90 return new ForkJoinTask.AdaptedCallable<T>(callable);
3193 jsr166 1.1 }
3194    
3195     // Unsafe mechanics
3196 dl 1.78 private static final sun.misc.Unsafe U;
3197 dl 1.200 private static final int ABASE;
3198     private static final int ASHIFT;
3199 dl 1.78 private static final long CTL;
3200     private static final long PARKBLOCKER;
3201 dl 1.101 private static final long STEALCOUNT;
3202 dl 1.200 private static final long RUNSTATE;
3203 dl 1.170 private static final long QBASE;
3204 dl 1.200 private static final long QTOP;
3205 dl 1.105 private static final long QLOCK;
3206 dl 1.200 private static final long QSCANSTATE;
3207     private static final long QPARKER;
3208     private static final long QCURRENTSTEAL;
3209     private static final long QCURRENTJOIN;
3210 dl 1.52
3211     static {
3212 jsr166 1.142 // initialize field offsets for CAS etc
3213 jsr166 1.3 try {
3214 dl 1.78 U = sun.misc.Unsafe.getUnsafe();
3215 jsr166 1.64 Class<?> k = ForkJoinPool.class;
3216 dl 1.78 CTL = U.objectFieldOffset
3217 dl 1.52 (k.getDeclaredField("ctl"));
3218 dl 1.101 STEALCOUNT = U.objectFieldOffset
3219     (k.getDeclaredField("stealCount"));
3220 dl 1.200 RUNSTATE = U.objectFieldOffset
3221     (k.getDeclaredField("runState"));
3222 dl 1.86 Class<?> tk = Thread.class;
3223 dl 1.78 PARKBLOCKER = U.objectFieldOffset
3224     (tk.getDeclaredField("parkBlocker"));
3225 dl 1.105 Class<?> wk = WorkQueue.class;
3226 dl 1.170 QBASE = U.objectFieldOffset
3227     (wk.getDeclaredField("base"));
3228 dl 1.200 QTOP = U.objectFieldOffset
3229     (wk.getDeclaredField("top"));
3230 dl 1.105 QLOCK = U.objectFieldOffset
3231     (wk.getDeclaredField("qlock"));
3232 dl 1.200 QSCANSTATE = U.objectFieldOffset
3233     (wk.getDeclaredField("scanState"));
3234     QPARKER = U.objectFieldOffset
3235     (wk.getDeclaredField("parker"));
3236     QCURRENTSTEAL = U.objectFieldOffset
3237     (wk.getDeclaredField("currentSteal"));
3238     QCURRENTJOIN = U.objectFieldOffset
3239     (wk.getDeclaredField("currentJoin"));
3240 dl 1.105 Class<?> ak = ForkJoinTask[].class;
3241 dl 1.90 ABASE = U.arrayBaseOffset(ak);
3242 jsr166 1.142 int scale = U.arrayIndexScale(ak);
3243     if ((scale & (scale - 1)) != 0)
3244     throw new Error("data type scale not a power of two");
3245     ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3246 dl 1.52 } catch (Exception e) {
3247     throw new Error(e);
3248     }
3249 dl 1.105
3250 dl 1.152 defaultForkJoinWorkerThreadFactory =
3251 dl 1.112 new DefaultForkJoinWorkerThreadFactory();
3252 dl 1.115 modifyThreadPermission = new RuntimePermission("modifyThread");
3253    
3254 dl 1.152 common = java.security.AccessController.doPrivileged
3255     (new java.security.PrivilegedAction<ForkJoinPool>() {
3256     public ForkJoinPool run() { return makeCommonPool(); }});
3257 dl 1.200 int par = common.config & SMASK; // report 1 even if threads disabled
3258 dl 1.160 commonParallelism = par > 0 ? par : 1;
3259 dl 1.152 }
3260 dl 1.112
3261 dl 1.152 /**
3262     * Creates and returns the common pool, respecting user settings
3263     * specified via system properties.
3264     */
3265     private static ForkJoinPool makeCommonPool() {
3266 dl 1.160 int parallelism = -1;
3267 dl 1.197 ForkJoinWorkerThreadFactory factory = null;
3268 jsr166 1.156 UncaughtExceptionHandler handler = null;
3269 jsr166 1.189 try { // ignore exceptions in accessing/parsing properties
3270 dl 1.112 String pp = System.getProperty
3271     ("java.util.concurrent.ForkJoinPool.common.parallelism");
3272 dl 1.152 String fp = System.getProperty
3273     ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3274 dl 1.112 String hp = System.getProperty
3275     ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3276 dl 1.152 if (pp != null)
3277     parallelism = Integer.parseInt(pp);
3278 dl 1.112 if (fp != null)
3279 dl 1.152 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3280     getSystemClassLoader().loadClass(fp).newInstance());
3281 dl 1.112 if (hp != null)
3282 jsr166 1.156 handler = ((UncaughtExceptionHandler)ClassLoader.
3283 dl 1.112 getSystemClassLoader().loadClass(hp).newInstance());
3284     } catch (Exception ignore) {
3285     }
3286 dl 1.197 if (factory == null) {
3287     if (System.getSecurityManager() == null)
3288     factory = defaultForkJoinWorkerThreadFactory;
3289     else // use security-managed default
3290     factory = new InnocuousForkJoinWorkerThreadFactory();
3291     }
3292 dl 1.167 if (parallelism < 0 && // default 1 less than #cores
3293 dl 1.193 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3294     parallelism = 1;
3295 dl 1.152 if (parallelism > MAX_CAP)
3296     parallelism = MAX_CAP;
3297 dl 1.185 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3298 dl 1.152 "ForkJoinPool.commonPool-worker-");
3299 jsr166 1.3 }
3300 dl 1.52
3301 dl 1.197 /**
3302     * Factory for innocuous worker threads
3303     */
3304     static final class InnocuousForkJoinWorkerThreadFactory
3305     implements ForkJoinWorkerThreadFactory {
3306    
3307     /**
3308     * An ACC to restrict permissions for the factory itself.
3309     * The constructed workers have no permissions set.
3310     */
3311     private static final AccessControlContext innocuousAcc;
3312     static {
3313     Permissions innocuousPerms = new Permissions();
3314     innocuousPerms.add(modifyThreadPermission);
3315     innocuousPerms.add(new RuntimePermission(
3316     "enableContextClassLoaderOverride"));
3317     innocuousPerms.add(new RuntimePermission(
3318     "modifyThreadGroup"));
3319     innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3320     new ProtectionDomain(null, innocuousPerms)
3321     });
3322     }
3323    
3324     public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3325     return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
3326     java.security.AccessController.doPrivileged(
3327     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3328     public ForkJoinWorkerThread run() {
3329     return new ForkJoinWorkerThread.
3330     InnocuousForkJoinWorkerThread(pool);
3331     }}, innocuousAcc);
3332     }
3333     }
3334    
3335 jsr166 1.1 }