ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.21
Committed: Fri Aug 13 16:21:45 2010 UTC (13 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.20: +0 -3 lines
Log Message:
Remove outdated javadoc sentence

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.CountDownLatch;
18
19 /**
20 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21 * A {@code ForkJoinPool} provides the entry point for submissions
22 * from non-{@code ForkJoinTask} clients, as well as management and
23 * monitoring operations.
24 *
25 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
26 * ExecutorService} mainly by virtue of employing
27 * <em>work-stealing</em>: all threads in the pool attempt to find and
28 * execute subtasks created by other active tasks (eventually blocking
29 * waiting for work if none exist). This enables efficient processing
30 * when most tasks spawn other subtasks (as do most {@code
31 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
32 * constructors, {@code ForkJoinPool}s may also be appropriate for use
33 * with event-style tasks that are never joined.
34 *
35 * <p>A {@code ForkJoinPool} is constructed with a given target
36 * parallelism level; by default, equal to the number of available
37 * processors. The pool attempts to maintain enough active (or
38 * available) threads by dynamically adding, suspending, or resuming
39 * internal worker threads, even if some tasks are stalled waiting to
40 * join others. However, no such adjustments are guaranteed in the
41 * face of blocked IO or other unmanaged synchronization. The nested
42 * {@link ManagedBlocker} interface enables extension of the kinds of
43 * synchronization accommodated.
44 *
45 * <p>In addition to execution and lifecycle control methods, this
46 * class provides status check methods (for example
47 * {@link #getStealCount}) that are intended to aid in developing,
48 * tuning, and monitoring fork/join applications. Also, method
49 * {@link #toString} returns indications of pool state in a
50 * convenient form for informal monitoring.
51 *
52 * <p> As is the case with other ExecutorServices, there are three
53 * main task execution methods summarized in the following
54 * table. These are designed to be used by clients not already engaged
55 * in fork/join computations in the current pool. The main forms of
56 * these methods accept instances of {@code ForkJoinTask}, but
57 * overloaded forms also allow mixed execution of plain {@code
58 * Runnable}- or {@code Callable}- based activities as well. However,
59 * tasks that are already executing in a pool should normally
60 * <em>NOT</em> use these pool execution methods, but instead use the
61 * within-computation forms listed in the table.
62 *
63 * <table BORDER CELLPADDING=3 CELLSPACING=1>
64 * <tr>
65 * <td></td>
66 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
67 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
68 * </tr>
69 * <tr>
70 * <td> <b>Arange async execution</td>
71 * <td> {@link #execute(ForkJoinTask)}</td>
72 * <td> {@link ForkJoinTask#fork}</td>
73 * </tr>
74 * <tr>
75 * <td> <b>Await and obtain result</td>
76 * <td> {@link #invoke(ForkJoinTask)}</td>
77 * <td> {@link ForkJoinTask#invoke}</td>
78 * </tr>
79 * <tr>
80 * <td> <b>Arrange exec and obtain Future</td>
81 * <td> {@link #submit(ForkJoinTask)}</td>
82 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
83 * </tr>
84 * </table>
85 *
86 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
87 * used for all parallel task execution in a program or subsystem.
88 * Otherwise, use would not usually outweigh the construction and
89 * bookkeeping overhead of creating a large set of threads. For
90 * example, a common pool could be used for the {@code SortTasks}
91 * illustrated in {@link RecursiveAction}. Because {@code
92 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
93 * daemon} mode, there is typically no need to explicitly {@link
94 * #shutdown} such a pool upon program exit.
95 *
96 * <pre>
97 * static final ForkJoinPool mainPool = new ForkJoinPool();
98 * ...
99 * public void sort(long[] array) {
100 * mainPool.invoke(new SortTask(array, 0, array.length));
101 * }
102 * </pre>
103 *
104 * <p><b>Implementation notes</b>: This implementation restricts the
105 * maximum number of running threads to 32767. Attempts to create
106 * pools with greater than the maximum number result in
107 * {@code IllegalArgumentException}.
108 *
109 * <p>This implementation rejects submitted tasks (that is, by throwing
110 * {@link RejectedExecutionException}) only when the pool is shut down
111 * or internal resources have been exhausted.
112 *
113 * @since 1.7
114 * @author Doug Lea
115 */
116 public class ForkJoinPool extends AbstractExecutorService {
117
118 /*
119 * Implementation Overview
120 *
121 * This class provides the central bookkeeping and control for a
122 * set of worker threads: Submissions from non-FJ threads enter
123 * into a submission queue. Workers take these tasks and typically
124 * split them into subtasks that may be stolen by other workers.
125 * The main work-stealing mechanics implemented in class
126 * ForkJoinWorkerThread give first priority to processing tasks
127 * from their own queues (LIFO or FIFO, depending on mode), then
128 * to randomized FIFO steals of tasks in other worker queues, and
129 * lastly to new submissions. These mechanics do not consider
130 * affinities, loads, cache localities, etc, so rarely provide the
131 * best possible performance on a given machine, but portably
132 * provide good throughput by averaging over these factors.
133 * (Further, even if we did try to use such information, we do not
134 * usually have a basis for exploiting it. For example, some sets
135 * of tasks profit from cache affinities, but others are harmed by
136 * cache pollution effects.)
137 *
138 * Beyond work-stealing support and essential bookkeeping, the
139 * main responsibility of this framework is to take actions when
140 * one worker is waiting to join a task stolen (or always held by)
141 * another. Becauae we are multiplexing many tasks on to a pool
142 * of workers, we can't just let them block (as in Thread.join).
143 * We also cannot just reassign the joiner's run-time stack with
144 * another and replace it later, which would be a form of
145 * "continuation", that even if possible is not necessarily a good
146 * idea. Given that the creation costs of most threads on most
147 * systems mainly surrounds setting up runtime stacks, thread
148 * creation and switching is usually not much more expensive than
149 * stack creation and switching, and is more flexible). Instead we
150 * combine two tactics:
151 *
152 * Helping: Arranging for the joiner to execute some task that it
153 * would be running if the steal had not occurred. Method
154 * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
155 * links to try to find such a task.
156 *
157 * Compensating: Unless there are already enough live threads,
158 * method helpMaintainParallelism() may create or or
159 * re-activate a spare thread to compensate for blocked
160 * joiners until they unblock.
161 *
162 * Because the determining existence of conservatively safe
163 * helping targets, the availability of already-created spares,
164 * and the apparent need to create new spares are all racy and
165 * require heuristic guidance, we rely on multiple retries of
166 * each. Further, because it is impossible to keep exactly the
167 * target (parallelism) number of threads running at any given
168 * time, we allow compensation during joins to fail, and enlist
169 * all other threads to help out whenever they are not otherwise
170 * occupied (i.e., mainly in method preStep).
171 *
172 * The ManagedBlocker extension API can't use helping so relies
173 * only on compensation in method awaitBlocker.
174 *
175 * The main throughput advantages of work-stealing stem from
176 * decentralized control -- workers mostly steal tasks from each
177 * other. We do not want to negate this by creating bottlenecks
178 * implementing other management responsibilities. So we use a
179 * collection of techniques that avoid, reduce, or cope well with
180 * contention. These entail several instances of bit-packing into
181 * CASable fields to maintain only the minimally required
182 * atomicity. To enable such packing, we restrict maximum
183 * parallelism to (1<<15)-1 (enabling twice this (to accommodate
184 * unbalanced increments and decrements) to fit into a 16 bit
185 * field, which is far in excess of normal operating range. Even
186 * though updates to some of these bookkeeping fields do sometimes
187 * contend with each other, they don't normally cache-contend with
188 * updates to others enough to warrant memory padding or
189 * isolation. So they are all held as fields of ForkJoinPool
190 * objects. The main capabilities are as follows:
191 *
192 * 1. Creating and removing workers. Workers are recorded in the
193 * "workers" array. This is an array as opposed to some other data
194 * structure to support index-based random steals by workers.
195 * Updates to the array recording new workers and unrecording
196 * terminated ones are protected from each other by a lock
197 * (workerLock) but the array is otherwise concurrently readable,
198 * and accessed directly by workers. To simplify index-based
199 * operations, the array size is always a power of two, and all
200 * readers must tolerate null slots. Currently, all worker thread
201 * creation is on-demand, triggered by task submissions,
202 * replacement of terminated workers, and/or compensation for
203 * blocked workers. However, all other support code is set up to
204 * work with other policies.
205 *
206 * To ensure that we do not hold on to worker references that
207 * would prevent GC, ALL accesses to workers are via indices into
208 * the workers array (which is one source of some of the unusual
209 * code constructions here). In essence, the workers array serves
210 * as a WeakReference mechanism. Thus for example the event queue
211 * stores worker indices, not worker references. Access to the
212 * workers in associated methods (for example releaseEventWaiters)
213 * must both index-check and null-check the IDs. All such accesses
214 * ignore bad IDs by returning out early from what they are doing,
215 * since this can only be associated with shutdown, in which case
216 * it is OK to give up. On termination, we just clobber these
217 * data structures without trying to use them.
218 *
219 * 2. Bookkeeping for dynamically adding and removing workers. We
220 * aim to approximately maintain the given level of parallelism.
221 * When some workers are known to be blocked (on joins or via
222 * ManagedBlocker), we may create or resume others to take their
223 * place until they unblock (see below). Implementing this
224 * requires counts of the number of "running" threads (i.e., those
225 * that are neither blocked nor artifically suspended) as well as
226 * the total number. These two values are packed into one field,
227 * "workerCounts" because we need accurate snapshots when deciding
228 * to create, resume or suspend. Note however that the
229 * correspondance of these counts to reality is not guaranteed. In
230 * particular updates for unblocked threads may lag until they
231 * actually wake up.
232 *
233 * 3. Maintaining global run state. The run state of the pool
234 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
235 * those in other Executor implementations, as well as a count of
236 * "active" workers -- those that are, or soon will be, or
237 * recently were executing tasks. The runLevel and active count
238 * are packed together in order to correctly trigger shutdown and
239 * termination. Without care, active counts can be subject to very
240 * high contention. We substantially reduce this contention by
241 * relaxing update rules. A worker must claim active status
242 * prospectively, by activating if it sees that a submitted or
243 * stealable task exists (it may find after activating that the
244 * task no longer exists). It stays active while processing this
245 * task (if it exists) and any other local subtasks it produces,
246 * until it cannot find any other tasks. It then tries
247 * inactivating (see method preStep), but upon update contention
248 * instead scans for more tasks, later retrying inactivation if it
249 * doesn't find any.
250 *
251 * 4. Managing idle workers waiting for tasks. We cannot let
252 * workers spin indefinitely scanning for tasks when none are
253 * available. On the other hand, we must quickly prod them into
254 * action when new tasks are submitted or generated. We
255 * park/unpark these idle workers using an event-count scheme.
256 * Field eventCount is incremented upon events that may enable
257 * workers that previously could not find a task to now find one:
258 * Submission of a new task to the pool, or another worker pushing
259 * a task onto a previously empty queue. (We also use this
260 * mechanism for termination actions that require wakeups of idle
261 * workers). Each worker maintains its last known event count,
262 * and blocks when a scan for work did not find a task AND its
263 * lastEventCount matches the current eventCount. Waiting idle
264 * workers are recorded in a variant of Treiber stack headed by
265 * field eventWaiters which, when nonzero, encodes the thread
266 * index and count awaited for by the worker thread most recently
267 * calling eventSync. This thread in turn has a record (field
268 * nextEventWaiter) for the next waiting worker. In addition to
269 * allowing simpler decisions about need for wakeup, the event
270 * count bits in eventWaiters serve the role of tags to avoid ABA
271 * errors in Treiber stacks. To reduce delays in task diffusion,
272 * workers not otherwise occupied may invoke method
273 * releaseEventWaiters, that removes and signals (unparks) workers
274 * not waiting on current count. To reduce stalls, To minimize
275 * task production stalls associate with signalling, any worker
276 * pushing a task on an empty queue invokes the weaker method
277 * signalWork, that only releases idle workers until it detects
278 * interference by other threads trying to release, and lets them
279 * take over. The net effect is a tree-like diffusion of signals,
280 * where released threads (and possibly others) help with unparks.
281 * To further reduce contention effects a bit, failed CASes to
282 * increment field eventCount are tolerated without retries.
283 * Conceptually they are merged into the same event, which is OK
284 * when their only purpose is to enable workers to scan for work.
285 *
286 * 5. Managing suspension of extra workers. When a worker is about
287 * to block waiting for a join (or via ManagedBlockers), we may
288 * create a new thread to maintain parallelism level, or at least
289 * avoid starvation. Usually, extra threads are needed for only
290 * very short periods, yet join dependencies are such that we
291 * sometimes need them in bursts. Rather than create new threads
292 * each time this happens, we suspend no-longer-needed extra ones
293 * as "spares". For most purposes, we don't distinguish "extra"
294 * spare threads from normal "core" threads: On each call to
295 * preStep (the only point at which we can do this) a worker
296 * checks to see if there are now too many running workers, and if
297 * so, suspends itself. Method helpMaintainParallelism looks for
298 * suspended threads to resume before considering creating a new
299 * replacement. The spares themselves are encoded on another
300 * variant of a Treiber Stack, headed at field "spareWaiters".
301 * Note that the use of spares is intrinsically racy. One thread
302 * may become a spare at about the same time as another is
303 * needlessly being created. We counteract this and related slop
304 * in part by requiring resumed spares to immediately recheck (in
305 * preStep) to see whether they they should re-suspend. To avoid
306 * long-term build-up of spares, the oldest spare (see
307 * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if
308 * not signalled and calls tryTrimSpare, which uses two different
309 * thresholds: Always killing if the number of spares is greater
310 * that 25% of total, and killing others only at a slower rate
311 * (UNUSED_SPARE_TRIM_RATE_NANOS).
312 *
313 * 6. Deciding when to create new workers. The main dynamic
314 * control in this class is deciding when to create extra threads
315 * in method helpMaintainParallelism. We would like to keep
316 * exactly #parallelism threads running, which is an impossble
317 * task. We always need to create one when the number of running
318 * threads would become zero and all workers are busy. Beyond
319 * this, we must rely on heuristics that work well in the the
320 * presence of transients phenomena such as GC stalls, dynamic
321 * compilation, and wake-up lags. These transients are extremely
322 * common -- we are normally trying to fully saturate the CPUs on
323 * a machine, so almost any activity other than running tasks
324 * impedes accuracy. Our main defense is to allow some slack in
325 * creation thresholds, using rules that reflect the fact that the
326 * more threads we have running, the more likely that we are
327 * underestimating the number running threads. The rules also
328 * better cope with the fact that some of the methods in this
329 * class tend to never become compiled (but are interpreted), so
330 * some components of the entire set of controls might execute 100
331 * times faster than others. And similarly for cases where the
332 * apparent lack of work is just due to GC stalls and other
333 * transient system activity.
334 *
335 * Beware that there is a lot of representation-level coupling
336 * among classes ForkJoinPool, ForkJoinWorkerThread, and
337 * ForkJoinTask. For example, direct access to "workers" array by
338 * workers, and direct access to ForkJoinTask.status by both
339 * ForkJoinPool and ForkJoinWorkerThread. There is little point
340 * trying to reduce this, since any associated future changes in
341 * representations will need to be accompanied by algorithmic
342 * changes anyway.
343 *
344 * Style notes: There are lots of inline assignments (of form
345 * "while ((local = field) != 0)") which are usually the simplest
346 * way to ensure the required read orderings (which are sometimes
347 * critical). Also several occurrences of the unusual "do {}
348 * while(!cas...)" which is the simplest way to force an update of
349 * a CAS'ed variable. There are also other coding oddities that
350 * help some methods perform reasonably even when interpreted (not
351 * compiled), at the expense of some messy constructions that
352 * reduce byte code counts.
353 *
354 * The order of declarations in this file is: (1) statics (2)
355 * fields (along with constants used when unpacking some of them)
356 * (3) internal control methods (4) callbacks and other support
357 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
358 * methods (plus a few little helpers).
359 */
360
361 /**
362 * Factory for creating new {@link ForkJoinWorkerThread}s.
363 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
364 * for {@code ForkJoinWorkerThread} subclasses that extend base
365 * functionality or initialize threads with different contexts.
366 */
367 public static interface ForkJoinWorkerThreadFactory {
368 /**
369 * Returns a new worker thread operating in the given pool.
370 *
371 * @param pool the pool this thread works in
372 * @throws NullPointerException if the pool is null
373 */
374 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
375 }
376
377 /**
378 * Default ForkJoinWorkerThreadFactory implementation; creates a
379 * new ForkJoinWorkerThread.
380 */
381 static class DefaultForkJoinWorkerThreadFactory
382 implements ForkJoinWorkerThreadFactory {
383 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
384 return new ForkJoinWorkerThread(pool);
385 }
386 }
387
388 /**
389 * Creates a new ForkJoinWorkerThread. This factory is used unless
390 * overridden in ForkJoinPool constructors.
391 */
392 public static final ForkJoinWorkerThreadFactory
393 defaultForkJoinWorkerThreadFactory =
394 new DefaultForkJoinWorkerThreadFactory();
395
396 /**
397 * Permission required for callers of methods that may start or
398 * kill threads.
399 */
400 private static final RuntimePermission modifyThreadPermission =
401 new RuntimePermission("modifyThread");
402
403 /**
404 * If there is a security manager, makes sure caller has
405 * permission to modify threads.
406 */
407 private static void checkPermission() {
408 SecurityManager security = System.getSecurityManager();
409 if (security != null)
410 security.checkPermission(modifyThreadPermission);
411 }
412
413 /**
414 * Generator for assigning sequence numbers as pool names.
415 */
416 private static final AtomicInteger poolNumberGenerator =
417 new AtomicInteger();
418
419 /**
420 * Absolute bound for parallelism level. Twice this number plus
421 * one (i.e., 0xfff) must fit into a 16bit field to enable
422 * word-packing for some counts and indices.
423 */
424 private static final int MAX_WORKERS = 0x7fff;
425
426 /**
427 * Array holding all worker threads in the pool. Array size must
428 * be a power of two. Updates and replacements are protected by
429 * workerLock, but the array is always kept in a consistent enough
430 * state to be randomly accessed without locking by workers
431 * performing work-stealing, as well as other traversal-based
432 * methods in this class. All readers must tolerate that some
433 * array slots may be null.
434 */
435 volatile ForkJoinWorkerThread[] workers;
436
437 /**
438 * Queue for external submissions.
439 */
440 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
441
442 /**
443 * Lock protecting updates to workers array.
444 */
445 private final ReentrantLock workerLock;
446
447 /**
448 * Latch released upon termination.
449 */
450 private final Phaser termination;
451
452 /**
453 * Creation factory for worker threads.
454 */
455 private final ForkJoinWorkerThreadFactory factory;
456
457 /**
458 * Sum of per-thread steal counts, updated only when threads are
459 * idle or terminating.
460 */
461 private volatile long stealCount;
462
463 /**
464 * The last nanoTime that a spare thread was trimmed
465 */
466 private volatile long trimTime;
467
468 /**
469 * The rate at which to trim unused spares
470 */
471 static final long UNUSED_SPARE_TRIM_RATE_NANOS =
472 1000L * 1000L * 1000L; // 1 sec
473
474 /**
475 * Encoded record of top of treiber stack of threads waiting for
476 * events. The top 32 bits contain the count being waited for. The
477 * bottom 16 bits contains one plus the pool index of waiting
478 * worker thread. (Bits 16-31 are unused.)
479 */
480 private volatile long eventWaiters;
481
482 private static final int EVENT_COUNT_SHIFT = 32;
483 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
484
485 /**
486 * A counter for events that may wake up worker threads:
487 * - Submission of a new task to the pool
488 * - A worker pushing a task on an empty queue
489 * - termination
490 */
491 private volatile int eventCount;
492
493 /**
494 * Encoded record of top of treiber stack of spare threads waiting
495 * for resumption. The top 16 bits contain an arbitrary count to
496 * avoid ABA effects. The bottom 16bits contains one plus the pool
497 * index of waiting worker thread.
498 */
499 private volatile int spareWaiters;
500
501 private static final int SPARE_COUNT_SHIFT = 16;
502 private static final int SPARE_ID_MASK = (1 << 16) - 1;
503
504 /**
505 * Lifecycle control. The low word contains the number of workers
506 * that are (probably) executing tasks. This value is atomically
507 * incremented before a worker gets a task to run, and decremented
508 * when worker has no tasks and cannot find any. Bits 16-18
509 * contain runLevel value. When all are zero, the pool is
510 * running. Level transitions are monotonic (running -> shutdown
511 * -> terminating -> terminated) so each transition adds a bit.
512 * These are bundled together to ensure consistent read for
513 * termination checks (i.e., that runLevel is at least SHUTDOWN
514 * and active threads is zero).
515 */
516 private volatile int runState;
517
518 // Note: The order among run level values matters.
519 private static final int RUNLEVEL_SHIFT = 16;
520 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
521 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
522 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
523 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
524 private static final int ONE_ACTIVE = 1; // active update delta
525
526 /**
527 * Holds number of total (i.e., created and not yet terminated)
528 * and running (i.e., not blocked on joins or other managed sync)
529 * threads, packed together to ensure consistent snapshot when
530 * making decisions about creating and suspending spare
531 * threads. Updated only by CAS. Note that adding a new worker
532 * requires incrementing both counts, since workers start off in
533 * running state.
534 */
535 private volatile int workerCounts;
536
537 private static final int TOTAL_COUNT_SHIFT = 16;
538 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
539 private static final int ONE_RUNNING = 1;
540 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
541
542 /**
543 * The target parallelism level.
544 * Accessed directly by ForkJoinWorkerThreads.
545 */
546 final int parallelism;
547
548 /**
549 * True if use local fifo, not default lifo, for local polling
550 * Read by, and replicated by ForkJoinWorkerThreads
551 */
552 final boolean locallyFifo;
553
554 /**
555 * The uncaught exception handler used when any worker abruptly
556 * terminates.
557 */
558 private final Thread.UncaughtExceptionHandler ueh;
559
560 /**
561 * Pool number, just for assigning useful names to worker threads
562 */
563 private final int poolNumber;
564
565
566 // Utilities for CASing fields. Note that several of these
567 // are manually inlined by callers
568
569 /**
570 * Increments running count part of workerCounts
571 */
572 final void incrementRunningCount() {
573 int c;
574 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
575 c = workerCounts,
576 c + ONE_RUNNING));
577 }
578
579 /**
580 * Tries to decrement running count unless already zero
581 */
582 final boolean tryDecrementRunningCount() {
583 int wc = workerCounts;
584 if ((wc & RUNNING_COUNT_MASK) == 0)
585 return false;
586 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
587 wc, wc - ONE_RUNNING);
588 }
589
590 /**
591 * Forces decrement of encoded workerCounts, awaiting nonzero if
592 * (rarely) necessary when other count updates lag.
593 *
594 * @param dr -- either zero or ONE_RUNNING
595 * @param dt == either zero or ONE_TOTAL
596 */
597 private void decrementWorkerCounts(int dr, int dt) {
598 for (;;) {
599 int wc = workerCounts;
600 if (wc == 0 && (runState & TERMINATED) != 0)
601 return; // lagging termination on a backout
602 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
603 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0)
604 Thread.yield();
605 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
606 wc, wc - (dr + dt)))
607 return;
608 }
609 }
610
611 /**
612 * Increments event count
613 */
614 private void advanceEventCount() {
615 int c;
616 do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
617 c = eventCount, c+1));
618 }
619
620 /**
621 * Tries incrementing active count; fails on contention.
622 * Called by workers before executing tasks.
623 *
624 * @return true on success
625 */
626 final boolean tryIncrementActiveCount() {
627 int c;
628 return UNSAFE.compareAndSwapInt(this, runStateOffset,
629 c = runState, c + ONE_ACTIVE);
630 }
631
632 /**
633 * Tries decrementing active count; fails on contention.
634 * Called when workers cannot find tasks to run.
635 */
636 final boolean tryDecrementActiveCount() {
637 int c;
638 return UNSAFE.compareAndSwapInt(this, runStateOffset,
639 c = runState, c - ONE_ACTIVE);
640 }
641
642 /**
643 * Advances to at least the given level. Returns true if not
644 * already in at least the given level.
645 */
646 private boolean advanceRunLevel(int level) {
647 for (;;) {
648 int s = runState;
649 if ((s & level) != 0)
650 return false;
651 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
652 return true;
653 }
654 }
655
656 // workers array maintenance
657
658 /**
659 * Records and returns a workers array index for new worker.
660 */
661 private int recordWorker(ForkJoinWorkerThread w) {
662 // Try using slot totalCount-1. If not available, scan and/or resize
663 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
664 final ReentrantLock lock = this.workerLock;
665 lock.lock();
666 try {
667 ForkJoinWorkerThread[] ws = workers;
668 int n = ws.length;
669 if (k < 0 || k >= n || ws[k] != null) {
670 for (k = 0; k < n && ws[k] != null; ++k)
671 ;
672 if (k == n)
673 ws = Arrays.copyOf(ws, n << 1);
674 }
675 ws[k] = w;
676 workers = ws; // volatile array write ensures slot visibility
677 } finally {
678 lock.unlock();
679 }
680 return k;
681 }
682
683 /**
684 * Nulls out record of worker in workers array
685 */
686 private void forgetWorker(ForkJoinWorkerThread w) {
687 int idx = w.poolIndex;
688 // Locking helps method recordWorker avoid unecessary expansion
689 final ReentrantLock lock = this.workerLock;
690 lock.lock();
691 try {
692 ForkJoinWorkerThread[] ws = workers;
693 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
694 ws[idx] = null;
695 } finally {
696 lock.unlock();
697 }
698 }
699
700 // adding and removing workers
701
702 /**
703 * Tries to create and add new worker. Assumes that worker counts
704 * are already updated to accommodate the worker, so adjusts on
705 * failure.
706 */
707 private void addWorker() {
708 ForkJoinWorkerThread w = null;
709 try {
710 w = factory.newThread(this);
711 } finally { // Adjust on either null or exceptional factory return
712 if (w == null) {
713 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
714 tryTerminate(false); // in case of failure during shutdown
715 }
716 }
717 if (w != null)
718 w.start(recordWorker(w), ueh);
719 }
720
721 /**
722 * Final callback from terminating worker. Removes record of
723 * worker from array, and adjusts counts. If pool is shutting
724 * down, tries to complete terminatation.
725 *
726 * @param w the worker
727 */
728 final void workerTerminated(ForkJoinWorkerThread w) {
729 forgetWorker(w);
730 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
731 while (w.stealCount != 0) // collect final count
732 tryAccumulateStealCount(w);
733 tryTerminate(false);
734 }
735
736 // Waiting for and signalling events
737
738 /**
739 * Releases workers blocked on a count not equal to current count.
740 * Normally called after precheck that eventWaiters isn't zero to
741 * avoid wasted array checks.
742 *
743 * @param signalling true if caller is a signalling worker so can
744 * exit upon (conservatively) detected contention by other threads
745 * who will continue to release
746 */
747 private void releaseEventWaiters(boolean signalling) {
748 ForkJoinWorkerThread[] ws = workers;
749 int n = ws.length;
750 long h; // head of stack
751 ForkJoinWorkerThread w; int id, ec;
752 while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 &&
753 (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) &&
754 id < n && (w = ws[id]) != null) {
755 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
756 h, h = w.nextWaiter))
757 LockSupport.unpark(w);
758 if (signalling && (eventCount != ec || eventWaiters != h))
759 break;
760 }
761 }
762
763 /**
764 * Tries to advance eventCount and releases waiters. Called only
765 * from workers.
766 */
767 final void signalWork() {
768 int c; // try to increment event count -- CAS failure OK
769 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
770 if (eventWaiters != 0L)
771 releaseEventWaiters(true);
772 }
773
774 /**
775 * Blocks worker until terminating or event count
776 * advances from last value held by worker
777 *
778 * @param w the calling worker thread
779 */
780 private void eventSync(ForkJoinWorkerThread w) {
781 int wec = w.lastEventCount;
782 long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
783 long h;
784 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
785 ((h = eventWaiters) == 0L ||
786 (int)(h >>> EVENT_COUNT_SHIFT) == wec) &&
787 eventCount == wec) {
788 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
789 w.nextWaiter = h, nh)) {
790 while (runState < TERMINATING && eventCount == wec) {
791 if (!tryAccumulateStealCount(w)) // transfer while idle
792 continue;
793 Thread.interrupted(); // clear/ignore interrupt
794 if (eventCount != wec)
795 break;
796 LockSupport.park(w);
797 }
798 break;
799 }
800 }
801 w.lastEventCount = eventCount;
802 }
803
804 // Maintaining spares
805
806 /**
807 * Pushes worker onto the spare stack
808 */
809 final void pushSpare(ForkJoinWorkerThread w) {
810 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1);
811 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
812 w.nextSpare = spareWaiters,ns));
813 }
814
815 /**
816 * Tries (once) to resume a spare if running count is less than
817 * target parallelism. Fails on contention or stale workers.
818 */
819 private void tryResumeSpare() {
820 int sw, id;
821 ForkJoinWorkerThread w;
822 ForkJoinWorkerThread[] ws;
823 if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
824 id < (ws = workers).length && (w = ws[id]) != null &&
825 (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
826 eventWaiters == 0L &&
827 spareWaiters == sw &&
828 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
829 sw, w.nextSpare) &&
830 w.tryUnsuspend()) {
831 int c; // try increment; if contended, finish after unpark
832 boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
833 c = workerCounts,
834 c + ONE_RUNNING);
835 LockSupport.unpark(w);
836 if (!inc) {
837 do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
838 c = workerCounts,
839 c + ONE_RUNNING));
840 }
841 }
842 }
843
844 /**
845 * Callback from oldest spare occasionally waking up. Tries
846 * (once) to shutdown a spare if more than 25% spare overage, or
847 * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at
848 * least #parallelism running threads. Note that we don't need CAS
849 * or locks here because the method is called only from the oldest
850 * suspended spare occasionally waking (and even misfires are OK).
851 *
852 * @param now the wake up nanoTime of caller
853 */
854 final void tryTrimSpare(long now) {
855 long lastTrim = trimTime;
856 trimTime = now;
857 helpMaintainParallelism(); // first, help wake up any needed spares
858 int sw, id;
859 ForkJoinWorkerThread w;
860 ForkJoinWorkerThread[] ws;
861 int pc = parallelism;
862 int wc = workerCounts;
863 if ((wc & RUNNING_COUNT_MASK) >= pc &&
864 (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25%
865 now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) &&
866 (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
867 id < (ws = workers).length && (w = ws[id]) != null &&
868 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
869 sw, w.nextSpare))
870 w.shutdown(false);
871 }
872
873 /**
874 * Does at most one of:
875 *
876 * 1. Help wake up existing workers waiting for work via
877 * releaseEventWaiters. (If any exist, then it probably doesn't
878 * matter right now if under target parallelism level.)
879 *
880 * 2. If below parallelism level and a spare exists, try (once)
881 * to resume it via tryResumeSpare.
882 *
883 * 3. If neither of the above, tries (once) to add a new
884 * worker if either there are not enough total, or if all
885 * existing workers are busy, there are either no running
886 * workers or the deficit is at least twice the surplus.
887 */
888 private void helpMaintainParallelism() {
889 // uglified to work better when not compiled
890 int pc, wc, rc, tc, rs; long h;
891 if ((h = eventWaiters) != 0L) {
892 if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
893 releaseEventWaiters(false); // avoid useless call
894 }
895 else if ((pc = parallelism) >
896 (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) {
897 if (spareWaiters != 0)
898 tryResumeSpare();
899 else if ((rs = runState) < TERMINATING &&
900 ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc ||
901 (tc == (rs & ACTIVE_COUNT_MASK) && // all busy
902 (rc == 0 || // must add
903 rc < pc - ((tc - pc) << 1)) && // within slack
904 tc < MAX_WORKERS && runState == rs)) && // recheck busy
905 workerCounts == wc &&
906 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
907 wc + (ONE_RUNNING|ONE_TOTAL)))
908 addWorker();
909 }
910 }
911
912 /**
913 * Callback from workers invoked upon each top-level action (i.e.,
914 * stealing a task or taking a submission and running
915 * it). Performs one or more of the following:
916 *
917 * 1. If the worker cannot find work (misses > 0), updates its
918 * active status to inactive and updates activeCount unless
919 * this is the first miss and there is contention, in which
920 * case it may try again (either in this or a subsequent
921 * call).
922 *
923 * 2. If there are at least 2 misses, awaits the next task event
924 * via eventSync
925 *
926 * 3. If there are too many running threads, suspends this worker
927 * (first forcing inactivation if necessary). If it is not
928 * needed, it may be killed while suspended via
929 * tryTrimSpare. Otherwise, upon resume it rechecks to make
930 * sure that it is still needed.
931 *
932 * 4. Helps release and/or reactivate other workers via
933 * helpMaintainParallelism
934 *
935 * @param w the worker
936 * @param misses the number of scans by caller failing to find work
937 * (saturating at 2 just to avoid wraparound)
938 */
939 final void preStep(ForkJoinWorkerThread w, int misses) {
940 boolean active = w.active;
941 int pc = parallelism;
942 for (;;) {
943 int wc = workerCounts;
944 int rc = wc & RUNNING_COUNT_MASK;
945 if (active && (misses > 0 || rc > pc)) {
946 int rs; // try inactivate
947 if (UNSAFE.compareAndSwapInt(this, runStateOffset,
948 rs = runState, rs - ONE_ACTIVE))
949 active = w.active = false;
950 else if (misses > 1 || rc > pc ||
951 (rs & ACTIVE_COUNT_MASK) >= pc)
952 continue; // force inactivate
953 }
954 if (misses > 1) {
955 misses = 0; // don't re-sync
956 eventSync(w); // continue loop to recheck rc
957 }
958 else if (rc > pc) {
959 if (workerCounts == wc && // try to suspend as spare
960 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
961 wc, wc - ONE_RUNNING) &&
962 !w.suspendAsSpare()) // false if killed
963 break;
964 }
965 else {
966 if (rc < pc || eventWaiters != 0L)
967 helpMaintainParallelism();
968 break;
969 }
970 }
971 }
972
973 /**
974 * Helps and/or blocks awaiting join of the given task.
975 * Alternates between helpJoinTask() and helpMaintainParallelism()
976 * as many times as there is a deficit in running count (or longer
977 * if running count would become zero), then blocks if task still
978 * not done.
979 *
980 * @param joinMe the task to join
981 */
982 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
983 int threshold = parallelism; // descend blocking thresholds
984 while (joinMe.status >= 0) {
985 boolean block; int wc;
986 worker.helpJoinTask(joinMe);
987 if (joinMe.status < 0)
988 break;
989 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
990 if (threshold > 0)
991 --threshold;
992 else
993 advanceEventCount(); // force release
994 block = false;
995 }
996 else
997 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
998 wc, wc - ONE_RUNNING);
999 helpMaintainParallelism();
1000 if (block) {
1001 int c;
1002 joinMe.internalAwaitDone();
1003 do {} while (!UNSAFE.compareAndSwapInt
1004 (this, workerCountsOffset,
1005 c = workerCounts, c + ONE_RUNNING));
1006 break;
1007 }
1008 }
1009 }
1010
1011 /**
1012 * Same idea as awaitJoin, but no helping
1013 */
1014 final void awaitBlocker(ManagedBlocker blocker)
1015 throws InterruptedException {
1016 int threshold = parallelism;
1017 while (!blocker.isReleasable()) {
1018 boolean block; int wc;
1019 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1020 if (threshold > 0)
1021 --threshold;
1022 else
1023 advanceEventCount();
1024 block = false;
1025 }
1026 else
1027 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1028 wc, wc - ONE_RUNNING);
1029 helpMaintainParallelism();
1030 if (block) {
1031 try {
1032 do {} while (!blocker.isReleasable() && !blocker.block());
1033 } finally {
1034 int c;
1035 do {} while (!UNSAFE.compareAndSwapInt
1036 (this, workerCountsOffset,
1037 c = workerCounts, c + ONE_RUNNING));
1038 }
1039 break;
1040 }
1041 }
1042 }
1043
1044 /**
1045 * Possibly initiates and/or completes termination.
1046 *
1047 * @param now if true, unconditionally terminate, else only
1048 * if shutdown and empty queue and no active workers
1049 * @return true if now terminating or terminated
1050 */
1051 private boolean tryTerminate(boolean now) {
1052 if (now)
1053 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1054 else if (runState < SHUTDOWN ||
1055 !submissionQueue.isEmpty() ||
1056 (runState & ACTIVE_COUNT_MASK) != 0)
1057 return false;
1058
1059 if (advanceRunLevel(TERMINATING))
1060 startTerminating();
1061
1062 // Finish now if all threads terminated; else in some subsequent call
1063 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1064 advanceRunLevel(TERMINATED);
1065 termination.arrive();
1066 }
1067 return true;
1068 }
1069
1070 /**
1071 * Actions on transition to TERMINATING
1072 *
1073 * Runs up to four passes through workers: (0) shutting down each
1074 * quietly (without waking up if parked) to quickly spread
1075 * notifications without unnecessary bouncing around event queues
1076 * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up
1077 * races with interrupted workers
1078 */
1079 private void startTerminating() {
1080 cancelSubmissions();
1081 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1082 advanceEventCount();
1083 eventWaiters = 0L; // clobber lists
1084 spareWaiters = 0;
1085 ForkJoinWorkerThread[] ws = workers;
1086 int n = ws.length;
1087 for (int i = 0; i < n; ++i) {
1088 ForkJoinWorkerThread w = ws[i];
1089 if (w != null) {
1090 w.shutdown(true);
1091 if (passes > 0 && !w.isTerminated()) {
1092 w.cancelTasks();
1093 LockSupport.unpark(w);
1094 if (passes > 1) {
1095 try {
1096 w.interrupt();
1097 } catch (SecurityException ignore) {
1098 }
1099 }
1100 }
1101 }
1102 }
1103 }
1104 }
1105
1106 /**
1107 * Clear out and cancel submissions, ignoring exceptions
1108 */
1109 private void cancelSubmissions() {
1110 ForkJoinTask<?> task;
1111 while ((task = submissionQueue.poll()) != null) {
1112 try {
1113 task.cancel(false);
1114 } catch (Throwable ignore) {
1115 }
1116 }
1117 }
1118
1119 // misc support for ForkJoinWorkerThread
1120
1121 /**
1122 * Returns pool number
1123 */
1124 final int getPoolNumber() {
1125 return poolNumber;
1126 }
1127
1128 /**
1129 * Tries to accumulates steal count from a worker, clearing
1130 * the worker's value.
1131 *
1132 * @return true if worker steal count now zero
1133 */
1134 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1135 int sc = w.stealCount;
1136 long c = stealCount;
1137 // CAS even if zero, for fence effects
1138 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1139 if (sc != 0)
1140 w.stealCount = 0;
1141 return true;
1142 }
1143 return sc == 0;
1144 }
1145
1146 /**
1147 * Returns the approximate (non-atomic) number of idle threads per
1148 * active thread.
1149 */
1150 final int idlePerActive() {
1151 int pc = parallelism; // use parallelism, not rc
1152 int ac = runState; // no mask -- artifically boosts during shutdown
1153 // Use exact results for small values, saturate past 4
1154 return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1155 }
1156
1157 // Public and protected methods
1158
1159 // Constructors
1160
1161 /**
1162 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1163 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1164 * #defaultForkJoinWorkerThreadFactory default thread factory},
1165 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1166 *
1167 * @throws SecurityException if a security manager exists and
1168 * the caller is not permitted to modify threads
1169 * because it does not hold {@link
1170 * java.lang.RuntimePermission}{@code ("modifyThread")}
1171 */
1172 public ForkJoinPool() {
1173 this(Runtime.getRuntime().availableProcessors(),
1174 defaultForkJoinWorkerThreadFactory, null, false);
1175 }
1176
1177 /**
1178 * Creates a {@code ForkJoinPool} with the indicated parallelism
1179 * level, the {@linkplain
1180 * #defaultForkJoinWorkerThreadFactory default thread factory},
1181 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1182 *
1183 * @param parallelism the parallelism level
1184 * @throws IllegalArgumentException if parallelism less than or
1185 * equal to zero, or greater than implementation limit
1186 * @throws SecurityException if a security manager exists and
1187 * the caller is not permitted to modify threads
1188 * because it does not hold {@link
1189 * java.lang.RuntimePermission}{@code ("modifyThread")}
1190 */
1191 public ForkJoinPool(int parallelism) {
1192 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1193 }
1194
1195 /**
1196 * Creates a {@code ForkJoinPool} with the given parameters.
1197 *
1198 * @param parallelism the parallelism level. For default value,
1199 * use {@link java.lang.Runtime#availableProcessors}.
1200 * @param factory the factory for creating new threads. For default value,
1201 * use {@link #defaultForkJoinWorkerThreadFactory}.
1202 * @param handler the handler for internal worker threads that
1203 * terminate due to unrecoverable errors encountered while executing
1204 * tasks. For default value, use <code>null</code>.
1205 * @param asyncMode if true,
1206 * establishes local first-in-first-out scheduling mode for forked
1207 * tasks that are never joined. This mode may be more appropriate
1208 * than default locally stack-based mode in applications in which
1209 * worker threads only process event-style asynchronous tasks.
1210 * For default value, use <code>false</code>.
1211 * @throws IllegalArgumentException if parallelism less than or
1212 * equal to zero, or greater than implementation limit
1213 * @throws NullPointerException if the factory is null
1214 * @throws SecurityException if a security manager exists and
1215 * the caller is not permitted to modify threads
1216 * because it does not hold {@link
1217 * java.lang.RuntimePermission}{@code ("modifyThread")}
1218 */
1219 public ForkJoinPool(int parallelism,
1220 ForkJoinWorkerThreadFactory factory,
1221 Thread.UncaughtExceptionHandler handler,
1222 boolean asyncMode) {
1223 checkPermission();
1224 if (factory == null)
1225 throw new NullPointerException();
1226 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1227 throw new IllegalArgumentException();
1228 this.parallelism = parallelism;
1229 this.factory = factory;
1230 this.ueh = handler;
1231 this.locallyFifo = asyncMode;
1232 int arraySize = initialArraySizeFor(parallelism);
1233 this.workers = new ForkJoinWorkerThread[arraySize];
1234 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1235 this.workerLock = new ReentrantLock();
1236 this.termination = new Phaser(1);
1237 this.poolNumber = poolNumberGenerator.incrementAndGet();
1238 this.trimTime = System.nanoTime();
1239 }
1240
1241 /**
1242 * Returns initial power of two size for workers array.
1243 * @param pc the initial parallelism level
1244 */
1245 private static int initialArraySizeFor(int pc) {
1246 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1247 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1248 size |= size >>> 1;
1249 size |= size >>> 2;
1250 size |= size >>> 4;
1251 size |= size >>> 8;
1252 return size + 1;
1253 }
1254
1255 // Execution methods
1256
1257 /**
1258 * Common code for execute, invoke and submit
1259 */
1260 private <T> void doSubmit(ForkJoinTask<T> task) {
1261 if (task == null)
1262 throw new NullPointerException();
1263 if (runState >= SHUTDOWN)
1264 throw new RejectedExecutionException();
1265 submissionQueue.offer(task);
1266 advanceEventCount();
1267 helpMaintainParallelism(); // start or wake up workers
1268 }
1269
1270 /**
1271 * Performs the given task, returning its result upon completion.
1272 *
1273 * @param task the task
1274 * @return the task's result
1275 * @throws NullPointerException if the task is null
1276 * @throws RejectedExecutionException if the task cannot be
1277 * scheduled for execution
1278 */
1279 public <T> T invoke(ForkJoinTask<T> task) {
1280 doSubmit(task);
1281 return task.join();
1282 }
1283
1284 /**
1285 * Arranges for (asynchronous) execution of the given task.
1286 * If the caller is already engaged in a fork/join computation in
1287 * the current pool, this method is equivalent in effect to
1288 * {@link ForkJoinTask#fork}.
1289 *
1290 * @param task the task
1291 * @throws NullPointerException if the task is null
1292 * @throws RejectedExecutionException if the task cannot be
1293 * scheduled for execution
1294 */
1295 public void execute(ForkJoinTask<?> task) {
1296 doSubmit(task);
1297 }
1298
1299 // AbstractExecutorService methods
1300
1301 /**
1302 * @throws NullPointerException if the task is null
1303 * @throws RejectedExecutionException if the task cannot be
1304 * scheduled for execution
1305 */
1306 public void execute(Runnable task) {
1307 ForkJoinTask<?> job;
1308 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1309 job = (ForkJoinTask<?>) task;
1310 else
1311 job = ForkJoinTask.adapt(task, null);
1312 doSubmit(job);
1313 }
1314
1315 /**
1316 * Submits a ForkJoinTask for execution.
1317 * If the caller is already engaged in a fork/join computation in
1318 * the current pool, this method is equivalent in effect to
1319 * {@link ForkJoinTask#fork}.
1320 *
1321 * @param task the task to submit
1322 * @return the task
1323 * @throws NullPointerException if the task is null
1324 * @throws RejectedExecutionException if the task cannot be
1325 * scheduled for execution
1326 */
1327 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1328 doSubmit(task);
1329 return task;
1330 }
1331
1332 /**
1333 * @throws NullPointerException if the task is null
1334 * @throws RejectedExecutionException if the task cannot be
1335 * scheduled for execution
1336 */
1337 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1338 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1339 doSubmit(job);
1340 return job;
1341 }
1342
1343 /**
1344 * @throws NullPointerException if the task is null
1345 * @throws RejectedExecutionException if the task cannot be
1346 * scheduled for execution
1347 */
1348 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1349 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1350 doSubmit(job);
1351 return job;
1352 }
1353
1354 /**
1355 * @throws NullPointerException if the task is null
1356 * @throws RejectedExecutionException if the task cannot be
1357 * scheduled for execution
1358 */
1359 public ForkJoinTask<?> submit(Runnable task) {
1360 ForkJoinTask<?> job;
1361 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1362 job = (ForkJoinTask<?>) task;
1363 else
1364 job = ForkJoinTask.adapt(task, null);
1365 doSubmit(job);
1366 return job;
1367 }
1368
1369 /**
1370 * @throws NullPointerException {@inheritDoc}
1371 * @throws RejectedExecutionException {@inheritDoc}
1372 */
1373 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1374 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1375 new ArrayList<ForkJoinTask<T>>(tasks.size());
1376 for (Callable<T> task : tasks)
1377 forkJoinTasks.add(ForkJoinTask.adapt(task));
1378 invoke(new InvokeAll<T>(forkJoinTasks));
1379
1380 @SuppressWarnings({"unchecked", "rawtypes"})
1381 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1382 return futures;
1383 }
1384
1385 static final class InvokeAll<T> extends RecursiveAction {
1386 final ArrayList<ForkJoinTask<T>> tasks;
1387 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1388 public void compute() {
1389 try { invokeAll(tasks); }
1390 catch (Exception ignore) {}
1391 }
1392 private static final long serialVersionUID = -7914297376763021607L;
1393 }
1394
1395 /**
1396 * Returns the factory used for constructing new workers.
1397 *
1398 * @return the factory used for constructing new workers
1399 */
1400 public ForkJoinWorkerThreadFactory getFactory() {
1401 return factory;
1402 }
1403
1404 /**
1405 * Returns the handler for internal worker threads that terminate
1406 * due to unrecoverable errors encountered while executing tasks.
1407 *
1408 * @return the handler, or {@code null} if none
1409 */
1410 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1411 return ueh;
1412 }
1413
1414 /**
1415 * Returns the targeted parallelism level of this pool.
1416 *
1417 * @return the targeted parallelism level of this pool
1418 */
1419 public int getParallelism() {
1420 return parallelism;
1421 }
1422
1423 /**
1424 * Returns the number of worker threads that have started but not
1425 * yet terminated. This result returned by this method may differ
1426 * from {@link #getParallelism} when threads are created to
1427 * maintain parallelism when others are cooperatively blocked.
1428 *
1429 * @return the number of worker threads
1430 */
1431 public int getPoolSize() {
1432 return workerCounts >>> TOTAL_COUNT_SHIFT;
1433 }
1434
1435 /**
1436 * Returns {@code true} if this pool uses local first-in-first-out
1437 * scheduling mode for forked tasks that are never joined.
1438 *
1439 * @return {@code true} if this pool uses async mode
1440 */
1441 public boolean getAsyncMode() {
1442 return locallyFifo;
1443 }
1444
1445 /**
1446 * Returns an estimate of the number of worker threads that are
1447 * not blocked waiting to join tasks or for other managed
1448 * synchronization. This method may overestimate the
1449 * number of running threads.
1450 *
1451 * @return the number of worker threads
1452 */
1453 public int getRunningThreadCount() {
1454 return workerCounts & RUNNING_COUNT_MASK;
1455 }
1456
1457 /**
1458 * Returns an estimate of the number of threads that are currently
1459 * stealing or executing tasks. This method may overestimate the
1460 * number of active threads.
1461 *
1462 * @return the number of active threads
1463 */
1464 public int getActiveThreadCount() {
1465 return runState & ACTIVE_COUNT_MASK;
1466 }
1467
1468 /**
1469 * Returns {@code true} if all worker threads are currently idle.
1470 * An idle worker is one that cannot obtain a task to execute
1471 * because none are available to steal from other threads, and
1472 * there are no pending submissions to the pool. This method is
1473 * conservative; it might not return {@code true} immediately upon
1474 * idleness of all threads, but will eventually become true if
1475 * threads remain inactive.
1476 *
1477 * @return {@code true} if all threads are currently idle
1478 */
1479 public boolean isQuiescent() {
1480 return (runState & ACTIVE_COUNT_MASK) == 0;
1481 }
1482
1483 /**
1484 * Returns an estimate of the total number of tasks stolen from
1485 * one thread's work queue by another. The reported value
1486 * underestimates the actual total number of steals when the pool
1487 * is not quiescent. This value may be useful for monitoring and
1488 * tuning fork/join programs: in general, steal counts should be
1489 * high enough to keep threads busy, but low enough to avoid
1490 * overhead and contention across threads.
1491 *
1492 * @return the number of steals
1493 */
1494 public long getStealCount() {
1495 return stealCount;
1496 }
1497
1498 /**
1499 * Returns an estimate of the total number of tasks currently held
1500 * in queues by worker threads (but not including tasks submitted
1501 * to the pool that have not begun executing). This value is only
1502 * an approximation, obtained by iterating across all threads in
1503 * the pool. This method may be useful for tuning task
1504 * granularities.
1505 *
1506 * @return the number of queued tasks
1507 */
1508 public long getQueuedTaskCount() {
1509 long count = 0;
1510 ForkJoinWorkerThread[] ws = workers;
1511 int n = ws.length;
1512 for (int i = 0; i < n; ++i) {
1513 ForkJoinWorkerThread w = ws[i];
1514 if (w != null)
1515 count += w.getQueueSize();
1516 }
1517 return count;
1518 }
1519
1520 /**
1521 * Returns an estimate of the number of tasks submitted to this
1522 * pool that have not yet begun executing. This method takes time
1523 * proportional to the number of submissions.
1524 *
1525 * @return the number of queued submissions
1526 */
1527 public int getQueuedSubmissionCount() {
1528 return submissionQueue.size();
1529 }
1530
1531 /**
1532 * Returns {@code true} if there are any tasks submitted to this
1533 * pool that have not yet begun executing.
1534 *
1535 * @return {@code true} if there are any queued submissions
1536 */
1537 public boolean hasQueuedSubmissions() {
1538 return !submissionQueue.isEmpty();
1539 }
1540
1541 /**
1542 * Removes and returns the next unexecuted submission if one is
1543 * available. This method may be useful in extensions to this
1544 * class that re-assign work in systems with multiple pools.
1545 *
1546 * @return the next submission, or {@code null} if none
1547 */
1548 protected ForkJoinTask<?> pollSubmission() {
1549 return submissionQueue.poll();
1550 }
1551
1552 /**
1553 * Removes all available unexecuted submitted and forked tasks
1554 * from scheduling queues and adds them to the given collection,
1555 * without altering their execution status. These may include
1556 * artificially generated or wrapped tasks. This method is
1557 * designed to be invoked only when the pool is known to be
1558 * quiescent. Invocations at other times may not remove all
1559 * tasks. A failure encountered while attempting to add elements
1560 * to collection {@code c} may result in elements being in
1561 * neither, either or both collections when the associated
1562 * exception is thrown. The behavior of this operation is
1563 * undefined if the specified collection is modified while the
1564 * operation is in progress.
1565 *
1566 * @param c the collection to transfer elements into
1567 * @return the number of elements transferred
1568 */
1569 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1570 int count = submissionQueue.drainTo(c);
1571 ForkJoinWorkerThread[] ws = workers;
1572 int n = ws.length;
1573 for (int i = 0; i < n; ++i) {
1574 ForkJoinWorkerThread w = ws[i];
1575 if (w != null)
1576 count += w.drainTasksTo(c);
1577 }
1578 return count;
1579 }
1580
1581 /**
1582 * Returns a string identifying this pool, as well as its state,
1583 * including indications of run state, parallelism level, and
1584 * worker and task counts.
1585 *
1586 * @return a string identifying this pool, as well as its state
1587 */
1588 public String toString() {
1589 long st = getStealCount();
1590 long qt = getQueuedTaskCount();
1591 long qs = getQueuedSubmissionCount();
1592 int wc = workerCounts;
1593 int tc = wc >>> TOTAL_COUNT_SHIFT;
1594 int rc = wc & RUNNING_COUNT_MASK;
1595 int pc = parallelism;
1596 int rs = runState;
1597 int ac = rs & ACTIVE_COUNT_MASK;
1598 return super.toString() +
1599 "[" + runLevelToString(rs) +
1600 ", parallelism = " + pc +
1601 ", size = " + tc +
1602 ", active = " + ac +
1603 ", running = " + rc +
1604 ", steals = " + st +
1605 ", tasks = " + qt +
1606 ", submissions = " + qs +
1607 "]";
1608 }
1609
1610 private static String runLevelToString(int s) {
1611 return ((s & TERMINATED) != 0 ? "Terminated" :
1612 ((s & TERMINATING) != 0 ? "Terminating" :
1613 ((s & SHUTDOWN) != 0 ? "Shutting down" :
1614 "Running")));
1615 }
1616
1617 /**
1618 * Initiates an orderly shutdown in which previously submitted
1619 * tasks are executed, but no new tasks will be accepted.
1620 * Invocation has no additional effect if already shut down.
1621 * Tasks that are in the process of being submitted concurrently
1622 * during the course of this method may or may not be rejected.
1623 *
1624 * @throws SecurityException if a security manager exists and
1625 * the caller is not permitted to modify threads
1626 * because it does not hold {@link
1627 * java.lang.RuntimePermission}{@code ("modifyThread")}
1628 */
1629 public void shutdown() {
1630 checkPermission();
1631 advanceRunLevel(SHUTDOWN);
1632 tryTerminate(false);
1633 }
1634
1635 /**
1636 * Attempts to cancel and/or stop all tasks, and reject all
1637 * subsequently submitted tasks. Tasks that are in the process of
1638 * being submitted or executed concurrently during the course of
1639 * this method may or may not be rejected. This method cancels
1640 * both existing and unexecuted tasks, in order to permit
1641 * termination in the presence of task dependencies. So the method
1642 * always returns an empty list (unlike the case for some other
1643 * Executors).
1644 *
1645 * @return an empty list
1646 * @throws SecurityException if a security manager exists and
1647 * the caller is not permitted to modify threads
1648 * because it does not hold {@link
1649 * java.lang.RuntimePermission}{@code ("modifyThread")}
1650 */
1651 public List<Runnable> shutdownNow() {
1652 checkPermission();
1653 tryTerminate(true);
1654 return Collections.emptyList();
1655 }
1656
1657 /**
1658 * Returns {@code true} if all tasks have completed following shut down.
1659 *
1660 * @return {@code true} if all tasks have completed following shut down
1661 */
1662 public boolean isTerminated() {
1663 return runState >= TERMINATED;
1664 }
1665
1666 /**
1667 * Returns {@code true} if the process of termination has
1668 * commenced but not yet completed. This method may be useful for
1669 * debugging. A return of {@code true} reported a sufficient
1670 * period after shutdown may indicate that submitted tasks have
1671 * ignored or suppressed interruption, causing this executor not
1672 * to properly terminate.
1673 *
1674 * @return {@code true} if terminating but not yet terminated
1675 */
1676 public boolean isTerminating() {
1677 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1678 }
1679
1680 /**
1681 * Returns {@code true} if this pool has been shut down.
1682 *
1683 * @return {@code true} if this pool has been shut down
1684 */
1685 public boolean isShutdown() {
1686 return runState >= SHUTDOWN;
1687 }
1688
1689 /**
1690 * Blocks until all tasks have completed execution after a shutdown
1691 * request, or the timeout occurs, or the current thread is
1692 * interrupted, whichever happens first.
1693 *
1694 * @param timeout the maximum time to wait
1695 * @param unit the time unit of the timeout argument
1696 * @return {@code true} if this executor terminated and
1697 * {@code false} if the timeout elapsed before termination
1698 * @throws InterruptedException if interrupted while waiting
1699 */
1700 public boolean awaitTermination(long timeout, TimeUnit unit)
1701 throws InterruptedException {
1702 try {
1703 return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1704 } catch(TimeoutException ex) {
1705 return false;
1706 }
1707 }
1708
1709 /**
1710 * Interface for extending managed parallelism for tasks running
1711 * in {@link ForkJoinPool}s.
1712 *
1713 * <p>A {@code ManagedBlocker} provides two methods. Method
1714 * {@code isReleasable} must return {@code true} if blocking is
1715 * not necessary. Method {@code block} blocks the current thread
1716 * if necessary (perhaps internally invoking {@code isReleasable}
1717 * before actually blocking). The unusual methods in this API
1718 * accommodate synchronizers that may, but don't usually, block
1719 * for long periods. Similarly, they allow more efficient internal
1720 * handling of cases in which additional workers may be, but
1721 * usually are not, needed to ensure sufficient parallelism.
1722 * Toward this end, implementations of method {@code isReleasable}
1723 * must be amenable to repeated invocation.
1724 *
1725 * <p>For example, here is a ManagedBlocker based on a
1726 * ReentrantLock:
1727 * <pre> {@code
1728 * class ManagedLocker implements ManagedBlocker {
1729 * final ReentrantLock lock;
1730 * boolean hasLock = false;
1731 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1732 * public boolean block() {
1733 * if (!hasLock)
1734 * lock.lock();
1735 * return true;
1736 * }
1737 * public boolean isReleasable() {
1738 * return hasLock || (hasLock = lock.tryLock());
1739 * }
1740 * }}</pre>
1741 *
1742 * <p>Here is a class that possibly blocks waiting for an
1743 * item on a given queue:
1744 * <pre> {@code
1745 * class QueueTaker<E> implements ManagedBlocker {
1746 * final BlockingQueue<E> queue;
1747 * volatile E item = null;
1748 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1749 * public boolean block() throws InterruptedException {
1750 * if (item == null)
1751 * item = queue.take
1752 * return true;
1753 * }
1754 * public boolean isReleasable() {
1755 * return item != null || (item = queue.poll) != null;
1756 * }
1757 * public E getItem() { // call after pool.managedBlock completes
1758 * return item;
1759 * }
1760 * }}</pre>
1761 */
1762 public static interface ManagedBlocker {
1763 /**
1764 * Possibly blocks the current thread, for example waiting for
1765 * a lock or condition.
1766 *
1767 * @return {@code true} if no additional blocking is necessary
1768 * (i.e., if isReleasable would return true)
1769 * @throws InterruptedException if interrupted while waiting
1770 * (the method is not required to do so, but is allowed to)
1771 */
1772 boolean block() throws InterruptedException;
1773
1774 /**
1775 * Returns {@code true} if blocking is unnecessary.
1776 */
1777 boolean isReleasable();
1778 }
1779
1780 /**
1781 * Blocks in accord with the given blocker. If the current thread
1782 * is a {@link ForkJoinWorkerThread}, this method possibly
1783 * arranges for a spare thread to be activated if necessary to
1784 * ensure sufficient parallelism while the current thread is blocked.
1785 *
1786 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1787 * behaviorally equivalent to
1788 * <pre> {@code
1789 * while (!blocker.isReleasable())
1790 * if (blocker.block())
1791 * return;
1792 * }</pre>
1793 *
1794 * If the caller is a {@code ForkJoinTask}, then the pool may
1795 * first be expanded to ensure parallelism, and later adjusted.
1796 *
1797 * @param blocker the blocker
1798 * @throws InterruptedException if blocker.block did so
1799 */
1800 public static void managedBlock(ManagedBlocker blocker)
1801 throws InterruptedException {
1802 Thread t = Thread.currentThread();
1803 if (t instanceof ForkJoinWorkerThread) {
1804 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1805 w.pool.awaitBlocker(blocker);
1806 }
1807 else {
1808 do {} while (!blocker.isReleasable() && !blocker.block());
1809 }
1810 }
1811
1812 // AbstractExecutorService overrides. These rely on undocumented
1813 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1814 // implement RunnableFuture.
1815
1816 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1817 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1818 }
1819
1820 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1821 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1822 }
1823
1824 // Unsafe mechanics
1825
1826 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1827 private static final long workerCountsOffset =
1828 objectFieldOffset("workerCounts", ForkJoinPool.class);
1829 private static final long runStateOffset =
1830 objectFieldOffset("runState", ForkJoinPool.class);
1831 private static final long eventCountOffset =
1832 objectFieldOffset("eventCount", ForkJoinPool.class);
1833 private static final long eventWaitersOffset =
1834 objectFieldOffset("eventWaiters",ForkJoinPool.class);
1835 private static final long stealCountOffset =
1836 objectFieldOffset("stealCount",ForkJoinPool.class);
1837 private static final long spareWaitersOffset =
1838 objectFieldOffset("spareWaiters",ForkJoinPool.class);
1839
1840 private static long objectFieldOffset(String field, Class<?> klazz) {
1841 try {
1842 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1843 } catch (NoSuchFieldException e) {
1844 // Convert Exception to corresponding Error
1845 NoSuchFieldError error = new NoSuchFieldError(field);
1846 error.initCause(e);
1847 throw error;
1848 }
1849 }
1850 }