ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
Revision: 1.62
Committed: Wed Aug 11 20:28:22 2010 UTC (13 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.61: +1 -1 lines
Log Message:
Typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8
9 import java.util.concurrent.*;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.locks.LockSupport;
17 import java.util.concurrent.locks.ReentrantLock;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.CountDownLatch;
20
21 /**
22 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
23 * A {@code ForkJoinPool} provides the entry point for submissions
24 * from non-{@code ForkJoinTask} clients, as well as management and
25 * monitoring operations.
26 *
27 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
28 * ExecutorService} mainly by virtue of employing
29 * <em>work-stealing</em>: all threads in the pool attempt to find and
30 * execute subtasks created by other active tasks (eventually blocking
31 * waiting for work if none exist). This enables efficient processing
32 * when most tasks spawn other subtasks (as do most {@code
33 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
34 * constructors, {@code ForkJoinPool}s may also be appropriate for use
35 * with event-style tasks that are never joined.
36 *
37 * <p>A {@code ForkJoinPool} is constructed with a given target
38 * parallelism level; by default, equal to the number of available
39 * processors. The pool attempts to maintain enough active (or
40 * available) threads by dynamically adding, suspending, or resuming
41 * internal worker threads, even if some tasks are stalled waiting to
42 * join others. However, no such adjustments are guaranteed in the
43 * face of blocked IO or other unmanaged synchronization. The nested
44 * {@link ManagedBlocker} interface enables extension of the kinds of
45 * synchronization accommodated.
46 *
47 * <p>In addition to execution and lifecycle control methods, this
48 * class provides status check methods (for example
49 * {@link #getStealCount}) that are intended to aid in developing,
50 * tuning, and monitoring fork/join applications. Also, method
51 * {@link #toString} returns indications of pool state in a
52 * convenient form for informal monitoring.
53 *
54 * <p> As is the case with other ExecutorServices, there are three
55 * main task execution methods summarized in the following
56 * table. These are designed to be used by clients not already engaged
57 * in fork/join computations in the current pool. The main forms of
58 * these methods accept instances of {@code ForkJoinTask}, but
59 * overloaded forms also allow mixed execution of plain {@code
60 * Runnable}- or {@code Callable}- based activities as well. However,
61 * tasks that are already executing in a pool should normally
62 * <em>NOT</em> use these pool execution methods, but instead use the
63 * within-computation forms listed in the table.
64 *
65 * <table BORDER CELLPADDING=3 CELLSPACING=1>
66 * <tr>
67 * <td></td>
68 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
69 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
70 * </tr>
71 * <tr>
72 * <td> <b>Arange async execution</td>
73 * <td> {@link #execute(ForkJoinTask)}</td>
74 * <td> {@link ForkJoinTask#fork}</td>
75 * </tr>
76 * <tr>
77 * <td> <b>Await and obtain result</td>
78 * <td> {@link #invoke(ForkJoinTask)}</td>
79 * <td> {@link ForkJoinTask#invoke}</td>
80 * </tr>
81 * <tr>
82 * <td> <b>Arrange exec and obtain Future</td>
83 * <td> {@link #submit(ForkJoinTask)}</td>
84 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
85 * </tr>
86 * </table>
87 *
88 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
89 * used for all parallel task execution in a program or subsystem.
90 * Otherwise, use would not usually outweigh the construction and
91 * bookkeeping overhead of creating a large set of threads. For
92 * example, a common pool could be used for the {@code SortTasks}
93 * illustrated in {@link RecursiveAction}. Because {@code
94 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
95 * daemon} mode, there is typically no need to explicitly {@link
96 * #shutdown} such a pool upon program exit.
97 *
98 * <pre>
99 * static final ForkJoinPool mainPool = new ForkJoinPool();
100 * ...
101 * public void sort(long[] array) {
102 * mainPool.invoke(new SortTask(array, 0, array.length));
103 * }
104 * </pre>
105 *
106 * <p><b>Implementation notes</b>: This implementation restricts the
107 * maximum number of running threads to 32767. Attempts to create
108 * pools with greater than the maximum number result in
109 * {@code IllegalArgumentException}.
110 *
111 * <p>This implementation rejects submitted tasks (that is, by throwing
112 * {@link RejectedExecutionException}) only when the pool is shut down
113 * or internal resources have been exhausted.
114 *
115 * @since 1.7
116 * @author Doug Lea
117 */
118 public class ForkJoinPool extends AbstractExecutorService {
119
120 /*
121 * Implementation Overview
122 *
123 * This class provides the central bookkeeping and control for a
124 * set of worker threads: Submissions from non-FJ threads enter
125 * into a submission queue. Workers take these tasks and typically
126 * split them into subtasks that may be stolen by other workers.
127 * The main work-stealing mechanics implemented in class
128 * ForkJoinWorkerThread give first priority to processing tasks
129 * from their own queues (LIFO or FIFO, depending on mode), then
130 * to randomized FIFO steals of tasks in other worker queues, and
131 * lastly to new submissions. These mechanics do not consider
132 * affinities, loads, cache localities, etc, so rarely provide the
133 * best possible performance on a given machine, but portably
134 * provide good throughput by averaging over these factors.
135 * (Further, even if we did try to use such information, we do not
136 * usually have a basis for exploiting it. For example, some sets
137 * of tasks profit from cache affinities, but others are harmed by
138 * cache pollution effects.)
139 *
140 * Beyond work-stealing support and essential bookkeeping, the
141 * main responsibility of this framework is to take actions when
142 * one worker is waiting to join a task stolen (or always held by)
143 * another. Becauae we are multiplexing many tasks on to a pool
144 * of workers, we can't just let them block (as in Thread.join).
145 * We also cannot just reassign the joiner's run-time stack with
146 * another and replace it later, which would be a form of
147 * "continuation", that even if possible is not necessarily a good
148 * idea. Given that the creation costs of most threads on most
149 * systems mainly surrounds setting up runtime stacks, thread
150 * creation and switching is usually not much more expensive than
151 * stack creation and switching, and is more flexible). Instead we
152 * combine two tactics:
153 *
154 * Helping: Arranging for the joiner to execute some task that it
155 * would be running if the steal had not occurred. Method
156 * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
157 * links to try to find such a task.
158 *
159 * Compensating: Unless there are already enough live threads,
160 * method helpMaintainParallelism() may create or or
161 * re-activate a spare thread to compensate for blocked
162 * joiners until they unblock.
163 *
164 * Because the determining existence of conservatively safe
165 * helping targets, the availability of already-created spares,
166 * and the apparent need to create new spares are all racy and
167 * require heuristic guidance, we rely on multiple retries of
168 * each. Further, because it is impossible to keep exactly the
169 * target (parallelism) number of threads running at any given
170 * time, we allow compensation during joins to fail, and enlist
171 * all other threads to help out whenever they are not otherwise
172 * occupied (i.e., mainly in method preStep).
173 *
174 * The ManagedBlocker extension API can't use helping so relies
175 * only on compensation in method awaitBlocker.
176 *
177 * The main throughput advantages of work-stealing stem from
178 * decentralized control -- workers mostly steal tasks from each
179 * other. We do not want to negate this by creating bottlenecks
180 * implementing other management responsibilities. So we use a
181 * collection of techniques that avoid, reduce, or cope well with
182 * contention. These entail several instances of bit-packing into
183 * CASable fields to maintain only the minimally required
184 * atomicity. To enable such packing, we restrict maximum
185 * parallelism to (1<<15)-1 (enabling twice this (to accommodate
186 * unbalanced increments and decrements) to fit into a 16 bit
187 * field, which is far in excess of normal operating range. Even
188 * though updates to some of these bookkeeping fields do sometimes
189 * contend with each other, they don't normally cache-contend with
190 * updates to others enough to warrant memory padding or
191 * isolation. So they are all held as fields of ForkJoinPool
192 * objects. The main capabilities are as follows:
193 *
194 * 1. Creating and removing workers. Workers are recorded in the
195 * "workers" array. This is an array as opposed to some other data
196 * structure to support index-based random steals by workers.
197 * Updates to the array recording new workers and unrecording
198 * terminated ones are protected from each other by a lock
199 * (workerLock) but the array is otherwise concurrently readable,
200 * and accessed directly by workers. To simplify index-based
201 * operations, the array size is always a power of two, and all
202 * readers must tolerate null slots. Currently, all worker thread
203 * creation is on-demand, triggered by task submissions,
204 * replacement of terminated workers, and/or compensation for
205 * blocked workers. However, all other support code is set up to
206 * work with other policies.
207 *
208 * To ensure that we do not hold on to worker references that
209 * would prevent GC, ALL accesses to workers are via indices into
210 * the workers array (which is one source of some of the unusual
211 * code constructions here). In essence, the workers array serves
212 * as a WeakReference mechanism. Thus for example the event queue
213 * stores worker indices, not worker references. Access to the
214 * workers in associated methods (for example releaseEventWaiters)
215 * must both index-check and null-check the IDs. All such accesses
216 * ignore bad IDs by returning out early from what they are doing,
217 * since this can only be associated with shutdown, in which case
218 * it is OK to give up. On termination, we just clobber these
219 * data structures without trying to use them.
220 *
221 * 2. Bookkeeping for dynamically adding and removing workers. We
222 * aim to approximately maintain the given level of parallelism.
223 * When some workers are known to be blocked (on joins or via
224 * ManagedBlocker), we may create or resume others to take their
225 * place until they unblock (see below). Implementing this
226 * requires counts of the number of "running" threads (i.e., those
227 * that are neither blocked nor artifically suspended) as well as
228 * the total number. These two values are packed into one field,
229 * "workerCounts" because we need accurate snapshots when deciding
230 * to create, resume or suspend. Note however that the
231 * correspondance of these counts to reality is not guaranteed. In
232 * particular updates for unblocked threads may lag until they
233 * actually wake up.
234 *
235 * 3. Maintaining global run state. The run state of the pool
236 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
237 * those in other Executor implementations, as well as a count of
238 * "active" workers -- those that are, or soon will be, or
239 * recently were executing tasks. The runLevel and active count
240 * are packed together in order to correctly trigger shutdown and
241 * termination. Without care, active counts can be subject to very
242 * high contention. We substantially reduce this contention by
243 * relaxing update rules. A worker must claim active status
244 * prospectively, by activating if it sees that a submitted or
245 * stealable task exists (it may find after activating that the
246 * task no longer exists). It stays active while processing this
247 * task (if it exists) and any other local subtasks it produces,
248 * until it cannot find any other tasks. It then tries
249 * inactivating (see method preStep), but upon update contention
250 * instead scans for more tasks, later retrying inactivation if it
251 * doesn't find any.
252 *
253 * 4. Managing idle workers waiting for tasks. We cannot let
254 * workers spin indefinitely scanning for tasks when none are
255 * available. On the other hand, we must quickly prod them into
256 * action when new tasks are submitted or generated. We
257 * park/unpark these idle workers using an event-count scheme.
258 * Field eventCount is incremented upon events that may enable
259 * workers that previously could not find a task to now find one:
260 * Submission of a new task to the pool, or another worker pushing
261 * a task onto a previously empty queue. (We also use this
262 * mechanism for termination actions that require wakeups of idle
263 * workers). Each worker maintains its last known event count,
264 * and blocks when a scan for work did not find a task AND its
265 * lastEventCount matches the current eventCount. Waiting idle
266 * workers are recorded in a variant of Treiber stack headed by
267 * field eventWaiters which, when nonzero, encodes the thread
268 * index and count awaited for by the worker thread most recently
269 * calling eventSync. This thread in turn has a record (field
270 * nextEventWaiter) for the next waiting worker. In addition to
271 * allowing simpler decisions about need for wakeup, the event
272 * count bits in eventWaiters serve the role of tags to avoid ABA
273 * errors in Treiber stacks. To reduce delays in task diffusion,
274 * workers not otherwise occupied may invoke method
275 * releaseEventWaiters, that removes and signals (unparks) workers
276 * not waiting on current count. To reduce stalls, To minimize
277 * task production stalls associate with signalling, any worker
278 * pushing a task on an empty queue invokes the weaker method
279 * signalWork, that only releases idle workers until it detects
280 * interference by other threads trying to release, and lets them
281 * take over. The net effect is a tree-like diffusion of signals,
282 * where released threads (and possibly others) help with unparks.
283 * To further reduce contention effects a bit, failed CASes to
284 * increment field eventCount are tolerated without retries.
285 * Conceptually they are merged into the same event, which is OK
286 * when their only purpose is to enable workers to scan for work.
287 *
288 * 5. Managing suspension of extra workers. When a worker is about
289 * to block waiting for a join (or via ManagedBlockers), we may
290 * create a new thread to maintain parallelism level, or at least
291 * avoid starvation. Usually, extra threads are needed for only
292 * very short periods, yet join dependencies are such that we
293 * sometimes need them in bursts. Rather than create new threads
294 * each time this happens, we suspend no-longer-needed extra ones
295 * as "spares". For most purposes, we don't distinguish "extra"
296 * spare threads from normal "core" threads: On each call to
297 * preStep (the only point at which we can do this) a worker
298 * checks to see if there are now too many running workers, and if
299 * so, suspends itself. Method helpMaintainParallelism looks for
300 * suspended threads to resume before considering creating a new
301 * replacement. The spares themselves are encoded on another
302 * variant of a Treiber Stack, headed at field "spareWaiters".
303 * Note that the use of spares is intrinsically racy. One thread
304 * may become a spare at about the same time as another is
305 * needlessly being created. We counteract this and related slop
306 * in part by requiring resumed spares to immediately recheck (in
307 * preStep) to see whether they they should re-suspend. To avoid
308 * long-term build-up of spares, the oldest spare (see
309 * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if
310 * not signalled and calls tryTrimSpare, which uses two different
311 * thresholds: Always killing if the number of spares is greater
312 * that 25% of total, and killing others only at a slower rate
313 * (UNUSED_SPARE_TRIM_RATE_NANOS).
314 *
315 * 6. Deciding when to create new workers. The main dynamic
316 * control in this class is deciding when to create extra threads
317 * in method helpMaintainParallelism. We would like to keep
318 * exactly #parallelism threads running, which is an impossble
319 * task. We always need to create one when the number of running
320 * threads would become zero and all workers are busy. Beyond
321 * this, we must rely on heuristics that work well in the the
322 * presence of transients phenomena such as GC stalls, dynamic
323 * compilation, and wake-up lags. These transients are extremely
324 * common -- we are normally trying to fully saturate the CPUs on
325 * a machine, so almost any activity other than running tasks
326 * impedes accuracy. Our main defense is to allow some slack in
327 * creation thresholds, using rules that reflect the fact that the
328 * more threads we have running, the more likely that we are
329 * underestimating the number running threads. The rules also
330 * better cope with the fact that some of the methods in this
331 * class tend to never become compiled (but are interpreted), so
332 * some components of the entire set of controls might execute 100
333 * times faster than others. And similarly for cases where the
334 * apparent lack of work is just due to GC stalls and other
335 * transient system activity.
336 *
337 * Beware that there is a lot of representation-level coupling
338 * among classes ForkJoinPool, ForkJoinWorkerThread, and
339 * ForkJoinTask. For example, direct access to "workers" array by
340 * workers, and direct access to ForkJoinTask.status by both
341 * ForkJoinPool and ForkJoinWorkerThread. There is little point
342 * trying to reduce this, since any associated future changes in
343 * representations will need to be accompanied by algorithmic
344 * changes anyway.
345 *
346 * Style notes: There are lots of inline assignments (of form
347 * "while ((local = field) != 0)") which are usually the simplest
348 * way to ensure the required read orderings (which are sometimes
349 * critical). Also several occurrences of the unusual "do {}
350 * while(!cas...)" which is the simplest way to force an update of
351 * a CAS'ed variable. There are also other coding oddities that
352 * help some methods perform reasonably even when interpreted (not
353 * compiled), at the expense of some messy constructions that
354 * reduce byte code counts.
355 *
356 * The order of declarations in this file is: (1) statics (2)
357 * fields (along with constants used when unpacking some of them)
358 * (3) internal control methods (4) callbacks and other support
359 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
360 * methods (plus a few little helpers).
361 */
362
363 /**
364 * Factory for creating new {@link ForkJoinWorkerThread}s.
365 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
366 * for {@code ForkJoinWorkerThread} subclasses that extend base
367 * functionality or initialize threads with different contexts.
368 */
369 public static interface ForkJoinWorkerThreadFactory {
370 /**
371 * Returns a new worker thread operating in the given pool.
372 *
373 * @param pool the pool this thread works in
374 * @throws NullPointerException if the pool is null
375 */
376 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
377 }
378
379 /**
380 * Default ForkJoinWorkerThreadFactory implementation; creates a
381 * new ForkJoinWorkerThread.
382 */
383 static class DefaultForkJoinWorkerThreadFactory
384 implements ForkJoinWorkerThreadFactory {
385 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
386 return new ForkJoinWorkerThread(pool);
387 }
388 }
389
390 /**
391 * Creates a new ForkJoinWorkerThread. This factory is used unless
392 * overridden in ForkJoinPool constructors.
393 */
394 public static final ForkJoinWorkerThreadFactory
395 defaultForkJoinWorkerThreadFactory =
396 new DefaultForkJoinWorkerThreadFactory();
397
398 /**
399 * Permission required for callers of methods that may start or
400 * kill threads.
401 */
402 private static final RuntimePermission modifyThreadPermission =
403 new RuntimePermission("modifyThread");
404
405 /**
406 * If there is a security manager, makes sure caller has
407 * permission to modify threads.
408 */
409 private static void checkPermission() {
410 SecurityManager security = System.getSecurityManager();
411 if (security != null)
412 security.checkPermission(modifyThreadPermission);
413 }
414
415 /**
416 * Generator for assigning sequence numbers as pool names.
417 */
418 private static final AtomicInteger poolNumberGenerator =
419 new AtomicInteger();
420
421 /**
422 * Absolute bound for parallelism level. Twice this number plus
423 * one (i.e., 0xfff) must fit into a 16bit field to enable
424 * word-packing for some counts and indices.
425 */
426 private static final int MAX_WORKERS = 0x7fff;
427
428 /**
429 * Array holding all worker threads in the pool. Array size must
430 * be a power of two. Updates and replacements are protected by
431 * workerLock, but the array is always kept in a consistent enough
432 * state to be randomly accessed without locking by workers
433 * performing work-stealing, as well as other traversal-based
434 * methods in this class. All readers must tolerate that some
435 * array slots may be null.
436 */
437 volatile ForkJoinWorkerThread[] workers;
438
439 /**
440 * Queue for external submissions.
441 */
442 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
443
444 /**
445 * Lock protecting updates to workers array.
446 */
447 private final ReentrantLock workerLock;
448
449 /**
450 * Latch released upon termination.
451 */
452 private final Phaser termination;
453
454 /**
455 * Creation factory for worker threads.
456 */
457 private final ForkJoinWorkerThreadFactory factory;
458
459 /**
460 * Sum of per-thread steal counts, updated only when threads are
461 * idle or terminating.
462 */
463 private volatile long stealCount;
464
465 /**
466 * The last nanoTime that a spare thread was trimmed
467 */
468 private volatile long trimTime;
469
470 /**
471 * The rate at which to trim unused spares
472 */
473 static final long UNUSED_SPARE_TRIM_RATE_NANOS =
474 1000L * 1000L * 1000L; // 1 sec
475
476 /**
477 * Encoded record of top of treiber stack of threads waiting for
478 * events. The top 32 bits contain the count being waited for. The
479 * bottom 16 bits contains one plus the pool index of waiting
480 * worker thread. (Bits 16-31 are unused.)
481 */
482 private volatile long eventWaiters;
483
484 private static final int EVENT_COUNT_SHIFT = 32;
485 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
486
487 /**
488 * A counter for events that may wake up worker threads:
489 * - Submission of a new task to the pool
490 * - A worker pushing a task on an empty queue
491 * - termination
492 */
493 private volatile int eventCount;
494
495 /**
496 * Encoded record of top of treiber stack of spare threads waiting
497 * for resumption. The top 16 bits contain an arbitrary count to
498 * avoid ABA effects. The bottom 16bits contains one plus the pool
499 * index of waiting worker thread.
500 */
501 private volatile int spareWaiters;
502
503 private static final int SPARE_COUNT_SHIFT = 16;
504 private static final int SPARE_ID_MASK = (1 << 16) - 1;
505
506 /**
507 * Lifecycle control. The low word contains the number of workers
508 * that are (probably) executing tasks. This value is atomically
509 * incremented before a worker gets a task to run, and decremented
510 * when worker has no tasks and cannot find any. Bits 16-18
511 * contain runLevel value. When all are zero, the pool is
512 * running. Level transitions are monotonic (running -> shutdown
513 * -> terminating -> terminated) so each transition adds a bit.
514 * These are bundled together to ensure consistent read for
515 * termination checks (i.e., that runLevel is at least SHUTDOWN
516 * and active threads is zero).
517 */
518 private volatile int runState;
519
520 // Note: The order among run level values matters.
521 private static final int RUNLEVEL_SHIFT = 16;
522 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
523 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
524 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
525 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
526 private static final int ONE_ACTIVE = 1; // active update delta
527
528 /**
529 * Holds number of total (i.e., created and not yet terminated)
530 * and running (i.e., not blocked on joins or other managed sync)
531 * threads, packed together to ensure consistent snapshot when
532 * making decisions about creating and suspending spare
533 * threads. Updated only by CAS. Note that adding a new worker
534 * requires incrementing both counts, since workers start off in
535 * running state.
536 */
537 private volatile int workerCounts;
538
539 private static final int TOTAL_COUNT_SHIFT = 16;
540 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
541 private static final int ONE_RUNNING = 1;
542 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
543
544 /**
545 * The target parallelism level.
546 * Accessed directly by ForkJoinWorkerThreads.
547 */
548 final int parallelism;
549
550 /**
551 * True if use local fifo, not default lifo, for local polling
552 * Read by, and replicated by ForkJoinWorkerThreads
553 */
554 final boolean locallyFifo;
555
556 /**
557 * The uncaught exception handler used when any worker abruptly
558 * terminates.
559 */
560 private final Thread.UncaughtExceptionHandler ueh;
561
562 /**
563 * Pool number, just for assigning useful names to worker threads
564 */
565 private final int poolNumber;
566
567
568 // Utilities for CASing fields. Note that several of these
569 // are manually inlined by callers
570
571 /**
572 * Increments running count part of workerCounts
573 */
574 final void incrementRunningCount() {
575 int c;
576 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
577 c = workerCounts,
578 c + ONE_RUNNING));
579 }
580
581 /**
582 * Tries to decrement running count unless already zero
583 */
584 final boolean tryDecrementRunningCount() {
585 int wc = workerCounts;
586 if ((wc & RUNNING_COUNT_MASK) == 0)
587 return false;
588 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
589 wc, wc - ONE_RUNNING);
590 }
591
592 /**
593 * Forces decrement of encoded workerCounts, awaiting nonzero if
594 * (rarely) necessary when other count updates lag.
595 *
596 * @param dr -- either zero or ONE_RUNNING
597 * @param dt == either zero or ONE_TOTAL
598 */
599 private void decrementWorkerCounts(int dr, int dt) {
600 for (;;) {
601 int wc = workerCounts;
602 if (wc == 0 && (runState & TERMINATED) != 0)
603 return; // lagging termination on a backout
604 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
605 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0)
606 Thread.yield();
607 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
608 wc, wc - (dr + dt)))
609 return;
610 }
611 }
612
613 /**
614 * Increments event count
615 */
616 private void advanceEventCount() {
617 int c;
618 do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
619 c = eventCount, c+1));
620 }
621
622 /**
623 * Tries incrementing active count; fails on contention.
624 * Called by workers before executing tasks.
625 *
626 * @return true on success
627 */
628 final boolean tryIncrementActiveCount() {
629 int c;
630 return UNSAFE.compareAndSwapInt(this, runStateOffset,
631 c = runState, c + ONE_ACTIVE);
632 }
633
634 /**
635 * Tries decrementing active count; fails on contention.
636 * Called when workers cannot find tasks to run.
637 */
638 final boolean tryDecrementActiveCount() {
639 int c;
640 return UNSAFE.compareAndSwapInt(this, runStateOffset,
641 c = runState, c - ONE_ACTIVE);
642 }
643
644 /**
645 * Advances to at least the given level. Returns true if not
646 * already in at least the given level.
647 */
648 private boolean advanceRunLevel(int level) {
649 for (;;) {
650 int s = runState;
651 if ((s & level) != 0)
652 return false;
653 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
654 return true;
655 }
656 }
657
658 // workers array maintenance
659
660 /**
661 * Records and returns a workers array index for new worker.
662 */
663 private int recordWorker(ForkJoinWorkerThread w) {
664 // Try using slot totalCount-1. If not available, scan and/or resize
665 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
666 final ReentrantLock lock = this.workerLock;
667 lock.lock();
668 try {
669 ForkJoinWorkerThread[] ws = workers;
670 int n = ws.length;
671 if (k < 0 || k >= n || ws[k] != null) {
672 for (k = 0; k < n && ws[k] != null; ++k)
673 ;
674 if (k == n)
675 ws = Arrays.copyOf(ws, n << 1);
676 }
677 ws[k] = w;
678 workers = ws; // volatile array write ensures slot visibility
679 } finally {
680 lock.unlock();
681 }
682 return k;
683 }
684
685 /**
686 * Nulls out record of worker in workers array
687 */
688 private void forgetWorker(ForkJoinWorkerThread w) {
689 int idx = w.poolIndex;
690 // Locking helps method recordWorker avoid unecessary expansion
691 final ReentrantLock lock = this.workerLock;
692 lock.lock();
693 try {
694 ForkJoinWorkerThread[] ws = workers;
695 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
696 ws[idx] = null;
697 } finally {
698 lock.unlock();
699 }
700 }
701
702 // adding and removing workers
703
704 /**
705 * Tries to create and add new worker. Assumes that worker counts
706 * are already updated to accommodate the worker, so adjusts on
707 * failure.
708 */
709 private void addWorker() {
710 ForkJoinWorkerThread w = null;
711 try {
712 w = factory.newThread(this);
713 } finally { // Adjust on either null or exceptional factory return
714 if (w == null) {
715 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
716 tryTerminate(false); // in case of failure during shutdown
717 }
718 }
719 if (w != null)
720 w.start(recordWorker(w), ueh);
721 }
722
723 /**
724 * Final callback from terminating worker. Removes record of
725 * worker from array, and adjusts counts. If pool is shutting
726 * down, tries to complete terminatation.
727 *
728 * @param w the worker
729 */
730 final void workerTerminated(ForkJoinWorkerThread w) {
731 forgetWorker(w);
732 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
733 while (w.stealCount != 0) // collect final count
734 tryAccumulateStealCount(w);
735 tryTerminate(false);
736 }
737
738 // Waiting for and signalling events
739
740 /**
741 * Releases workers blocked on a count not equal to current count.
742 * Normally called after precheck that eventWaiters isn't zero to
743 * avoid wasted array checks.
744 *
745 * @param signalling true if caller is a signalling worker so can
746 * exit upon (conservatively) detected contention by other threads
747 * who will continue to release
748 */
749 private void releaseEventWaiters(boolean signalling) {
750 ForkJoinWorkerThread[] ws = workers;
751 int n = ws.length;
752 long h; // head of stack
753 ForkJoinWorkerThread w; int id, ec;
754 while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 &&
755 (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) &&
756 id < n && (w = ws[id]) != null) {
757 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
758 h, h = w.nextWaiter))
759 LockSupport.unpark(w);
760 if (signalling && (eventCount != ec || eventWaiters != h))
761 break;
762 }
763 }
764
765 /**
766 * Tries to advance eventCount and releases waiters. Called only
767 * from workers.
768 */
769 final void signalWork() {
770 int c; // try to increment event count -- CAS failure OK
771 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
772 if (eventWaiters != 0L)
773 releaseEventWaiters(true);
774 }
775
776 /**
777 * Blocks worker until terminating or event count
778 * advances from last value held by worker
779 *
780 * @param w the calling worker thread
781 */
782 private void eventSync(ForkJoinWorkerThread w) {
783 int wec = w.lastEventCount;
784 long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
785 long h;
786 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
787 ((h = eventWaiters) == 0L ||
788 (int)(h >>> EVENT_COUNT_SHIFT) == wec) &&
789 eventCount == wec) {
790 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
791 w.nextWaiter = h, nh)) {
792 while (runState < TERMINATING && eventCount == wec) {
793 if (!tryAccumulateStealCount(w)) // transfer while idle
794 continue;
795 Thread.interrupted(); // clear/ignore interrupt
796 if (eventCount != wec)
797 break;
798 LockSupport.park(w);
799 }
800 break;
801 }
802 }
803 w.lastEventCount = eventCount;
804 }
805
806 // Maintaining spares
807
808 /**
809 * Pushes worker onto the spare stack
810 */
811 final void pushSpare(ForkJoinWorkerThread w) {
812 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1);
813 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
814 w.nextSpare = spareWaiters,ns));
815 }
816
817 /**
818 * Tries (once) to resume a spare if running count is less than
819 * target parallelism. Fails on contention or stale workers.
820 */
821 private void tryResumeSpare() {
822 int sw, id;
823 ForkJoinWorkerThread w;
824 ForkJoinWorkerThread[] ws;
825 if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
826 id < (ws = workers).length && (w = ws[id]) != null &&
827 (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
828 eventWaiters == 0L &&
829 spareWaiters == sw &&
830 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
831 sw, w.nextSpare) &&
832 w.tryUnsuspend()) {
833 int c; // try increment; if contended, finish after unpark
834 boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
835 c = workerCounts,
836 c + ONE_RUNNING);
837 LockSupport.unpark(w);
838 if (!inc) {
839 do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
840 c = workerCounts,
841 c + ONE_RUNNING));
842 }
843 }
844 }
845
846 /**
847 * Callback from oldest spare occasionally waking up. Tries
848 * (once) to shutdown a spare if more than 25% spare overage, or
849 * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at
850 * least #parallelism running threads. Note that we don't need CAS
851 * or locks here because the method is called only from the oldest
852 * suspended spare occasionally waking (and even misfires are OK).
853 *
854 * @param now the wake up nanoTime of caller
855 */
856 final void tryTrimSpare(long now) {
857 long lastTrim = trimTime;
858 trimTime = now;
859 helpMaintainParallelism(); // first, help wake up any needed spares
860 int sw, id;
861 ForkJoinWorkerThread w;
862 ForkJoinWorkerThread[] ws;
863 int pc = parallelism;
864 int wc = workerCounts;
865 if ((wc & RUNNING_COUNT_MASK) >= pc &&
866 (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25%
867 now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) &&
868 (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
869 id < (ws = workers).length && (w = ws[id]) != null &&
870 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
871 sw, w.nextSpare))
872 w.shutdown(false);
873 }
874
875 /**
876 * Does at most one of:
877 *
878 * 1. Help wake up existing workers waiting for work via
879 * releaseEventWaiters. (If any exist, then it probably doesn't
880 * matter right now if under target parallelism level.)
881 *
882 * 2. If below parallelism level and a spare exists, try (once)
883 * to resume it via tryResumeSpare.
884 *
885 * 3. If neither of the above, tries (once) to add a new
886 * worker if either there are not enough total, or if all
887 * existing workers are busy, there are either no running
888 * workers or the deficit is at least twice the surplus.
889 */
890 private void helpMaintainParallelism() {
891 // uglified to work better when not compiled
892 int pc, wc, rc, tc, rs; long h;
893 if ((h = eventWaiters) != 0L) {
894 if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
895 releaseEventWaiters(false); // avoid useless call
896 }
897 else if ((pc = parallelism) >
898 (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) {
899 if (spareWaiters != 0)
900 tryResumeSpare();
901 else if ((rs = runState) < TERMINATING &&
902 ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc ||
903 (tc == (rs & ACTIVE_COUNT_MASK) && // all busy
904 (rc == 0 || // must add
905 rc < pc - ((tc - pc) << 1)) && // within slack
906 tc < MAX_WORKERS && runState == rs)) && // recheck busy
907 workerCounts == wc &&
908 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
909 wc + (ONE_RUNNING|ONE_TOTAL)))
910 addWorker();
911 }
912 }
913
914 /**
915 * Callback from workers invoked upon each top-level action (i.e.,
916 * stealing a task or taking a submission and running
917 * it). Performs one or more of the following:
918 *
919 * 1. If the worker cannot find work (misses > 0), updates its
920 * active status to inactive and updates activeCount unless
921 * this is the first miss and there is contention, in which
922 * case it may try again (either in this or a subsequent
923 * call).
924 *
925 * 2. If there are at least 2 misses, awaits the next task event
926 * via eventSync
927 *
928 * 3. If there are too many running threads, suspends this worker
929 * (first forcing inactivation if necessary). If it is not
930 * needed, it may be killed while suspended via
931 * tryTrimSpare. Otherwise, upon resume it rechecks to make
932 * sure that it is still needed.
933 *
934 * 4. Helps release and/or reactivate other workers via
935 * helpMaintainParallelism
936 *
937 * @param w the worker
938 * @param misses the number of scans by caller failing to find work
939 * (saturating at 2 just to avoid wraparound)
940 */
941 final void preStep(ForkJoinWorkerThread w, int misses) {
942 boolean active = w.active;
943 int pc = parallelism;
944 for (;;) {
945 int wc = workerCounts;
946 int rc = wc & RUNNING_COUNT_MASK;
947 if (active && (misses > 0 || rc > pc)) {
948 int rs; // try inactivate
949 if (UNSAFE.compareAndSwapInt(this, runStateOffset,
950 rs = runState, rs - ONE_ACTIVE))
951 active = w.active = false;
952 else if (misses > 1 || rc > pc ||
953 (rs & ACTIVE_COUNT_MASK) >= pc)
954 continue; // force inactivate
955 }
956 if (misses > 1) {
957 misses = 0; // don't re-sync
958 eventSync(w); // continue loop to recheck rc
959 }
960 else if (rc > pc) {
961 if (workerCounts == wc && // try to suspend as spare
962 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
963 wc, wc - ONE_RUNNING) &&
964 !w.suspendAsSpare()) // false if killed
965 break;
966 }
967 else {
968 if (rc < pc || eventWaiters != 0L)
969 helpMaintainParallelism();
970 break;
971 }
972 }
973 }
974
975 /**
976 * Helps and/or blocks awaiting join of the given task.
977 * Alternates between helpJoinTask() and helpMaintainParallelism()
978 * as many times as there is a deficit in running count (or longer
979 * if running count would become zero), then blocks if task still
980 * not done.
981 *
982 * @param joinMe the task to join
983 */
984 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
985 int threshold = parallelism; // descend blocking thresholds
986 while (joinMe.status >= 0) {
987 boolean block; int wc;
988 worker.helpJoinTask(joinMe);
989 if (joinMe.status < 0)
990 break;
991 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
992 if (threshold > 0)
993 --threshold;
994 else
995 advanceEventCount(); // force release
996 block = false;
997 }
998 else
999 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1000 wc, wc - ONE_RUNNING);
1001 helpMaintainParallelism();
1002 if (block) {
1003 int c;
1004 joinMe.internalAwaitDone();
1005 do {} while (!UNSAFE.compareAndSwapInt
1006 (this, workerCountsOffset,
1007 c = workerCounts, c + ONE_RUNNING));
1008 break;
1009 }
1010 }
1011 }
1012
1013 /**
1014 * Same idea as awaitJoin, but no helping
1015 */
1016 final void awaitBlocker(ManagedBlocker blocker)
1017 throws InterruptedException {
1018 int threshold = parallelism;
1019 while (!blocker.isReleasable()) {
1020 boolean block; int wc;
1021 if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1022 if (threshold > 0)
1023 --threshold;
1024 else
1025 advanceEventCount();
1026 block = false;
1027 }
1028 else
1029 block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1030 wc, wc - ONE_RUNNING);
1031 helpMaintainParallelism();
1032 if (block) {
1033 try {
1034 do {} while (!blocker.isReleasable() && !blocker.block());
1035 } finally {
1036 int c;
1037 do {} while (!UNSAFE.compareAndSwapInt
1038 (this, workerCountsOffset,
1039 c = workerCounts, c + ONE_RUNNING));
1040 }
1041 break;
1042 }
1043 }
1044 }
1045
1046 /**
1047 * Possibly initiates and/or completes termination.
1048 *
1049 * @param now if true, unconditionally terminate, else only
1050 * if shutdown and empty queue and no active workers
1051 * @return true if now terminating or terminated
1052 */
1053 private boolean tryTerminate(boolean now) {
1054 if (now)
1055 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1056 else if (runState < SHUTDOWN ||
1057 !submissionQueue.isEmpty() ||
1058 (runState & ACTIVE_COUNT_MASK) != 0)
1059 return false;
1060
1061 if (advanceRunLevel(TERMINATING))
1062 startTerminating();
1063
1064 // Finish now if all threads terminated; else in some subsequent call
1065 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1066 advanceRunLevel(TERMINATED);
1067 termination.arrive();
1068 }
1069 return true;
1070 }
1071
1072 /**
1073 * Actions on transition to TERMINATING
1074 *
1075 * Runs up to four passes through workers: (0) shutting down each
1076 * quietly (without waking up if parked) to quickly spread
1077 * notifications without unnecessary bouncing around event queues
1078 * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up
1079 * races with interrupted workers
1080 */
1081 private void startTerminating() {
1082 cancelSubmissions();
1083 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1084 advanceEventCount();
1085 eventWaiters = 0L; // clobber lists
1086 spareWaiters = 0;
1087 ForkJoinWorkerThread[] ws = workers;
1088 int n = ws.length;
1089 for (int i = 0; i < n; ++i) {
1090 ForkJoinWorkerThread w = ws[i];
1091 if (w != null) {
1092 w.shutdown(true);
1093 if (passes > 0 && !w.isTerminated()) {
1094 w.cancelTasks();
1095 LockSupport.unpark(w);
1096 if (passes > 1) {
1097 try {
1098 w.interrupt();
1099 } catch (SecurityException ignore) {
1100 }
1101 }
1102 }
1103 }
1104 }
1105 }
1106 }
1107
1108 /**
1109 * Clear out and cancel submissions, ignoring exceptions
1110 */
1111 private void cancelSubmissions() {
1112 ForkJoinTask<?> task;
1113 while ((task = submissionQueue.poll()) != null) {
1114 try {
1115 task.cancel(false);
1116 } catch (Throwable ignore) {
1117 }
1118 }
1119 }
1120
1121 // misc support for ForkJoinWorkerThread
1122
1123 /**
1124 * Returns pool number
1125 */
1126 final int getPoolNumber() {
1127 return poolNumber;
1128 }
1129
1130 /**
1131 * Tries to accumulates steal count from a worker, clearing
1132 * the worker's value.
1133 *
1134 * @return true if worker steal count now zero
1135 */
1136 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1137 int sc = w.stealCount;
1138 long c = stealCount;
1139 // CAS even if zero, for fence effects
1140 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1141 if (sc != 0)
1142 w.stealCount = 0;
1143 return true;
1144 }
1145 return sc == 0;
1146 }
1147
1148 /**
1149 * Returns the approximate (non-atomic) number of idle threads per
1150 * active thread.
1151 */
1152 final int idlePerActive() {
1153 int pc = parallelism; // use parallelism, not rc
1154 int ac = runState; // no mask -- artifically boosts during shutdown
1155 // Use exact results for small values, saturate past 4
1156 return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1157 }
1158
1159 // Public and protected methods
1160
1161 // Constructors
1162
1163 /**
1164 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1165 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1166 * #defaultForkJoinWorkerThreadFactory default thread factory},
1167 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1168 *
1169 * @throws SecurityException if a security manager exists and
1170 * the caller is not permitted to modify threads
1171 * because it does not hold {@link
1172 * java.lang.RuntimePermission}{@code ("modifyThread")}
1173 */
1174 public ForkJoinPool() {
1175 this(Runtime.getRuntime().availableProcessors(),
1176 defaultForkJoinWorkerThreadFactory, null, false);
1177 }
1178
1179 /**
1180 * Creates a {@code ForkJoinPool} with the indicated parallelism
1181 * level, the {@linkplain
1182 * #defaultForkJoinWorkerThreadFactory default thread factory},
1183 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1184 *
1185 * @param parallelism the parallelism level
1186 * @throws IllegalArgumentException if parallelism less than or
1187 * equal to zero, or greater than implementation limit
1188 * @throws SecurityException if a security manager exists and
1189 * the caller is not permitted to modify threads
1190 * because it does not hold {@link
1191 * java.lang.RuntimePermission}{@code ("modifyThread")}
1192 */
1193 public ForkJoinPool(int parallelism) {
1194 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1195 }
1196
1197 /**
1198 * Creates a {@code ForkJoinPool} with the given parameters.
1199 *
1200 * @param parallelism the parallelism level. For default value,
1201 * use {@link java.lang.Runtime#availableProcessors}.
1202 * @param factory the factory for creating new threads. For default value,
1203 * use {@link #defaultForkJoinWorkerThreadFactory}.
1204 * @param handler the handler for internal worker threads that
1205 * terminate due to unrecoverable errors encountered while executing
1206 * tasks. For default value, use <code>null</code>.
1207 * @param asyncMode if true,
1208 * establishes local first-in-first-out scheduling mode for forked
1209 * tasks that are never joined. This mode may be more appropriate
1210 * than default locally stack-based mode in applications in which
1211 * worker threads only process event-style asynchronous tasks.
1212 * For default value, use <code>false</code>.
1213 * @throws IllegalArgumentException if parallelism less than or
1214 * equal to zero, or greater than implementation limit
1215 * @throws NullPointerException if the factory is null
1216 * @throws SecurityException if a security manager exists and
1217 * the caller is not permitted to modify threads
1218 * because it does not hold {@link
1219 * java.lang.RuntimePermission}{@code ("modifyThread")}
1220 */
1221 public ForkJoinPool(int parallelism,
1222 ForkJoinWorkerThreadFactory factory,
1223 Thread.UncaughtExceptionHandler handler,
1224 boolean asyncMode) {
1225 checkPermission();
1226 if (factory == null)
1227 throw new NullPointerException();
1228 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1229 throw new IllegalArgumentException();
1230 this.parallelism = parallelism;
1231 this.factory = factory;
1232 this.ueh = handler;
1233 this.locallyFifo = asyncMode;
1234 int arraySize = initialArraySizeFor(parallelism);
1235 this.workers = new ForkJoinWorkerThread[arraySize];
1236 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1237 this.workerLock = new ReentrantLock();
1238 this.termination = new Phaser(1);
1239 this.poolNumber = poolNumberGenerator.incrementAndGet();
1240 this.trimTime = System.nanoTime();
1241 }
1242
1243 /**
1244 * Returns initial power of two size for workers array.
1245 * @param pc the initial parallelism level
1246 */
1247 private static int initialArraySizeFor(int pc) {
1248 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1249 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1250 size |= size >>> 1;
1251 size |= size >>> 2;
1252 size |= size >>> 4;
1253 size |= size >>> 8;
1254 return size + 1;
1255 }
1256
1257 // Execution methods
1258
1259 /**
1260 * Common code for execute, invoke and submit
1261 */
1262 private <T> void doSubmit(ForkJoinTask<T> task) {
1263 if (task == null)
1264 throw new NullPointerException();
1265 if (runState >= SHUTDOWN)
1266 throw new RejectedExecutionException();
1267 submissionQueue.offer(task);
1268 advanceEventCount();
1269 helpMaintainParallelism(); // start or wake up workers
1270 }
1271
1272 /**
1273 * Performs the given task, returning its result upon completion.
1274 * If the caller is already engaged in a fork/join computation in
1275 * the current pool, this method is equivalent in effect to
1276 * {@link ForkJoinTask#invoke}.
1277 *
1278 * @param task the task
1279 * @return the task's result
1280 * @throws NullPointerException if the task is null
1281 * @throws RejectedExecutionException if the task cannot be
1282 * scheduled for execution
1283 */
1284 public <T> T invoke(ForkJoinTask<T> task) {
1285 doSubmit(task);
1286 return task.join();
1287 }
1288
1289 /**
1290 * Arranges for (asynchronous) execution of the given task.
1291 * If the caller is already engaged in a fork/join computation in
1292 * the current pool, this method is equivalent in effect to
1293 * {@link ForkJoinTask#fork}.
1294 *
1295 * @param task the task
1296 * @throws NullPointerException if the task is null
1297 * @throws RejectedExecutionException if the task cannot be
1298 * scheduled for execution
1299 */
1300 public void execute(ForkJoinTask<?> task) {
1301 doSubmit(task);
1302 }
1303
1304 // AbstractExecutorService methods
1305
1306 /**
1307 * @throws NullPointerException if the task is null
1308 * @throws RejectedExecutionException if the task cannot be
1309 * scheduled for execution
1310 */
1311 public void execute(Runnable task) {
1312 ForkJoinTask<?> job;
1313 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1314 job = (ForkJoinTask<?>) task;
1315 else
1316 job = ForkJoinTask.adapt(task, null);
1317 doSubmit(job);
1318 }
1319
1320 /**
1321 * Submits a ForkJoinTask for execution.
1322 * If the caller is already engaged in a fork/join computation in
1323 * the current pool, this method is equivalent in effect to
1324 * {@link ForkJoinTask#fork}.
1325 *
1326 * @param task the task to submit
1327 * @return the task
1328 * @throws NullPointerException if the task is null
1329 * @throws RejectedExecutionException if the task cannot be
1330 * scheduled for execution
1331 */
1332 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1333 doSubmit(task);
1334 return task;
1335 }
1336
1337 /**
1338 * @throws NullPointerException if the task is null
1339 * @throws RejectedExecutionException if the task cannot be
1340 * scheduled for execution
1341 */
1342 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1343 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1344 doSubmit(job);
1345 return job;
1346 }
1347
1348 /**
1349 * @throws NullPointerException if the task is null
1350 * @throws RejectedExecutionException if the task cannot be
1351 * scheduled for execution
1352 */
1353 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1354 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1355 doSubmit(job);
1356 return job;
1357 }
1358
1359 /**
1360 * @throws NullPointerException if the task is null
1361 * @throws RejectedExecutionException if the task cannot be
1362 * scheduled for execution
1363 */
1364 public ForkJoinTask<?> submit(Runnable task) {
1365 ForkJoinTask<?> job;
1366 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1367 job = (ForkJoinTask<?>) task;
1368 else
1369 job = ForkJoinTask.adapt(task, null);
1370 doSubmit(job);
1371 return job;
1372 }
1373
1374 /**
1375 * @throws NullPointerException {@inheritDoc}
1376 * @throws RejectedExecutionException {@inheritDoc}
1377 */
1378 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1379 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1380 new ArrayList<ForkJoinTask<T>>(tasks.size());
1381 for (Callable<T> task : tasks)
1382 forkJoinTasks.add(ForkJoinTask.adapt(task));
1383 invoke(new InvokeAll<T>(forkJoinTasks));
1384
1385 @SuppressWarnings({"unchecked", "rawtypes"})
1386 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1387 return futures;
1388 }
1389
1390 static final class InvokeAll<T> extends RecursiveAction {
1391 final ArrayList<ForkJoinTask<T>> tasks;
1392 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1393 public void compute() {
1394 try { invokeAll(tasks); }
1395 catch (Exception ignore) {}
1396 }
1397 private static final long serialVersionUID = -7914297376763021607L;
1398 }
1399
1400 /**
1401 * Returns the factory used for constructing new workers.
1402 *
1403 * @return the factory used for constructing new workers
1404 */
1405 public ForkJoinWorkerThreadFactory getFactory() {
1406 return factory;
1407 }
1408
1409 /**
1410 * Returns the handler for internal worker threads that terminate
1411 * due to unrecoverable errors encountered while executing tasks.
1412 *
1413 * @return the handler, or {@code null} if none
1414 */
1415 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1416 return ueh;
1417 }
1418
1419 /**
1420 * Returns the targeted parallelism level of this pool.
1421 *
1422 * @return the targeted parallelism level of this pool
1423 */
1424 public int getParallelism() {
1425 return parallelism;
1426 }
1427
1428 /**
1429 * Returns the number of worker threads that have started but not
1430 * yet terminated. This result returned by this method may differ
1431 * from {@link #getParallelism} when threads are created to
1432 * maintain parallelism when others are cooperatively blocked.
1433 *
1434 * @return the number of worker threads
1435 */
1436 public int getPoolSize() {
1437 return workerCounts >>> TOTAL_COUNT_SHIFT;
1438 }
1439
1440 /**
1441 * Returns {@code true} if this pool uses local first-in-first-out
1442 * scheduling mode for forked tasks that are never joined.
1443 *
1444 * @return {@code true} if this pool uses async mode
1445 */
1446 public boolean getAsyncMode() {
1447 return locallyFifo;
1448 }
1449
1450 /**
1451 * Returns an estimate of the number of worker threads that are
1452 * not blocked waiting to join tasks or for other managed
1453 * synchronization. This method may overestimate the
1454 * number of running threads.
1455 *
1456 * @return the number of worker threads
1457 */
1458 public int getRunningThreadCount() {
1459 return workerCounts & RUNNING_COUNT_MASK;
1460 }
1461
1462 /**
1463 * Returns an estimate of the number of threads that are currently
1464 * stealing or executing tasks. This method may overestimate the
1465 * number of active threads.
1466 *
1467 * @return the number of active threads
1468 */
1469 public int getActiveThreadCount() {
1470 return runState & ACTIVE_COUNT_MASK;
1471 }
1472
1473 /**
1474 * Returns {@code true} if all worker threads are currently idle.
1475 * An idle worker is one that cannot obtain a task to execute
1476 * because none are available to steal from other threads, and
1477 * there are no pending submissions to the pool. This method is
1478 * conservative; it might not return {@code true} immediately upon
1479 * idleness of all threads, but will eventually become true if
1480 * threads remain inactive.
1481 *
1482 * @return {@code true} if all threads are currently idle
1483 */
1484 public boolean isQuiescent() {
1485 return (runState & ACTIVE_COUNT_MASK) == 0;
1486 }
1487
1488 /**
1489 * Returns an estimate of the total number of tasks stolen from
1490 * one thread's work queue by another. The reported value
1491 * underestimates the actual total number of steals when the pool
1492 * is not quiescent. This value may be useful for monitoring and
1493 * tuning fork/join programs: in general, steal counts should be
1494 * high enough to keep threads busy, but low enough to avoid
1495 * overhead and contention across threads.
1496 *
1497 * @return the number of steals
1498 */
1499 public long getStealCount() {
1500 return stealCount;
1501 }
1502
1503 /**
1504 * Returns an estimate of the total number of tasks currently held
1505 * in queues by worker threads (but not including tasks submitted
1506 * to the pool that have not begun executing). This value is only
1507 * an approximation, obtained by iterating across all threads in
1508 * the pool. This method may be useful for tuning task
1509 * granularities.
1510 *
1511 * @return the number of queued tasks
1512 */
1513 public long getQueuedTaskCount() {
1514 long count = 0;
1515 ForkJoinWorkerThread[] ws = workers;
1516 int n = ws.length;
1517 for (int i = 0; i < n; ++i) {
1518 ForkJoinWorkerThread w = ws[i];
1519 if (w != null)
1520 count += w.getQueueSize();
1521 }
1522 return count;
1523 }
1524
1525 /**
1526 * Returns an estimate of the number of tasks submitted to this
1527 * pool that have not yet begun executing. This method takes time
1528 * proportional to the number of submissions.
1529 *
1530 * @return the number of queued submissions
1531 */
1532 public int getQueuedSubmissionCount() {
1533 return submissionQueue.size();
1534 }
1535
1536 /**
1537 * Returns {@code true} if there are any tasks submitted to this
1538 * pool that have not yet begun executing.
1539 *
1540 * @return {@code true} if there are any queued submissions
1541 */
1542 public boolean hasQueuedSubmissions() {
1543 return !submissionQueue.isEmpty();
1544 }
1545
1546 /**
1547 * Removes and returns the next unexecuted submission if one is
1548 * available. This method may be useful in extensions to this
1549 * class that re-assign work in systems with multiple pools.
1550 *
1551 * @return the next submission, or {@code null} if none
1552 */
1553 protected ForkJoinTask<?> pollSubmission() {
1554 return submissionQueue.poll();
1555 }
1556
1557 /**
1558 * Removes all available unexecuted submitted and forked tasks
1559 * from scheduling queues and adds them to the given collection,
1560 * without altering their execution status. These may include
1561 * artificially generated or wrapped tasks. This method is
1562 * designed to be invoked only when the pool is known to be
1563 * quiescent. Invocations at other times may not remove all
1564 * tasks. A failure encountered while attempting to add elements
1565 * to collection {@code c} may result in elements being in
1566 * neither, either or both collections when the associated
1567 * exception is thrown. The behavior of this operation is
1568 * undefined if the specified collection is modified while the
1569 * operation is in progress.
1570 *
1571 * @param c the collection to transfer elements into
1572 * @return the number of elements transferred
1573 */
1574 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1575 int count = submissionQueue.drainTo(c);
1576 ForkJoinWorkerThread[] ws = workers;
1577 int n = ws.length;
1578 for (int i = 0; i < n; ++i) {
1579 ForkJoinWorkerThread w = ws[i];
1580 if (w != null)
1581 count += w.drainTasksTo(c);
1582 }
1583 return count;
1584 }
1585
1586 /**
1587 * Returns a string identifying this pool, as well as its state,
1588 * including indications of run state, parallelism level, and
1589 * worker and task counts.
1590 *
1591 * @return a string identifying this pool, as well as its state
1592 */
1593 public String toString() {
1594 long st = getStealCount();
1595 long qt = getQueuedTaskCount();
1596 long qs = getQueuedSubmissionCount();
1597 int wc = workerCounts;
1598 int tc = wc >>> TOTAL_COUNT_SHIFT;
1599 int rc = wc & RUNNING_COUNT_MASK;
1600 int pc = parallelism;
1601 int rs = runState;
1602 int ac = rs & ACTIVE_COUNT_MASK;
1603 return super.toString() +
1604 "[" + runLevelToString(rs) +
1605 ", parallelism = " + pc +
1606 ", size = " + tc +
1607 ", active = " + ac +
1608 ", running = " + rc +
1609 ", steals = " + st +
1610 ", tasks = " + qt +
1611 ", submissions = " + qs +
1612 "]";
1613 }
1614
1615 private static String runLevelToString(int s) {
1616 return ((s & TERMINATED) != 0 ? "Terminated" :
1617 ((s & TERMINATING) != 0 ? "Terminating" :
1618 ((s & SHUTDOWN) != 0 ? "Shutting down" :
1619 "Running")));
1620 }
1621
1622 /**
1623 * Initiates an orderly shutdown in which previously submitted
1624 * tasks are executed, but no new tasks will be accepted.
1625 * Invocation has no additional effect if already shut down.
1626 * Tasks that are in the process of being submitted concurrently
1627 * during the course of this method may or may not be rejected.
1628 *
1629 * @throws SecurityException if a security manager exists and
1630 * the caller is not permitted to modify threads
1631 * because it does not hold {@link
1632 * java.lang.RuntimePermission}{@code ("modifyThread")}
1633 */
1634 public void shutdown() {
1635 checkPermission();
1636 advanceRunLevel(SHUTDOWN);
1637 tryTerminate(false);
1638 }
1639
1640 /**
1641 * Attempts to cancel and/or stop all tasks, and reject all
1642 * subsequently submitted tasks. Tasks that are in the process of
1643 * being submitted or executed concurrently during the course of
1644 * this method may or may not be rejected. This method cancels
1645 * both existing and unexecuted tasks, in order to permit
1646 * termination in the presence of task dependencies. So the method
1647 * always returns an empty list (unlike the case for some other
1648 * Executors).
1649 *
1650 * @return an empty list
1651 * @throws SecurityException if a security manager exists and
1652 * the caller is not permitted to modify threads
1653 * because it does not hold {@link
1654 * java.lang.RuntimePermission}{@code ("modifyThread")}
1655 */
1656 public List<Runnable> shutdownNow() {
1657 checkPermission();
1658 tryTerminate(true);
1659 return Collections.emptyList();
1660 }
1661
1662 /**
1663 * Returns {@code true} if all tasks have completed following shut down.
1664 *
1665 * @return {@code true} if all tasks have completed following shut down
1666 */
1667 public boolean isTerminated() {
1668 return runState >= TERMINATED;
1669 }
1670
1671 /**
1672 * Returns {@code true} if the process of termination has
1673 * commenced but not yet completed. This method may be useful for
1674 * debugging. A return of {@code true} reported a sufficient
1675 * period after shutdown may indicate that submitted tasks have
1676 * ignored or suppressed interruption, causing this executor not
1677 * to properly terminate.
1678 *
1679 * @return {@code true} if terminating but not yet terminated
1680 */
1681 public boolean isTerminating() {
1682 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1683 }
1684
1685 /**
1686 * Returns {@code true} if this pool has been shut down.
1687 *
1688 * @return {@code true} if this pool has been shut down
1689 */
1690 public boolean isShutdown() {
1691 return runState >= SHUTDOWN;
1692 }
1693
1694 /**
1695 * Blocks until all tasks have completed execution after a shutdown
1696 * request, or the timeout occurs, or the current thread is
1697 * interrupted, whichever happens first.
1698 *
1699 * @param timeout the maximum time to wait
1700 * @param unit the time unit of the timeout argument
1701 * @return {@code true} if this executor terminated and
1702 * {@code false} if the timeout elapsed before termination
1703 * @throws InterruptedException if interrupted while waiting
1704 */
1705 public boolean awaitTermination(long timeout, TimeUnit unit)
1706 throws InterruptedException {
1707 try {
1708 return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1709 } catch(TimeoutException ex) {
1710 return false;
1711 }
1712 }
1713
1714 /**
1715 * Interface for extending managed parallelism for tasks running
1716 * in {@link ForkJoinPool}s.
1717 *
1718 * <p>A {@code ManagedBlocker} provides two methods. Method
1719 * {@code isReleasable} must return {@code true} if blocking is
1720 * not necessary. Method {@code block} blocks the current thread
1721 * if necessary (perhaps internally invoking {@code isReleasable}
1722 * before actually blocking). The unusual methods in this API
1723 * accommodate synchronizers that may, but don't usually, block
1724 * for long periods. Similarly, they allow more efficient internal
1725 * handling of cases in which additional workers may be, but
1726 * usually are not, needed to ensure sufficient parallelism.
1727 * Toward this end, implementations of method {@code isReleasable}
1728 * must be amenable to repeated invocation.
1729 *
1730 * <p>For example, here is a ManagedBlocker based on a
1731 * ReentrantLock:
1732 * <pre> {@code
1733 * class ManagedLocker implements ManagedBlocker {
1734 * final ReentrantLock lock;
1735 * boolean hasLock = false;
1736 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1737 * public boolean block() {
1738 * if (!hasLock)
1739 * lock.lock();
1740 * return true;
1741 * }
1742 * public boolean isReleasable() {
1743 * return hasLock || (hasLock = lock.tryLock());
1744 * }
1745 * }}</pre>
1746 *
1747 * <p>Here is a class that possibly blocks waiting for an
1748 * item on a given queue:
1749 * <pre> {@code
1750 * class QueueTaker<E> implements ManagedBlocker {
1751 * final BlockingQueue<E> queue;
1752 * volatile E item = null;
1753 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1754 * public boolean block() throws InterruptedException {
1755 * if (item == null)
1756 * item = queue.take
1757 * return true;
1758 * }
1759 * public boolean isReleasable() {
1760 * return item != null || (item = queue.poll) != null;
1761 * }
1762 * public E getItem() { // call after pool.managedBlock completes
1763 * return item;
1764 * }
1765 * }}</pre>
1766 */
1767 public static interface ManagedBlocker {
1768 /**
1769 * Possibly blocks the current thread, for example waiting for
1770 * a lock or condition.
1771 *
1772 * @return {@code true} if no additional blocking is necessary
1773 * (i.e., if isReleasable would return true)
1774 * @throws InterruptedException if interrupted while waiting
1775 * (the method is not required to do so, but is allowed to)
1776 */
1777 boolean block() throws InterruptedException;
1778
1779 /**
1780 * Returns {@code true} if blocking is unnecessary.
1781 */
1782 boolean isReleasable();
1783 }
1784
1785 /**
1786 * Blocks in accord with the given blocker. If the current thread
1787 * is a {@link ForkJoinWorkerThread}, this method possibly
1788 * arranges for a spare thread to be activated if necessary to
1789 * ensure sufficient parallelism while the current thread is blocked.
1790 *
1791 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1792 * behaviorally equivalent to
1793 * <pre> {@code
1794 * while (!blocker.isReleasable())
1795 * if (blocker.block())
1796 * return;
1797 * }</pre>
1798 *
1799 * If the caller is a {@code ForkJoinTask}, then the pool may
1800 * first be expanded to ensure parallelism, and later adjusted.
1801 *
1802 * @param blocker the blocker
1803 * @throws InterruptedException if blocker.block did so
1804 */
1805 public static void managedBlock(ManagedBlocker blocker)
1806 throws InterruptedException {
1807 Thread t = Thread.currentThread();
1808 if (t instanceof ForkJoinWorkerThread) {
1809 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1810 w.pool.awaitBlocker(blocker);
1811 }
1812 else {
1813 do {} while (!blocker.isReleasable() && !blocker.block());
1814 }
1815 }
1816
1817 // AbstractExecutorService overrides. These rely on undocumented
1818 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1819 // implement RunnableFuture.
1820
1821 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1822 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1823 }
1824
1825 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1826 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1827 }
1828
1829 // Unsafe mechanics
1830
1831 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1832 private static final long workerCountsOffset =
1833 objectFieldOffset("workerCounts", ForkJoinPool.class);
1834 private static final long runStateOffset =
1835 objectFieldOffset("runState", ForkJoinPool.class);
1836 private static final long eventCountOffset =
1837 objectFieldOffset("eventCount", ForkJoinPool.class);
1838 private static final long eventWaitersOffset =
1839 objectFieldOffset("eventWaiters",ForkJoinPool.class);
1840 private static final long stealCountOffset =
1841 objectFieldOffset("stealCount",ForkJoinPool.class);
1842 private static final long spareWaitersOffset =
1843 objectFieldOffset("spareWaiters",ForkJoinPool.class);
1844
1845 private static long objectFieldOffset(String field, Class<?> klazz) {
1846 try {
1847 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1848 } catch (NoSuchFieldException e) {
1849 // Convert Exception to corresponding Error
1850 NoSuchFieldError error = new NoSuchFieldError(field);
1851 error.initCause(e);
1852 throw error;
1853 }
1854 }
1855
1856 /**
1857 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1858 * Replace with a simple call to Unsafe.getUnsafe when integrating
1859 * into a jdk.
1860 *
1861 * @return a sun.misc.Unsafe
1862 */
1863 private static sun.misc.Unsafe getUnsafe() {
1864 try {
1865 return sun.misc.Unsafe.getUnsafe();
1866 } catch (SecurityException se) {
1867 try {
1868 return java.security.AccessController.doPrivileged
1869 (new java.security
1870 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1871 public sun.misc.Unsafe run() throws Exception {
1872 java.lang.reflect.Field f = sun.misc
1873 .Unsafe.class.getDeclaredField("theUnsafe");
1874 f.setAccessible(true);
1875 return (sun.misc.Unsafe) f.get(null);
1876 }});
1877 } catch (java.security.PrivilegedActionException e) {
1878 throw new RuntimeException("Could not initialize intrinsics",
1879 e.getCause());
1880 }
1881 }
1882 }
1883 }