ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.33
Committed: Tue Sep 7 06:42:39 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.32: +3 -3 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.ArrayList;
10     import java.util.Arrays;
11     import java.util.Collection;
12     import java.util.Collections;
13     import java.util.List;
14     import java.util.concurrent.locks.LockSupport;
15     import java.util.concurrent.locks.ReentrantLock;
16     import java.util.concurrent.atomic.AtomicInteger;
17 dl 1.14 import java.util.concurrent.CountDownLatch;
18 jsr166 1.1
19     /**
20 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
22 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
23 jsr166 1.11 * monitoring operations.
24 jsr166 1.1 *
25 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
26     * ExecutorService} mainly by virtue of employing
27     * <em>work-stealing</em>: all threads in the pool attempt to find and
28     * execute subtasks created by other active tasks (eventually blocking
29     * waiting for work if none exist). This enables efficient processing
30     * when most tasks spawn other subtasks (as do most {@code
31 dl 1.18 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
32     * constructors, {@code ForkJoinPool}s may also be appropriate for use
33     * with event-style tasks that are never joined.
34 jsr166 1.1 *
35 jsr166 1.9 * <p>A {@code ForkJoinPool} is constructed with a given target
36     * parallelism level; by default, equal to the number of available
37 dl 1.18 * processors. The pool attempts to maintain enough active (or
38     * available) threads by dynamically adding, suspending, or resuming
39     * internal worker threads, even if some tasks are stalled waiting to
40     * join others. However, no such adjustments are guaranteed in the
41     * face of blocked IO or other unmanaged synchronization. The nested
42     * {@link ManagedBlocker} interface enables extension of the kinds of
43     * synchronization accommodated.
44 jsr166 1.1 *
45     * <p>In addition to execution and lifecycle control methods, this
46     * class provides status check methods (for example
47 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
48 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
49 jsr166 1.4 * {@link #toString} returns indications of pool state in a
50 jsr166 1.1 * convenient form for informal monitoring.
51     *
52 dl 1.18 * <p> As is the case with other ExecutorServices, there are three
53 dl 1.19 * main task execution methods summarized in the following
54 dl 1.18 * table. These are designed to be used by clients not already engaged
55     * in fork/join computations in the current pool. The main forms of
56     * these methods accept instances of {@code ForkJoinTask}, but
57     * overloaded forms also allow mixed execution of plain {@code
58     * Runnable}- or {@code Callable}- based activities as well. However,
59     * tasks that are already executing in a pool should normally
60     * <em>NOT</em> use these pool execution methods, but instead use the
61 dl 1.19 * within-computation forms listed in the table.
62 dl 1.18 *
63     * <table BORDER CELLPADDING=3 CELLSPACING=1>
64     * <tr>
65     * <td></td>
66     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
67     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
68     * </tr>
69     * <tr>
70 jsr166 1.25 * <td> <b>Arrange async execution</td>
71 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
72     * <td> {@link ForkJoinTask#fork}</td>
73     * </tr>
74     * <tr>
75     * <td> <b>Await and obtain result</td>
76     * <td> {@link #invoke(ForkJoinTask)}</td>
77     * <td> {@link ForkJoinTask#invoke}</td>
78     * </tr>
79     * <tr>
80     * <td> <b>Arrange exec and obtain Future</td>
81     * <td> {@link #submit(ForkJoinTask)}</td>
82     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
83     * </tr>
84     * </table>
85 dl 1.19 *
86 jsr166 1.9 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
87     * used for all parallel task execution in a program or subsystem.
88     * Otherwise, use would not usually outweigh the construction and
89     * bookkeeping overhead of creating a large set of threads. For
90     * example, a common pool could be used for the {@code SortTasks}
91     * illustrated in {@link RecursiveAction}. Because {@code
92     * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
93     * daemon} mode, there is typically no need to explicitly {@link
94     * #shutdown} such a pool upon program exit.
95     *
96     * <pre>
97     * static final ForkJoinPool mainPool = new ForkJoinPool();
98     * ...
99     * public void sort(long[] array) {
100     * mainPool.invoke(new SortTask(array, 0, array.length));
101     * }
102     * </pre>
103     *
104 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
105     * maximum number of running threads to 32767. Attempts to create
106 jsr166 1.11 * pools with greater than the maximum number result in
107 jsr166 1.8 * {@code IllegalArgumentException}.
108 jsr166 1.1 *
109 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
110 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
111 dl 1.20 * or internal resources have been exhausted.
112 jsr166 1.11 *
113 jsr166 1.1 * @since 1.7
114     * @author Doug Lea
115     */
116     public class ForkJoinPool extends AbstractExecutorService {
117    
118     /*
119 dl 1.14 * Implementation Overview
120     *
121     * This class provides the central bookkeeping and control for a
122     * set of worker threads: Submissions from non-FJ threads enter
123     * into a submission queue. Workers take these tasks and typically
124     * split them into subtasks that may be stolen by other workers.
125     * The main work-stealing mechanics implemented in class
126     * ForkJoinWorkerThread give first priority to processing tasks
127     * from their own queues (LIFO or FIFO, depending on mode), then
128     * to randomized FIFO steals of tasks in other worker queues, and
129     * lastly to new submissions. These mechanics do not consider
130     * affinities, loads, cache localities, etc, so rarely provide the
131     * best possible performance on a given machine, but portably
132     * provide good throughput by averaging over these factors.
133     * (Further, even if we did try to use such information, we do not
134     * usually have a basis for exploiting it. For example, some sets
135     * of tasks profit from cache affinities, but others are harmed by
136     * cache pollution effects.)
137     *
138 dl 1.19 * Beyond work-stealing support and essential bookkeeping, the
139     * main responsibility of this framework is to take actions when
140     * one worker is waiting to join a task stolen (or always held by)
141 jsr166 1.25 * another. Because we are multiplexing many tasks on to a pool
142 dl 1.19 * of workers, we can't just let them block (as in Thread.join).
143     * We also cannot just reassign the joiner's run-time stack with
144     * another and replace it later, which would be a form of
145     * "continuation", that even if possible is not necessarily a good
146     * idea. Given that the creation costs of most threads on most
147     * systems mainly surrounds setting up runtime stacks, thread
148     * creation and switching is usually not much more expensive than
149     * stack creation and switching, and is more flexible). Instead we
150     * combine two tactics:
151     *
152     * Helping: Arranging for the joiner to execute some task that it
153     * would be running if the steal had not occurred. Method
154     * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
155     * links to try to find such a task.
156     *
157     * Compensating: Unless there are already enough live threads,
158 jsr166 1.26 * method helpMaintainParallelism() may create or
159 dl 1.19 * re-activate a spare thread to compensate for blocked
160     * joiners until they unblock.
161     *
162 dl 1.24 * It is impossible to keep exactly the target (parallelism)
163     * number of threads running at any given time. Determining
164     * existence of conservatively safe helping targets, the
165     * availability of already-created spares, and the apparent need
166     * to create new spares are all racy and require heuristic
167     * guidance, so we rely on multiple retries of each. Compensation
168     * occurs in slow-motion. It is triggered only upon timeouts of
169     * Object.wait used for joins. This reduces poor decisions that
170     * would otherwise be made when threads are waiting for others
171     * that are stalled because of unrelated activities such as
172     * garbage collection.
173 dl 1.19 *
174     * The ManagedBlocker extension API can't use helping so relies
175     * only on compensation in method awaitBlocker.
176     *
177 dl 1.14 * The main throughput advantages of work-stealing stem from
178     * decentralized control -- workers mostly steal tasks from each
179     * other. We do not want to negate this by creating bottlenecks
180 dl 1.19 * implementing other management responsibilities. So we use a
181     * collection of techniques that avoid, reduce, or cope well with
182     * contention. These entail several instances of bit-packing into
183     * CASable fields to maintain only the minimally required
184     * atomicity. To enable such packing, we restrict maximum
185     * parallelism to (1<<15)-1 (enabling twice this (to accommodate
186     * unbalanced increments and decrements) to fit into a 16 bit
187     * field, which is far in excess of normal operating range. Even
188     * though updates to some of these bookkeeping fields do sometimes
189     * contend with each other, they don't normally cache-contend with
190     * updates to others enough to warrant memory padding or
191     * isolation. So they are all held as fields of ForkJoinPool
192     * objects. The main capabilities are as follows:
193 dl 1.14 *
194     * 1. Creating and removing workers. Workers are recorded in the
195     * "workers" array. This is an array as opposed to some other data
196     * structure to support index-based random steals by workers.
197     * Updates to the array recording new workers and unrecording
198     * terminated ones are protected from each other by a lock
199     * (workerLock) but the array is otherwise concurrently readable,
200     * and accessed directly by workers. To simplify index-based
201     * operations, the array size is always a power of two, and all
202 dl 1.17 * readers must tolerate null slots. Currently, all worker thread
203     * creation is on-demand, triggered by task submissions,
204     * replacement of terminated workers, and/or compensation for
205     * blocked workers. However, all other support code is set up to
206     * work with other policies.
207 dl 1.14 *
208 dl 1.19 * To ensure that we do not hold on to worker references that
209     * would prevent GC, ALL accesses to workers are via indices into
210     * the workers array (which is one source of some of the unusual
211     * code constructions here). In essence, the workers array serves
212     * as a WeakReference mechanism. Thus for example the event queue
213     * stores worker indices, not worker references. Access to the
214     * workers in associated methods (for example releaseEventWaiters)
215     * must both index-check and null-check the IDs. All such accesses
216     * ignore bad IDs by returning out early from what they are doing,
217     * since this can only be associated with shutdown, in which case
218     * it is OK to give up. On termination, we just clobber these
219     * data structures without trying to use them.
220     *
221 dl 1.14 * 2. Bookkeeping for dynamically adding and removing workers. We
222 dl 1.18 * aim to approximately maintain the given level of parallelism.
223     * When some workers are known to be blocked (on joins or via
224 dl 1.14 * ManagedBlocker), we may create or resume others to take their
225     * place until they unblock (see below). Implementing this
226     * requires counts of the number of "running" threads (i.e., those
227 jsr166 1.25 * that are neither blocked nor artificially suspended) as well as
228 dl 1.14 * the total number. These two values are packed into one field,
229     * "workerCounts" because we need accurate snapshots when deciding
230 dl 1.19 * to create, resume or suspend. Note however that the
231 jsr166 1.25 * correspondence of these counts to reality is not guaranteed. In
232 dl 1.19 * particular updates for unblocked threads may lag until they
233     * actually wake up.
234 dl 1.14 *
235     * 3. Maintaining global run state. The run state of the pool
236     * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
237     * those in other Executor implementations, as well as a count of
238     * "active" workers -- those that are, or soon will be, or
239     * recently were executing tasks. The runLevel and active count
240     * are packed together in order to correctly trigger shutdown and
241     * termination. Without care, active counts can be subject to very
242     * high contention. We substantially reduce this contention by
243     * relaxing update rules. A worker must claim active status
244     * prospectively, by activating if it sees that a submitted or
245     * stealable task exists (it may find after activating that the
246     * task no longer exists). It stays active while processing this
247     * task (if it exists) and any other local subtasks it produces,
248     * until it cannot find any other tasks. It then tries
249     * inactivating (see method preStep), but upon update contention
250     * instead scans for more tasks, later retrying inactivation if it
251     * doesn't find any.
252     *
253     * 4. Managing idle workers waiting for tasks. We cannot let
254     * workers spin indefinitely scanning for tasks when none are
255     * available. On the other hand, we must quickly prod them into
256     * action when new tasks are submitted or generated. We
257     * park/unpark these idle workers using an event-count scheme.
258     * Field eventCount is incremented upon events that may enable
259     * workers that previously could not find a task to now find one:
260     * Submission of a new task to the pool, or another worker pushing
261     * a task onto a previously empty queue. (We also use this
262 dl 1.22 * mechanism for configuration and termination actions that
263     * require wakeups of idle workers). Each worker maintains its
264     * last known event count, and blocks when a scan for work did not
265     * find a task AND its lastEventCount matches the current
266     * eventCount. Waiting idle workers are recorded in a variant of
267     * Treiber stack headed by field eventWaiters which, when nonzero,
268     * encodes the thread index and count awaited for by the worker
269     * thread most recently calling eventSync. This thread in turn has
270     * a record (field nextEventWaiter) for the next waiting worker.
271     * In addition to allowing simpler decisions about need for
272     * wakeup, the event count bits in eventWaiters serve the role of
273     * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
274 dl 1.24 * released threads also try to release at most two others. The
275     * net effect is a tree-like diffusion of signals, where released
276     * threads (and possibly others) help with unparks. To further
277     * reduce contention effects a bit, failed CASes to increment
278     * field eventCount are tolerated without retries in signalWork.
279 dl 1.14 * Conceptually they are merged into the same event, which is OK
280     * when their only purpose is to enable workers to scan for work.
281     *
282 dl 1.24 * 5. Managing suspension of extra workers. When a worker notices
283     * (usually upon timeout of a wait()) that there are too few
284     * running threads, we may create a new thread to maintain
285     * parallelism level, or at least avoid starvation. Usually, extra
286     * threads are needed for only very short periods, yet join
287     * dependencies are such that we sometimes need them in
288     * bursts. Rather than create new threads each time this happens,
289     * we suspend no-longer-needed extra ones as "spares". For most
290     * purposes, we don't distinguish "extra" spare threads from
291     * normal "core" threads: On each call to preStep (the only point
292     * at which we can do this) a worker checks to see if there are
293     * now too many running workers, and if so, suspends itself.
294     * Method helpMaintainParallelism looks for suspended threads to
295     * resume before considering creating a new replacement. The
296     * spares themselves are encoded on another variant of a Treiber
297     * Stack, headed at field "spareWaiters". Note that the use of
298     * spares is intrinsically racy. One thread may become a spare at
299     * about the same time as another is needlessly being created. We
300     * counteract this and related slop in part by requiring resumed
301     * spares to immediately recheck (in preStep) to see whether they
302 jsr166 1.30 * should re-suspend.
303 dl 1.24 *
304     * 6. Killing off unneeded workers. A timeout mechanism is used to
305     * shed unused workers: The oldest (first) event queue waiter uses
306     * a timed rather than hard wait. When this wait times out without
307     * a normal wakeup, it tries to shutdown any one (for convenience
308     * the newest) other spare or event waiter via
309     * tryShutdownUnusedWorker. This eventually reduces the number of
310     * worker threads to a minimum of one after a long enough period
311     * without use.
312 dl 1.22 *
313     * 7. Deciding when to create new workers. The main dynamic
314 dl 1.19 * control in this class is deciding when to create extra threads
315     * in method helpMaintainParallelism. We would like to keep
316 jsr166 1.25 * exactly #parallelism threads running, which is an impossible
317 dl 1.19 * task. We always need to create one when the number of running
318     * threads would become zero and all workers are busy. Beyond
319 jsr166 1.26 * this, we must rely on heuristics that work well in the
320     * presence of transient phenomena such as GC stalls, dynamic
321 dl 1.19 * compilation, and wake-up lags. These transients are extremely
322     * common -- we are normally trying to fully saturate the CPUs on
323     * a machine, so almost any activity other than running tasks
324 dl 1.24 * impedes accuracy. Our main defense is to allow parallelism to
325     * lapse for a while during joins, and use a timeout to see if,
326     * after the resulting settling, there is still a need for
327     * additional workers. This also better copes with the fact that
328     * some of the methods in this class tend to never become compiled
329     * (but are interpreted), so some components of the entire set of
330     * controls might execute 100 times faster than others. And
331     * similarly for cases where the apparent lack of work is just due
332     * to GC stalls and other transient system activity.
333 dl 1.14 *
334     * Beware that there is a lot of representation-level coupling
335     * among classes ForkJoinPool, ForkJoinWorkerThread, and
336     * ForkJoinTask. For example, direct access to "workers" array by
337     * workers, and direct access to ForkJoinTask.status by both
338     * ForkJoinPool and ForkJoinWorkerThread. There is little point
339     * trying to reduce this, since any associated future changes in
340     * representations will need to be accompanied by algorithmic
341     * changes anyway.
342     *
343     * Style notes: There are lots of inline assignments (of form
344     * "while ((local = field) != 0)") which are usually the simplest
345 dl 1.19 * way to ensure the required read orderings (which are sometimes
346     * critical). Also several occurrences of the unusual "do {}
347 jsr166 1.28 * while (!cas...)" which is the simplest way to force an update of
348 dl 1.19 * a CAS'ed variable. There are also other coding oddities that
349     * help some methods perform reasonably even when interpreted (not
350     * compiled), at the expense of some messy constructions that
351     * reduce byte code counts.
352 dl 1.14 *
353     * The order of declarations in this file is: (1) statics (2)
354     * fields (along with constants used when unpacking some of them)
355     * (3) internal control methods (4) callbacks and other support
356     * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
357     * methods (plus a few little helpers).
358 jsr166 1.1 */
359    
360     /**
361 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
362     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
363     * for {@code ForkJoinWorkerThread} subclasses that extend base
364     * functionality or initialize threads with different contexts.
365 jsr166 1.1 */
366     public static interface ForkJoinWorkerThreadFactory {
367     /**
368     * Returns a new worker thread operating in the given pool.
369     *
370     * @param pool the pool this thread works in
371 jsr166 1.11 * @throws NullPointerException if the pool is null
372 jsr166 1.1 */
373     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
374     }
375    
376     /**
377     * Default ForkJoinWorkerThreadFactory implementation; creates a
378     * new ForkJoinWorkerThread.
379     */
380 dl 1.18 static class DefaultForkJoinWorkerThreadFactory
381 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
382     public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
383 dl 1.14 return new ForkJoinWorkerThread(pool);
384 jsr166 1.1 }
385     }
386    
387     /**
388     * Creates a new ForkJoinWorkerThread. This factory is used unless
389     * overridden in ForkJoinPool constructors.
390     */
391     public static final ForkJoinWorkerThreadFactory
392     defaultForkJoinWorkerThreadFactory =
393     new DefaultForkJoinWorkerThreadFactory();
394    
395     /**
396     * Permission required for callers of methods that may start or
397     * kill threads.
398     */
399     private static final RuntimePermission modifyThreadPermission =
400     new RuntimePermission("modifyThread");
401    
402     /**
403     * If there is a security manager, makes sure caller has
404     * permission to modify threads.
405     */
406     private static void checkPermission() {
407     SecurityManager security = System.getSecurityManager();
408     if (security != null)
409     security.checkPermission(modifyThreadPermission);
410     }
411    
412     /**
413     * Generator for assigning sequence numbers as pool names.
414     */
415     private static final AtomicInteger poolNumberGenerator =
416     new AtomicInteger();
417    
418     /**
419 dl 1.24 * The time to block in a join (see awaitJoin) before checking if
420     * a new worker should be (re)started to maintain parallelism
421 jsr166 1.25 * level. The value should be short enough to maintain global
422 dl 1.24 * responsiveness and progress but long enough to avoid
423     * counterproductive firings during GC stalls or unrelated system
424     * activity, and to not bog down systems with continual re-firings
425     * on GCs or legitimately long waits.
426     */
427     private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
428    
429     /**
430 dl 1.22 * The wakeup interval (in nanoseconds) for the oldest worker
431 jsr166 1.30 * waiting for an event invokes tryShutdownUnusedWorker to shrink
432 dl 1.22 * the number of workers. The exact value does not matter too
433     * much, but should be long enough to slowly release resources
434     * during long periods without use without disrupting normal use.
435     */
436     private static final long SHRINK_RATE_NANOS =
437 dl 1.24 30L * 1000L * 1000L * 1000L; // 2 per minute
438 dl 1.22
439     /**
440 dl 1.19 * Absolute bound for parallelism level. Twice this number plus
441     * one (i.e., 0xfff) must fit into a 16bit field to enable
442     * word-packing for some counts and indices.
443 dl 1.14 */
444 dl 1.19 private static final int MAX_WORKERS = 0x7fff;
445 dl 1.14
446     /**
447     * Array holding all worker threads in the pool. Array size must
448     * be a power of two. Updates and replacements are protected by
449     * workerLock, but the array is always kept in a consistent enough
450     * state to be randomly accessed without locking by workers
451     * performing work-stealing, as well as other traversal-based
452     * methods in this class. All readers must tolerate that some
453     * array slots may be null.
454 jsr166 1.1 */
455     volatile ForkJoinWorkerThread[] workers;
456    
457     /**
458 dl 1.14 * Queue for external submissions.
459 jsr166 1.1 */
460 dl 1.14 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
461 jsr166 1.1
462     /**
463 dl 1.14 * Lock protecting updates to workers array.
464 jsr166 1.1 */
465 dl 1.14 private final ReentrantLock workerLock;
466 jsr166 1.1
467     /**
468 dl 1.14 * Latch released upon termination.
469 jsr166 1.1 */
470 dl 1.18 private final Phaser termination;
471 jsr166 1.1
472     /**
473     * Creation factory for worker threads.
474     */
475     private final ForkJoinWorkerThreadFactory factory;
476    
477     /**
478 dl 1.14 * Sum of per-thread steal counts, updated only when threads are
479     * idle or terminating.
480 jsr166 1.1 */
481 dl 1.14 private volatile long stealCount;
482 jsr166 1.1
483     /**
484 jsr166 1.25 * Encoded record of top of Treiber stack of threads waiting for
485 dl 1.14 * events. The top 32 bits contain the count being waited for. The
486 dl 1.19 * bottom 16 bits contains one plus the pool index of waiting
487     * worker thread. (Bits 16-31 are unused.)
488 jsr166 1.1 */
489 dl 1.14 private volatile long eventWaiters;
490    
491     private static final int EVENT_COUNT_SHIFT = 32;
492 dl 1.19 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
493 jsr166 1.1
494     /**
495 dl 1.14 * A counter for events that may wake up worker threads:
496     * - Submission of a new task to the pool
497     * - A worker pushing a task on an empty queue
498 dl 1.19 * - termination
499 jsr166 1.1 */
500 dl 1.14 private volatile int eventCount;
501    
502     /**
503 jsr166 1.25 * Encoded record of top of Treiber stack of spare threads waiting
504 dl 1.19 * for resumption. The top 16 bits contain an arbitrary count to
505     * avoid ABA effects. The bottom 16bits contains one plus the pool
506     * index of waiting worker thread.
507     */
508     private volatile int spareWaiters;
509    
510     private static final int SPARE_COUNT_SHIFT = 16;
511     private static final int SPARE_ID_MASK = (1 << 16) - 1;
512    
513     /**
514 dl 1.14 * Lifecycle control. The low word contains the number of workers
515     * that are (probably) executing tasks. This value is atomically
516     * incremented before a worker gets a task to run, and decremented
517     * when worker has no tasks and cannot find any. Bits 16-18
518     * contain runLevel value. When all are zero, the pool is
519     * running. Level transitions are monotonic (running -> shutdown
520     * -> terminating -> terminated) so each transition adds a bit.
521     * These are bundled together to ensure consistent read for
522     * termination checks (i.e., that runLevel is at least SHUTDOWN
523     * and active threads is zero).
524 dl 1.22 *
525     * Notes: Most direct CASes are dependent on these bitfield
526     * positions. Also, this field is non-private to enable direct
527     * performance-sensitive CASes in ForkJoinWorkerThread.
528 dl 1.14 */
529 dl 1.22 volatile int runState;
530 dl 1.14
531     // Note: The order among run level values matters.
532     private static final int RUNLEVEL_SHIFT = 16;
533     private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
534     private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
535     private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
536     private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
537 jsr166 1.1
538     /**
539 dl 1.14 * Holds number of total (i.e., created and not yet terminated)
540     * and running (i.e., not blocked on joins or other managed sync)
541     * threads, packed together to ensure consistent snapshot when
542     * making decisions about creating and suspending spare
543     * threads. Updated only by CAS. Note that adding a new worker
544     * requires incrementing both counts, since workers start off in
545 dl 1.19 * running state.
546 dl 1.14 */
547     private volatile int workerCounts;
548    
549     private static final int TOTAL_COUNT_SHIFT = 16;
550     private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
551     private static final int ONE_RUNNING = 1;
552     private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
553    
554 jsr166 1.1 /**
555 dl 1.14 * The target parallelism level.
556 dl 1.18 * Accessed directly by ForkJoinWorkerThreads.
557 jsr166 1.1 */
558 dl 1.18 final int parallelism;
559 jsr166 1.1
560     /**
561 dl 1.14 * True if use local fifo, not default lifo, for local polling
562 dl 1.18 * Read by, and replicated by ForkJoinWorkerThreads
563 jsr166 1.1 */
564 dl 1.18 final boolean locallyFifo;
565 jsr166 1.1
566     /**
567 dl 1.18 * The uncaught exception handler used when any worker abruptly
568     * terminates.
569 jsr166 1.1 */
570 dl 1.18 private final Thread.UncaughtExceptionHandler ueh;
571 jsr166 1.1
572     /**
573 dl 1.14 * Pool number, just for assigning useful names to worker threads
574 jsr166 1.1 */
575 dl 1.14 private final int poolNumber;
576 jsr166 1.1
577 dl 1.22 // Utilities for CASing fields. Note that most of these
578     // are usually manually inlined by callers
579 jsr166 1.1
580     /**
581 dl 1.19 * Increments running count part of workerCounts
582 jsr166 1.1 */
583 dl 1.18 final void incrementRunningCount() {
584     int c;
585 dl 1.14 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
586 dl 1.19 c = workerCounts,
587 dl 1.18 c + ONE_RUNNING));
588 jsr166 1.1 }
589 dl 1.19
590 jsr166 1.1 /**
591 dl 1.18 * Tries to decrement running count unless already zero
592 dl 1.17 */
593     final boolean tryDecrementRunningCount() {
594     int wc = workerCounts;
595     if ((wc & RUNNING_COUNT_MASK) == 0)
596     return false;
597     return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
598     wc, wc - ONE_RUNNING);
599     }
600    
601     /**
602 dl 1.19 * Forces decrement of encoded workerCounts, awaiting nonzero if
603     * (rarely) necessary when other count updates lag.
604     *
605     * @param dr -- either zero or ONE_RUNNING
606     * @param dt == either zero or ONE_TOTAL
607     */
608     private void decrementWorkerCounts(int dr, int dt) {
609     for (;;) {
610     int wc = workerCounts;
611     if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
612 dl 1.22 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
613     if ((runState & TERMINATED) != 0)
614     return; // lagging termination on a backout
615 dl 1.19 Thread.yield();
616 dl 1.22 }
617 dl 1.19 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
618     wc, wc - (dr + dt)))
619     return;
620     }
621     }
622    
623     /**
624 jsr166 1.1 * Tries decrementing active count; fails on contention.
625 dl 1.14 * Called when workers cannot find tasks to run.
626     */
627     final boolean tryDecrementActiveCount() {
628     int c;
629     return UNSAFE.compareAndSwapInt(this, runStateOffset,
630 dl 1.22 c = runState, c - 1);
631 dl 1.14 }
632    
633     /**
634     * Advances to at least the given level. Returns true if not
635     * already in at least the given level.
636     */
637     private boolean advanceRunLevel(int level) {
638     for (;;) {
639     int s = runState;
640     if ((s & level) != 0)
641     return false;
642     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
643     return true;
644     }
645     }
646    
647     // workers array maintenance
648    
649     /**
650     * Records and returns a workers array index for new worker.
651     */
652     private int recordWorker(ForkJoinWorkerThread w) {
653     // Try using slot totalCount-1. If not available, scan and/or resize
654     int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
655     final ReentrantLock lock = this.workerLock;
656     lock.lock();
657     try {
658     ForkJoinWorkerThread[] ws = workers;
659 dl 1.19 int n = ws.length;
660     if (k < 0 || k >= n || ws[k] != null) {
661     for (k = 0; k < n && ws[k] != null; ++k)
662 dl 1.14 ;
663 dl 1.19 if (k == n)
664     ws = Arrays.copyOf(ws, n << 1);
665 dl 1.14 }
666     ws[k] = w;
667     workers = ws; // volatile array write ensures slot visibility
668     } finally {
669     lock.unlock();
670     }
671     return k;
672     }
673    
674     /**
675 jsr166 1.29 * Nulls out record of worker in workers array.
676 dl 1.14 */
677     private void forgetWorker(ForkJoinWorkerThread w) {
678     int idx = w.poolIndex;
679 jsr166 1.25 // Locking helps method recordWorker avoid unnecessary expansion
680 dl 1.14 final ReentrantLock lock = this.workerLock;
681     lock.lock();
682     try {
683     ForkJoinWorkerThread[] ws = workers;
684     if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
685     ws[idx] = null;
686     } finally {
687     lock.unlock();
688     }
689     }
690    
691     /**
692     * Final callback from terminating worker. Removes record of
693     * worker from array, and adjusts counts. If pool is shutting
694 jsr166 1.25 * down, tries to complete termination.
695 dl 1.14 *
696     * @param w the worker
697     */
698     final void workerTerminated(ForkJoinWorkerThread w) {
699     forgetWorker(w);
700 dl 1.19 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
701     while (w.stealCount != 0) // collect final count
702     tryAccumulateStealCount(w);
703     tryTerminate(false);
704 dl 1.14 }
705    
706     // Waiting for and signalling events
707    
708     /**
709     * Releases workers blocked on a count not equal to current count.
710 dl 1.19 * Normally called after precheck that eventWaiters isn't zero to
711 dl 1.22 * avoid wasted array checks. Gives up upon a change in count or
712 dl 1.24 * upon releasing two workers, letting others take over.
713 dl 1.14 */
714 dl 1.22 private void releaseEventWaiters() {
715 dl 1.19 ForkJoinWorkerThread[] ws = workers;
716     int n = ws.length;
717 dl 1.22 long h = eventWaiters;
718     int ec = eventCount;
719 dl 1.24 boolean releasedOne = false;
720 dl 1.22 ForkJoinWorkerThread w; int id;
721 dl 1.24 while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
722     (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
723     id < n && (w = ws[id]) != null) {
724     if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
725     h, w.nextWaiter)) {
726     LockSupport.unpark(w);
727     if (releasedOne) // exit on second release
728     break;
729     releasedOne = true;
730     }
731     if (eventCount != ec)
732 dl 1.19 break;
733 dl 1.24 h = eventWaiters;
734 dl 1.14 }
735     }
736    
737     /**
738 dl 1.19 * Tries to advance eventCount and releases waiters. Called only
739     * from workers.
740 dl 1.18 */
741 dl 1.19 final void signalWork() {
742     int c; // try to increment event count -- CAS failure OK
743     UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
744     if (eventWaiters != 0L)
745 dl 1.22 releaseEventWaiters();
746 dl 1.18 }
747    
748     /**
749 dl 1.22 * Adds the given worker to event queue and blocks until
750 dl 1.24 * terminating or event count advances from the given value
751 dl 1.19 *
752     * @param w the calling worker thread
753 dl 1.24 * @param ec the count
754 dl 1.14 */
755 dl 1.24 private void eventSync(ForkJoinWorkerThread w, int ec) {
756 dl 1.22 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
757 dl 1.19 long h;
758     while ((runState < SHUTDOWN || !tryTerminate(false)) &&
759 dl 1.22 (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
760     (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
761     eventCount == ec) {
762 dl 1.19 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
763     w.nextWaiter = h, nh)) {
764 dl 1.22 awaitEvent(w, ec);
765     break;
766     }
767     }
768     }
769    
770     /**
771     * Blocks the given worker (that has already been entered as an
772     * event waiter) until terminating or event count advances from
773     * the given value. The oldest (first) waiter uses a timed wait to
774     * occasionally one-by-one shrink the number of workers (to a
775 dl 1.24 * minimum of one) if the pool has not been used for extended
776 dl 1.22 * periods.
777     *
778     * @param w the calling worker thread
779     * @param ec the count
780     */
781     private void awaitEvent(ForkJoinWorkerThread w, int ec) {
782     while (eventCount == ec) {
783     if (tryAccumulateStealCount(w)) { // transfer while idle
784     boolean untimed = (w.nextWaiter != 0L ||
785     (workerCounts & RUNNING_COUNT_MASK) <= 1);
786     long startTime = untimed? 0 : System.nanoTime();
787     Thread.interrupted(); // clear/ignore interrupt
788 dl 1.24 if (eventCount != ec || w.runState != 0 ||
789 dl 1.22 runState >= TERMINATING) // recheck after clear
790     break;
791     if (untimed)
792     LockSupport.park(w);
793     else {
794     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
795 dl 1.24 if (eventCount != ec || w.runState != 0 ||
796 dl 1.22 runState >= TERMINATING)
797 dl 1.19 break;
798 dl 1.22 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
799 dl 1.24 tryShutdownUnusedWorker(ec);
800 dl 1.19 }
801 dl 1.14 }
802     }
803 dl 1.22 }
804    
805 dl 1.24 // Maintaining parallelism
806 dl 1.19
807     /**
808 jsr166 1.32 * Pushes worker onto the spare stack.
809 dl 1.19 */
810     final void pushSpare(ForkJoinWorkerThread w) {
811 dl 1.22 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
812 dl 1.19 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
813     w.nextSpare = spareWaiters,ns));
814 dl 1.14 }
815    
816     /**
817 dl 1.24 * Tries (once) to resume a spare if the number of running
818     * threads is less than target.
819 dl 1.14 */
820 dl 1.24 private void tryResumeSpare() {
821 dl 1.19 int sw, id;
822 dl 1.24 ForkJoinWorkerThread[] ws = workers;
823     int n = ws.length;
824 dl 1.19 ForkJoinWorkerThread w;
825 dl 1.24 if ((sw = spareWaiters) != 0 &&
826     (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
827     id < n && (w = ws[id]) != null &&
828     (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
829     spareWaiters == sw &&
830 dl 1.19 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
831 dl 1.22 sw, w.nextSpare)) {
832 dl 1.24 int c; // increment running count before resume
833 jsr166 1.28 do {} while (!UNSAFE.compareAndSwapInt
834     (this, workerCountsOffset,
835     c = workerCounts, c + ONE_RUNNING));
836 dl 1.24 if (w.tryUnsuspend())
837     LockSupport.unpark(w);
838     else // back out if w was shutdown
839     decrementWorkerCounts(ONE_RUNNING, 0);
840 dl 1.22 }
841     }
842    
843     /**
844 dl 1.24 * Tries to increase the number of running workers if below target
845     * parallelism: If a spare exists tries to resume it via
846     * tryResumeSpare. Otherwise, if not enough total workers or all
847 jsr166 1.25 * existing workers are busy, adds a new worker. In all cases also
848 dl 1.24 * helps wake up releasable workers waiting for work.
849 dl 1.22 */
850 dl 1.24 private void helpMaintainParallelism() {
851 dl 1.22 int pc = parallelism;
852 dl 1.24 int wc, rs, tc;
853     while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
854     (rs = runState) < TERMINATING) {
855     if (spareWaiters != 0)
856     tryResumeSpare();
857     else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
858     (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
859     break; // enough total
860     else if (runState == rs && workerCounts == wc &&
861     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
862     wc + (ONE_RUNNING|ONE_TOTAL))) {
863     ForkJoinWorkerThread w = null;
864     try {
865     w = factory.newThread(this);
866     } finally { // adjust on null or exceptional factory return
867     if (w == null) {
868     decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
869     tryTerminate(false); // handle failure during shutdown
870     }
871     }
872     if (w == null)
873 dl 1.22 break;
874 dl 1.24 w.start(recordWorker(w), ueh);
875     if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
876     int c; // advance event count
877     UNSAFE.compareAndSwapInt(this, eventCountOffset,
878     c = eventCount, c+1);
879     break; // add at most one unless total below target
880     }
881 dl 1.22 }
882     }
883 dl 1.24 if (eventWaiters != 0L)
884     releaseEventWaiters();
885 dl 1.22 }
886    
887     /**
888 dl 1.24 * Callback from the oldest waiter in awaitEvent waking up after a
889     * period of non-use. If all workers are idle, tries (once) to
890     * shutdown an event waiter or a spare, if one exists. Note that
891     * we don't need CAS or locks here because the method is called
892     * only from one thread occasionally waking (and even misfires are
893     * OK). Note that until the shutdown worker fully terminates,
894     * workerCounts will overestimate total count, which is tolerable.
895 dl 1.22 *
896 dl 1.24 * @param ec the event count waited on by caller (to abort
897     * attempt if count has since changed).
898 dl 1.22 */
899 dl 1.24 private void tryShutdownUnusedWorker(int ec) {
900     if (runState == 0 && eventCount == ec) { // only trigger if all idle
901     ForkJoinWorkerThread[] ws = workers;
902     int n = ws.length;
903     ForkJoinWorkerThread w = null;
904     boolean shutdown = false;
905     int sw;
906     long h;
907     if ((sw = spareWaiters) != 0) { // prefer killing spares
908     int id = (sw & SPARE_ID_MASK) - 1;
909     if (id >= 0 && id < n && (w = ws[id]) != null &&
910     UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
911     sw, w.nextSpare))
912     shutdown = true;
913     }
914     else if ((h = eventWaiters) != 0L) {
915     long nh;
916     int id = ((int)(h & WAITER_ID_MASK)) - 1;
917     if (id >= 0 && id < n && (w = ws[id]) != null &&
918     (nh = w.nextWaiter) != 0L && // keep at least one worker
919     UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
920     shutdown = true;
921     }
922     if (w != null && shutdown) {
923     w.shutdown();
924     LockSupport.unpark(w);
925     }
926 dl 1.19 }
927 dl 1.24 releaseEventWaiters(); // in case of interference
928 dl 1.14 }
929    
930     /**
931     * Callback from workers invoked upon each top-level action (i.e.,
932 dl 1.22 * stealing a task or taking a submission and running it).
933     * Performs one or more of the following:
934 dl 1.19 *
935 dl 1.24 * 1. If the worker is active and either did not run a task
936     * or there are too many workers, try to set its active status
937     * to inactive and update activeCount. On contention, we may
938     * try again in this or a subsequent call.
939     *
940     * 2. If not enough total workers, help create some.
941     *
942     * 3. If there are too many running workers, suspend this worker
943     * (first forcing inactive if necessary). If it is not needed,
944     * it may be shutdown while suspended (via
945     * tryShutdownUnusedWorker). Otherwise, upon resume it
946     * rechecks running thread count and need for event sync.
947     *
948     * 4. If worker did not run a task, await the next task event via
949     * eventSync if necessary (first forcing inactivation), upon
950     * which the worker may be shutdown via
951     * tryShutdownUnusedWorker. Otherwise, help release any
952     * existing event waiters that are now releasable,
953 dl 1.14 *
954     * @param w the worker
955 dl 1.24 * @param ran true if worker ran a task since last call to this method
956 dl 1.14 */
957 dl 1.24 final void preStep(ForkJoinWorkerThread w, boolean ran) {
958     int wec = w.lastEventCount;
959 dl 1.14 boolean active = w.active;
960 dl 1.24 boolean inactivate = false;
961 dl 1.19 int pc = parallelism;
962 dl 1.24 int rs;
963     while (w.runState == 0 && (rs = runState) < TERMINATING) {
964     if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
965     UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
966     inactivate = active = w.active = false;
967     int wc = workerCounts;
968     if ((wc & RUNNING_COUNT_MASK) > pc) {
969     if (!(inactivate |= active) && // must inactivate to suspend
970 dl 1.22 workerCounts == wc && // try to suspend as spare
971 dl 1.19 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
972 dl 1.24 wc, wc - ONE_RUNNING))
973 dl 1.22 w.suspendAsSpare();
974 dl 1.19 }
975 dl 1.24 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
976     helpMaintainParallelism(); // not enough workers
977     else if (!ran) {
978     long h = eventWaiters;
979     int ec = eventCount;
980     if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
981     releaseEventWaiters(); // release others before waiting
982     else if (ec != wec) {
983     w.lastEventCount = ec; // no need to wait
984     break;
985 dl 1.22 }
986 jsr166 1.27 else if (!(inactivate |= active))
987 dl 1.24 eventSync(w, wec); // must inactivate before sync
988 dl 1.14 }
989 dl 1.24 else
990     break;
991 dl 1.14 }
992     }
993    
994     /**
995 dl 1.19 * Helps and/or blocks awaiting join of the given task.
996 dl 1.24 * See above for explanation.
997 dl 1.17 *
998     * @param joinMe the task to join
999 dl 1.24 * @param worker the current worker thread
1000 dl 1.14 */
1001 dl 1.19 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1002 dl 1.24 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1003 dl 1.19 while (joinMe.status >= 0) {
1004 dl 1.24 int wc;
1005 dl 1.19 worker.helpJoinTask(joinMe);
1006     if (joinMe.status < 0)
1007     break;
1008 dl 1.24 else if (retries > 0)
1009     --retries;
1010     else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1011     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1012     wc, wc - ONE_RUNNING)) {
1013     int stat, c; long h;
1014     while ((stat = joinMe.status) >= 0 &&
1015     (h = eventWaiters) != 0L && // help release others
1016     (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1017     releaseEventWaiters();
1018     if (stat >= 0 &&
1019     ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1020     (stat =
1021     joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1022     helpMaintainParallelism(); // timeout or no running workers
1023 dl 1.19 do {} while (!UNSAFE.compareAndSwapInt
1024     (this, workerCountsOffset,
1025     c = workerCounts, c + ONE_RUNNING));
1026 dl 1.24 if (stat < 0)
1027     break; // else restart
1028 dl 1.14 }
1029     }
1030     }
1031    
1032     /**
1033 dl 1.24 * Same idea as awaitJoin, but no helping, retries, or timeouts.
1034 dl 1.14 */
1035 dl 1.18 final void awaitBlocker(ManagedBlocker blocker)
1036 dl 1.14 throws InterruptedException {
1037 dl 1.19 while (!blocker.isReleasable()) {
1038 dl 1.24 int wc = workerCounts;
1039     if ((wc & RUNNING_COUNT_MASK) != 0 &&
1040     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1041     wc, wc - ONE_RUNNING)) {
1042 dl 1.19 try {
1043 dl 1.24 while (!blocker.isReleasable()) {
1044     long h = eventWaiters;
1045     if (h != 0L &&
1046     (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1047     releaseEventWaiters();
1048     else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1049     runState < TERMINATING)
1050     helpMaintainParallelism();
1051     else if (blocker.block())
1052     break;
1053     }
1054 dl 1.19 } finally {
1055 dl 1.14 int c;
1056 dl 1.17 do {} while (!UNSAFE.compareAndSwapInt
1057     (this, workerCountsOffset,
1058     c = workerCounts, c + ONE_RUNNING));
1059 dl 1.14 }
1060 dl 1.18 break;
1061     }
1062 dl 1.14 }
1063 dl 1.19 }
1064 dl 1.15
1065     /**
1066 dl 1.14 * Possibly initiates and/or completes termination.
1067     *
1068     * @param now if true, unconditionally terminate, else only
1069     * if shutdown and empty queue and no active workers
1070     * @return true if now terminating or terminated
1071 jsr166 1.1 */
1072 dl 1.14 private boolean tryTerminate(boolean now) {
1073     if (now)
1074     advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1075     else if (runState < SHUTDOWN ||
1076     !submissionQueue.isEmpty() ||
1077     (runState & ACTIVE_COUNT_MASK) != 0)
1078 jsr166 1.1 return false;
1079 dl 1.14
1080     if (advanceRunLevel(TERMINATING))
1081     startTerminating();
1082    
1083     // Finish now if all threads terminated; else in some subsequent call
1084     if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1085     advanceRunLevel(TERMINATED);
1086 dl 1.18 termination.arrive();
1087 dl 1.14 }
1088 jsr166 1.1 return true;
1089     }
1090    
1091     /**
1092 dl 1.14 * Actions on transition to TERMINATING
1093 dl 1.19 *
1094     * Runs up to four passes through workers: (0) shutting down each
1095 dl 1.22 * (without waking up if parked) to quickly spread notifications
1096     * without unnecessary bouncing around event queues etc (1) wake
1097     * up and help cancel tasks (2) interrupt (3) mop up races with
1098     * interrupted workers
1099 dl 1.14 */
1100     private void startTerminating() {
1101 dl 1.19 cancelSubmissions();
1102     for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1103 dl 1.24 int c; // advance event count
1104     UNSAFE.compareAndSwapInt(this, eventCountOffset,
1105     c = eventCount, c+1);
1106 dl 1.19 eventWaiters = 0L; // clobber lists
1107     spareWaiters = 0;
1108 jsr166 1.29 for (ForkJoinWorkerThread w : workers) {
1109 dl 1.19 if (w != null) {
1110 dl 1.22 w.shutdown();
1111 dl 1.19 if (passes > 0 && !w.isTerminated()) {
1112     w.cancelTasks();
1113     LockSupport.unpark(w);
1114     if (passes > 1) {
1115     try {
1116     w.interrupt();
1117     } catch (SecurityException ignore) {
1118     }
1119     }
1120     }
1121     }
1122     }
1123 dl 1.17 }
1124     }
1125    
1126     /**
1127 jsr166 1.30 * Clears out and cancels submissions, ignoring exceptions.
1128 dl 1.17 */
1129     private void cancelSubmissions() {
1130 dl 1.14 ForkJoinTask<?> task;
1131     while ((task = submissionQueue.poll()) != null) {
1132     try {
1133     task.cancel(false);
1134     } catch (Throwable ignore) {
1135     }
1136     }
1137 dl 1.17 }
1138    
1139 dl 1.14 // misc support for ForkJoinWorkerThread
1140    
1141     /**
1142 jsr166 1.30 * Returns pool number.
1143 jsr166 1.1 */
1144 dl 1.14 final int getPoolNumber() {
1145     return poolNumber;
1146 jsr166 1.1 }
1147    
1148     /**
1149 jsr166 1.30 * Tries to accumulate steal count from a worker, clearing
1150     * the worker's value if successful.
1151 dl 1.19 *
1152     * @return true if worker steal count now zero
1153 jsr166 1.1 */
1154 dl 1.19 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1155 dl 1.14 int sc = w.stealCount;
1156 dl 1.19 long c = stealCount;
1157     // CAS even if zero, for fence effects
1158     if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1159     if (sc != 0)
1160     w.stealCount = 0;
1161     return true;
1162 jsr166 1.1 }
1163 dl 1.19 return sc == 0;
1164 jsr166 1.1 }
1165    
1166     /**
1167 dl 1.14 * Returns the approximate (non-atomic) number of idle threads per
1168     * active thread.
1169     */
1170     final int idlePerActive() {
1171 dl 1.19 int pc = parallelism; // use parallelism, not rc
1172 jsr166 1.25 int ac = runState; // no mask -- artificially boosts during shutdown
1173 dl 1.14 // Use exact results for small values, saturate past 4
1174 jsr166 1.30 return ((pc <= ac) ? 0 :
1175     (pc >>> 1 <= ac) ? 1 :
1176     (pc >>> 2 <= ac) ? 3 :
1177     pc >>> 3);
1178 dl 1.14 }
1179    
1180     // Public and protected methods
1181 jsr166 1.1
1182     // Constructors
1183    
1184     /**
1185 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1186 dl 1.18 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1187     * #defaultForkJoinWorkerThreadFactory default thread factory},
1188     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1189 jsr166 1.1 *
1190     * @throws SecurityException if a security manager exists and
1191     * the caller is not permitted to modify threads
1192     * because it does not hold {@link
1193     * java.lang.RuntimePermission}{@code ("modifyThread")}
1194     */
1195     public ForkJoinPool() {
1196     this(Runtime.getRuntime().availableProcessors(),
1197 dl 1.18 defaultForkJoinWorkerThreadFactory, null, false);
1198 jsr166 1.1 }
1199    
1200     /**
1201 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
1202 dl 1.18 * level, the {@linkplain
1203     * #defaultForkJoinWorkerThreadFactory default thread factory},
1204     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1205 jsr166 1.1 *
1206 jsr166 1.9 * @param parallelism the parallelism level
1207 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
1208 jsr166 1.11 * equal to zero, or greater than implementation limit
1209 jsr166 1.1 * @throws SecurityException if a security manager exists and
1210     * the caller is not permitted to modify threads
1211     * because it does not hold {@link
1212     * java.lang.RuntimePermission}{@code ("modifyThread")}
1213     */
1214     public ForkJoinPool(int parallelism) {
1215 dl 1.18 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1216 jsr166 1.1 }
1217    
1218     /**
1219 dl 1.18 * Creates a {@code ForkJoinPool} with the given parameters.
1220 jsr166 1.1 *
1221 dl 1.18 * @param parallelism the parallelism level. For default value,
1222     * use {@link java.lang.Runtime#availableProcessors}.
1223     * @param factory the factory for creating new threads. For default value,
1224     * use {@link #defaultForkJoinWorkerThreadFactory}.
1225 dl 1.19 * @param handler the handler for internal worker threads that
1226     * terminate due to unrecoverable errors encountered while executing
1227 jsr166 1.31 * tasks. For default value, use {@code null}.
1228 dl 1.19 * @param asyncMode if true,
1229 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
1230     * tasks that are never joined. This mode may be more appropriate
1231     * than default locally stack-based mode in applications in which
1232     * worker threads only process event-style asynchronous tasks.
1233 jsr166 1.31 * For default value, use {@code false}.
1234 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
1235 jsr166 1.11 * equal to zero, or greater than implementation limit
1236     * @throws NullPointerException if the factory is null
1237 jsr166 1.1 * @throws SecurityException if a security manager exists and
1238     * the caller is not permitted to modify threads
1239     * because it does not hold {@link
1240     * java.lang.RuntimePermission}{@code ("modifyThread")}
1241     */
1242 dl 1.19 public ForkJoinPool(int parallelism,
1243 dl 1.18 ForkJoinWorkerThreadFactory factory,
1244     Thread.UncaughtExceptionHandler handler,
1245     boolean asyncMode) {
1246 dl 1.14 checkPermission();
1247     if (factory == null)
1248     throw new NullPointerException();
1249 dl 1.19 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1250 jsr166 1.1 throw new IllegalArgumentException();
1251 dl 1.14 this.parallelism = parallelism;
1252 jsr166 1.1 this.factory = factory;
1253 dl 1.18 this.ueh = handler;
1254     this.locallyFifo = asyncMode;
1255     int arraySize = initialArraySizeFor(parallelism);
1256 dl 1.14 this.workers = new ForkJoinWorkerThread[arraySize];
1257     this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1258 jsr166 1.1 this.workerLock = new ReentrantLock();
1259 dl 1.18 this.termination = new Phaser(1);
1260     this.poolNumber = poolNumberGenerator.incrementAndGet();
1261 jsr166 1.1 }
1262    
1263     /**
1264 dl 1.14 * Returns initial power of two size for workers array.
1265     * @param pc the initial parallelism level
1266     */
1267     private static int initialArraySizeFor(int pc) {
1268 dl 1.24 // If possible, initially allocate enough space for one spare
1269     int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1270 dl 1.19 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1271 dl 1.14 size |= size >>> 1;
1272     size |= size >>> 2;
1273     size |= size >>> 4;
1274     size |= size >>> 8;
1275     return size + 1;
1276 jsr166 1.1 }
1277    
1278     // Execution methods
1279    
1280     /**
1281     * Common code for execute, invoke and submit
1282     */
1283     private <T> void doSubmit(ForkJoinTask<T> task) {
1284 jsr166 1.2 if (task == null)
1285     throw new NullPointerException();
1286 dl 1.14 if (runState >= SHUTDOWN)
1287 jsr166 1.1 throw new RejectedExecutionException();
1288 dl 1.19 submissionQueue.offer(task);
1289 dl 1.24 int c; // try to increment event count -- CAS failure OK
1290     UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1291     helpMaintainParallelism(); // create, start, or resume some workers
1292 jsr166 1.1 }
1293    
1294     /**
1295     * Performs the given task, returning its result upon completion.
1296     *
1297     * @param task the task
1298     * @return the task's result
1299 jsr166 1.11 * @throws NullPointerException if the task is null
1300     * @throws RejectedExecutionException if the task cannot be
1301     * scheduled for execution
1302 jsr166 1.1 */
1303     public <T> T invoke(ForkJoinTask<T> task) {
1304     doSubmit(task);
1305     return task.join();
1306     }
1307    
1308     /**
1309     * Arranges for (asynchronous) execution of the given task.
1310     *
1311     * @param task the task
1312 jsr166 1.11 * @throws NullPointerException if the task is null
1313     * @throws RejectedExecutionException if the task cannot be
1314     * scheduled for execution
1315 jsr166 1.1 */
1316 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
1317 jsr166 1.1 doSubmit(task);
1318     }
1319    
1320     // AbstractExecutorService methods
1321    
1322 jsr166 1.11 /**
1323     * @throws NullPointerException if the task is null
1324     * @throws RejectedExecutionException if the task cannot be
1325     * scheduled for execution
1326     */
1327 jsr166 1.1 public void execute(Runnable task) {
1328 jsr166 1.2 ForkJoinTask<?> job;
1329 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1330     job = (ForkJoinTask<?>) task;
1331 jsr166 1.2 else
1332 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
1333 jsr166 1.2 doSubmit(job);
1334 jsr166 1.1 }
1335    
1336 jsr166 1.11 /**
1337 dl 1.18 * Submits a ForkJoinTask for execution.
1338     *
1339     * @param task the task to submit
1340     * @return the task
1341     * @throws NullPointerException if the task is null
1342     * @throws RejectedExecutionException if the task cannot be
1343     * scheduled for execution
1344     */
1345     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1346     doSubmit(task);
1347     return task;
1348     }
1349    
1350     /**
1351 jsr166 1.11 * @throws NullPointerException if the task is null
1352     * @throws RejectedExecutionException if the task cannot be
1353     * scheduled for execution
1354     */
1355 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1356 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1357 jsr166 1.1 doSubmit(job);
1358     return job;
1359     }
1360    
1361 jsr166 1.11 /**
1362     * @throws NullPointerException if the task is null
1363     * @throws RejectedExecutionException if the task cannot be
1364     * scheduled for execution
1365     */
1366 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1367 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1368 jsr166 1.1 doSubmit(job);
1369     return job;
1370     }
1371    
1372 jsr166 1.11 /**
1373     * @throws NullPointerException if the task is null
1374     * @throws RejectedExecutionException if the task cannot be
1375     * scheduled for execution
1376     */
1377 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
1378 jsr166 1.2 ForkJoinTask<?> job;
1379 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1380     job = (ForkJoinTask<?>) task;
1381 jsr166 1.2 else
1382 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
1383 jsr166 1.1 doSubmit(job);
1384     return job;
1385     }
1386    
1387     /**
1388 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
1389     * @throws RejectedExecutionException {@inheritDoc}
1390     */
1391 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1392     ArrayList<ForkJoinTask<T>> forkJoinTasks =
1393     new ArrayList<ForkJoinTask<T>>(tasks.size());
1394     for (Callable<T> task : tasks)
1395 jsr166 1.7 forkJoinTasks.add(ForkJoinTask.adapt(task));
1396 jsr166 1.1 invoke(new InvokeAll<T>(forkJoinTasks));
1397    
1398     @SuppressWarnings({"unchecked", "rawtypes"})
1399 dl 1.15 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1400 jsr166 1.1 return futures;
1401     }
1402    
1403     static final class InvokeAll<T> extends RecursiveAction {
1404     final ArrayList<ForkJoinTask<T>> tasks;
1405     InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1406     public void compute() {
1407     try { invokeAll(tasks); }
1408     catch (Exception ignore) {}
1409     }
1410     private static final long serialVersionUID = -7914297376763021607L;
1411     }
1412    
1413     /**
1414     * Returns the factory used for constructing new workers.
1415     *
1416     * @return the factory used for constructing new workers
1417     */
1418     public ForkJoinWorkerThreadFactory getFactory() {
1419     return factory;
1420     }
1421    
1422     /**
1423     * Returns the handler for internal worker threads that terminate
1424     * due to unrecoverable errors encountered while executing tasks.
1425     *
1426 jsr166 1.4 * @return the handler, or {@code null} if none
1427 jsr166 1.1 */
1428     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1429 dl 1.14 return ueh;
1430 jsr166 1.1 }
1431    
1432     /**
1433 jsr166 1.9 * Returns the targeted parallelism level of this pool.
1434 jsr166 1.1 *
1435 jsr166 1.9 * @return the targeted parallelism level of this pool
1436 jsr166 1.1 */
1437     public int getParallelism() {
1438     return parallelism;
1439     }
1440    
1441     /**
1442     * Returns the number of worker threads that have started but not
1443     * yet terminated. This result returned by this method may differ
1444 jsr166 1.4 * from {@link #getParallelism} when threads are created to
1445 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
1446     *
1447     * @return the number of worker threads
1448     */
1449     public int getPoolSize() {
1450 dl 1.14 return workerCounts >>> TOTAL_COUNT_SHIFT;
1451 jsr166 1.1 }
1452    
1453     /**
1454 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
1455 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
1456     *
1457 jsr166 1.4 * @return {@code true} if this pool uses async mode
1458 jsr166 1.1 */
1459     public boolean getAsyncMode() {
1460     return locallyFifo;
1461     }
1462    
1463     /**
1464     * Returns an estimate of the number of worker threads that are
1465     * not blocked waiting to join tasks or for other managed
1466 dl 1.14 * synchronization. This method may overestimate the
1467     * number of running threads.
1468 jsr166 1.1 *
1469     * @return the number of worker threads
1470     */
1471     public int getRunningThreadCount() {
1472 dl 1.14 return workerCounts & RUNNING_COUNT_MASK;
1473 jsr166 1.1 }
1474    
1475     /**
1476     * Returns an estimate of the number of threads that are currently
1477     * stealing or executing tasks. This method may overestimate the
1478     * number of active threads.
1479     *
1480     * @return the number of active threads
1481     */
1482     public int getActiveThreadCount() {
1483 dl 1.14 return runState & ACTIVE_COUNT_MASK;
1484 jsr166 1.1 }
1485    
1486     /**
1487 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
1488     * An idle worker is one that cannot obtain a task to execute
1489     * because none are available to steal from other threads, and
1490     * there are no pending submissions to the pool. This method is
1491     * conservative; it might not return {@code true} immediately upon
1492     * idleness of all threads, but will eventually become true if
1493     * threads remain inactive.
1494 jsr166 1.1 *
1495 jsr166 1.4 * @return {@code true} if all threads are currently idle
1496 jsr166 1.1 */
1497     public boolean isQuiescent() {
1498 dl 1.14 return (runState & ACTIVE_COUNT_MASK) == 0;
1499 jsr166 1.1 }
1500    
1501     /**
1502     * Returns an estimate of the total number of tasks stolen from
1503     * one thread's work queue by another. The reported value
1504     * underestimates the actual total number of steals when the pool
1505     * is not quiescent. This value may be useful for monitoring and
1506     * tuning fork/join programs: in general, steal counts should be
1507     * high enough to keep threads busy, but low enough to avoid
1508     * overhead and contention across threads.
1509     *
1510     * @return the number of steals
1511     */
1512     public long getStealCount() {
1513 dl 1.14 return stealCount;
1514 jsr166 1.1 }
1515    
1516     /**
1517     * Returns an estimate of the total number of tasks currently held
1518     * in queues by worker threads (but not including tasks submitted
1519     * to the pool that have not begun executing). This value is only
1520     * an approximation, obtained by iterating across all threads in
1521     * the pool. This method may be useful for tuning task
1522     * granularities.
1523     *
1524     * @return the number of queued tasks
1525     */
1526     public long getQueuedTaskCount() {
1527     long count = 0;
1528 jsr166 1.29 for (ForkJoinWorkerThread w : workers)
1529 dl 1.14 if (w != null)
1530     count += w.getQueueSize();
1531 jsr166 1.1 return count;
1532     }
1533    
1534     /**
1535 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
1536     * pool that have not yet begun executing. This method takes time
1537 jsr166 1.1 * proportional to the number of submissions.
1538     *
1539     * @return the number of queued submissions
1540     */
1541     public int getQueuedSubmissionCount() {
1542     return submissionQueue.size();
1543     }
1544    
1545     /**
1546 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
1547     * pool that have not yet begun executing.
1548 jsr166 1.1 *
1549     * @return {@code true} if there are any queued submissions
1550     */
1551     public boolean hasQueuedSubmissions() {
1552     return !submissionQueue.isEmpty();
1553     }
1554    
1555     /**
1556     * Removes and returns the next unexecuted submission if one is
1557     * available. This method may be useful in extensions to this
1558     * class that re-assign work in systems with multiple pools.
1559     *
1560 jsr166 1.4 * @return the next submission, or {@code null} if none
1561 jsr166 1.1 */
1562     protected ForkJoinTask<?> pollSubmission() {
1563     return submissionQueue.poll();
1564     }
1565    
1566     /**
1567     * Removes all available unexecuted submitted and forked tasks
1568     * from scheduling queues and adds them to the given collection,
1569     * without altering their execution status. These may include
1570 jsr166 1.8 * artificially generated or wrapped tasks. This method is
1571     * designed to be invoked only when the pool is known to be
1572 jsr166 1.1 * quiescent. Invocations at other times may not remove all
1573     * tasks. A failure encountered while attempting to add elements
1574     * to collection {@code c} may result in elements being in
1575     * neither, either or both collections when the associated
1576     * exception is thrown. The behavior of this operation is
1577     * undefined if the specified collection is modified while the
1578     * operation is in progress.
1579     *
1580     * @param c the collection to transfer elements into
1581     * @return the number of elements transferred
1582     */
1583 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1584 dl 1.19 int count = submissionQueue.drainTo(c);
1585 jsr166 1.29 for (ForkJoinWorkerThread w : workers)
1586 dl 1.18 if (w != null)
1587 dl 1.19 count += w.drainTasksTo(c);
1588 dl 1.18 return count;
1589     }
1590    
1591     /**
1592 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
1593     * including indications of run state, parallelism level, and
1594     * worker and task counts.
1595     *
1596     * @return a string identifying this pool, as well as its state
1597     */
1598     public String toString() {
1599     long st = getStealCount();
1600     long qt = getQueuedTaskCount();
1601     long qs = getQueuedSubmissionCount();
1602 dl 1.14 int wc = workerCounts;
1603     int tc = wc >>> TOTAL_COUNT_SHIFT;
1604     int rc = wc & RUNNING_COUNT_MASK;
1605     int pc = parallelism;
1606     int rs = runState;
1607     int ac = rs & ACTIVE_COUNT_MASK;
1608 jsr166 1.1 return super.toString() +
1609 dl 1.14 "[" + runLevelToString(rs) +
1610     ", parallelism = " + pc +
1611     ", size = " + tc +
1612     ", active = " + ac +
1613     ", running = " + rc +
1614 jsr166 1.1 ", steals = " + st +
1615     ", tasks = " + qt +
1616     ", submissions = " + qs +
1617     "]";
1618     }
1619    
1620 dl 1.14 private static String runLevelToString(int s) {
1621     return ((s & TERMINATED) != 0 ? "Terminated" :
1622     ((s & TERMINATING) != 0 ? "Terminating" :
1623     ((s & SHUTDOWN) != 0 ? "Shutting down" :
1624     "Running")));
1625 jsr166 1.1 }
1626    
1627     /**
1628     * Initiates an orderly shutdown in which previously submitted
1629     * tasks are executed, but no new tasks will be accepted.
1630     * Invocation has no additional effect if already shut down.
1631     * Tasks that are in the process of being submitted concurrently
1632     * during the course of this method may or may not be rejected.
1633     *
1634     * @throws SecurityException if a security manager exists and
1635     * the caller is not permitted to modify threads
1636     * because it does not hold {@link
1637     * java.lang.RuntimePermission}{@code ("modifyThread")}
1638     */
1639     public void shutdown() {
1640     checkPermission();
1641 dl 1.14 advanceRunLevel(SHUTDOWN);
1642     tryTerminate(false);
1643 jsr166 1.1 }
1644    
1645     /**
1646 jsr166 1.9 * Attempts to cancel and/or stop all tasks, and reject all
1647     * subsequently submitted tasks. Tasks that are in the process of
1648     * being submitted or executed concurrently during the course of
1649     * this method may or may not be rejected. This method cancels
1650     * both existing and unexecuted tasks, in order to permit
1651     * termination in the presence of task dependencies. So the method
1652     * always returns an empty list (unlike the case for some other
1653     * Executors).
1654 jsr166 1.1 *
1655     * @return an empty list
1656     * @throws SecurityException if a security manager exists and
1657     * the caller is not permitted to modify threads
1658     * because it does not hold {@link
1659     * java.lang.RuntimePermission}{@code ("modifyThread")}
1660     */
1661     public List<Runnable> shutdownNow() {
1662     checkPermission();
1663 dl 1.14 tryTerminate(true);
1664 jsr166 1.1 return Collections.emptyList();
1665     }
1666    
1667     /**
1668     * Returns {@code true} if all tasks have completed following shut down.
1669     *
1670     * @return {@code true} if all tasks have completed following shut down
1671     */
1672     public boolean isTerminated() {
1673 dl 1.14 return runState >= TERMINATED;
1674 jsr166 1.1 }
1675    
1676     /**
1677     * Returns {@code true} if the process of termination has
1678 jsr166 1.9 * commenced but not yet completed. This method may be useful for
1679     * debugging. A return of {@code true} reported a sufficient
1680     * period after shutdown may indicate that submitted tasks have
1681     * ignored or suppressed interruption, causing this executor not
1682     * to properly terminate.
1683 jsr166 1.1 *
1684 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
1685 jsr166 1.1 */
1686     public boolean isTerminating() {
1687 dl 1.14 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1688 jsr166 1.1 }
1689    
1690     /**
1691     * Returns {@code true} if this pool has been shut down.
1692     *
1693     * @return {@code true} if this pool has been shut down
1694     */
1695     public boolean isShutdown() {
1696 dl 1.14 return runState >= SHUTDOWN;
1697 jsr166 1.9 }
1698    
1699     /**
1700 jsr166 1.1 * Blocks until all tasks have completed execution after a shutdown
1701     * request, or the timeout occurs, or the current thread is
1702     * interrupted, whichever happens first.
1703     *
1704     * @param timeout the maximum time to wait
1705     * @param unit the time unit of the timeout argument
1706     * @return {@code true} if this executor terminated and
1707     * {@code false} if the timeout elapsed before termination
1708     * @throws InterruptedException if interrupted while waiting
1709     */
1710     public boolean awaitTermination(long timeout, TimeUnit unit)
1711     throws InterruptedException {
1712 dl 1.18 try {
1713     return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1714 jsr166 1.28 } catch (TimeoutException ex) {
1715 dl 1.18 return false;
1716     }
1717 jsr166 1.1 }
1718    
1719     /**
1720     * Interface for extending managed parallelism for tasks running
1721 jsr166 1.8 * in {@link ForkJoinPool}s.
1722     *
1723 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
1724     * {@code isReleasable} must return {@code true} if blocking is
1725     * not necessary. Method {@code block} blocks the current thread
1726     * if necessary (perhaps internally invoking {@code isReleasable}
1727     * before actually blocking). The unusual methods in this API
1728     * accommodate synchronizers that may, but don't usually, block
1729     * for long periods. Similarly, they allow more efficient internal
1730     * handling of cases in which additional workers may be, but
1731     * usually are not, needed to ensure sufficient parallelism.
1732     * Toward this end, implementations of method {@code isReleasable}
1733     * must be amenable to repeated invocation.
1734 jsr166 1.1 *
1735     * <p>For example, here is a ManagedBlocker based on a
1736     * ReentrantLock:
1737     * <pre> {@code
1738     * class ManagedLocker implements ManagedBlocker {
1739     * final ReentrantLock lock;
1740     * boolean hasLock = false;
1741     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1742     * public boolean block() {
1743     * if (!hasLock)
1744     * lock.lock();
1745     * return true;
1746     * }
1747     * public boolean isReleasable() {
1748     * return hasLock || (hasLock = lock.tryLock());
1749     * }
1750     * }}</pre>
1751 dl 1.19 *
1752     * <p>Here is a class that possibly blocks waiting for an
1753     * item on a given queue:
1754     * <pre> {@code
1755     * class QueueTaker<E> implements ManagedBlocker {
1756     * final BlockingQueue<E> queue;
1757     * volatile E item = null;
1758     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1759     * public boolean block() throws InterruptedException {
1760     * if (item == null)
1761 dl 1.23 * item = queue.take();
1762 dl 1.19 * return true;
1763     * }
1764     * public boolean isReleasable() {
1765 dl 1.23 * return item != null || (item = queue.poll()) != null;
1766 dl 1.19 * }
1767     * public E getItem() { // call after pool.managedBlock completes
1768     * return item;
1769     * }
1770     * }}</pre>
1771 jsr166 1.1 */
1772     public static interface ManagedBlocker {
1773     /**
1774     * Possibly blocks the current thread, for example waiting for
1775     * a lock or condition.
1776     *
1777 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
1778     * (i.e., if isReleasable would return true)
1779 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
1780     * (the method is not required to do so, but is allowed to)
1781     */
1782     boolean block() throws InterruptedException;
1783    
1784     /**
1785 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
1786 jsr166 1.1 */
1787     boolean isReleasable();
1788     }
1789    
1790     /**
1791     * Blocks in accord with the given blocker. If the current thread
1792 jsr166 1.8 * is a {@link ForkJoinWorkerThread}, this method possibly
1793     * arranges for a spare thread to be activated if necessary to
1794 dl 1.18 * ensure sufficient parallelism while the current thread is blocked.
1795 jsr166 1.1 *
1796 jsr166 1.8 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1797     * behaviorally equivalent to
1798 jsr166 1.1 * <pre> {@code
1799     * while (!blocker.isReleasable())
1800     * if (blocker.block())
1801     * return;
1802     * }</pre>
1803 jsr166 1.8 *
1804     * If the caller is a {@code ForkJoinTask}, then the pool may
1805     * first be expanded to ensure parallelism, and later adjusted.
1806 jsr166 1.1 *
1807     * @param blocker the blocker
1808     * @throws InterruptedException if blocker.block did so
1809     */
1810 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
1811 jsr166 1.1 throws InterruptedException {
1812     Thread t = Thread.currentThread();
1813 dl 1.19 if (t instanceof ForkJoinWorkerThread) {
1814     ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1815     w.pool.awaitBlocker(blocker);
1816     }
1817 dl 1.18 else {
1818     do {} while (!blocker.isReleasable() && !blocker.block());
1819     }
1820 jsr166 1.1 }
1821    
1822 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
1823     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1824     // implement RunnableFuture.
1825 jsr166 1.1
1826     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1827 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1828 jsr166 1.1 }
1829    
1830     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1831 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1832 jsr166 1.1 }
1833    
1834     // Unsafe mechanics
1835    
1836     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1837 dl 1.14 private static final long workerCountsOffset =
1838     objectFieldOffset("workerCounts", ForkJoinPool.class);
1839     private static final long runStateOffset =
1840     objectFieldOffset("runState", ForkJoinPool.class);
1841 jsr166 1.2 private static final long eventCountOffset =
1842 jsr166 1.3 objectFieldOffset("eventCount", ForkJoinPool.class);
1843 dl 1.14 private static final long eventWaitersOffset =
1844 jsr166 1.33 objectFieldOffset("eventWaiters", ForkJoinPool.class);
1845 dl 1.14 private static final long stealCountOffset =
1846 jsr166 1.33 objectFieldOffset("stealCount", ForkJoinPool.class);
1847 dl 1.19 private static final long spareWaitersOffset =
1848 jsr166 1.33 objectFieldOffset("spareWaiters", ForkJoinPool.class);
1849 jsr166 1.3
1850     private static long objectFieldOffset(String field, Class<?> klazz) {
1851     try {
1852     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1853     } catch (NoSuchFieldException e) {
1854     // Convert Exception to corresponding Error
1855     NoSuchFieldError error = new NoSuchFieldError(field);
1856     error.initCause(e);
1857     throw error;
1858     }
1859     }
1860 jsr166 1.1 }