ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.29
Committed: Mon Sep 6 21:36:44 2010 UTC (13 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.28: +4 -15 lines
Log Message:
Use enhanced for loop.

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.ArrayList;
10     import java.util.Arrays;
11     import java.util.Collection;
12     import java.util.Collections;
13     import java.util.List;
14     import java.util.concurrent.locks.LockSupport;
15     import java.util.concurrent.locks.ReentrantLock;
16     import java.util.concurrent.atomic.AtomicInteger;
17 dl 1.14 import java.util.concurrent.CountDownLatch;
18 jsr166 1.1
19     /**
20 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
22 dl 1.18 * from non-{@code ForkJoinTask} clients, as well as management and
23 jsr166 1.11 * monitoring operations.
24 jsr166 1.1 *
25 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
26     * ExecutorService} mainly by virtue of employing
27     * <em>work-stealing</em>: all threads in the pool attempt to find and
28     * execute subtasks created by other active tasks (eventually blocking
29     * waiting for work if none exist). This enables efficient processing
30     * when most tasks spawn other subtasks (as do most {@code
31 dl 1.18 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
32     * constructors, {@code ForkJoinPool}s may also be appropriate for use
33     * with event-style tasks that are never joined.
34 jsr166 1.1 *
35 jsr166 1.9 * <p>A {@code ForkJoinPool} is constructed with a given target
36     * parallelism level; by default, equal to the number of available
37 dl 1.18 * processors. The pool attempts to maintain enough active (or
38     * available) threads by dynamically adding, suspending, or resuming
39     * internal worker threads, even if some tasks are stalled waiting to
40     * join others. However, no such adjustments are guaranteed in the
41     * face of blocked IO or other unmanaged synchronization. The nested
42     * {@link ManagedBlocker} interface enables extension of the kinds of
43     * synchronization accommodated.
44 jsr166 1.1 *
45     * <p>In addition to execution and lifecycle control methods, this
46     * class provides status check methods (for example
47 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
48 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
49 jsr166 1.4 * {@link #toString} returns indications of pool state in a
50 jsr166 1.1 * convenient form for informal monitoring.
51     *
52 dl 1.18 * <p> As is the case with other ExecutorServices, there are three
53 dl 1.19 * main task execution methods summarized in the following
54 dl 1.18 * table. These are designed to be used by clients not already engaged
55     * in fork/join computations in the current pool. The main forms of
56     * these methods accept instances of {@code ForkJoinTask}, but
57     * overloaded forms also allow mixed execution of plain {@code
58     * Runnable}- or {@code Callable}- based activities as well. However,
59     * tasks that are already executing in a pool should normally
60     * <em>NOT</em> use these pool execution methods, but instead use the
61 dl 1.19 * within-computation forms listed in the table.
62 dl 1.18 *
63     * <table BORDER CELLPADDING=3 CELLSPACING=1>
64     * <tr>
65     * <td></td>
66     * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
67     * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
68     * </tr>
69     * <tr>
70 jsr166 1.25 * <td> <b>Arrange async execution</td>
71 dl 1.18 * <td> {@link #execute(ForkJoinTask)}</td>
72     * <td> {@link ForkJoinTask#fork}</td>
73     * </tr>
74     * <tr>
75     * <td> <b>Await and obtain result</td>
76     * <td> {@link #invoke(ForkJoinTask)}</td>
77     * <td> {@link ForkJoinTask#invoke}</td>
78     * </tr>
79     * <tr>
80     * <td> <b>Arrange exec and obtain Future</td>
81     * <td> {@link #submit(ForkJoinTask)}</td>
82     * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
83     * </tr>
84     * </table>
85 dl 1.19 *
86 jsr166 1.9 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
87     * used for all parallel task execution in a program or subsystem.
88     * Otherwise, use would not usually outweigh the construction and
89     * bookkeeping overhead of creating a large set of threads. For
90     * example, a common pool could be used for the {@code SortTasks}
91     * illustrated in {@link RecursiveAction}. Because {@code
92     * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
93     * daemon} mode, there is typically no need to explicitly {@link
94     * #shutdown} such a pool upon program exit.
95     *
96     * <pre>
97     * static final ForkJoinPool mainPool = new ForkJoinPool();
98     * ...
99     * public void sort(long[] array) {
100     * mainPool.invoke(new SortTask(array, 0, array.length));
101     * }
102     * </pre>
103     *
104 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
105     * maximum number of running threads to 32767. Attempts to create
106 jsr166 1.11 * pools with greater than the maximum number result in
107 jsr166 1.8 * {@code IllegalArgumentException}.
108 jsr166 1.1 *
109 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
110 dl 1.19 * {@link RejectedExecutionException}) only when the pool is shut down
111 dl 1.20 * or internal resources have been exhausted.
112 jsr166 1.11 *
113 jsr166 1.1 * @since 1.7
114     * @author Doug Lea
115     */
116     public class ForkJoinPool extends AbstractExecutorService {
117    
118     /*
119 dl 1.14 * Implementation Overview
120     *
121     * This class provides the central bookkeeping and control for a
122     * set of worker threads: Submissions from non-FJ threads enter
123     * into a submission queue. Workers take these tasks and typically
124     * split them into subtasks that may be stolen by other workers.
125     * The main work-stealing mechanics implemented in class
126     * ForkJoinWorkerThread give first priority to processing tasks
127     * from their own queues (LIFO or FIFO, depending on mode), then
128     * to randomized FIFO steals of tasks in other worker queues, and
129     * lastly to new submissions. These mechanics do not consider
130     * affinities, loads, cache localities, etc, so rarely provide the
131     * best possible performance on a given machine, but portably
132     * provide good throughput by averaging over these factors.
133     * (Further, even if we did try to use such information, we do not
134     * usually have a basis for exploiting it. For example, some sets
135     * of tasks profit from cache affinities, but others are harmed by
136     * cache pollution effects.)
137     *
138 dl 1.19 * Beyond work-stealing support and essential bookkeeping, the
139     * main responsibility of this framework is to take actions when
140     * one worker is waiting to join a task stolen (or always held by)
141 jsr166 1.25 * another. Because we are multiplexing many tasks on to a pool
142 dl 1.19 * of workers, we can't just let them block (as in Thread.join).
143     * We also cannot just reassign the joiner's run-time stack with
144     * another and replace it later, which would be a form of
145     * "continuation", that even if possible is not necessarily a good
146     * idea. Given that the creation costs of most threads on most
147     * systems mainly surrounds setting up runtime stacks, thread
148     * creation and switching is usually not much more expensive than
149     * stack creation and switching, and is more flexible). Instead we
150     * combine two tactics:
151     *
152     * Helping: Arranging for the joiner to execute some task that it
153     * would be running if the steal had not occurred. Method
154     * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
155     * links to try to find such a task.
156     *
157     * Compensating: Unless there are already enough live threads,
158 jsr166 1.26 * method helpMaintainParallelism() may create or
159 dl 1.19 * re-activate a spare thread to compensate for blocked
160     * joiners until they unblock.
161     *
162 dl 1.24 * It is impossible to keep exactly the target (parallelism)
163     * number of threads running at any given time. Determining
164     * existence of conservatively safe helping targets, the
165     * availability of already-created spares, and the apparent need
166     * to create new spares are all racy and require heuristic
167     * guidance, so we rely on multiple retries of each. Compensation
168     * occurs in slow-motion. It is triggered only upon timeouts of
169     * Object.wait used for joins. This reduces poor decisions that
170     * would otherwise be made when threads are waiting for others
171     * that are stalled because of unrelated activities such as
172     * garbage collection.
173 dl 1.19 *
174     * The ManagedBlocker extension API can't use helping so relies
175     * only on compensation in method awaitBlocker.
176     *
177 dl 1.14 * The main throughput advantages of work-stealing stem from
178     * decentralized control -- workers mostly steal tasks from each
179     * other. We do not want to negate this by creating bottlenecks
180 dl 1.19 * implementing other management responsibilities. So we use a
181     * collection of techniques that avoid, reduce, or cope well with
182     * contention. These entail several instances of bit-packing into
183     * CASable fields to maintain only the minimally required
184     * atomicity. To enable such packing, we restrict maximum
185     * parallelism to (1<<15)-1 (enabling twice this (to accommodate
186     * unbalanced increments and decrements) to fit into a 16 bit
187     * field, which is far in excess of normal operating range. Even
188     * though updates to some of these bookkeeping fields do sometimes
189     * contend with each other, they don't normally cache-contend with
190     * updates to others enough to warrant memory padding or
191     * isolation. So they are all held as fields of ForkJoinPool
192     * objects. The main capabilities are as follows:
193 dl 1.14 *
194     * 1. Creating and removing workers. Workers are recorded in the
195     * "workers" array. This is an array as opposed to some other data
196     * structure to support index-based random steals by workers.
197     * Updates to the array recording new workers and unrecording
198     * terminated ones are protected from each other by a lock
199     * (workerLock) but the array is otherwise concurrently readable,
200     * and accessed directly by workers. To simplify index-based
201     * operations, the array size is always a power of two, and all
202 dl 1.17 * readers must tolerate null slots. Currently, all worker thread
203     * creation is on-demand, triggered by task submissions,
204     * replacement of terminated workers, and/or compensation for
205     * blocked workers. However, all other support code is set up to
206     * work with other policies.
207 dl 1.14 *
208 dl 1.19 * To ensure that we do not hold on to worker references that
209     * would prevent GC, ALL accesses to workers are via indices into
210     * the workers array (which is one source of some of the unusual
211     * code constructions here). In essence, the workers array serves
212     * as a WeakReference mechanism. Thus for example the event queue
213     * stores worker indices, not worker references. Access to the
214     * workers in associated methods (for example releaseEventWaiters)
215     * must both index-check and null-check the IDs. All such accesses
216     * ignore bad IDs by returning out early from what they are doing,
217     * since this can only be associated with shutdown, in which case
218     * it is OK to give up. On termination, we just clobber these
219     * data structures without trying to use them.
220     *
221 dl 1.14 * 2. Bookkeeping for dynamically adding and removing workers. We
222 dl 1.18 * aim to approximately maintain the given level of parallelism.
223     * When some workers are known to be blocked (on joins or via
224 dl 1.14 * ManagedBlocker), we may create or resume others to take their
225     * place until they unblock (see below). Implementing this
226     * requires counts of the number of "running" threads (i.e., those
227 jsr166 1.25 * that are neither blocked nor artificially suspended) as well as
228 dl 1.14 * the total number. These two values are packed into one field,
229     * "workerCounts" because we need accurate snapshots when deciding
230 dl 1.19 * to create, resume or suspend. Note however that the
231 jsr166 1.25 * correspondence of these counts to reality is not guaranteed. In
232 dl 1.19 * particular updates for unblocked threads may lag until they
233     * actually wake up.
234 dl 1.14 *
235     * 3. Maintaining global run state. The run state of the pool
236     * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
237     * those in other Executor implementations, as well as a count of
238     * "active" workers -- those that are, or soon will be, or
239     * recently were executing tasks. The runLevel and active count
240     * are packed together in order to correctly trigger shutdown and
241     * termination. Without care, active counts can be subject to very
242     * high contention. We substantially reduce this contention by
243     * relaxing update rules. A worker must claim active status
244     * prospectively, by activating if it sees that a submitted or
245     * stealable task exists (it may find after activating that the
246     * task no longer exists). It stays active while processing this
247     * task (if it exists) and any other local subtasks it produces,
248     * until it cannot find any other tasks. It then tries
249     * inactivating (see method preStep), but upon update contention
250     * instead scans for more tasks, later retrying inactivation if it
251     * doesn't find any.
252     *
253     * 4. Managing idle workers waiting for tasks. We cannot let
254     * workers spin indefinitely scanning for tasks when none are
255     * available. On the other hand, we must quickly prod them into
256     * action when new tasks are submitted or generated. We
257     * park/unpark these idle workers using an event-count scheme.
258     * Field eventCount is incremented upon events that may enable
259     * workers that previously could not find a task to now find one:
260     * Submission of a new task to the pool, or another worker pushing
261     * a task onto a previously empty queue. (We also use this
262 dl 1.22 * mechanism for configuration and termination actions that
263     * require wakeups of idle workers). Each worker maintains its
264     * last known event count, and blocks when a scan for work did not
265     * find a task AND its lastEventCount matches the current
266     * eventCount. Waiting idle workers are recorded in a variant of
267     * Treiber stack headed by field eventWaiters which, when nonzero,
268     * encodes the thread index and count awaited for by the worker
269     * thread most recently calling eventSync. This thread in turn has
270     * a record (field nextEventWaiter) for the next waiting worker.
271     * In addition to allowing simpler decisions about need for
272     * wakeup, the event count bits in eventWaiters serve the role of
273     * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
274 dl 1.24 * released threads also try to release at most two others. The
275     * net effect is a tree-like diffusion of signals, where released
276     * threads (and possibly others) help with unparks. To further
277     * reduce contention effects a bit, failed CASes to increment
278     * field eventCount are tolerated without retries in signalWork.
279 dl 1.14 * Conceptually they are merged into the same event, which is OK
280     * when their only purpose is to enable workers to scan for work.
281     *
282 dl 1.24 * 5. Managing suspension of extra workers. When a worker notices
283     * (usually upon timeout of a wait()) that there are too few
284     * running threads, we may create a new thread to maintain
285     * parallelism level, or at least avoid starvation. Usually, extra
286     * threads are needed for only very short periods, yet join
287     * dependencies are such that we sometimes need them in
288     * bursts. Rather than create new threads each time this happens,
289     * we suspend no-longer-needed extra ones as "spares". For most
290     * purposes, we don't distinguish "extra" spare threads from
291     * normal "core" threads: On each call to preStep (the only point
292     * at which we can do this) a worker checks to see if there are
293     * now too many running workers, and if so, suspends itself.
294     * Method helpMaintainParallelism looks for suspended threads to
295     * resume before considering creating a new replacement. The
296     * spares themselves are encoded on another variant of a Treiber
297     * Stack, headed at field "spareWaiters". Note that the use of
298     * spares is intrinsically racy. One thread may become a spare at
299     * about the same time as another is needlessly being created. We
300     * counteract this and related slop in part by requiring resumed
301     * spares to immediately recheck (in preStep) to see whether they
302     * they should re-suspend.
303     *
304     * 6. Killing off unneeded workers. A timeout mechanism is used to
305     * shed unused workers: The oldest (first) event queue waiter uses
306     * a timed rather than hard wait. When this wait times out without
307     * a normal wakeup, it tries to shutdown any one (for convenience
308     * the newest) other spare or event waiter via
309     * tryShutdownUnusedWorker. This eventually reduces the number of
310     * worker threads to a minimum of one after a long enough period
311     * without use.
312 dl 1.22 *
313     * 7. Deciding when to create new workers. The main dynamic
314 dl 1.19 * control in this class is deciding when to create extra threads
315     * in method helpMaintainParallelism. We would like to keep
316 jsr166 1.25 * exactly #parallelism threads running, which is an impossible
317 dl 1.19 * task. We always need to create one when the number of running
318     * threads would become zero and all workers are busy. Beyond
319 jsr166 1.26 * this, we must rely on heuristics that work well in the
320     * presence of transient phenomena such as GC stalls, dynamic
321 dl 1.19 * compilation, and wake-up lags. These transients are extremely
322     * common -- we are normally trying to fully saturate the CPUs on
323     * a machine, so almost any activity other than running tasks
324 dl 1.24 * impedes accuracy. Our main defense is to allow parallelism to
325     * lapse for a while during joins, and use a timeout to see if,
326     * after the resulting settling, there is still a need for
327     * additional workers. This also better copes with the fact that
328     * some of the methods in this class tend to never become compiled
329     * (but are interpreted), so some components of the entire set of
330     * controls might execute 100 times faster than others. And
331     * similarly for cases where the apparent lack of work is just due
332     * to GC stalls and other transient system activity.
333 dl 1.14 *
334     * Beware that there is a lot of representation-level coupling
335     * among classes ForkJoinPool, ForkJoinWorkerThread, and
336     * ForkJoinTask. For example, direct access to "workers" array by
337     * workers, and direct access to ForkJoinTask.status by both
338     * ForkJoinPool and ForkJoinWorkerThread. There is little point
339     * trying to reduce this, since any associated future changes in
340     * representations will need to be accompanied by algorithmic
341     * changes anyway.
342     *
343     * Style notes: There are lots of inline assignments (of form
344     * "while ((local = field) != 0)") which are usually the simplest
345 dl 1.19 * way to ensure the required read orderings (which are sometimes
346     * critical). Also several occurrences of the unusual "do {}
347 jsr166 1.28 * while (!cas...)" which is the simplest way to force an update of
348 dl 1.19 * a CAS'ed variable. There are also other coding oddities that
349     * help some methods perform reasonably even when interpreted (not
350     * compiled), at the expense of some messy constructions that
351     * reduce byte code counts.
352 dl 1.14 *
353     * The order of declarations in this file is: (1) statics (2)
354     * fields (along with constants used when unpacking some of them)
355     * (3) internal control methods (4) callbacks and other support
356     * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
357     * methods (plus a few little helpers).
358 jsr166 1.1 */
359    
360     /**
361 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
362     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
363     * for {@code ForkJoinWorkerThread} subclasses that extend base
364     * functionality or initialize threads with different contexts.
365 jsr166 1.1 */
366     public static interface ForkJoinWorkerThreadFactory {
367     /**
368     * Returns a new worker thread operating in the given pool.
369     *
370     * @param pool the pool this thread works in
371 jsr166 1.11 * @throws NullPointerException if the pool is null
372 jsr166 1.1 */
373     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
374     }
375    
376     /**
377     * Default ForkJoinWorkerThreadFactory implementation; creates a
378     * new ForkJoinWorkerThread.
379     */
380 dl 1.18 static class DefaultForkJoinWorkerThreadFactory
381 jsr166 1.1 implements ForkJoinWorkerThreadFactory {
382     public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
383 dl 1.14 return new ForkJoinWorkerThread(pool);
384 jsr166 1.1 }
385     }
386    
387     /**
388     * Creates a new ForkJoinWorkerThread. This factory is used unless
389     * overridden in ForkJoinPool constructors.
390     */
391     public static final ForkJoinWorkerThreadFactory
392     defaultForkJoinWorkerThreadFactory =
393     new DefaultForkJoinWorkerThreadFactory();
394    
395     /**
396     * Permission required for callers of methods that may start or
397     * kill threads.
398     */
399     private static final RuntimePermission modifyThreadPermission =
400     new RuntimePermission("modifyThread");
401    
402     /**
403     * If there is a security manager, makes sure caller has
404     * permission to modify threads.
405     */
406     private static void checkPermission() {
407     SecurityManager security = System.getSecurityManager();
408     if (security != null)
409     security.checkPermission(modifyThreadPermission);
410     }
411    
412     /**
413     * Generator for assigning sequence numbers as pool names.
414     */
415     private static final AtomicInteger poolNumberGenerator =
416     new AtomicInteger();
417    
418     /**
419 dl 1.24 * The time to block in a join (see awaitJoin) before checking if
420     * a new worker should be (re)started to maintain parallelism
421 jsr166 1.25 * level. The value should be short enough to maintain global
422 dl 1.24 * responsiveness and progress but long enough to avoid
423     * counterproductive firings during GC stalls or unrelated system
424     * activity, and to not bog down systems with continual re-firings
425     * on GCs or legitimately long waits.
426     */
427     private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
428    
429     /**
430 dl 1.22 * The wakeup interval (in nanoseconds) for the oldest worker
431 dl 1.24 * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
432 dl 1.22 * the number of workers. The exact value does not matter too
433     * much, but should be long enough to slowly release resources
434     * during long periods without use without disrupting normal use.
435     */
436     private static final long SHRINK_RATE_NANOS =
437 dl 1.24 30L * 1000L * 1000L * 1000L; // 2 per minute
438 dl 1.22
439     /**
440 dl 1.19 * Absolute bound for parallelism level. Twice this number plus
441     * one (i.e., 0xfff) must fit into a 16bit field to enable
442     * word-packing for some counts and indices.
443 dl 1.14 */
444 dl 1.19 private static final int MAX_WORKERS = 0x7fff;
445 dl 1.14
446     /**
447     * Array holding all worker threads in the pool. Array size must
448     * be a power of two. Updates and replacements are protected by
449     * workerLock, but the array is always kept in a consistent enough
450     * state to be randomly accessed without locking by workers
451     * performing work-stealing, as well as other traversal-based
452     * methods in this class. All readers must tolerate that some
453     * array slots may be null.
454 jsr166 1.1 */
455     volatile ForkJoinWorkerThread[] workers;
456    
457     /**
458 dl 1.14 * Queue for external submissions.
459 jsr166 1.1 */
460 dl 1.14 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
461 jsr166 1.1
462     /**
463 dl 1.14 * Lock protecting updates to workers array.
464 jsr166 1.1 */
465 dl 1.14 private final ReentrantLock workerLock;
466 jsr166 1.1
467     /**
468 dl 1.14 * Latch released upon termination.
469 jsr166 1.1 */
470 dl 1.18 private final Phaser termination;
471 jsr166 1.1
472     /**
473     * Creation factory for worker threads.
474     */
475     private final ForkJoinWorkerThreadFactory factory;
476    
477     /**
478 dl 1.14 * Sum of per-thread steal counts, updated only when threads are
479     * idle or terminating.
480 jsr166 1.1 */
481 dl 1.14 private volatile long stealCount;
482 jsr166 1.1
483     /**
484 jsr166 1.25 * Encoded record of top of Treiber stack of threads waiting for
485 dl 1.14 * events. The top 32 bits contain the count being waited for. The
486 dl 1.19 * bottom 16 bits contains one plus the pool index of waiting
487     * worker thread. (Bits 16-31 are unused.)
488 jsr166 1.1 */
489 dl 1.14 private volatile long eventWaiters;
490    
491     private static final int EVENT_COUNT_SHIFT = 32;
492 dl 1.19 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
493 jsr166 1.1
494     /**
495 dl 1.14 * A counter for events that may wake up worker threads:
496     * - Submission of a new task to the pool
497     * - A worker pushing a task on an empty queue
498 dl 1.19 * - termination
499 jsr166 1.1 */
500 dl 1.14 private volatile int eventCount;
501    
502     /**
503 jsr166 1.25 * Encoded record of top of Treiber stack of spare threads waiting
504 dl 1.19 * for resumption. The top 16 bits contain an arbitrary count to
505     * avoid ABA effects. The bottom 16bits contains one plus the pool
506     * index of waiting worker thread.
507     */
508     private volatile int spareWaiters;
509    
510     private static final int SPARE_COUNT_SHIFT = 16;
511     private static final int SPARE_ID_MASK = (1 << 16) - 1;
512    
513     /**
514 dl 1.14 * Lifecycle control. The low word contains the number of workers
515     * that are (probably) executing tasks. This value is atomically
516     * incremented before a worker gets a task to run, and decremented
517     * when worker has no tasks and cannot find any. Bits 16-18
518     * contain runLevel value. When all are zero, the pool is
519     * running. Level transitions are monotonic (running -> shutdown
520     * -> terminating -> terminated) so each transition adds a bit.
521     * These are bundled together to ensure consistent read for
522     * termination checks (i.e., that runLevel is at least SHUTDOWN
523     * and active threads is zero).
524 dl 1.22 *
525     * Notes: Most direct CASes are dependent on these bitfield
526     * positions. Also, this field is non-private to enable direct
527     * performance-sensitive CASes in ForkJoinWorkerThread.
528 dl 1.14 */
529 dl 1.22 volatile int runState;
530 dl 1.14
531     // Note: The order among run level values matters.
532     private static final int RUNLEVEL_SHIFT = 16;
533     private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
534     private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
535     private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
536     private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
537 jsr166 1.1
538     /**
539 dl 1.14 * Holds number of total (i.e., created and not yet terminated)
540     * and running (i.e., not blocked on joins or other managed sync)
541     * threads, packed together to ensure consistent snapshot when
542     * making decisions about creating and suspending spare
543     * threads. Updated only by CAS. Note that adding a new worker
544     * requires incrementing both counts, since workers start off in
545 dl 1.19 * running state.
546 dl 1.14 */
547     private volatile int workerCounts;
548    
549     private static final int TOTAL_COUNT_SHIFT = 16;
550     private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
551     private static final int ONE_RUNNING = 1;
552     private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
553    
554 jsr166 1.1 /**
555 dl 1.14 * The target parallelism level.
556 dl 1.18 * Accessed directly by ForkJoinWorkerThreads.
557 jsr166 1.1 */
558 dl 1.18 final int parallelism;
559 jsr166 1.1
560     /**
561 dl 1.14 * True if use local fifo, not default lifo, for local polling
562 dl 1.18 * Read by, and replicated by ForkJoinWorkerThreads
563 jsr166 1.1 */
564 dl 1.18 final boolean locallyFifo;
565 jsr166 1.1
566     /**
567 dl 1.18 * The uncaught exception handler used when any worker abruptly
568     * terminates.
569 jsr166 1.1 */
570 dl 1.18 private final Thread.UncaughtExceptionHandler ueh;
571 jsr166 1.1
572     /**
573 dl 1.14 * Pool number, just for assigning useful names to worker threads
574 jsr166 1.1 */
575 dl 1.14 private final int poolNumber;
576 jsr166 1.1
577 dl 1.22 // Utilities for CASing fields. Note that most of these
578     // are usually manually inlined by callers
579 jsr166 1.1
580     /**
581 dl 1.19 * Increments running count part of workerCounts
582 jsr166 1.1 */
583 dl 1.18 final void incrementRunningCount() {
584     int c;
585 dl 1.14 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
586 dl 1.19 c = workerCounts,
587 dl 1.18 c + ONE_RUNNING));
588 jsr166 1.1 }
589 dl 1.19
590 jsr166 1.1 /**
591 dl 1.18 * Tries to decrement running count unless already zero
592 dl 1.17 */
593     final boolean tryDecrementRunningCount() {
594     int wc = workerCounts;
595     if ((wc & RUNNING_COUNT_MASK) == 0)
596     return false;
597     return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
598     wc, wc - ONE_RUNNING);
599     }
600    
601     /**
602 dl 1.19 * Forces decrement of encoded workerCounts, awaiting nonzero if
603     * (rarely) necessary when other count updates lag.
604     *
605     * @param dr -- either zero or ONE_RUNNING
606     * @param dt == either zero or ONE_TOTAL
607     */
608     private void decrementWorkerCounts(int dr, int dt) {
609     for (;;) {
610     int wc = workerCounts;
611     if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
612 dl 1.22 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
613     if ((runState & TERMINATED) != 0)
614     return; // lagging termination on a backout
615 dl 1.19 Thread.yield();
616 dl 1.22 }
617 dl 1.19 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
618     wc, wc - (dr + dt)))
619     return;
620     }
621     }
622    
623     /**
624 jsr166 1.1 * Tries decrementing active count; fails on contention.
625 dl 1.14 * Called when workers cannot find tasks to run.
626     */
627     final boolean tryDecrementActiveCount() {
628     int c;
629     return UNSAFE.compareAndSwapInt(this, runStateOffset,
630 dl 1.22 c = runState, c - 1);
631 dl 1.14 }
632    
633     /**
634     * Advances to at least the given level. Returns true if not
635     * already in at least the given level.
636     */
637     private boolean advanceRunLevel(int level) {
638     for (;;) {
639     int s = runState;
640     if ((s & level) != 0)
641     return false;
642     if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
643     return true;
644     }
645     }
646    
647     // workers array maintenance
648    
649     /**
650     * Records and returns a workers array index for new worker.
651     */
652     private int recordWorker(ForkJoinWorkerThread w) {
653     // Try using slot totalCount-1. If not available, scan and/or resize
654     int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
655     final ReentrantLock lock = this.workerLock;
656     lock.lock();
657     try {
658     ForkJoinWorkerThread[] ws = workers;
659 dl 1.19 int n = ws.length;
660     if (k < 0 || k >= n || ws[k] != null) {
661     for (k = 0; k < n && ws[k] != null; ++k)
662 dl 1.14 ;
663 dl 1.19 if (k == n)
664     ws = Arrays.copyOf(ws, n << 1);
665 dl 1.14 }
666     ws[k] = w;
667     workers = ws; // volatile array write ensures slot visibility
668     } finally {
669     lock.unlock();
670     }
671     return k;
672     }
673    
674     /**
675 jsr166 1.29 * Nulls out record of worker in workers array.
676 dl 1.14 */
677     private void forgetWorker(ForkJoinWorkerThread w) {
678     int idx = w.poolIndex;
679 jsr166 1.25 // Locking helps method recordWorker avoid unnecessary expansion
680 dl 1.14 final ReentrantLock lock = this.workerLock;
681     lock.lock();
682     try {
683     ForkJoinWorkerThread[] ws = workers;
684     if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
685     ws[idx] = null;
686     } finally {
687     lock.unlock();
688     }
689     }
690    
691     /**
692     * Final callback from terminating worker. Removes record of
693     * worker from array, and adjusts counts. If pool is shutting
694 jsr166 1.25 * down, tries to complete termination.
695 dl 1.14 *
696     * @param w the worker
697     */
698     final void workerTerminated(ForkJoinWorkerThread w) {
699     forgetWorker(w);
700 dl 1.19 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
701     while (w.stealCount != 0) // collect final count
702     tryAccumulateStealCount(w);
703     tryTerminate(false);
704 dl 1.14 }
705    
706     // Waiting for and signalling events
707    
708     /**
709     * Releases workers blocked on a count not equal to current count.
710 dl 1.19 * Normally called after precheck that eventWaiters isn't zero to
711 dl 1.22 * avoid wasted array checks. Gives up upon a change in count or
712 dl 1.24 * upon releasing two workers, letting others take over.
713 dl 1.14 */
714 dl 1.22 private void releaseEventWaiters() {
715 dl 1.19 ForkJoinWorkerThread[] ws = workers;
716     int n = ws.length;
717 dl 1.22 long h = eventWaiters;
718     int ec = eventCount;
719 dl 1.24 boolean releasedOne = false;
720 dl 1.22 ForkJoinWorkerThread w; int id;
721 dl 1.24 while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
722     (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
723     id < n && (w = ws[id]) != null) {
724     if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
725     h, w.nextWaiter)) {
726     LockSupport.unpark(w);
727     if (releasedOne) // exit on second release
728     break;
729     releasedOne = true;
730     }
731     if (eventCount != ec)
732 dl 1.19 break;
733 dl 1.24 h = eventWaiters;
734 dl 1.14 }
735     }
736    
737     /**
738 dl 1.19 * Tries to advance eventCount and releases waiters. Called only
739     * from workers.
740 dl 1.18 */
741 dl 1.19 final void signalWork() {
742     int c; // try to increment event count -- CAS failure OK
743     UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
744     if (eventWaiters != 0L)
745 dl 1.22 releaseEventWaiters();
746 dl 1.18 }
747    
748     /**
749 dl 1.22 * Adds the given worker to event queue and blocks until
750 dl 1.24 * terminating or event count advances from the given value
751 dl 1.19 *
752     * @param w the calling worker thread
753 dl 1.24 * @param ec the count
754 dl 1.14 */
755 dl 1.24 private void eventSync(ForkJoinWorkerThread w, int ec) {
756 dl 1.22 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
757 dl 1.19 long h;
758     while ((runState < SHUTDOWN || !tryTerminate(false)) &&
759 dl 1.22 (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
760     (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
761     eventCount == ec) {
762 dl 1.19 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
763     w.nextWaiter = h, nh)) {
764 dl 1.22 awaitEvent(w, ec);
765     break;
766     }
767     }
768     }
769    
770     /**
771     * Blocks the given worker (that has already been entered as an
772     * event waiter) until terminating or event count advances from
773     * the given value. The oldest (first) waiter uses a timed wait to
774     * occasionally one-by-one shrink the number of workers (to a
775 dl 1.24 * minimum of one) if the pool has not been used for extended
776 dl 1.22 * periods.
777     *
778     * @param w the calling worker thread
779     * @param ec the count
780     */
781     private void awaitEvent(ForkJoinWorkerThread w, int ec) {
782     while (eventCount == ec) {
783     if (tryAccumulateStealCount(w)) { // transfer while idle
784     boolean untimed = (w.nextWaiter != 0L ||
785     (workerCounts & RUNNING_COUNT_MASK) <= 1);
786     long startTime = untimed? 0 : System.nanoTime();
787     Thread.interrupted(); // clear/ignore interrupt
788 dl 1.24 if (eventCount != ec || w.runState != 0 ||
789 dl 1.22 runState >= TERMINATING) // recheck after clear
790     break;
791     if (untimed)
792     LockSupport.park(w);
793     else {
794     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
795 dl 1.24 if (eventCount != ec || w.runState != 0 ||
796 dl 1.22 runState >= TERMINATING)
797 dl 1.19 break;
798 dl 1.22 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
799 dl 1.24 tryShutdownUnusedWorker(ec);
800 dl 1.19 }
801 dl 1.14 }
802     }
803 dl 1.22 }
804    
805 dl 1.24 // Maintaining parallelism
806 dl 1.19
807     /**
808     * Pushes worker onto the spare stack
809     */
810     final void pushSpare(ForkJoinWorkerThread w) {
811 dl 1.22 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
812 dl 1.19 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
813     w.nextSpare = spareWaiters,ns));
814 dl 1.14 }
815    
816     /**
817 dl 1.24 * Tries (once) to resume a spare if the number of running
818     * threads is less than target.
819 dl 1.14 */
820 dl 1.24 private void tryResumeSpare() {
821 dl 1.19 int sw, id;
822 dl 1.24 ForkJoinWorkerThread[] ws = workers;
823     int n = ws.length;
824 dl 1.19 ForkJoinWorkerThread w;
825 dl 1.24 if ((sw = spareWaiters) != 0 &&
826     (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
827     id < n && (w = ws[id]) != null &&
828     (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
829     spareWaiters == sw &&
830 dl 1.19 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
831 dl 1.22 sw, w.nextSpare)) {
832 dl 1.24 int c; // increment running count before resume
833 jsr166 1.28 do {} while (!UNSAFE.compareAndSwapInt
834     (this, workerCountsOffset,
835     c = workerCounts, c + ONE_RUNNING));
836 dl 1.24 if (w.tryUnsuspend())
837     LockSupport.unpark(w);
838     else // back out if w was shutdown
839     decrementWorkerCounts(ONE_RUNNING, 0);
840 dl 1.22 }
841     }
842    
843     /**
844 dl 1.24 * Tries to increase the number of running workers if below target
845     * parallelism: If a spare exists tries to resume it via
846     * tryResumeSpare. Otherwise, if not enough total workers or all
847 jsr166 1.25 * existing workers are busy, adds a new worker. In all cases also
848 dl 1.24 * helps wake up releasable workers waiting for work.
849 dl 1.22 */
850 dl 1.24 private void helpMaintainParallelism() {
851 dl 1.22 int pc = parallelism;
852 dl 1.24 int wc, rs, tc;
853     while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
854     (rs = runState) < TERMINATING) {
855     if (spareWaiters != 0)
856     tryResumeSpare();
857     else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
858     (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
859     break; // enough total
860     else if (runState == rs && workerCounts == wc &&
861     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
862     wc + (ONE_RUNNING|ONE_TOTAL))) {
863     ForkJoinWorkerThread w = null;
864     try {
865     w = factory.newThread(this);
866     } finally { // adjust on null or exceptional factory return
867     if (w == null) {
868     decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
869     tryTerminate(false); // handle failure during shutdown
870     }
871     }
872     if (w == null)
873 dl 1.22 break;
874 dl 1.24 w.start(recordWorker(w), ueh);
875     if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
876     int c; // advance event count
877     UNSAFE.compareAndSwapInt(this, eventCountOffset,
878     c = eventCount, c+1);
879     break; // add at most one unless total below target
880     }
881 dl 1.22 }
882     }
883 dl 1.24 if (eventWaiters != 0L)
884     releaseEventWaiters();
885 dl 1.22 }
886    
887     /**
888 dl 1.24 * Callback from the oldest waiter in awaitEvent waking up after a
889     * period of non-use. If all workers are idle, tries (once) to
890     * shutdown an event waiter or a spare, if one exists. Note that
891     * we don't need CAS or locks here because the method is called
892     * only from one thread occasionally waking (and even misfires are
893     * OK). Note that until the shutdown worker fully terminates,
894     * workerCounts will overestimate total count, which is tolerable.
895 dl 1.22 *
896 dl 1.24 * @param ec the event count waited on by caller (to abort
897     * attempt if count has since changed).
898 dl 1.22 */
899 dl 1.24 private void tryShutdownUnusedWorker(int ec) {
900     if (runState == 0 && eventCount == ec) { // only trigger if all idle
901     ForkJoinWorkerThread[] ws = workers;
902     int n = ws.length;
903     ForkJoinWorkerThread w = null;
904     boolean shutdown = false;
905     int sw;
906     long h;
907     if ((sw = spareWaiters) != 0) { // prefer killing spares
908     int id = (sw & SPARE_ID_MASK) - 1;
909     if (id >= 0 && id < n && (w = ws[id]) != null &&
910     UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
911     sw, w.nextSpare))
912     shutdown = true;
913     }
914     else if ((h = eventWaiters) != 0L) {
915     long nh;
916     int id = ((int)(h & WAITER_ID_MASK)) - 1;
917     if (id >= 0 && id < n && (w = ws[id]) != null &&
918     (nh = w.nextWaiter) != 0L && // keep at least one worker
919     UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
920     shutdown = true;
921     }
922     if (w != null && shutdown) {
923     w.shutdown();
924     LockSupport.unpark(w);
925     }
926 dl 1.19 }
927 dl 1.24 releaseEventWaiters(); // in case of interference
928 dl 1.14 }
929    
930     /**
931     * Callback from workers invoked upon each top-level action (i.e.,
932 dl 1.22 * stealing a task or taking a submission and running it).
933     * Performs one or more of the following:
934 dl 1.19 *
935 dl 1.24 * 1. If the worker is active and either did not run a task
936     * or there are too many workers, try to set its active status
937     * to inactive and update activeCount. On contention, we may
938     * try again in this or a subsequent call.
939     *
940     * 2. If not enough total workers, help create some.
941     *
942     * 3. If there are too many running workers, suspend this worker
943     * (first forcing inactive if necessary). If it is not needed,
944     * it may be shutdown while suspended (via
945     * tryShutdownUnusedWorker). Otherwise, upon resume it
946     * rechecks running thread count and need for event sync.
947     *
948     * 4. If worker did not run a task, await the next task event via
949     * eventSync if necessary (first forcing inactivation), upon
950     * which the worker may be shutdown via
951     * tryShutdownUnusedWorker. Otherwise, help release any
952     * existing event waiters that are now releasable,
953 dl 1.14 *
954     * @param w the worker
955 dl 1.24 * @param ran true if worker ran a task since last call to this method
956 dl 1.14 */
957 dl 1.24 final void preStep(ForkJoinWorkerThread w, boolean ran) {
958     int wec = w.lastEventCount;
959 dl 1.14 boolean active = w.active;
960 dl 1.24 boolean inactivate = false;
961 dl 1.19 int pc = parallelism;
962 dl 1.24 int rs;
963     while (w.runState == 0 && (rs = runState) < TERMINATING) {
964     if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
965     UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
966     inactivate = active = w.active = false;
967     int wc = workerCounts;
968     if ((wc & RUNNING_COUNT_MASK) > pc) {
969     if (!(inactivate |= active) && // must inactivate to suspend
970 dl 1.22 workerCounts == wc && // try to suspend as spare
971 dl 1.19 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
972 dl 1.24 wc, wc - ONE_RUNNING))
973 dl 1.22 w.suspendAsSpare();
974 dl 1.19 }
975 dl 1.24 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
976     helpMaintainParallelism(); // not enough workers
977     else if (!ran) {
978     long h = eventWaiters;
979     int ec = eventCount;
980     if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
981     releaseEventWaiters(); // release others before waiting
982     else if (ec != wec) {
983     w.lastEventCount = ec; // no need to wait
984     break;
985 dl 1.22 }
986 jsr166 1.27 else if (!(inactivate |= active))
987 dl 1.24 eventSync(w, wec); // must inactivate before sync
988 dl 1.14 }
989 dl 1.24 else
990     break;
991 dl 1.14 }
992     }
993    
994     /**
995 dl 1.19 * Helps and/or blocks awaiting join of the given task.
996 dl 1.24 * See above for explanation.
997 dl 1.17 *
998     * @param joinMe the task to join
999 dl 1.24 * @param worker the current worker thread
1000 dl 1.14 */
1001 dl 1.19 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1002 dl 1.24 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1003 dl 1.19 while (joinMe.status >= 0) {
1004 dl 1.24 int wc;
1005 dl 1.19 worker.helpJoinTask(joinMe);
1006     if (joinMe.status < 0)
1007     break;
1008 dl 1.24 else if (retries > 0)
1009     --retries;
1010     else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1011     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1012     wc, wc - ONE_RUNNING)) {
1013     int stat, c; long h;
1014     while ((stat = joinMe.status) >= 0 &&
1015     (h = eventWaiters) != 0L && // help release others
1016     (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1017     releaseEventWaiters();
1018     if (stat >= 0 &&
1019     ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1020     (stat =
1021     joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1022     helpMaintainParallelism(); // timeout or no running workers
1023 dl 1.19 do {} while (!UNSAFE.compareAndSwapInt
1024     (this, workerCountsOffset,
1025     c = workerCounts, c + ONE_RUNNING));
1026 dl 1.24 if (stat < 0)
1027     break; // else restart
1028 dl 1.14 }
1029     }
1030     }
1031    
1032     /**
1033 dl 1.24 * Same idea as awaitJoin, but no helping, retries, or timeouts.
1034 dl 1.14 */
1035 dl 1.18 final void awaitBlocker(ManagedBlocker blocker)
1036 dl 1.14 throws InterruptedException {
1037 dl 1.19 while (!blocker.isReleasable()) {
1038 dl 1.24 int wc = workerCounts;
1039     if ((wc & RUNNING_COUNT_MASK) != 0 &&
1040     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1041     wc, wc - ONE_RUNNING)) {
1042 dl 1.19 try {
1043 dl 1.24 while (!blocker.isReleasable()) {
1044     long h = eventWaiters;
1045     if (h != 0L &&
1046     (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1047     releaseEventWaiters();
1048     else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1049     runState < TERMINATING)
1050     helpMaintainParallelism();
1051     else if (blocker.block())
1052     break;
1053     }
1054 dl 1.19 } finally {
1055 dl 1.14 int c;
1056 dl 1.17 do {} while (!UNSAFE.compareAndSwapInt
1057     (this, workerCountsOffset,
1058     c = workerCounts, c + ONE_RUNNING));
1059 dl 1.14 }
1060 dl 1.18 break;
1061     }
1062 dl 1.14 }
1063 dl 1.19 }
1064 dl 1.15
1065     /**
1066 dl 1.14 * Possibly initiates and/or completes termination.
1067     *
1068     * @param now if true, unconditionally terminate, else only
1069     * if shutdown and empty queue and no active workers
1070     * @return true if now terminating or terminated
1071 jsr166 1.1 */
1072 dl 1.14 private boolean tryTerminate(boolean now) {
1073     if (now)
1074     advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1075     else if (runState < SHUTDOWN ||
1076     !submissionQueue.isEmpty() ||
1077     (runState & ACTIVE_COUNT_MASK) != 0)
1078 jsr166 1.1 return false;
1079 dl 1.14
1080     if (advanceRunLevel(TERMINATING))
1081     startTerminating();
1082    
1083     // Finish now if all threads terminated; else in some subsequent call
1084     if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1085     advanceRunLevel(TERMINATED);
1086 dl 1.18 termination.arrive();
1087 dl 1.14 }
1088 jsr166 1.1 return true;
1089     }
1090    
1091     /**
1092 dl 1.14 * Actions on transition to TERMINATING
1093 dl 1.19 *
1094     * Runs up to four passes through workers: (0) shutting down each
1095 dl 1.22 * (without waking up if parked) to quickly spread notifications
1096     * without unnecessary bouncing around event queues etc (1) wake
1097     * up and help cancel tasks (2) interrupt (3) mop up races with
1098     * interrupted workers
1099 dl 1.14 */
1100     private void startTerminating() {
1101 dl 1.19 cancelSubmissions();
1102     for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1103 dl 1.24 int c; // advance event count
1104     UNSAFE.compareAndSwapInt(this, eventCountOffset,
1105     c = eventCount, c+1);
1106 dl 1.19 eventWaiters = 0L; // clobber lists
1107     spareWaiters = 0;
1108 jsr166 1.29 for (ForkJoinWorkerThread w : workers) {
1109 dl 1.19 if (w != null) {
1110 dl 1.22 w.shutdown();
1111 dl 1.19 if (passes > 0 && !w.isTerminated()) {
1112     w.cancelTasks();
1113     LockSupport.unpark(w);
1114     if (passes > 1) {
1115     try {
1116     w.interrupt();
1117     } catch (SecurityException ignore) {
1118     }
1119     }
1120     }
1121     }
1122     }
1123 dl 1.17 }
1124     }
1125    
1126     /**
1127     * Clear out and cancel submissions, ignoring exceptions
1128     */
1129     private void cancelSubmissions() {
1130 dl 1.14 ForkJoinTask<?> task;
1131     while ((task = submissionQueue.poll()) != null) {
1132     try {
1133     task.cancel(false);
1134     } catch (Throwable ignore) {
1135     }
1136     }
1137 dl 1.17 }
1138    
1139 dl 1.14 // misc support for ForkJoinWorkerThread
1140    
1141     /**
1142     * Returns pool number
1143 jsr166 1.1 */
1144 dl 1.14 final int getPoolNumber() {
1145     return poolNumber;
1146 jsr166 1.1 }
1147    
1148     /**
1149 dl 1.19 * Tries to accumulates steal count from a worker, clearing
1150     * the worker's value.
1151     *
1152     * @return true if worker steal count now zero
1153 jsr166 1.1 */
1154 dl 1.19 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1155 dl 1.14 int sc = w.stealCount;
1156 dl 1.19 long c = stealCount;
1157     // CAS even if zero, for fence effects
1158     if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1159     if (sc != 0)
1160     w.stealCount = 0;
1161     return true;
1162 jsr166 1.1 }
1163 dl 1.19 return sc == 0;
1164 jsr166 1.1 }
1165    
1166     /**
1167 dl 1.14 * Returns the approximate (non-atomic) number of idle threads per
1168     * active thread.
1169     */
1170     final int idlePerActive() {
1171 dl 1.19 int pc = parallelism; // use parallelism, not rc
1172 jsr166 1.25 int ac = runState; // no mask -- artificially boosts during shutdown
1173 dl 1.14 // Use exact results for small values, saturate past 4
1174     return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1175     }
1176    
1177     // Public and protected methods
1178 jsr166 1.1
1179     // Constructors
1180    
1181     /**
1182 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1183 dl 1.18 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1184     * #defaultForkJoinWorkerThreadFactory default thread factory},
1185     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1186 jsr166 1.1 *
1187     * @throws SecurityException if a security manager exists and
1188     * the caller is not permitted to modify threads
1189     * because it does not hold {@link
1190     * java.lang.RuntimePermission}{@code ("modifyThread")}
1191     */
1192     public ForkJoinPool() {
1193     this(Runtime.getRuntime().availableProcessors(),
1194 dl 1.18 defaultForkJoinWorkerThreadFactory, null, false);
1195 jsr166 1.1 }
1196    
1197     /**
1198 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
1199 dl 1.18 * level, the {@linkplain
1200     * #defaultForkJoinWorkerThreadFactory default thread factory},
1201     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1202 jsr166 1.1 *
1203 jsr166 1.9 * @param parallelism the parallelism level
1204 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
1205 jsr166 1.11 * equal to zero, or greater than implementation limit
1206 jsr166 1.1 * @throws SecurityException if a security manager exists and
1207     * the caller is not permitted to modify threads
1208     * because it does not hold {@link
1209     * java.lang.RuntimePermission}{@code ("modifyThread")}
1210     */
1211     public ForkJoinPool(int parallelism) {
1212 dl 1.18 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1213 jsr166 1.1 }
1214    
1215     /**
1216 dl 1.18 * Creates a {@code ForkJoinPool} with the given parameters.
1217 jsr166 1.1 *
1218 dl 1.18 * @param parallelism the parallelism level. For default value,
1219     * use {@link java.lang.Runtime#availableProcessors}.
1220     * @param factory the factory for creating new threads. For default value,
1221     * use {@link #defaultForkJoinWorkerThreadFactory}.
1222 dl 1.19 * @param handler the handler for internal worker threads that
1223     * terminate due to unrecoverable errors encountered while executing
1224 dl 1.18 * tasks. For default value, use <code>null</code>.
1225 dl 1.19 * @param asyncMode if true,
1226 dl 1.18 * establishes local first-in-first-out scheduling mode for forked
1227     * tasks that are never joined. This mode may be more appropriate
1228     * than default locally stack-based mode in applications in which
1229     * worker threads only process event-style asynchronous tasks.
1230     * For default value, use <code>false</code>.
1231 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
1232 jsr166 1.11 * equal to zero, or greater than implementation limit
1233     * @throws NullPointerException if the factory is null
1234 jsr166 1.1 * @throws SecurityException if a security manager exists and
1235     * the caller is not permitted to modify threads
1236     * because it does not hold {@link
1237     * java.lang.RuntimePermission}{@code ("modifyThread")}
1238     */
1239 dl 1.19 public ForkJoinPool(int parallelism,
1240 dl 1.18 ForkJoinWorkerThreadFactory factory,
1241     Thread.UncaughtExceptionHandler handler,
1242     boolean asyncMode) {
1243 dl 1.14 checkPermission();
1244     if (factory == null)
1245     throw new NullPointerException();
1246 dl 1.19 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1247 jsr166 1.1 throw new IllegalArgumentException();
1248 dl 1.14 this.parallelism = parallelism;
1249 jsr166 1.1 this.factory = factory;
1250 dl 1.18 this.ueh = handler;
1251     this.locallyFifo = asyncMode;
1252     int arraySize = initialArraySizeFor(parallelism);
1253 dl 1.14 this.workers = new ForkJoinWorkerThread[arraySize];
1254     this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1255 jsr166 1.1 this.workerLock = new ReentrantLock();
1256 dl 1.18 this.termination = new Phaser(1);
1257     this.poolNumber = poolNumberGenerator.incrementAndGet();
1258 jsr166 1.1 }
1259    
1260     /**
1261 dl 1.14 * Returns initial power of two size for workers array.
1262     * @param pc the initial parallelism level
1263     */
1264     private static int initialArraySizeFor(int pc) {
1265 dl 1.24 // If possible, initially allocate enough space for one spare
1266     int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1267 dl 1.19 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1268 dl 1.14 size |= size >>> 1;
1269     size |= size >>> 2;
1270     size |= size >>> 4;
1271     size |= size >>> 8;
1272     return size + 1;
1273 jsr166 1.1 }
1274    
1275     // Execution methods
1276    
1277     /**
1278     * Common code for execute, invoke and submit
1279     */
1280     private <T> void doSubmit(ForkJoinTask<T> task) {
1281 jsr166 1.2 if (task == null)
1282     throw new NullPointerException();
1283 dl 1.14 if (runState >= SHUTDOWN)
1284 jsr166 1.1 throw new RejectedExecutionException();
1285 dl 1.19 submissionQueue.offer(task);
1286 dl 1.24 int c; // try to increment event count -- CAS failure OK
1287     UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1288     helpMaintainParallelism(); // create, start, or resume some workers
1289 jsr166 1.1 }
1290    
1291     /**
1292     * Performs the given task, returning its result upon completion.
1293     *
1294     * @param task the task
1295     * @return the task's result
1296 jsr166 1.11 * @throws NullPointerException if the task is null
1297     * @throws RejectedExecutionException if the task cannot be
1298     * scheduled for execution
1299 jsr166 1.1 */
1300     public <T> T invoke(ForkJoinTask<T> task) {
1301     doSubmit(task);
1302     return task.join();
1303     }
1304    
1305     /**
1306     * Arranges for (asynchronous) execution of the given task.
1307     *
1308     * @param task the task
1309 jsr166 1.11 * @throws NullPointerException if the task is null
1310     * @throws RejectedExecutionException if the task cannot be
1311     * scheduled for execution
1312 jsr166 1.1 */
1313 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
1314 jsr166 1.1 doSubmit(task);
1315     }
1316    
1317     // AbstractExecutorService methods
1318    
1319 jsr166 1.11 /**
1320     * @throws NullPointerException if the task is null
1321     * @throws RejectedExecutionException if the task cannot be
1322     * scheduled for execution
1323     */
1324 jsr166 1.1 public void execute(Runnable task) {
1325 jsr166 1.2 ForkJoinTask<?> job;
1326 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1327     job = (ForkJoinTask<?>) task;
1328 jsr166 1.2 else
1329 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
1330 jsr166 1.2 doSubmit(job);
1331 jsr166 1.1 }
1332    
1333 jsr166 1.11 /**
1334 dl 1.18 * Submits a ForkJoinTask for execution.
1335     *
1336     * @param task the task to submit
1337     * @return the task
1338     * @throws NullPointerException if the task is null
1339     * @throws RejectedExecutionException if the task cannot be
1340     * scheduled for execution
1341     */
1342     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1343     doSubmit(task);
1344     return task;
1345     }
1346    
1347     /**
1348 jsr166 1.11 * @throws NullPointerException if the task is null
1349     * @throws RejectedExecutionException if the task cannot be
1350     * scheduled for execution
1351     */
1352 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1353 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1354 jsr166 1.1 doSubmit(job);
1355     return job;
1356     }
1357    
1358 jsr166 1.11 /**
1359     * @throws NullPointerException if the task is null
1360     * @throws RejectedExecutionException if the task cannot be
1361     * scheduled for execution
1362     */
1363 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1364 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1365 jsr166 1.1 doSubmit(job);
1366     return job;
1367     }
1368    
1369 jsr166 1.11 /**
1370     * @throws NullPointerException if the task is null
1371     * @throws RejectedExecutionException if the task cannot be
1372     * scheduled for execution
1373     */
1374 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
1375 jsr166 1.2 ForkJoinTask<?> job;
1376 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377     job = (ForkJoinTask<?>) task;
1378 jsr166 1.2 else
1379 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
1380 jsr166 1.1 doSubmit(job);
1381     return job;
1382     }
1383    
1384     /**
1385 jsr166 1.11 * @throws NullPointerException {@inheritDoc}
1386     * @throws RejectedExecutionException {@inheritDoc}
1387     */
1388 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1389     ArrayList<ForkJoinTask<T>> forkJoinTasks =
1390     new ArrayList<ForkJoinTask<T>>(tasks.size());
1391     for (Callable<T> task : tasks)
1392 jsr166 1.7 forkJoinTasks.add(ForkJoinTask.adapt(task));
1393 jsr166 1.1 invoke(new InvokeAll<T>(forkJoinTasks));
1394    
1395     @SuppressWarnings({"unchecked", "rawtypes"})
1396 dl 1.15 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1397 jsr166 1.1 return futures;
1398     }
1399    
1400     static final class InvokeAll<T> extends RecursiveAction {
1401     final ArrayList<ForkJoinTask<T>> tasks;
1402     InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1403     public void compute() {
1404     try { invokeAll(tasks); }
1405     catch (Exception ignore) {}
1406     }
1407     private static final long serialVersionUID = -7914297376763021607L;
1408     }
1409    
1410     /**
1411     * Returns the factory used for constructing new workers.
1412     *
1413     * @return the factory used for constructing new workers
1414     */
1415     public ForkJoinWorkerThreadFactory getFactory() {
1416     return factory;
1417     }
1418    
1419     /**
1420     * Returns the handler for internal worker threads that terminate
1421     * due to unrecoverable errors encountered while executing tasks.
1422     *
1423 jsr166 1.4 * @return the handler, or {@code null} if none
1424 jsr166 1.1 */
1425     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1426 dl 1.14 return ueh;
1427 jsr166 1.1 }
1428    
1429     /**
1430 jsr166 1.9 * Returns the targeted parallelism level of this pool.
1431 jsr166 1.1 *
1432 jsr166 1.9 * @return the targeted parallelism level of this pool
1433 jsr166 1.1 */
1434     public int getParallelism() {
1435     return parallelism;
1436     }
1437    
1438     /**
1439     * Returns the number of worker threads that have started but not
1440     * yet terminated. This result returned by this method may differ
1441 jsr166 1.4 * from {@link #getParallelism} when threads are created to
1442 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
1443     *
1444     * @return the number of worker threads
1445     */
1446     public int getPoolSize() {
1447 dl 1.14 return workerCounts >>> TOTAL_COUNT_SHIFT;
1448 jsr166 1.1 }
1449    
1450     /**
1451 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
1452 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
1453     *
1454 jsr166 1.4 * @return {@code true} if this pool uses async mode
1455 jsr166 1.1 */
1456     public boolean getAsyncMode() {
1457     return locallyFifo;
1458     }
1459    
1460     /**
1461     * Returns an estimate of the number of worker threads that are
1462     * not blocked waiting to join tasks or for other managed
1463 dl 1.14 * synchronization. This method may overestimate the
1464     * number of running threads.
1465 jsr166 1.1 *
1466     * @return the number of worker threads
1467     */
1468     public int getRunningThreadCount() {
1469 dl 1.14 return workerCounts & RUNNING_COUNT_MASK;
1470 jsr166 1.1 }
1471    
1472     /**
1473     * Returns an estimate of the number of threads that are currently
1474     * stealing or executing tasks. This method may overestimate the
1475     * number of active threads.
1476     *
1477     * @return the number of active threads
1478     */
1479     public int getActiveThreadCount() {
1480 dl 1.14 return runState & ACTIVE_COUNT_MASK;
1481 jsr166 1.1 }
1482    
1483     /**
1484 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
1485     * An idle worker is one that cannot obtain a task to execute
1486     * because none are available to steal from other threads, and
1487     * there are no pending submissions to the pool. This method is
1488     * conservative; it might not return {@code true} immediately upon
1489     * idleness of all threads, but will eventually become true if
1490     * threads remain inactive.
1491 jsr166 1.1 *
1492 jsr166 1.4 * @return {@code true} if all threads are currently idle
1493 jsr166 1.1 */
1494     public boolean isQuiescent() {
1495 dl 1.14 return (runState & ACTIVE_COUNT_MASK) == 0;
1496 jsr166 1.1 }
1497    
1498     /**
1499     * Returns an estimate of the total number of tasks stolen from
1500     * one thread's work queue by another. The reported value
1501     * underestimates the actual total number of steals when the pool
1502     * is not quiescent. This value may be useful for monitoring and
1503     * tuning fork/join programs: in general, steal counts should be
1504     * high enough to keep threads busy, but low enough to avoid
1505     * overhead and contention across threads.
1506     *
1507     * @return the number of steals
1508     */
1509     public long getStealCount() {
1510 dl 1.14 return stealCount;
1511 jsr166 1.1 }
1512    
1513     /**
1514     * Returns an estimate of the total number of tasks currently held
1515     * in queues by worker threads (but not including tasks submitted
1516     * to the pool that have not begun executing). This value is only
1517     * an approximation, obtained by iterating across all threads in
1518     * the pool. This method may be useful for tuning task
1519     * granularities.
1520     *
1521     * @return the number of queued tasks
1522     */
1523     public long getQueuedTaskCount() {
1524     long count = 0;
1525 jsr166 1.29 for (ForkJoinWorkerThread w : workers)
1526 dl 1.14 if (w != null)
1527     count += w.getQueueSize();
1528 jsr166 1.1 return count;
1529     }
1530    
1531     /**
1532 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
1533     * pool that have not yet begun executing. This method takes time
1534 jsr166 1.1 * proportional to the number of submissions.
1535     *
1536     * @return the number of queued submissions
1537     */
1538     public int getQueuedSubmissionCount() {
1539     return submissionQueue.size();
1540     }
1541    
1542     /**
1543 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
1544     * pool that have not yet begun executing.
1545 jsr166 1.1 *
1546     * @return {@code true} if there are any queued submissions
1547     */
1548     public boolean hasQueuedSubmissions() {
1549     return !submissionQueue.isEmpty();
1550     }
1551    
1552     /**
1553     * Removes and returns the next unexecuted submission if one is
1554     * available. This method may be useful in extensions to this
1555     * class that re-assign work in systems with multiple pools.
1556     *
1557 jsr166 1.4 * @return the next submission, or {@code null} if none
1558 jsr166 1.1 */
1559     protected ForkJoinTask<?> pollSubmission() {
1560     return submissionQueue.poll();
1561     }
1562    
1563     /**
1564     * Removes all available unexecuted submitted and forked tasks
1565     * from scheduling queues and adds them to the given collection,
1566     * without altering their execution status. These may include
1567 jsr166 1.8 * artificially generated or wrapped tasks. This method is
1568     * designed to be invoked only when the pool is known to be
1569 jsr166 1.1 * quiescent. Invocations at other times may not remove all
1570     * tasks. A failure encountered while attempting to add elements
1571     * to collection {@code c} may result in elements being in
1572     * neither, either or both collections when the associated
1573     * exception is thrown. The behavior of this operation is
1574     * undefined if the specified collection is modified while the
1575     * operation is in progress.
1576     *
1577     * @param c the collection to transfer elements into
1578     * @return the number of elements transferred
1579     */
1580 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1581 dl 1.19 int count = submissionQueue.drainTo(c);
1582 jsr166 1.29 for (ForkJoinWorkerThread w : workers)
1583 dl 1.18 if (w != null)
1584 dl 1.19 count += w.drainTasksTo(c);
1585 dl 1.18 return count;
1586     }
1587    
1588     /**
1589 jsr166 1.1 * Returns a string identifying this pool, as well as its state,
1590     * including indications of run state, parallelism level, and
1591     * worker and task counts.
1592     *
1593     * @return a string identifying this pool, as well as its state
1594     */
1595     public String toString() {
1596     long st = getStealCount();
1597     long qt = getQueuedTaskCount();
1598     long qs = getQueuedSubmissionCount();
1599 dl 1.14 int wc = workerCounts;
1600     int tc = wc >>> TOTAL_COUNT_SHIFT;
1601     int rc = wc & RUNNING_COUNT_MASK;
1602     int pc = parallelism;
1603     int rs = runState;
1604     int ac = rs & ACTIVE_COUNT_MASK;
1605 jsr166 1.1 return super.toString() +
1606 dl 1.14 "[" + runLevelToString(rs) +
1607     ", parallelism = " + pc +
1608     ", size = " + tc +
1609     ", active = " + ac +
1610     ", running = " + rc +
1611 jsr166 1.1 ", steals = " + st +
1612     ", tasks = " + qt +
1613     ", submissions = " + qs +
1614     "]";
1615     }
1616    
1617 dl 1.14 private static String runLevelToString(int s) {
1618     return ((s & TERMINATED) != 0 ? "Terminated" :
1619     ((s & TERMINATING) != 0 ? "Terminating" :
1620     ((s & SHUTDOWN) != 0 ? "Shutting down" :
1621     "Running")));
1622 jsr166 1.1 }
1623    
1624     /**
1625     * Initiates an orderly shutdown in which previously submitted
1626     * tasks are executed, but no new tasks will be accepted.
1627     * Invocation has no additional effect if already shut down.
1628     * Tasks that are in the process of being submitted concurrently
1629     * during the course of this method may or may not be rejected.
1630     *
1631     * @throws SecurityException if a security manager exists and
1632     * the caller is not permitted to modify threads
1633     * because it does not hold {@link
1634     * java.lang.RuntimePermission}{@code ("modifyThread")}
1635     */
1636     public void shutdown() {
1637     checkPermission();
1638 dl 1.14 advanceRunLevel(SHUTDOWN);
1639     tryTerminate(false);
1640 jsr166 1.1 }
1641    
1642     /**
1643 jsr166 1.9 * Attempts to cancel and/or stop all tasks, and reject all
1644     * subsequently submitted tasks. Tasks that are in the process of
1645     * being submitted or executed concurrently during the course of
1646     * this method may or may not be rejected. This method cancels
1647     * both existing and unexecuted tasks, in order to permit
1648     * termination in the presence of task dependencies. So the method
1649     * always returns an empty list (unlike the case for some other
1650     * Executors).
1651 jsr166 1.1 *
1652     * @return an empty list
1653     * @throws SecurityException if a security manager exists and
1654     * the caller is not permitted to modify threads
1655     * because it does not hold {@link
1656     * java.lang.RuntimePermission}{@code ("modifyThread")}
1657     */
1658     public List<Runnable> shutdownNow() {
1659     checkPermission();
1660 dl 1.14 tryTerminate(true);
1661 jsr166 1.1 return Collections.emptyList();
1662     }
1663    
1664     /**
1665     * Returns {@code true} if all tasks have completed following shut down.
1666     *
1667     * @return {@code true} if all tasks have completed following shut down
1668     */
1669     public boolean isTerminated() {
1670 dl 1.14 return runState >= TERMINATED;
1671 jsr166 1.1 }
1672    
1673     /**
1674     * Returns {@code true} if the process of termination has
1675 jsr166 1.9 * commenced but not yet completed. This method may be useful for
1676     * debugging. A return of {@code true} reported a sufficient
1677     * period after shutdown may indicate that submitted tasks have
1678     * ignored or suppressed interruption, causing this executor not
1679     * to properly terminate.
1680 jsr166 1.1 *
1681 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
1682 jsr166 1.1 */
1683     public boolean isTerminating() {
1684 dl 1.14 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1685 jsr166 1.1 }
1686    
1687     /**
1688     * Returns {@code true} if this pool has been shut down.
1689     *
1690     * @return {@code true} if this pool has been shut down
1691     */
1692     public boolean isShutdown() {
1693 dl 1.14 return runState >= SHUTDOWN;
1694 jsr166 1.9 }
1695    
1696     /**
1697 jsr166 1.1 * Blocks until all tasks have completed execution after a shutdown
1698     * request, or the timeout occurs, or the current thread is
1699     * interrupted, whichever happens first.
1700     *
1701     * @param timeout the maximum time to wait
1702     * @param unit the time unit of the timeout argument
1703     * @return {@code true} if this executor terminated and
1704     * {@code false} if the timeout elapsed before termination
1705     * @throws InterruptedException if interrupted while waiting
1706     */
1707     public boolean awaitTermination(long timeout, TimeUnit unit)
1708     throws InterruptedException {
1709 dl 1.18 try {
1710     return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1711 jsr166 1.28 } catch (TimeoutException ex) {
1712 dl 1.18 return false;
1713     }
1714 jsr166 1.1 }
1715    
1716     /**
1717     * Interface for extending managed parallelism for tasks running
1718 jsr166 1.8 * in {@link ForkJoinPool}s.
1719     *
1720 dl 1.19 * <p>A {@code ManagedBlocker} provides two methods. Method
1721     * {@code isReleasable} must return {@code true} if blocking is
1722     * not necessary. Method {@code block} blocks the current thread
1723     * if necessary (perhaps internally invoking {@code isReleasable}
1724     * before actually blocking). The unusual methods in this API
1725     * accommodate synchronizers that may, but don't usually, block
1726     * for long periods. Similarly, they allow more efficient internal
1727     * handling of cases in which additional workers may be, but
1728     * usually are not, needed to ensure sufficient parallelism.
1729     * Toward this end, implementations of method {@code isReleasable}
1730     * must be amenable to repeated invocation.
1731 jsr166 1.1 *
1732     * <p>For example, here is a ManagedBlocker based on a
1733     * ReentrantLock:
1734     * <pre> {@code
1735     * class ManagedLocker implements ManagedBlocker {
1736     * final ReentrantLock lock;
1737     * boolean hasLock = false;
1738     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1739     * public boolean block() {
1740     * if (!hasLock)
1741     * lock.lock();
1742     * return true;
1743     * }
1744     * public boolean isReleasable() {
1745     * return hasLock || (hasLock = lock.tryLock());
1746     * }
1747     * }}</pre>
1748 dl 1.19 *
1749     * <p>Here is a class that possibly blocks waiting for an
1750     * item on a given queue:
1751     * <pre> {@code
1752     * class QueueTaker<E> implements ManagedBlocker {
1753     * final BlockingQueue<E> queue;
1754     * volatile E item = null;
1755     * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1756     * public boolean block() throws InterruptedException {
1757     * if (item == null)
1758 dl 1.23 * item = queue.take();
1759 dl 1.19 * return true;
1760     * }
1761     * public boolean isReleasable() {
1762 dl 1.23 * return item != null || (item = queue.poll()) != null;
1763 dl 1.19 * }
1764     * public E getItem() { // call after pool.managedBlock completes
1765     * return item;
1766     * }
1767     * }}</pre>
1768 jsr166 1.1 */
1769     public static interface ManagedBlocker {
1770     /**
1771     * Possibly blocks the current thread, for example waiting for
1772     * a lock or condition.
1773     *
1774 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
1775     * (i.e., if isReleasable would return true)
1776 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
1777     * (the method is not required to do so, but is allowed to)
1778     */
1779     boolean block() throws InterruptedException;
1780    
1781     /**
1782 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
1783 jsr166 1.1 */
1784     boolean isReleasable();
1785     }
1786    
1787     /**
1788     * Blocks in accord with the given blocker. If the current thread
1789 jsr166 1.8 * is a {@link ForkJoinWorkerThread}, this method possibly
1790     * arranges for a spare thread to be activated if necessary to
1791 dl 1.18 * ensure sufficient parallelism while the current thread is blocked.
1792 jsr166 1.1 *
1793 jsr166 1.8 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1794     * behaviorally equivalent to
1795 jsr166 1.1 * <pre> {@code
1796     * while (!blocker.isReleasable())
1797     * if (blocker.block())
1798     * return;
1799     * }</pre>
1800 jsr166 1.8 *
1801     * If the caller is a {@code ForkJoinTask}, then the pool may
1802     * first be expanded to ensure parallelism, and later adjusted.
1803 jsr166 1.1 *
1804     * @param blocker the blocker
1805     * @throws InterruptedException if blocker.block did so
1806     */
1807 dl 1.18 public static void managedBlock(ManagedBlocker blocker)
1808 jsr166 1.1 throws InterruptedException {
1809     Thread t = Thread.currentThread();
1810 dl 1.19 if (t instanceof ForkJoinWorkerThread) {
1811     ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1812     w.pool.awaitBlocker(blocker);
1813     }
1814 dl 1.18 else {
1815     do {} while (!blocker.isReleasable() && !blocker.block());
1816     }
1817 jsr166 1.1 }
1818    
1819 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
1820     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1821     // implement RunnableFuture.
1822 jsr166 1.1
1823     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1824 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1825 jsr166 1.1 }
1826    
1827     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1828 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1829 jsr166 1.1 }
1830    
1831     // Unsafe mechanics
1832    
1833     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1834 dl 1.14 private static final long workerCountsOffset =
1835     objectFieldOffset("workerCounts", ForkJoinPool.class);
1836     private static final long runStateOffset =
1837     objectFieldOffset("runState", ForkJoinPool.class);
1838 jsr166 1.2 private static final long eventCountOffset =
1839 jsr166 1.3 objectFieldOffset("eventCount", ForkJoinPool.class);
1840 dl 1.14 private static final long eventWaitersOffset =
1841     objectFieldOffset("eventWaiters",ForkJoinPool.class);
1842     private static final long stealCountOffset =
1843     objectFieldOffset("stealCount",ForkJoinPool.class);
1844 dl 1.19 private static final long spareWaitersOffset =
1845     objectFieldOffset("spareWaiters",ForkJoinPool.class);
1846 jsr166 1.3
1847     private static long objectFieldOffset(String field, Class<?> klazz) {
1848     try {
1849     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1850     } catch (NoSuchFieldException e) {
1851     // Convert Exception to corresponding Error
1852     NoSuchFieldError error = new NoSuchFieldError(field);
1853     error.initCause(e);
1854     throw error;
1855     }
1856     }
1857 jsr166 1.1 }