ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.20
Committed: Wed Aug 11 20:28:31 2010 UTC (13 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.19: +1 -1 lines
Log Message:
Typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.CountDownLatch;
18
19 /**
20 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21 * A {@code ForkJoinPool} provides the entry point for submissions
22 * from non-{@code ForkJoinTask} clients, as well as management and
23 * monitoring operations.
24 *
25 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
26 * ExecutorService} mainly by virtue of employing
27 * <em>work-stealing</em>: all threads in the pool attempt to find and
28 * execute subtasks created by other active tasks (eventually blocking
29 * waiting for work if none exist). This enables efficient processing
30 * when most tasks spawn other subtasks (as do most {@code
31 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
32 * constructors, {@code ForkJoinPool}s may also be appropriate for use
33 * with event-style tasks that are never joined.
34 *
35 * <p>A {@code ForkJoinPool} is constructed with a given target
36 * parallelism level; by default, equal to the number of available
37 * processors. The pool attempts to maintain enough active (or
38 * available) threads by dynamically adding, suspending, or resuming
39 * internal worker threads, even if some tasks are stalled waiting to
40 * join others. However, no such adjustments are guaranteed in the
41 * face of blocked IO or other unmanaged synchronization. The nested
42 * {@link ManagedBlocker} interface enables extension of the kinds of
43 * synchronization accommodated.
44 *
45 * <p>In addition to execution and lifecycle control methods, this
46 * class provides status check methods (for example
47 * {@link #getStealCount}) that are intended to aid in developing,
48 * tuning, and monitoring fork/join applications. Also, method
49 * {@link #toString} returns indications of pool state in a
50 * convenient form for informal monitoring.
51 *
52 * <p> As is the case with other ExecutorServices, there are three
53 * main task execution methods summarized in the following
54 * table. These are designed to be used by clients not already engaged
55 * in fork/join computations in the current pool. The main forms of
56 * these methods accept instances of {@code ForkJoinTask}, but
57 * overloaded forms also allow mixed execution of plain {@code
58 * Runnable}- or {@code Callable}- based activities as well. However,
59 * tasks that are already executing in a pool should normally
60 * <em>NOT</em> use these pool execution methods, but instead use the
61 * within-computation forms listed in the table.
62 *
63 * <table BORDER CELLPADDING=3 CELLSPACING=1>
64 * <tr>
65 * <td></td>
66 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
67 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
68 * </tr>
69 * <tr>
70 * <td> <b>Arange async execution</td>
71 * <td> {@link #execute(ForkJoinTask)}</td>
72 * <td> {@link ForkJoinTask#fork}</td>
73 * </tr>
74 * <tr>
75 * <td> <b>Await and obtain result</td>
76 * <td> {@link #invoke(ForkJoinTask)}</td>
77 * <td> {@link ForkJoinTask#invoke}</td>
78 * </tr>
79 * <tr>
80 * <td> <b>Arrange exec and obtain Future</td>
81 * <td> {@link #submit(ForkJoinTask)}</td>
82 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
83 * </tr>
84 * </table>
85 *
86 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
87 * used for all parallel task execution in a program or subsystem.
88 * Otherwise, use would not usually outweigh the construction and
89 * bookkeeping overhead of creating a large set of threads. For
90 * example, a common pool could be used for the {@code SortTasks}
91 * illustrated in {@link RecursiveAction}. Because {@code
92 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
93 * daemon} mode, there is typically no need to explicitly {@link
94 * #shutdown} such a pool upon program exit.
95 *
96 * <pre>
97 * static final ForkJoinPool mainPool = new ForkJoinPool();
98 * ...
99 * public void sort(long[] array) {
100 * mainPool.invoke(new SortTask(array, 0, array.length));
101 * }
102 * </pre>
103 *
104 * <p><b>Implementation notes</b>: This implementation restricts the
105 * maximum number of running threads to 32767. Attempts to create
106 * pools with greater than the maximum number result in
107 * {@code IllegalArgumentException}.
108 *
109 * <p>This implementation rejects submitted tasks (that is, by throwing
110 * {@link RejectedExecutionException}) only when the pool is shut down
111 * or internal resources have been exhausted.
112 *
113 * @since 1.7
114 * @author Doug Lea
115 */
116 public class ForkJoinPool extends AbstractExecutorService {
117
118 /*
119 * Implementation Overview
120 *
121 * This class provides the central bookkeeping and control for a
122 * set of worker threads: Submissions from non-FJ threads enter
123 * into a submission queue. Workers take these tasks and typically
124 * split them into subtasks that may be stolen by other workers.
125 * The main work-stealing mechanics implemented in class
126 * ForkJoinWorkerThread give first priority to processing tasks
127 * from their own queues (LIFO or FIFO, depending on mode), then
128 * to randomized FIFO steals of tasks in other worker queues, and
129 * lastly to new submissions. These mechanics do not consider
130 * affinities, loads, cache localities, etc, so rarely provide the
131 * best possible performance on a given machine, but portably
132 * provide good throughput by averaging over these factors.
133 * (Further, even if we did try to use such information, we do not
134 * usually have a basis for exploiting it. For example, some sets
135 * of tasks profit from cache affinities, but others are harmed by
136 * cache pollution effects.)
137 *
138 * Beyond work-stealing support and essential bookkeeping, the
139 * main responsibility of this framework is to take actions when
140 * one worker is waiting to join a task stolen (or always held by)
141 * another. Becauae we are multiplexing many tasks on to a pool
142 * of workers, we can't just let them block (as in Thread.join).
143 * We also cannot just reassign the joiner's run-time stack with
144 * another and replace it later, which would be a form of
145 * "continuation", that even if possible is not necessarily a good
146 * idea. Given that the creation costs of most threads on most
147 * systems mainly surrounds setting up runtime stacks, thread
148 * creation and switching is usually not much more expensive than
149 * stack creation and switching, and is more flexible). Instead we
150 * combine two tactics:
151 *
152 * Helping: Arranging for the joiner to execute some task that it
153 * would be running if the steal had not occurred. Method
154 * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
155 * links to try to find such a task.
156 *
157 * Compensating: Unless there are already enough live threads,
158 * method helpMaintainParallelism() may create or or
159 * re-activate a spare thread to compensate for blocked
160 * joiners until they unblock.
161 *
162 * Because the determining existence of conservatively safe
163 * helping targets, the availability of already-created spares,
164 * and the apparent need to create new spares are all racy and
165 * require heuristic guidance, we rely on multiple retries of
166 * each. Further, because it is impossible to keep exactly the
167 * target (parallelism) number of threads running at any given
168 * time, we allow compensation during joins to fail, and enlist
169 * all other threads to help out whenever they are not otherwise
170 * occupied (i.e., mainly in method preStep).
171 *
172 * The ManagedBlocker extension API can't use helping so relies
173 * only on compensation in method awaitBlocker.
174 *
175 * The main throughput advantages of work-stealing stem from
176 * decentralized control -- workers mostly steal tasks from each
177 * other. We do not want to negate this by creating bottlenecks
178 * implementing other management responsibilities. So we use a
179 * collection of techniques that avoid, reduce, or cope well with
180 * contention. These entail several instances of bit-packing into
181 * CASable fields to maintain only the minimally required
182 * atomicity. To enable such packing, we restrict maximum
183 * parallelism to (1<<15)-1 (enabling twice this (to accommodate
184 * unbalanced increments and decrements) to fit into a 16 bit
185 * field, which is far in excess of normal operating range. Even
186 * though updates to some of these bookkeeping fields do sometimes
187 * contend with each other, they don't normally cache-contend with
188 * updates to others enough to warrant memory padding or
189 * isolation. So they are all held as fields of ForkJoinPool
190 * objects. The main capabilities are as follows:
191 *
192 * 1. Creating and removing workers. Workers are recorded in the
193 * "workers" array. This is an array as opposed to some other data
194 * structure to support index-based random steals by workers.
195 * Updates to the array recording new workers and unrecording
196 * terminated ones are protected from each other by a lock
197 * (workerLock) but the array is otherwise concurrently readable,
198 * and accessed directly by workers. To simplify index-based
199 * operations, the array size is always a power of two, and all
200 * readers must tolerate null slots. Currently, all worker thread
201 * creation is on-demand, triggered by task submissions,
202 * replacement of terminated workers, and/or compensation for
203 * blocked workers. However, all other support code is set up to
204 * work with other policies.
205 *
206 * To ensure that we do not hold on to worker references that
207 * would prevent GC, ALL accesses to workers are via indices into
208 * the workers array (which is one source of some of the unusual
209 * code constructions here). In essence, the workers array serves
210 * as a WeakReference mechanism. Thus for example the event queue
211 * stores worker indices, not worker references. Access to the
212 * workers in associated methods (for example releaseEventWaiters)
213 * must both index-check and null-check the IDs. All such accesses
214 * ignore bad IDs by returning out early from what they are doing,
215 * since this can only be associated with shutdown, in which case
216 * it is OK to give up. On termination, we just clobber these
217 * data structures without trying to use them.
218 *
219 * 2. Bookkeeping for dynamically adding and removing workers. We
220 * aim to approximately maintain the given level of parallelism.
221 * When some workers are known to be blocked (on joins or via
222 * ManagedBlocker), we may create or resume others to take their
223 * place until they unblock (see below). Implementing this
224 * requires counts of the number of "running" threads (i.e., those
225 * that are neither blocked nor artifically suspended) as well as
226 * the total number. These two values are packed into one field,
227 * "workerCounts" because we need accurate snapshots when deciding
228 * to create, resume or suspend. Note however that the
229 * correspondance of these counts to reality is not guaranteed. In
230 * particular updates for unblocked threads may lag until they
231 * actually wake up.
232 *
233 * 3. Maintaining global run state. The run state of the pool
234 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
235 * those in other Executor implementations, as well as a count of
236 * "active" workers -- those that are, or soon will be, or
237 * recently were executing tasks. The runLevel and active count
238 * are packed together in order to correctly trigger shutdown and
239 * termination. Without care, active counts can be subject to very
240 * high contention. We substantially reduce this contention by
241 * relaxing update rules. A worker must claim active status
242 * prospectively, by activating if it sees that a submitted or
243 * stealable task exists (it may find after activating that the
244 * task no longer exists). It stays active while processing this
245 * task (if it exists) and any other local subtasks it produces,
246 * until it cannot find any other tasks. It then tries
247 * inactivating (see method preStep), but upon update contention
248 * instead scans for more tasks, later retrying inactivation if it
249 * doesn't find any.
250 *
251 * 4. Managing idle workers waiting for tasks. We cannot let
252 * workers spin indefinitely scanning for tasks when none are
253 * available. On the other hand, we must quickly prod them into
254 * action when new tasks are submitted or generated. We
255 * park/unpark these idle workers using an event-count scheme.
256 * Field eventCount is incremented upon events that may enable
257 * workers that previously could not find a task to now find one:
258 * Submission of a new task to the pool, or another worker pushing
259 * a task onto a previously empty queue. (We also use this
260 * mechanism for termination actions that require wakeups of idle
261 * workers). Each worker maintains its last known event count,
262 * and blocks when a scan for work did not find a task AND its
263 * lastEventCount matches the current eventCount. Waiting idle
264 * workers are recorded in a variant of Treiber stack headed by
265 * field eventWaiters which, when nonzero, encodes the thread
266 * index and count awaited for by the worker thread most recently
267 * calling eventSync. This thread in turn has a record (field
268 * nextEventWaiter) for the next waiting worker. In addition to
269 * allowing simpler decisions about need for wakeup, the event
270 * count bits in eventWaiters serve the role of tags to avoid ABA
271 * errors in Treiber stacks. To reduce delays in task diffusion,
272 * workers not otherwise occupied may invoke method
273 * releaseEventWaiters, that removes and signals (unparks) workers
274 * not waiting on current count. To reduce stalls, To minimize
275 * task production stalls associate with signalling, any worker
276 * pushing a task on an empty queue invokes the weaker method
277 * signalWork, that only releases idle workers until it detects
278 * interference by other threads trying to release, and lets them
279 * take over. The net effect is a tree-like diffusion of signals,
280 * where released threads (and possibly others) help with unparks.
281 * To further reduce contention effects a bit, failed CASes to
282 * increment field eventCount are tolerated without retries.
283 * Conceptually they are merged into the same event, which is OK
284 * when their only purpose is to enable workers to scan for work.
285 *
286 * 5. Managing suspension of extra workers. When a worker is about
287 * to block waiting for a join (or via ManagedBlockers), we may
288 * create a new thread to maintain parallelism level, or at least
289 * avoid starvation. Usually, extra threads are needed for only
290 * very short periods, yet join dependencies are such that we
291 * sometimes need them in bursts. Rather than create new threads
292 * each time this happens, we suspend no-longer-needed extra ones
293 * as "spares". For most purposes, we don't distinguish "extra"
294 * spare threads from normal "core" threads: On each call to
295 * preStep (the only point at which we can do this) a worker
296 * checks to see if there are now too many running workers, and if
297 * so, suspends itself. Method helpMaintainParallelism looks for
298 * suspended threads to resume before considering creating a new
299 * replacement. The spares themselves are encoded on another
300 * variant of a Treiber Stack, headed at field "spareWaiters".
301 * Note that the use of spares is intrinsically racy. One thread
302 * may become a spare at about the same time as another is
303 * needlessly being created. We counteract this and related slop
304 * in part by requiring resumed spares to immediately recheck (in
305 * preStep) to see whether they they should re-suspend. To avoid
306 * long-term build-up of spares, the oldest spare (see
307 * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if
308 * not signalled and calls tryTrimSpare, which uses two different
309 * thresholds: Always killing if the number of spares is greater
310 * that 25% of total, and killing others only at a slower rate
311 * (UNUSED_SPARE_TRIM_RATE_NANOS).
312 *
313 * 6. Deciding when to create new workers. The main dynamic
314 * control in this class is deciding when to create extra threads
315 * in method helpMaintainParallelism. We would like to keep
316 * exactly #parallelism threads running, which is an impossble
317 * task. We always need to create one when the number of running
318 * threads would become zero and all workers are busy. Beyond
319 * this, we must rely on heuristics that work well in the the
320 * presence of transients phenomena such as GC stalls, dynamic
321 * compilation, and wake-up lags. These transients are extremely
322 * common -- we are normally trying to fully saturate the CPUs on
323 * a machine, so almost any activity other than running tasks
324 * impedes accuracy. Our main defense is to allow some slack in
325 * creation thresholds, using rules that reflect the fact that the
326 * more threads we have running, the more likely that we are
327 * underestimating the number running threads. The rules also
328 * better cope with the fact that some of the methods in this
329 * class tend to never become compiled (but are interpreted), so
330 * some components of the entire set of controls might execute 100
331 * times faster than others. And similarly for cases where the
332 * apparent lack of work is just due to GC stalls and other
333 * transient system activity.
334 *
335 * Beware that there is a lot of representation-level coupling
336 * among classes ForkJoinPool, ForkJoinWorkerThread, and
337 * ForkJoinTask. For example, direct access to "workers" array by
338 * workers, and direct access to ForkJoinTask.status by both
339 * ForkJoinPool and ForkJoinWorkerThread. There is little point
340 * trying to reduce this, since any associated future changes in
341 * representations will need to be accompanied by algorithmic
342 * changes anyway.
343 *
344 * Style notes: There are lots of inline assignments (of form
345 * "while ((local = field) != 0)") which are usually the simplest
346 * way to ensure the required read orderings (which are sometimes
347 * critical). Also several occurrences of the unusual "do {}
348 * while(!cas...)" which is the simplest way to force an update of
349 * a CAS'ed variable. There are also other coding oddities that
350 * help some methods perform reasonably even when interpreted (not
351 * compiled), at the expense of some messy constructions that
352 * reduce byte code counts.
353 *
354 * The order of declarations in this file is: (1) statics (2)
355 * fields (along with constants used when unpacking some of them)
356 * (3) internal control methods (4) callbacks and other support
357 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
358 * methods (plus a few little helpers).
359 */
360
361 /**
362 * Factory for creating new {@link ForkJoinWorkerThread}s.
363 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
364 * for {@code ForkJoinWorkerThread} subclasses that extend base
365 * functionality or initialize threads with different contexts.
366 */
367 public static interface ForkJoinWorkerThreadFactory {
368 /**
369 * Returns a new worker thread operating in the given pool.
370 *
371 * @param pool the pool this thread works in
372 * @throws NullPointerException if the pool is null
373 */
374 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
375 }
376
377 /**
378 * Default ForkJoinWorkerThreadFactory implementation; creates a
379 * new ForkJoinWorkerThread.
380 */
381 static class DefaultForkJoinWorkerThreadFactory
382 implements ForkJoinWorkerThreadFactory {
383 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
384 return new ForkJoinWorkerThread(pool);
385 }
386 }
387
388 /**
389 * Creates a new ForkJoinWorkerThread. This factory is used unless
390 * overridden in ForkJoinPool constructors.
391 */
392 public static final ForkJoinWorkerThreadFactory
393 defaultForkJoinWorkerThreadFactory =
394 new DefaultForkJoinWorkerThreadFactory();
395
396 /**
397 * Permission required for callers of methods that may start or
398 * kill threads.
399 */
400 private static final RuntimePermission modifyThreadPermission =
401 new RuntimePermission("modifyThread");
402
403 /**
404 * If there is a security manager, makes sure caller has
405 * permission to modify threads.
406 */
407 private static void checkPermission() {
408 SecurityManager security = System.getSecurityManager();
409 if (security != null)
410 security.checkPermission(modifyThreadPermission);
411 }
412
413 /**
414 * Generator for assigning sequence numbers as pool names.
415 */
416 private static final AtomicInteger poolNumberGenerator =
417 new AtomicInteger();
418
419 /**
420 * Absolute bound for parallelism level. Twice this number plus
421 * one (i.e., 0xfff) must fit into a 16bit field to enable
422 * word-packing for some counts and indices.
423 */
424 private static final int MAX_WORKERS = 0x7fff;
425
426 /**
427 * Array holding all worker threads in the pool. Array size must
428 * be a power of two. Updates and replacements are protected by
429 * workerLock, but the array is always kept in a consistent enough
430 * state to be randomly accessed without locking by workers
431 * performing work-stealing, as well as other traversal-based
432 * methods in this class. All readers must tolerate that some
433 * array slots may be null.
434 */
435 volatile ForkJoinWorkerThread[] workers;
436
437 /**
438 * Queue for external submissions.
439 */
440 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
441
442 /**
443 * Lock protecting updates to workers array.
444 */
445 private final ReentrantLock workerLock;
446
447 /**
448 * Latch released upon termination.
449 */
450 private final Phaser termination;
451
452 /**
453 * Creation factory for worker threads.
454 */
455 private final ForkJoinWorkerThreadFactory factory;
456
457 /**
458 * Sum of per-thread steal counts, updated only when threads are
459 * idle or terminating.
460 */
461 private volatile long stealCount;
462
463 /**
464 * The last nanoTime that a spare thread was trimmed
465 */
466 private volatile long trimTime;
467
468 /**
469 * The rate at which to trim unused spares
470 */
471 static final long UNUSED_SPARE_TRIM_RATE_NANOS =
472 1000L * 1000L * 1000L; // 1 sec
473
474 /**
475 * Encoded record of top of treiber stack of threads waiting for
476 * events. The top 32 bits contain the count being waited for. The
477 * bottom 16 bits contains one plus the pool index of waiting
478 * worker thread. (Bits 16-31 are unused.)
479 */
480 private volatile long eventWaiters;
481
482 private static final int EVENT_COUNT_SHIFT = 32;
483 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
484
485 /**
486 * A counter for events that may wake up worker threads:
487 * - Submission of a new task to the pool
488 * - A worker pushing a task on an empty queue
489 * - termination
490 */
491 private volatile int eventCount;
492
493 /**
494 * Encoded record of top of treiber stack of spare threads waiting
495 * for resumption. The top 16 bits contain an arbitrary count to
496 * avoid ABA effects. The bottom 16bits contains one plus the pool
497 * index of waiting worker thread.
498 */
499 private volatile int spareWaiters;
500
501 private static final int SPARE_COUNT_SHIFT = 16;
502 private static final int SPARE_ID_MASK = (1 << 16) - 1;
503
504 /**
505 * Lifecycle control. The low word contains the number of workers
506 * that are (probably) executing tasks. This value is atomically
507 * incremented before a worker gets a task to run, and decremented
508 * when worker has no tasks and cannot find any. Bits 16-18
509 * contain runLevel value. When all are zero, the pool is
510 * running. Level transitions are monotonic (running -> shutdown
511 * -> terminating -> terminated) so each transition adds a bit.
512 * These are bundled together to ensure consistent read for
513 * termination checks (i.e., that runLevel is at least SHUTDOWN
514 * and active threads is zero).
515 */
516 private volatile int runState;
517
518 // Note: The order among run level values matters.
519 private static final int RUNLEVEL_SHIFT = 16;
520 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
521 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
522 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
523 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
524 private static final int ONE_ACTIVE = 1; // active update delta
525
526 /**
527 * Holds number of total (i.e., created and not yet terminated)
528 * and running (i.e., not blocked on joins or other managed sync)
529 * threads, packed together to ensure consistent snapshot when
530 * making decisions about creating and suspending spare
531 * threads. Updated only by CAS. Note that adding a new worker
532 * requires incrementing both counts, since workers start off in
533 * running state.
534 */
535 private volatile int workerCounts;
536
537 private static final int TOTAL_COUNT_SHIFT = 16;
538 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
539 private static final int ONE_RUNNING = 1;
540 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
541
542 /**
543 * The target parallelism level.
544 * Accessed directly by ForkJoinWorkerThreads.
545 */
546 final int parallelism;
547
548 /**
549 * True if use local fifo, not default lifo, for local polling
550 * Read by, and replicated by ForkJoinWorkerThreads
551 */
552 final boolean locallyFifo;
553
554 /**
555 * The uncaught exception handler used when any worker abruptly
556 * terminates.
557 */
558 private final Thread.UncaughtExceptionHandler ueh;
559
560 /**
561 * Pool number, just for assigning useful names to worker threads
562 */
563 private final int poolNumber;
564
565
566 // Utilities for CASing fields. Note that several of these
567 // are manually inlined by callers
568
569 /**
570 * Increments running count part of workerCounts
571 */
572 final void incrementRunningCount() {
573 int c;
574 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
575 c = workerCounts,
576 c + ONE_RUNNING));
577 }
578
579 /**
580 * Tries to decrement running count unless already zero
581 */
582 final boolean tryDecrementRunningCount() {
583 int wc = workerCounts;
584 if ((wc & RUNNING_COUNT_MASK) == 0)
585 return false;
586 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
587 wc, wc - ONE_RUNNING);
588 }
589
590 /**
591 * Forces decrement of encoded workerCounts, awaiting nonzero if
592 * (rarely) necessary when other count updates lag.
593 *
594 * @param dr -- either zero or ONE_RUNNING
595 * @param dt == either zero or ONE_TOTAL
596 */
597 private void decrementWorkerCounts(int dr, int dt) {
598 for (;;) {
599 int wc = workerCounts;
600 if (wc == 0 && (runState & TERMINATED) != 0)
601 return; // lagging termination on a backout
602 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
603 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0)
604 Thread.yield();
605 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
606 wc, wc - (dr + dt)))
607 return;
608 }
609 }
610
611 /**
612 * Increments event count
613 */
614 private void advanceEventCount() {
615 int c;
616 do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
617 c = eventCount, c+1));
618 }
619
620 /**
621 * Tries incrementing active count; fails on contention.
622 * Called by workers before executing tasks.
623 *
624 * @return true on success
625 */
626 final boolean tryIncrementActiveCount() {
627 int c;
628 return UNSAFE.compareAndSwapInt(this, runStateOffset,
629 c = runState, c + ONE_ACTIVE);
630 }
631
632 /**
633 * Tries decrementing active count; fails on contention.
634 * Called when workers cannot find tasks to run.
635 */
636 final boolean tryDecrementActiveCount() {
637 int c;
638 return UNSAFE.compareAndSwapInt(this, runStateOffset,
639 c = runState, c - ONE_ACTIVE);
640 }
641
642 /**
643 * Advances to at least the given level. Returns true if not
644 * already in at least the given level.
645 */
646 private boolean advanceRunLevel(int level) {
647 for (;;) {
648 int s = runState;
649 if ((s & level) != 0)
650 return false;
651 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
652 return true;
653 }
654 }
655
656 // workers array maintenance
657
658 /**
659 * Records and returns a workers array index for new worker.
660 */
661 private int recordWorker(ForkJoinWorkerThread w) {
662 // Try using slot totalCount-1. If not available, scan and/or resize
663 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
664 final ReentrantLock lock = this.workerLock;
665 lock.lock();
666 try {
667 ForkJoinWorkerThread[] ws = workers;
668 int n = ws.length;
669 if (k < 0 || k >= n || ws[k] != null) {
670 for (k = 0; k < n && ws[k] != null; ++k)
671 ;
672 if (k == n)
673 ws = Arrays.copyOf(ws, n << 1);
674 }
675 ws[k] = w;
676 workers = ws; // volatile array write ensures slot visibility
677 } finally {
678 lock.unlock();
679 }
680 return k;
681 }
682
683 /**
684 * Nulls out record of worker in workers array
685 */
686 private void forgetWorker(ForkJoinWorkerThread w) {
687 int idx = w.poolIndex;
688 // Locking helps method recordWorker avoid unecessary expansion
689 final ReentrantLock lock = this.workerLock;
690 lock.lock();
691 try {
692 ForkJoinWorkerThread[] ws = workers;
693 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
694 ws[idx] = null;
695 } finally {
696 lock.unlock();
697 }
698 }
699
700 // adding and removing workers
701
702 /**
703 * Tries to create and add new worker. Assumes that worker counts
704 * are already updated to accommodate the worker, so adjusts on
705 * failure.
706 */
707 private void addWorker() {
708 ForkJoinWorkerThread w = null;
709 try {
710 w = factory.newThread(this);
711 } finally { // Adjust on either null or exceptional factory return
712 if (w == null) {
713 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
714 tryTerminate(false); // in case of failure during shutdown
715 }
716 }
717 if (w != null)
718 w.start(recordWorker(w), ueh);
719 }
720
721 /**
722 * Final callback from terminating worker. Removes record of
723 * worker from array, and adjusts counts. If pool is shutting
724 * down, tries to complete terminatation.
725 *
726 * @param w the worker
727 */
728 final void workerTerminated(ForkJoinWorkerThread w) {
729 forgetWorker(w);
730 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
731 while (w.stealCount != 0) // collect final count
732 tryAccumulateStealCount(w);
733 tryTerminate(false);
734 }
735
736 // Waiting for and signalling events
737
738 /**
739 * Releases workers blocked on a count not equal to current count.
740 * Normally called after precheck that eventWaiters isn't zero to
741 * avoid wasted array checks.
742 *
743 * @param signalling true if caller is a signalling worker so can
744 * exit upon (conservatively) detected contention by other threads
745 * who will continue to release
746 */
747 private void releaseEventWaiters(boolean signalling) {
748 ForkJoinWorkerThread[] ws = workers;
749 int n = ws.length;
750 long h; // head of stack
751 ForkJoinWorkerThread w; int id, ec;
752 while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 &&
753 (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) &&
754 id < n && (w = ws[id]) != null) {
755 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
756 h, h = w.nextWaiter))
757 LockSupport.unpark(w);
758 if (signalling && (eventCount != ec || eventWaiters != h))
759 break;
760 }
761 }
762
763 /**
764 * Tries to advance eventCount and releases waiters. Called only
765 * from workers.
766 */
767 final void signalWork() {
768 int c; // try to increment event count -- CAS failure OK
769 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
770 if (eventWaiters != 0L)
771 releaseEventWaiters(true);
772 }
773
774 /**
775 * Blocks worker until terminating or event count
776 * advances from last value held by worker
777 *
778 * @param w the calling worker thread
779 */
780 private void eventSync(ForkJoinWorkerThread w) {
781 int wec = w.lastEventCount;
782 long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
783 long h;
784 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
785 ((h = eventWaiters) == 0L ||
786 (int)(h >>> EVENT_COUNT_SHIFT) == wec) &&
787 eventCount == wec) {
788 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
789 w.nextWaiter = h, nh)) {
790 while (runState < TERMINATING && eventCount == wec) {
791 if (!tryAccumulateStealCount(w)) // transfer while idle
792 continue;
793 Thread.interrupted(); // clear/ignore interrupt
794 if (eventCount != wec)
795 break;
796 LockSupport.park(w);
797 }
798 break;
799 }
800 }
801 w.lastEventCount = eventCount;
802 }
803
804 // Maintaining spares
805
806 /**
807 * Pushes worker onto the spare stack
808 */
809 final void pushSpare(ForkJoinWorkerThread w) {
810 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1);
811 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
812 w.nextSpare = spareWaiters,ns));
813 }
814
815 /**
816 * Tries (once) to resume a spare if running count is less than
817 * target parallelism. Fails on contention or stale workers.
818 */
819 private void tryResumeSpare() {
820 int sw, id;
821 ForkJoinWorkerThread w;
822 ForkJoinWorkerThread[] ws;
823 if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
824 id < (ws = workers).length && (w = ws[id]) != null &&
825 (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
826 eventWaiters == 0L &&
827 spareWaiters == sw &&
828 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
829 sw, w.nextSpare) &&
830 w.tryUnsuspend()) {
831 int c; // try increment; if contended, finish after unpark
832 boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
833 c = workerCounts,
834 c + ONE_RUNNING);
835 LockSupport.unpark(w);
836 if (!inc) {
837 do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
838 c = workerCounts,
839 c + ONE_RUNNING));
840 }
841 }
842 }
843
844 /**
845 * Callback from oldest spare occasionally waking up. Tries
846 * (once) to shutdown a spare if more than 25% spare overage, or
847 * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at
848 * least #parallelism running threads. Note that we don't need CAS
849 * or locks here because the method is called only from the oldest
850 * suspended spare occasionally waking (and even misfires are OK).
851 *
852 * @param now the wake up nanoTime of caller
853 */
854 final void tryTrimSpare(long now) {
855 long lastTrim = trimTime;
856 trimTime = now;
857 helpMaintainParallelism(); // first, help wake up any needed spares
858 int sw, id;
859 ForkJoinWorkerThread w;
860 ForkJoinWorkerThread[] ws;
861 int pc = parallelism;
862 int wc = workerCounts;
863 if ((wc & RUNNING_COUNT_MASK) >= pc &&
864 (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25%
865 now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) &&
866 (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
867 id < (ws = workers).length && (w = ws[id]) != null &&
868 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
869 sw, w.nextSpare))
870 w.shutdown(false);
871 }
872
873 /**
874 * Does at most one of:
875 *
876 * 1. Help wake up existing workers waiting for work via
877 * releaseEventWaiters. (If any exist, then it probably doesn't
878 * matter right now if under target parallelism level.)
879 *
880 * 2. If below parallelism level and a spare exists, try (once)
881 * to resume it via tryResumeSpare.
882 *
883 * 3. If neither of the above, tries (once) to add a new
884 * worker if either there are not enough total, or if all
885 * existing workers are busy, there are either no running
886 * workers or the deficit is at least twice the surplus.
887 */
888 private void helpMaintainParallelism() {
889 // uglified to work better when not compiled
890 int pc, wc, rc, tc, rs; long h;
891 if ((h = eventWaiters) != 0L) {
892 if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
893 releaseEventWaiters(false); // avoid useless call
894 }
895 else if ((pc = parallelism) >
896 (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) {
897 if (spareWaiters != 0)
898 tryResumeSpare();
899 else if ((rs = runState) < TERMINATING &&
900 ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc ||
901 (tc == (rs & ACTIVE_COUNT_MASK) && // all busy
902 (rc == 0 || // must add
903 rc < pc - ((tc - pc) << 1)) && // within slack
904 tc < MAX_WORKERS && runState == rs)) && // recheck busy
905 workerCounts == wc &&
906 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
907 wc + (ONE_RUNNING|ONE_TOTAL)))
908 addWorker();
909 }
910 }
911
912 /**
913 * Callback from workers invoked upon each top-level action (i.e.,
914 * stealing a task or taking a submission and running
915 * it). Performs one or more of the following:
916 *
917 * 1. If the worker cannot find work (misses > 0), updates its
918 * active status to inactive and updates activeCount unless
919 * this is the first miss and there is contention, in which
920 * case it may try again (either in this or a subsequent
921 * call).
922 *
923 * 2. If there are at least 2 misses, awaits the next task event
924 * via eventSync
925 *
926 * 3. If there are too many running threads, suspends this worker
927 * (first forcing inactivation if necessary). If it is not
928 * needed, it may be killed while suspended via
929 * tryTrimSpare. Otherwise, upon resume it rechecks to make
930 * sure that it is still needed.
931 *
932 * 4. Helps release and/or reactivate other workers via
933 * helpMaintainParallelism
934 *
935 * @param w the worker
936 * @param misses the number of scans by caller failing to find work
937 * (saturating at 2 just to avoid wraparound)
938 */
939 final void preStep(ForkJoinWorkerThread w, int misses) {
940 boolean active = w.active;
941 int pc = parallelism;
942 for (;;) {
943 int wc = workerCounts;
944 int rc = wc & RUNNING_COUNT_MASK;
945 if (active && (misses > 0 || rc > pc)) {
946 int rs; // try inactivate
947 if (UNSAFE.compareAndSwapInt(this, runStateOffset,
948 rs = runState, rs - ONE_ACTIVE))
949 active = w.active = false;
950 else if (misses > 1 || rc > pc ||
951 (rs & ACTIVE_COUNT_MASK) >= pc)
952 continue; // force inactivate
953 }
954 if (misses > 1) {
955 misses = 0; // don't re-sync
956 eventSync(w); // continue loop to recheck rc
957 }
958 else if (rc > pc) {
959 if (workerCounts == wc && // try to suspend as spare
960 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
961 wc, wc - ONE_RUNNING) &&
962 !w.suspendAsSpare()) // false if killed
963 break;
964 }
965 else {
966 if (rc < pc || eventWaiters != 0L)
967 helpMaintainParallelism();
968 break;
969 }
970 }
971 }
972
973 /**
974 * Helps and/or blocks awaiting join of the given task.
975 * Alternates between helpJoinTask() and helpMaintainParallelism()
976 * as many times as there is a deficit in running count (or longer
977 * if running count would become zero), then blocks if task still
978 * not done.
979 *
980 * @param joinMe the task to join
981 */
982 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
983 int threshold = parallelism; // descend blocking thresholds
984 while (joinMe.status >= 0) {
985 boolean block; int wc;
986 worker.helpJoinTask(joinMe);
987 if (joinMe.status < 0)
988 break;
989 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
990 if (threshold > 0)
991 --threshold;
992 else
993 advanceEventCount(); // force release
994 block = false;
995 }
996 else
997 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
998 wc, wc - ONE_RUNNING);
999 helpMaintainParallelism();
1000 if (block) {
1001 int c;
1002 joinMe.internalAwaitDone();
1003 do {} while (!UNSAFE.compareAndSwapInt
1004 (this, workerCountsOffset,
1005 c = workerCounts, c + ONE_RUNNING));
1006 break;
1007 }
1008 }
1009 }
1010
1011 /**
1012 * Same idea as awaitJoin, but no helping
1013 */
1014 final void awaitBlocker(ManagedBlocker blocker)
1015 throws InterruptedException {
1016 int threshold = parallelism;
1017 while (!blocker.isReleasable()) {
1018 boolean block; int wc;
1019 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1020 if (threshold > 0)
1021 --threshold;
1022 else
1023 advanceEventCount();
1024 block = false;
1025 }
1026 else
1027 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1028 wc, wc - ONE_RUNNING);
1029 helpMaintainParallelism();
1030 if (block) {
1031 try {
1032 do {} while (!blocker.isReleasable() && !blocker.block());
1033 } finally {
1034 int c;
1035 do {} while (!UNSAFE.compareAndSwapInt
1036 (this, workerCountsOffset,
1037 c = workerCounts, c + ONE_RUNNING));
1038 }
1039 break;
1040 }
1041 }
1042 }
1043
1044 /**
1045 * Possibly initiates and/or completes termination.
1046 *
1047 * @param now if true, unconditionally terminate, else only
1048 * if shutdown and empty queue and no active workers
1049 * @return true if now terminating or terminated
1050 */
1051 private boolean tryTerminate(boolean now) {
1052 if (now)
1053 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1054 else if (runState < SHUTDOWN ||
1055 !submissionQueue.isEmpty() ||
1056 (runState & ACTIVE_COUNT_MASK) != 0)
1057 return false;
1058
1059 if (advanceRunLevel(TERMINATING))
1060 startTerminating();
1061
1062 // Finish now if all threads terminated; else in some subsequent call
1063 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1064 advanceRunLevel(TERMINATED);
1065 termination.arrive();
1066 }
1067 return true;
1068 }
1069
1070 /**
1071 * Actions on transition to TERMINATING
1072 *
1073 * Runs up to four passes through workers: (0) shutting down each
1074 * quietly (without waking up if parked) to quickly spread
1075 * notifications without unnecessary bouncing around event queues
1076 * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up
1077 * races with interrupted workers
1078 */
1079 private void startTerminating() {
1080 cancelSubmissions();
1081 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1082 advanceEventCount();
1083 eventWaiters = 0L; // clobber lists
1084 spareWaiters = 0;
1085 ForkJoinWorkerThread[] ws = workers;
1086 int n = ws.length;
1087 for (int i = 0; i < n; ++i) {
1088 ForkJoinWorkerThread w = ws[i];
1089 if (w != null) {
1090 w.shutdown(true);
1091 if (passes > 0 && !w.isTerminated()) {
1092 w.cancelTasks();
1093 LockSupport.unpark(w);
1094 if (passes > 1) {
1095 try {
1096 w.interrupt();
1097 } catch (SecurityException ignore) {
1098 }
1099 }
1100 }
1101 }
1102 }
1103 }
1104 }
1105
1106 /**
1107 * Clear out and cancel submissions, ignoring exceptions
1108 */
1109 private void cancelSubmissions() {
1110 ForkJoinTask<?> task;
1111 while ((task = submissionQueue.poll()) != null) {
1112 try {
1113 task.cancel(false);
1114 } catch (Throwable ignore) {
1115 }
1116 }
1117 }
1118
1119 // misc support for ForkJoinWorkerThread
1120
1121 /**
1122 * Returns pool number
1123 */
1124 final int getPoolNumber() {
1125 return poolNumber;
1126 }
1127
1128 /**
1129 * Tries to accumulates steal count from a worker, clearing
1130 * the worker's value.
1131 *
1132 * @return true if worker steal count now zero
1133 */
1134 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1135 int sc = w.stealCount;
1136 long c = stealCount;
1137 // CAS even if zero, for fence effects
1138 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1139 if (sc != 0)
1140 w.stealCount = 0;
1141 return true;
1142 }
1143 return sc == 0;
1144 }
1145
1146 /**
1147 * Returns the approximate (non-atomic) number of idle threads per
1148 * active thread.
1149 */
1150 final int idlePerActive() {
1151 int pc = parallelism; // use parallelism, not rc
1152 int ac = runState; // no mask -- artifically boosts during shutdown
1153 // Use exact results for small values, saturate past 4
1154 return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1155 }
1156
1157 // Public and protected methods
1158
1159 // Constructors
1160
1161 /**
1162 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1163 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1164 * #defaultForkJoinWorkerThreadFactory default thread factory},
1165 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1166 *
1167 * @throws SecurityException if a security manager exists and
1168 * the caller is not permitted to modify threads
1169 * because it does not hold {@link
1170 * java.lang.RuntimePermission}{@code ("modifyThread")}
1171 */
1172 public ForkJoinPool() {
1173 this(Runtime.getRuntime().availableProcessors(),
1174 defaultForkJoinWorkerThreadFactory, null, false);
1175 }
1176
1177 /**
1178 * Creates a {@code ForkJoinPool} with the indicated parallelism
1179 * level, the {@linkplain
1180 * #defaultForkJoinWorkerThreadFactory default thread factory},
1181 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1182 *
1183 * @param parallelism the parallelism level
1184 * @throws IllegalArgumentException if parallelism less than or
1185 * equal to zero, or greater than implementation limit
1186 * @throws SecurityException if a security manager exists and
1187 * the caller is not permitted to modify threads
1188 * because it does not hold {@link
1189 * java.lang.RuntimePermission}{@code ("modifyThread")}
1190 */
1191 public ForkJoinPool(int parallelism) {
1192 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1193 }
1194
1195 /**
1196 * Creates a {@code ForkJoinPool} with the given parameters.
1197 *
1198 * @param parallelism the parallelism level. For default value,
1199 * use {@link java.lang.Runtime#availableProcessors}.
1200 * @param factory the factory for creating new threads. For default value,
1201 * use {@link #defaultForkJoinWorkerThreadFactory}.
1202 * @param handler the handler for internal worker threads that
1203 * terminate due to unrecoverable errors encountered while executing
1204 * tasks. For default value, use <code>null</code>.
1205 * @param asyncMode if true,
1206 * establishes local first-in-first-out scheduling mode for forked
1207 * tasks that are never joined. This mode may be more appropriate
1208 * than default locally stack-based mode in applications in which
1209 * worker threads only process event-style asynchronous tasks.
1210 * For default value, use <code>false</code>.
1211 * @throws IllegalArgumentException if parallelism less than or
1212 * equal to zero, or greater than implementation limit
1213 * @throws NullPointerException if the factory is null
1214 * @throws SecurityException if a security manager exists and
1215 * the caller is not permitted to modify threads
1216 * because it does not hold {@link
1217 * java.lang.RuntimePermission}{@code ("modifyThread")}
1218 */
1219 public ForkJoinPool(int parallelism,
1220 ForkJoinWorkerThreadFactory factory,
1221 Thread.UncaughtExceptionHandler handler,
1222 boolean asyncMode) {
1223 checkPermission();
1224 if (factory == null)
1225 throw new NullPointerException();
1226 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1227 throw new IllegalArgumentException();
1228 this.parallelism = parallelism;
1229 this.factory = factory;
1230 this.ueh = handler;
1231 this.locallyFifo = asyncMode;
1232 int arraySize = initialArraySizeFor(parallelism);
1233 this.workers = new ForkJoinWorkerThread[arraySize];
1234 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1235 this.workerLock = new ReentrantLock();
1236 this.termination = new Phaser(1);
1237 this.poolNumber = poolNumberGenerator.incrementAndGet();
1238 this.trimTime = System.nanoTime();
1239 }
1240
1241 /**
1242 * Returns initial power of two size for workers array.
1243 * @param pc the initial parallelism level
1244 */
1245 private static int initialArraySizeFor(int pc) {
1246 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1247 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1248 size |= size >>> 1;
1249 size |= size >>> 2;
1250 size |= size >>> 4;
1251 size |= size >>> 8;
1252 return size + 1;
1253 }
1254
1255 // Execution methods
1256
1257 /**
1258 * Common code for execute, invoke and submit
1259 */
1260 private <T> void doSubmit(ForkJoinTask<T> task) {
1261 if (task == null)
1262 throw new NullPointerException();
1263 if (runState >= SHUTDOWN)
1264 throw new RejectedExecutionException();
1265 submissionQueue.offer(task);
1266 advanceEventCount();
1267 helpMaintainParallelism(); // start or wake up workers
1268 }
1269
1270 /**
1271 * Performs the given task, returning its result upon completion.
1272 * If the caller is already engaged in a fork/join computation in
1273 * the current pool, this method is equivalent in effect to
1274 * {@link ForkJoinTask#invoke}.
1275 *
1276 * @param task the task
1277 * @return the task's result
1278 * @throws NullPointerException if the task is null
1279 * @throws RejectedExecutionException if the task cannot be
1280 * scheduled for execution
1281 */
1282 public <T> T invoke(ForkJoinTask<T> task) {
1283 doSubmit(task);
1284 return task.join();
1285 }
1286
1287 /**
1288 * Arranges for (asynchronous) execution of the given task.
1289 * If the caller is already engaged in a fork/join computation in
1290 * the current pool, this method is equivalent in effect to
1291 * {@link ForkJoinTask#fork}.
1292 *
1293 * @param task the task
1294 * @throws NullPointerException if the task is null
1295 * @throws RejectedExecutionException if the task cannot be
1296 * scheduled for execution
1297 */
1298 public void execute(ForkJoinTask<?> task) {
1299 doSubmit(task);
1300 }
1301
1302 // AbstractExecutorService methods
1303
1304 /**
1305 * @throws NullPointerException if the task is null
1306 * @throws RejectedExecutionException if the task cannot be
1307 * scheduled for execution
1308 */
1309 public void execute(Runnable task) {
1310 ForkJoinTask<?> job;
1311 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1312 job = (ForkJoinTask<?>) task;
1313 else
1314 job = ForkJoinTask.adapt(task, null);
1315 doSubmit(job);
1316 }
1317
1318 /**
1319 * Submits a ForkJoinTask for execution.
1320 * If the caller is already engaged in a fork/join computation in
1321 * the current pool, this method is equivalent in effect to
1322 * {@link ForkJoinTask#fork}.
1323 *
1324 * @param task the task to submit
1325 * @return the task
1326 * @throws NullPointerException if the task is null
1327 * @throws RejectedExecutionException if the task cannot be
1328 * scheduled for execution
1329 */
1330 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1331 doSubmit(task);
1332 return task;
1333 }
1334
1335 /**
1336 * @throws NullPointerException if the task is null
1337 * @throws RejectedExecutionException if the task cannot be
1338 * scheduled for execution
1339 */
1340 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1341 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1342 doSubmit(job);
1343 return job;
1344 }
1345
1346 /**
1347 * @throws NullPointerException if the task is null
1348 * @throws RejectedExecutionException if the task cannot be
1349 * scheduled for execution
1350 */
1351 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1352 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1353 doSubmit(job);
1354 return job;
1355 }
1356
1357 /**
1358 * @throws NullPointerException if the task is null
1359 * @throws RejectedExecutionException if the task cannot be
1360 * scheduled for execution
1361 */
1362 public ForkJoinTask<?> submit(Runnable task) {
1363 ForkJoinTask<?> job;
1364 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1365 job = (ForkJoinTask<?>) task;
1366 else
1367 job = ForkJoinTask.adapt(task, null);
1368 doSubmit(job);
1369 return job;
1370 }
1371
1372 /**
1373 * @throws NullPointerException {@inheritDoc}
1374 * @throws RejectedExecutionException {@inheritDoc}
1375 */
1376 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1377 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1378 new ArrayList<ForkJoinTask<T>>(tasks.size());
1379 for (Callable<T> task : tasks)
1380 forkJoinTasks.add(ForkJoinTask.adapt(task));
1381 invoke(new InvokeAll<T>(forkJoinTasks));
1382
1383 @SuppressWarnings({"unchecked", "rawtypes"})
1384 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1385 return futures;
1386 }
1387
1388 static final class InvokeAll<T> extends RecursiveAction {
1389 final ArrayList<ForkJoinTask<T>> tasks;
1390 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1391 public void compute() {
1392 try { invokeAll(tasks); }
1393 catch (Exception ignore) {}
1394 }
1395 private static final long serialVersionUID = -7914297376763021607L;
1396 }
1397
1398 /**
1399 * Returns the factory used for constructing new workers.
1400 *
1401 * @return the factory used for constructing new workers
1402 */
1403 public ForkJoinWorkerThreadFactory getFactory() {
1404 return factory;
1405 }
1406
1407 /**
1408 * Returns the handler for internal worker threads that terminate
1409 * due to unrecoverable errors encountered while executing tasks.
1410 *
1411 * @return the handler, or {@code null} if none
1412 */
1413 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1414 return ueh;
1415 }
1416
1417 /**
1418 * Returns the targeted parallelism level of this pool.
1419 *
1420 * @return the targeted parallelism level of this pool
1421 */
1422 public int getParallelism() {
1423 return parallelism;
1424 }
1425
1426 /**
1427 * Returns the number of worker threads that have started but not
1428 * yet terminated. This result returned by this method may differ
1429 * from {@link #getParallelism} when threads are created to
1430 * maintain parallelism when others are cooperatively blocked.
1431 *
1432 * @return the number of worker threads
1433 */
1434 public int getPoolSize() {
1435 return workerCounts >>> TOTAL_COUNT_SHIFT;
1436 }
1437
1438 /**
1439 * Returns {@code true} if this pool uses local first-in-first-out
1440 * scheduling mode for forked tasks that are never joined.
1441 *
1442 * @return {@code true} if this pool uses async mode
1443 */
1444 public boolean getAsyncMode() {
1445 return locallyFifo;
1446 }
1447
1448 /**
1449 * Returns an estimate of the number of worker threads that are
1450 * not blocked waiting to join tasks or for other managed
1451 * synchronization. This method may overestimate the
1452 * number of running threads.
1453 *
1454 * @return the number of worker threads
1455 */
1456 public int getRunningThreadCount() {
1457 return workerCounts & RUNNING_COUNT_MASK;
1458 }
1459
1460 /**
1461 * Returns an estimate of the number of threads that are currently
1462 * stealing or executing tasks. This method may overestimate the
1463 * number of active threads.
1464 *
1465 * @return the number of active threads
1466 */
1467 public int getActiveThreadCount() {
1468 return runState & ACTIVE_COUNT_MASK;
1469 }
1470
1471 /**
1472 * Returns {@code true} if all worker threads are currently idle.
1473 * An idle worker is one that cannot obtain a task to execute
1474 * because none are available to steal from other threads, and
1475 * there are no pending submissions to the pool. This method is
1476 * conservative; it might not return {@code true} immediately upon
1477 * idleness of all threads, but will eventually become true if
1478 * threads remain inactive.
1479 *
1480 * @return {@code true} if all threads are currently idle
1481 */
1482 public boolean isQuiescent() {
1483 return (runState & ACTIVE_COUNT_MASK) == 0;
1484 }
1485
1486 /**
1487 * Returns an estimate of the total number of tasks stolen from
1488 * one thread's work queue by another. The reported value
1489 * underestimates the actual total number of steals when the pool
1490 * is not quiescent. This value may be useful for monitoring and
1491 * tuning fork/join programs: in general, steal counts should be
1492 * high enough to keep threads busy, but low enough to avoid
1493 * overhead and contention across threads.
1494 *
1495 * @return the number of steals
1496 */
1497 public long getStealCount() {
1498 return stealCount;
1499 }
1500
1501 /**
1502 * Returns an estimate of the total number of tasks currently held
1503 * in queues by worker threads (but not including tasks submitted
1504 * to the pool that have not begun executing). This value is only
1505 * an approximation, obtained by iterating across all threads in
1506 * the pool. This method may be useful for tuning task
1507 * granularities.
1508 *
1509 * @return the number of queued tasks
1510 */
1511 public long getQueuedTaskCount() {
1512 long count = 0;
1513 ForkJoinWorkerThread[] ws = workers;
1514 int n = ws.length;
1515 for (int i = 0; i < n; ++i) {
1516 ForkJoinWorkerThread w = ws[i];
1517 if (w != null)
1518 count += w.getQueueSize();
1519 }
1520 return count;
1521 }
1522
1523 /**
1524 * Returns an estimate of the number of tasks submitted to this
1525 * pool that have not yet begun executing. This method takes time
1526 * proportional to the number of submissions.
1527 *
1528 * @return the number of queued submissions
1529 */
1530 public int getQueuedSubmissionCount() {
1531 return submissionQueue.size();
1532 }
1533
1534 /**
1535 * Returns {@code true} if there are any tasks submitted to this
1536 * pool that have not yet begun executing.
1537 *
1538 * @return {@code true} if there are any queued submissions
1539 */
1540 public boolean hasQueuedSubmissions() {
1541 return !submissionQueue.isEmpty();
1542 }
1543
1544 /**
1545 * Removes and returns the next unexecuted submission if one is
1546 * available. This method may be useful in extensions to this
1547 * class that re-assign work in systems with multiple pools.
1548 *
1549 * @return the next submission, or {@code null} if none
1550 */
1551 protected ForkJoinTask<?> pollSubmission() {
1552 return submissionQueue.poll();
1553 }
1554
1555 /**
1556 * Removes all available unexecuted submitted and forked tasks
1557 * from scheduling queues and adds them to the given collection,
1558 * without altering their execution status. These may include
1559 * artificially generated or wrapped tasks. This method is
1560 * designed to be invoked only when the pool is known to be
1561 * quiescent. Invocations at other times may not remove all
1562 * tasks. A failure encountered while attempting to add elements
1563 * to collection {@code c} may result in elements being in
1564 * neither, either or both collections when the associated
1565 * exception is thrown. The behavior of this operation is
1566 * undefined if the specified collection is modified while the
1567 * operation is in progress.
1568 *
1569 * @param c the collection to transfer elements into
1570 * @return the number of elements transferred
1571 */
1572 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1573 int count = submissionQueue.drainTo(c);
1574 ForkJoinWorkerThread[] ws = workers;
1575 int n = ws.length;
1576 for (int i = 0; i < n; ++i) {
1577 ForkJoinWorkerThread w = ws[i];
1578 if (w != null)
1579 count += w.drainTasksTo(c);
1580 }
1581 return count;
1582 }
1583
1584 /**
1585 * Returns a string identifying this pool, as well as its state,
1586 * including indications of run state, parallelism level, and
1587 * worker and task counts.
1588 *
1589 * @return a string identifying this pool, as well as its state
1590 */
1591 public String toString() {
1592 long st = getStealCount();
1593 long qt = getQueuedTaskCount();
1594 long qs = getQueuedSubmissionCount();
1595 int wc = workerCounts;
1596 int tc = wc >>> TOTAL_COUNT_SHIFT;
1597 int rc = wc & RUNNING_COUNT_MASK;
1598 int pc = parallelism;
1599 int rs = runState;
1600 int ac = rs & ACTIVE_COUNT_MASK;
1601 return super.toString() +
1602 "[" + runLevelToString(rs) +
1603 ", parallelism = " + pc +
1604 ", size = " + tc +
1605 ", active = " + ac +
1606 ", running = " + rc +
1607 ", steals = " + st +
1608 ", tasks = " + qt +
1609 ", submissions = " + qs +
1610 "]";
1611 }
1612
1613 private static String runLevelToString(int s) {
1614 return ((s & TERMINATED) != 0 ? "Terminated" :
1615 ((s & TERMINATING) != 0 ? "Terminating" :
1616 ((s & SHUTDOWN) != 0 ? "Shutting down" :
1617 "Running")));
1618 }
1619
1620 /**
1621 * Initiates an orderly shutdown in which previously submitted
1622 * tasks are executed, but no new tasks will be accepted.
1623 * Invocation has no additional effect if already shut down.
1624 * Tasks that are in the process of being submitted concurrently
1625 * during the course of this method may or may not be rejected.
1626 *
1627 * @throws SecurityException if a security manager exists and
1628 * the caller is not permitted to modify threads
1629 * because it does not hold {@link
1630 * java.lang.RuntimePermission}{@code ("modifyThread")}
1631 */
1632 public void shutdown() {
1633 checkPermission();
1634 advanceRunLevel(SHUTDOWN);
1635 tryTerminate(false);
1636 }
1637
1638 /**
1639 * Attempts to cancel and/or stop all tasks, and reject all
1640 * subsequently submitted tasks. Tasks that are in the process of
1641 * being submitted or executed concurrently during the course of
1642 * this method may or may not be rejected. This method cancels
1643 * both existing and unexecuted tasks, in order to permit
1644 * termination in the presence of task dependencies. So the method
1645 * always returns an empty list (unlike the case for some other
1646 * Executors).
1647 *
1648 * @return an empty list
1649 * @throws SecurityException if a security manager exists and
1650 * the caller is not permitted to modify threads
1651 * because it does not hold {@link
1652 * java.lang.RuntimePermission}{@code ("modifyThread")}
1653 */
1654 public List<Runnable> shutdownNow() {
1655 checkPermission();
1656 tryTerminate(true);
1657 return Collections.emptyList();
1658 }
1659
1660 /**
1661 * Returns {@code true} if all tasks have completed following shut down.
1662 *
1663 * @return {@code true} if all tasks have completed following shut down
1664 */
1665 public boolean isTerminated() {
1666 return runState >= TERMINATED;
1667 }
1668
1669 /**
1670 * Returns {@code true} if the process of termination has
1671 * commenced but not yet completed. This method may be useful for
1672 * debugging. A return of {@code true} reported a sufficient
1673 * period after shutdown may indicate that submitted tasks have
1674 * ignored or suppressed interruption, causing this executor not
1675 * to properly terminate.
1676 *
1677 * @return {@code true} if terminating but not yet terminated
1678 */
1679 public boolean isTerminating() {
1680 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1681 }
1682
1683 /**
1684 * Returns {@code true} if this pool has been shut down.
1685 *
1686 * @return {@code true} if this pool has been shut down
1687 */
1688 public boolean isShutdown() {
1689 return runState >= SHUTDOWN;
1690 }
1691
1692 /**
1693 * Blocks until all tasks have completed execution after a shutdown
1694 * request, or the timeout occurs, or the current thread is
1695 * interrupted, whichever happens first.
1696 *
1697 * @param timeout the maximum time to wait
1698 * @param unit the time unit of the timeout argument
1699 * @return {@code true} if this executor terminated and
1700 * {@code false} if the timeout elapsed before termination
1701 * @throws InterruptedException if interrupted while waiting
1702 */
1703 public boolean awaitTermination(long timeout, TimeUnit unit)
1704 throws InterruptedException {
1705 try {
1706 return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1707 } catch(TimeoutException ex) {
1708 return false;
1709 }
1710 }
1711
1712 /**
1713 * Interface for extending managed parallelism for tasks running
1714 * in {@link ForkJoinPool}s.
1715 *
1716 * <p>A {@code ManagedBlocker} provides two methods. Method
1717 * {@code isReleasable} must return {@code true} if blocking is
1718 * not necessary. Method {@code block} blocks the current thread
1719 * if necessary (perhaps internally invoking {@code isReleasable}
1720 * before actually blocking). The unusual methods in this API
1721 * accommodate synchronizers that may, but don't usually, block
1722 * for long periods. Similarly, they allow more efficient internal
1723 * handling of cases in which additional workers may be, but
1724 * usually are not, needed to ensure sufficient parallelism.
1725 * Toward this end, implementations of method {@code isReleasable}
1726 * must be amenable to repeated invocation.
1727 *
1728 * <p>For example, here is a ManagedBlocker based on a
1729 * ReentrantLock:
1730 * <pre> {@code
1731 * class ManagedLocker implements ManagedBlocker {
1732 * final ReentrantLock lock;
1733 * boolean hasLock = false;
1734 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1735 * public boolean block() {
1736 * if (!hasLock)
1737 * lock.lock();
1738 * return true;
1739 * }
1740 * public boolean isReleasable() {
1741 * return hasLock || (hasLock = lock.tryLock());
1742 * }
1743 * }}</pre>
1744 *
1745 * <p>Here is a class that possibly blocks waiting for an
1746 * item on a given queue:
1747 * <pre> {@code
1748 * class QueueTaker<E> implements ManagedBlocker {
1749 * final BlockingQueue<E> queue;
1750 * volatile E item = null;
1751 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1752 * public boolean block() throws InterruptedException {
1753 * if (item == null)
1754 * item = queue.take
1755 * return true;
1756 * }
1757 * public boolean isReleasable() {
1758 * return item != null || (item = queue.poll) != null;
1759 * }
1760 * public E getItem() { // call after pool.managedBlock completes
1761 * return item;
1762 * }
1763 * }}</pre>
1764 */
1765 public static interface ManagedBlocker {
1766 /**
1767 * Possibly blocks the current thread, for example waiting for
1768 * a lock or condition.
1769 *
1770 * @return {@code true} if no additional blocking is necessary
1771 * (i.e., if isReleasable would return true)
1772 * @throws InterruptedException if interrupted while waiting
1773 * (the method is not required to do so, but is allowed to)
1774 */
1775 boolean block() throws InterruptedException;
1776
1777 /**
1778 * Returns {@code true} if blocking is unnecessary.
1779 */
1780 boolean isReleasable();
1781 }
1782
1783 /**
1784 * Blocks in accord with the given blocker. If the current thread
1785 * is a {@link ForkJoinWorkerThread}, this method possibly
1786 * arranges for a spare thread to be activated if necessary to
1787 * ensure sufficient parallelism while the current thread is blocked.
1788 *
1789 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1790 * behaviorally equivalent to
1791 * <pre> {@code
1792 * while (!blocker.isReleasable())
1793 * if (blocker.block())
1794 * return;
1795 * }</pre>
1796 *
1797 * If the caller is a {@code ForkJoinTask}, then the pool may
1798 * first be expanded to ensure parallelism, and later adjusted.
1799 *
1800 * @param blocker the blocker
1801 * @throws InterruptedException if blocker.block did so
1802 */
1803 public static void managedBlock(ManagedBlocker blocker)
1804 throws InterruptedException {
1805 Thread t = Thread.currentThread();
1806 if (t instanceof ForkJoinWorkerThread) {
1807 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1808 w.pool.awaitBlocker(blocker);
1809 }
1810 else {
1811 do {} while (!blocker.isReleasable() && !blocker.block());
1812 }
1813 }
1814
1815 // AbstractExecutorService overrides. These rely on undocumented
1816 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1817 // implement RunnableFuture.
1818
1819 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1820 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1821 }
1822
1823 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1824 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1825 }
1826
1827 // Unsafe mechanics
1828
1829 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1830 private static final long workerCountsOffset =
1831 objectFieldOffset("workerCounts", ForkJoinPool.class);
1832 private static final long runStateOffset =
1833 objectFieldOffset("runState", ForkJoinPool.class);
1834 private static final long eventCountOffset =
1835 objectFieldOffset("eventCount", ForkJoinPool.class);
1836 private static final long eventWaitersOffset =
1837 objectFieldOffset("eventWaiters",ForkJoinPool.class);
1838 private static final long stealCountOffset =
1839 objectFieldOffset("stealCount",ForkJoinPool.class);
1840 private static final long spareWaitersOffset =
1841 objectFieldOffset("spareWaiters",ForkJoinPool.class);
1842
1843 private static long objectFieldOffset(String field, Class<?> klazz) {
1844 try {
1845 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1846 } catch (NoSuchFieldException e) {
1847 // Convert Exception to corresponding Error
1848 NoSuchFieldError error = new NoSuchFieldError(field);
1849 error.initCause(e);
1850 throw error;
1851 }
1852 }
1853 }