ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.17
Committed: Thu May 27 16:47:21 2010 UTC (14 years ago) by dl
Branch: MAIN
Changes since 1.16: +315 -234 lines
Log Message:
Sync with jsr166y

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.locks.LockSupport;
15 import java.util.concurrent.locks.ReentrantLock;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.CountDownLatch;
18
19 /**
20 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21 * A {@code ForkJoinPool} provides the entry point for submissions
22 * from non-{@code ForkJoinTask}s, as well as management and
23 * monitoring operations.
24 *
25 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
26 * ExecutorService} mainly by virtue of employing
27 * <em>work-stealing</em>: all threads in the pool attempt to find and
28 * execute subtasks created by other active tasks (eventually blocking
29 * waiting for work if none exist). This enables efficient processing
30 * when most tasks spawn other subtasks (as do most {@code
31 * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
32 * execution of some plain {@code Runnable}- or {@code Callable}-
33 * based activities along with {@code ForkJoinTask}s. When setting
34 * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
35 * also be appropriate for use with fine-grained tasks of any form
36 * that are never joined. Otherwise, other {@code ExecutorService}
37 * implementations are typically more appropriate choices.
38 *
39 * <p>A {@code ForkJoinPool} is constructed with a given target
40 * parallelism level; by default, equal to the number of available
41 * processors. Unless configured otherwise via {@link
42 * #setMaintainsParallelism}, the pool attempts to maintain this
43 * number of active (or available) threads by dynamically adding,
44 * suspending, or resuming internal worker threads, even if some tasks
45 * are stalled waiting to join others. However, no such adjustments
46 * are performed in the face of blocked IO or other unmanaged
47 * synchronization. The nested {@link ManagedBlocker} interface
48 * enables extension of the kinds of synchronization accommodated.
49 * The target parallelism level may also be changed dynamically
50 * ({@link #setParallelism}). The total number of threads may be
51 * limited using method {@link #setMaximumPoolSize}, in which case it
52 * may become possible for the activities of a pool to stall due to
53 * the lack of available threads to process new tasks. When the pool
54 * is executing tasks, these and other configuration setting methods
55 * may only gradually affect actual pool sizes. It is normally best
56 * practice to invoke these methods only when the pool is known to be
57 * quiescent.
58 *
59 * <p>In addition to execution and lifecycle control methods, this
60 * class provides status check methods (for example
61 * {@link #getStealCount}) that are intended to aid in developing,
62 * tuning, and monitoring fork/join applications. Also, method
63 * {@link #toString} returns indications of pool state in a
64 * convenient form for informal monitoring.
65 *
66 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
67 * used for all parallel task execution in a program or subsystem.
68 * Otherwise, use would not usually outweigh the construction and
69 * bookkeeping overhead of creating a large set of threads. For
70 * example, a common pool could be used for the {@code SortTasks}
71 * illustrated in {@link RecursiveAction}. Because {@code
72 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
73 * daemon} mode, there is typically no need to explicitly {@link
74 * #shutdown} such a pool upon program exit.
75 *
76 * <pre>
77 * static final ForkJoinPool mainPool = new ForkJoinPool();
78 * ...
79 * public void sort(long[] array) {
80 * mainPool.invoke(new SortTask(array, 0, array.length));
81 * }
82 * </pre>
83 *
84 * <p><b>Implementation notes</b>: This implementation restricts the
85 * maximum number of running threads to 32767. Attempts to create
86 * pools with greater than the maximum number result in
87 * {@code IllegalArgumentException}.
88 *
89 * <p>This implementation rejects submitted tasks (that is, by throwing
90 * {@link RejectedExecutionException}) only when the pool is shut down.
91 *
92 * @since 1.7
93 * @author Doug Lea
94 */
95 public class ForkJoinPool extends AbstractExecutorService {
96
97 /*
98 * Implementation Overview
99 *
100 * This class provides the central bookkeeping and control for a
101 * set of worker threads: Submissions from non-FJ threads enter
102 * into a submission queue. Workers take these tasks and typically
103 * split them into subtasks that may be stolen by other workers.
104 * The main work-stealing mechanics implemented in class
105 * ForkJoinWorkerThread give first priority to processing tasks
106 * from their own queues (LIFO or FIFO, depending on mode), then
107 * to randomized FIFO steals of tasks in other worker queues, and
108 * lastly to new submissions. These mechanics do not consider
109 * affinities, loads, cache localities, etc, so rarely provide the
110 * best possible performance on a given machine, but portably
111 * provide good throughput by averaging over these factors.
112 * (Further, even if we did try to use such information, we do not
113 * usually have a basis for exploiting it. For example, some sets
114 * of tasks profit from cache affinities, but others are harmed by
115 * cache pollution effects.)
116 *
117 * The main throughput advantages of work-stealing stem from
118 * decentralized control -- workers mostly steal tasks from each
119 * other. We do not want to negate this by creating bottlenecks
120 * implementing the management responsibilities of this class. So
121 * we use a collection of techniques that avoid, reduce, or cope
122 * well with contention. These entail several instances of
123 * bit-packing into CASable fields to maintain only the minimally
124 * required atomicity. To enable such packing, we restrict maximum
125 * parallelism to (1<<15)-1 (enabling twice this to fit into a 16
126 * bit field), which is far in excess of normal operating range.
127 * Even though updates to some of these bookkeeping fields do
128 * sometimes contend with each other, they don't normally
129 * cache-contend with updates to others enough to warrant memory
130 * padding or isolation. So they are all held as fields of
131 * ForkJoinPool objects. The main capabilities are as follows:
132 *
133 * 1. Creating and removing workers. Workers are recorded in the
134 * "workers" array. This is an array as opposed to some other data
135 * structure to support index-based random steals by workers.
136 * Updates to the array recording new workers and unrecording
137 * terminated ones are protected from each other by a lock
138 * (workerLock) but the array is otherwise concurrently readable,
139 * and accessed directly by workers. To simplify index-based
140 * operations, the array size is always a power of two, and all
141 * readers must tolerate null slots. Currently, all worker thread
142 * creation is on-demand, triggered by task submissions,
143 * replacement of terminated workers, and/or compensation for
144 * blocked workers. However, all other support code is set up to
145 * work with other policies.
146 *
147 * 2. Bookkeeping for dynamically adding and removing workers. We
148 * maintain a given level of parallelism (or, if
149 * maintainsParallelism is false, at least avoid starvation). When
150 * some workers are known to be blocked (on joins or via
151 * ManagedBlocker), we may create or resume others to take their
152 * place until they unblock (see below). Implementing this
153 * requires counts of the number of "running" threads (i.e., those
154 * that are neither blocked nor artifically suspended) as well as
155 * the total number. These two values are packed into one field,
156 * "workerCounts" because we need accurate snapshots when deciding
157 * to create, resume or suspend. To support these decisions,
158 * updates to spare counts must be prospective (not
159 * retrospective). For example, the running count is decremented
160 * before blocking by a thread about to block as a spare, but
161 * incremented by the thread about to unblock it. Updates upon
162 * resumption ofr threads blocking in awaitJoin or awaitBlocker
163 * cannot usually be prospective, so the running count is in
164 * general an upper bound of the number of productively running
165 * threads Updates to the workerCounts field sometimes transiently
166 * encounter a fair amount of contention when join dependencies
167 * are such that many threads block or unblock at about the same
168 * time. We alleviate this by sometimes bundling updates (for
169 * example blocking one thread on join and resuming a spare cancel
170 * each other out), and in most other cases performing an
171 * alternative action like releasing waiters or locating spares.
172 *
173 * 3. Maintaining global run state. The run state of the pool
174 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
175 * those in other Executor implementations, as well as a count of
176 * "active" workers -- those that are, or soon will be, or
177 * recently were executing tasks. The runLevel and active count
178 * are packed together in order to correctly trigger shutdown and
179 * termination. Without care, active counts can be subject to very
180 * high contention. We substantially reduce this contention by
181 * relaxing update rules. A worker must claim active status
182 * prospectively, by activating if it sees that a submitted or
183 * stealable task exists (it may find after activating that the
184 * task no longer exists). It stays active while processing this
185 * task (if it exists) and any other local subtasks it produces,
186 * until it cannot find any other tasks. It then tries
187 * inactivating (see method preStep), but upon update contention
188 * instead scans for more tasks, later retrying inactivation if it
189 * doesn't find any.
190 *
191 * 4. Managing idle workers waiting for tasks. We cannot let
192 * workers spin indefinitely scanning for tasks when none are
193 * available. On the other hand, we must quickly prod them into
194 * action when new tasks are submitted or generated. We
195 * park/unpark these idle workers using an event-count scheme.
196 * Field eventCount is incremented upon events that may enable
197 * workers that previously could not find a task to now find one:
198 * Submission of a new task to the pool, or another worker pushing
199 * a task onto a previously empty queue. (We also use this
200 * mechanism for termination and reconfiguration actions that
201 * require wakeups of idle workers). Each worker maintains its
202 * last known event count, and blocks when a scan for work did not
203 * find a task AND its lastEventCount matches the current
204 * eventCount. Waiting idle workers are recorded in a variant of
205 * Treiber stack headed by field eventWaiters which, when nonzero,
206 * encodes the thread index and count awaited for by the worker
207 * thread most recently calling eventSync. This thread in turn has
208 * a record (field nextEventWaiter) for the next waiting worker.
209 * In addition to allowing simpler decisions about need for
210 * wakeup, the event count bits in eventWaiters serve the role of
211 * tags to avoid ABA errors in Treiber stacks. To reduce delays
212 * in task diffusion, workers not otherwise occupied may invoke
213 * method releaseWaiters, that removes and signals (unparks)
214 * workers not waiting on current count. To minimize task
215 * production stalls associate with signalling, any worker pushing
216 * a task on an empty queue invokes the weaker method signalWork,
217 * that only releases idle workers until it detects interference
218 * by other threads trying to release, and lets them take
219 * over. The net effect is a tree-like diffusion of signals, where
220 * released threads (and possibly others) help with unparks. To
221 * further reduce contention effects a bit, failed CASes to
222 * increment field eventCount are tolerated without retries.
223 * Conceptually they are merged into the same event, which is OK
224 * when their only purpose is to enable workers to scan for work.
225 *
226 * 5. Managing suspension of extra workers. When a worker is about
227 * to block waiting for a join (or via ManagedBlockers), we may
228 * create a new thread to maintain parallelism level, or at least
229 * avoid starvation (see below). Usually, extra threads are needed
230 * for only very short periods, yet join dependencies are such
231 * that we sometimes need them in bursts. Rather than create new
232 * threads each time this happens, we suspend no-longer-needed
233 * extra ones as "spares". For most purposes, we don't distinguish
234 * "extra" spare threads from normal "core" threads: On each call
235 * to preStep (the only point at which we can do this) a worker
236 * checks to see if there are now too many running workers, and if
237 * so, suspends itself. Methods awaitJoin and awaitBlocker look
238 * for suspended threads to resume before considering creating a
239 * new replacement. We don't need a special data structure to
240 * maintain spares; simply scanning the workers array looking for
241 * worker.isSuspended() is fine because the calling thread is
242 * otherwise not doing anything useful anyway; we are at least as
243 * happy if after locating a spare, the caller doesn't actually
244 * block because the join is ready before we try to adjust and
245 * compensate. Note that this is intrinsically racy. One thread
246 * may become a spare at about the same time as another is
247 * needlessly being created. We counteract this and related slop
248 * in part by requiring resumed spares to immediately recheck (in
249 * preStep) to see whether they they should re-suspend. The only
250 * effective difference between "extra" and "core" threads is that
251 * we allow the "extra" ones to time out and die if they are not
252 * resumed within a keep-alive interval of a few seconds. This is
253 * implemented mainly within ForkJoinWorkerThread, but requires
254 * some coordination (isTrimmed() -- meaning killed while
255 * suspended) to correctly maintain pool counts.
256 *
257 * 6. Deciding when to create new workers. The main dynamic
258 * control in this class is deciding when to create extra threads,
259 * in methods awaitJoin and awaitBlocker. We always
260 * need to create one when the number of running threads becomes
261 * zero. But because blocked joins are typically dependent, we
262 * don't necessarily need or want one-to-one replacement. Using a
263 * one-to-one compensation rule often leads to enough useless
264 * overhead creating, suspending, resuming, and/or killing threads
265 * to signficantly degrade throughput. We use a rule reflecting
266 * the idea that, the more spare threads you already have, the
267 * more evidence you need to create another one. The "evidence"
268 * here takes two forms: (1) Using a creation threshold expressed
269 * in terms of the current deficit -- target minus running
270 * threads. To reduce flickering and drift around target values,
271 * the relation is quadratic: adding a spare if (dc*dc)>=(sc*pc)
272 * (where dc is deficit, sc is number of spare threads and pc is
273 * target parallelism.) (2) Using a form of adaptive
274 * spionning. requiring a number of threshold checks proportional
275 * to the number of spare threads. This effectively reduces churn
276 * at the price of systematically undershooting target parallelism
277 * when many threads are blocked. However, biasing toward
278 * undeshooting partially compensates for the above mechanics to
279 * suspend extra threads, that normally lead to overshoot because
280 * we can only suspend workers in-between top-level actions. It
281 * also better copes with the fact that some of the methods in
282 * this class tend to never become compiled (but are interpreted),
283 * so some components of the entire set of controls might execute
284 * many times faster than others. And similarly for cases where
285 * the apparent lack of work is just due to GC stalls and other
286 * transient system activity.
287 *
288 * 7. Maintaining other configuration parameters and monitoring
289 * statistics. Updates to fields controlling parallelism level,
290 * max size, etc can only meaningfully take effect for individual
291 * threads upon their next top-level actions; i.e., between
292 * stealing/running tasks/submission, which are separated by calls
293 * to preStep. Memory ordering for these (assumed infrequent)
294 * reconfiguration calls is ensured by using reads and writes to
295 * volatile field workerCounts (that must be read in preStep anyway)
296 * as "fences" -- user-level reads are preceded by reads of
297 * workCounts, and writes are followed by no-op CAS to
298 * workerCounts. The values reported by other management and
299 * monitoring methods are either computed on demand, or are kept
300 * in fields that are only updated when threads are otherwise
301 * idle.
302 *
303 * Beware that there is a lot of representation-level coupling
304 * among classes ForkJoinPool, ForkJoinWorkerThread, and
305 * ForkJoinTask. For example, direct access to "workers" array by
306 * workers, and direct access to ForkJoinTask.status by both
307 * ForkJoinPool and ForkJoinWorkerThread. There is little point
308 * trying to reduce this, since any associated future changes in
309 * representations will need to be accompanied by algorithmic
310 * changes anyway.
311 *
312 * Style notes: There are lots of inline assignments (of form
313 * "while ((local = field) != 0)") which are usually the simplest
314 * way to ensure read orderings. Also several occurrences of the
315 * unusual "do {} while(!cas...)" which is the simplest way to
316 * force an update of a CAS'ed variable. There are also a few
317 * other coding oddities that help some methods perform reasonably
318 * even when interpreted (not compiled).
319 *
320 * The order of declarations in this file is: (1) statics (2)
321 * fields (along with constants used when unpacking some of them)
322 * (3) internal control methods (4) callbacks and other support
323 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
324 * methods (plus a few little helpers).
325 */
326
327 /**
328 * Factory for creating new {@link ForkJoinWorkerThread}s.
329 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
330 * for {@code ForkJoinWorkerThread} subclasses that extend base
331 * functionality or initialize threads with different contexts.
332 */
333 public static interface ForkJoinWorkerThreadFactory {
334 /**
335 * Returns a new worker thread operating in the given pool.
336 *
337 * @param pool the pool this thread works in
338 * @throws NullPointerException if the pool is null
339 */
340 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
341 }
342
343 /**
344 * Default ForkJoinWorkerThreadFactory implementation; creates a
345 * new ForkJoinWorkerThread.
346 */
347 static class DefaultForkJoinWorkerThreadFactory
348 implements ForkJoinWorkerThreadFactory {
349 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
350 return new ForkJoinWorkerThread(pool);
351 }
352 }
353
354 /**
355 * Creates a new ForkJoinWorkerThread. This factory is used unless
356 * overridden in ForkJoinPool constructors.
357 */
358 public static final ForkJoinWorkerThreadFactory
359 defaultForkJoinWorkerThreadFactory =
360 new DefaultForkJoinWorkerThreadFactory();
361
362 /**
363 * Permission required for callers of methods that may start or
364 * kill threads.
365 */
366 private static final RuntimePermission modifyThreadPermission =
367 new RuntimePermission("modifyThread");
368
369 /**
370 * If there is a security manager, makes sure caller has
371 * permission to modify threads.
372 */
373 private static void checkPermission() {
374 SecurityManager security = System.getSecurityManager();
375 if (security != null)
376 security.checkPermission(modifyThreadPermission);
377 }
378
379 /**
380 * Generator for assigning sequence numbers as pool names.
381 */
382 private static final AtomicInteger poolNumberGenerator =
383 new AtomicInteger();
384
385 /**
386 * Absolute bound for parallelism level. Twice this number must
387 * fit into a 16bit field to enable word-packing for some counts.
388 */
389 private static final int MAX_THREADS = 0x7fff;
390
391 /**
392 * Array holding all worker threads in the pool. Array size must
393 * be a power of two. Updates and replacements are protected by
394 * workerLock, but the array is always kept in a consistent enough
395 * state to be randomly accessed without locking by workers
396 * performing work-stealing, as well as other traversal-based
397 * methods in this class. All readers must tolerate that some
398 * array slots may be null.
399 */
400 volatile ForkJoinWorkerThread[] workers;
401
402 /**
403 * Queue for external submissions.
404 */
405 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
406
407 /**
408 * Lock protecting updates to workers array.
409 */
410 private final ReentrantLock workerLock;
411
412 /**
413 * Latch released upon termination.
414 */
415 private final CountDownLatch terminationLatch;
416
417 /**
418 * Creation factory for worker threads.
419 */
420 private final ForkJoinWorkerThreadFactory factory;
421
422 /**
423 * Sum of per-thread steal counts, updated only when threads are
424 * idle or terminating.
425 */
426 private volatile long stealCount;
427
428 /**
429 * Encoded record of top of treiber stack of threads waiting for
430 * events. The top 32 bits contain the count being waited for. The
431 * bottom word contains one plus the pool index of waiting worker
432 * thread.
433 */
434 private volatile long eventWaiters;
435
436 private static final int EVENT_COUNT_SHIFT = 32;
437 private static final long WAITER_INDEX_MASK = (1L << EVENT_COUNT_SHIFT)-1L;
438
439 /**
440 * A counter for events that may wake up worker threads:
441 * - Submission of a new task to the pool
442 * - A worker pushing a task on an empty queue
443 * - termination and reconfiguration
444 */
445 private volatile int eventCount;
446
447 /**
448 * Lifecycle control. The low word contains the number of workers
449 * that are (probably) executing tasks. This value is atomically
450 * incremented before a worker gets a task to run, and decremented
451 * when worker has no tasks and cannot find any. Bits 16-18
452 * contain runLevel value. When all are zero, the pool is
453 * running. Level transitions are monotonic (running -> shutdown
454 * -> terminating -> terminated) so each transition adds a bit.
455 * These are bundled together to ensure consistent read for
456 * termination checks (i.e., that runLevel is at least SHUTDOWN
457 * and active threads is zero).
458 */
459 private volatile int runState;
460
461 // Note: The order among run level values matters.
462 private static final int RUNLEVEL_SHIFT = 16;
463 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
464 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
465 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
466 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
467 private static final int ONE_ACTIVE = 1; // active update delta
468
469 /**
470 * Holds number of total (i.e., created and not yet terminated)
471 * and running (i.e., not blocked on joins or other managed sync)
472 * threads, packed together to ensure consistent snapshot when
473 * making decisions about creating and suspending spare
474 * threads. Updated only by CAS. Note that adding a new worker
475 * requires incrementing both counts, since workers start off in
476 * running state. This field is also used for memory-fencing
477 * configuration parameters.
478 */
479 private volatile int workerCounts;
480
481 private static final int TOTAL_COUNT_SHIFT = 16;
482 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
483 private static final int ONE_RUNNING = 1;
484 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
485
486 /*
487 * Fields parallelism. maxPoolSize, and maintainsParallelism are
488 * non-volatile, but external reads/writes use workerCount fences
489 * to ensure visability.
490 */
491
492 /**
493 * The target parallelism level.
494 */
495 private int parallelism;
496
497 /**
498 * The maximum allowed pool size.
499 */
500 private int maxPoolSize;
501
502 /**
503 * True if use local fifo, not default lifo, for local polling
504 * Replicated by ForkJoinWorkerThreads
505 */
506 private volatile boolean locallyFifo;
507
508 /**
509 * Controls whether to add spares to maintain parallelism
510 */
511 private boolean maintainsParallelism;
512
513 /**
514 * The uncaught exception handler used when any worker
515 * abruptly terminates
516 */
517 private volatile Thread.UncaughtExceptionHandler ueh;
518
519 /**
520 * Pool number, just for assigning useful names to worker threads
521 */
522 private final int poolNumber;
523
524 // utilities for updating fields
525
526 /**
527 * Adds delta to running count. Used mainly by ForkJoinTask.
528 */
529 final void updateRunningCount(int delta) {
530 int wc;
531 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
532 wc = workerCounts,
533 wc + delta));
534 }
535
536 /**
537 * Decrements running count unless already zero
538 */
539 final boolean tryDecrementRunningCount() {
540 int wc = workerCounts;
541 if ((wc & RUNNING_COUNT_MASK) == 0)
542 return false;
543 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
544 wc, wc - ONE_RUNNING);
545 }
546
547 /**
548 * Write fence for user modifications of pool parameters
549 * (parallelism. etc). Note that it doesn't matter if CAS fails.
550 */
551 private void workerCountWriteFence() {
552 int wc;
553 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
554 wc = workerCounts, wc);
555 }
556
557 /**
558 * Read fence for external reads of pool parameters
559 * (parallelism. maxPoolSize, etc).
560 */
561 private void workerCountReadFence() {
562 int ignore = workerCounts;
563 }
564
565 /**
566 * Tries incrementing active count; fails on contention.
567 * Called by workers before executing tasks.
568 *
569 * @return true on success
570 */
571 final boolean tryIncrementActiveCount() {
572 int c;
573 return UNSAFE.compareAndSwapInt(this, runStateOffset,
574 c = runState, c + ONE_ACTIVE);
575 }
576
577 /**
578 * Tries decrementing active count; fails on contention.
579 * Called when workers cannot find tasks to run.
580 */
581 final boolean tryDecrementActiveCount() {
582 int c;
583 return UNSAFE.compareAndSwapInt(this, runStateOffset,
584 c = runState, c - ONE_ACTIVE);
585 }
586
587 /**
588 * Advances to at least the given level. Returns true if not
589 * already in at least the given level.
590 */
591 private boolean advanceRunLevel(int level) {
592 for (;;) {
593 int s = runState;
594 if ((s & level) != 0)
595 return false;
596 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
597 return true;
598 }
599 }
600
601 // workers array maintenance
602
603 /**
604 * Records and returns a workers array index for new worker.
605 */
606 private int recordWorker(ForkJoinWorkerThread w) {
607 // Try using slot totalCount-1. If not available, scan and/or resize
608 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
609 final ReentrantLock lock = this.workerLock;
610 lock.lock();
611 try {
612 ForkJoinWorkerThread[] ws = workers;
613 int nws = ws.length;
614 if (k < 0 || k >= nws || ws[k] != null) {
615 for (k = 0; k < nws && ws[k] != null; ++k)
616 ;
617 if (k == nws)
618 ws = Arrays.copyOf(ws, nws << 1);
619 }
620 ws[k] = w;
621 workers = ws; // volatile array write ensures slot visibility
622 } finally {
623 lock.unlock();
624 }
625 return k;
626 }
627
628 /**
629 * Nulls out record of worker in workers array
630 */
631 private void forgetWorker(ForkJoinWorkerThread w) {
632 int idx = w.poolIndex;
633 // Locking helps method recordWorker avoid unecessary expansion
634 final ReentrantLock lock = this.workerLock;
635 lock.lock();
636 try {
637 ForkJoinWorkerThread[] ws = workers;
638 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
639 ws[idx] = null;
640 } finally {
641 lock.unlock();
642 }
643 }
644
645 // adding and removing workers
646
647 /**
648 * Tries to create and add new worker. Assumes that worker counts
649 * are already updated to accommodate the worker, so adjusts on
650 * failure.
651 *
652 * @return new worker or null if creation failed
653 */
654 private ForkJoinWorkerThread addWorker() {
655 ForkJoinWorkerThread w = null;
656 try {
657 w = factory.newThread(this);
658 } finally { // Adjust on either null or exceptional factory return
659 if (w == null) {
660 onWorkerCreationFailure();
661 return null;
662 }
663 }
664 w.start(recordWorker(w), locallyFifo, ueh);
665 return w;
666 }
667
668 /**
669 * Adjusts counts upon failure to create worker
670 */
671 private void onWorkerCreationFailure() {
672 for (;;) {
673 int wc = workerCounts;
674 if ((wc >>> TOTAL_COUNT_SHIFT) > 0 &&
675 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
676 wc, wc - (ONE_RUNNING|ONE_TOTAL)))
677 break;
678 }
679 tryTerminate(false); // in case of failure during shutdown
680 }
681
682 /**
683 * Create enough total workers to establish target parallelism,
684 * giving up if terminating or addWorker fails
685 */
686 private void ensureEnoughTotalWorkers() {
687 int wc;
688 while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < parallelism &&
689 runState < TERMINATING) {
690 if ((UNSAFE.compareAndSwapInt(this, workerCountsOffset,
691 wc, wc + (ONE_RUNNING|ONE_TOTAL)) &&
692 addWorker() == null))
693 break;
694 }
695 }
696
697 /**
698 * Final callback from terminating worker. Removes record of
699 * worker from array, and adjusts counts. If pool is shutting
700 * down, tries to complete terminatation, else possibly replaces
701 * the worker.
702 *
703 * @param w the worker
704 */
705 final void workerTerminated(ForkJoinWorkerThread w) {
706 if (w.active) { // force inactive
707 w.active = false;
708 do {} while (!tryDecrementActiveCount());
709 }
710 forgetWorker(w);
711
712 // Decrement total count, and if was running, running count
713 // Spin (waiting for other updates) if either would be negative
714 int nr = w.isTrimmed() ? 0 : ONE_RUNNING;
715 int unit = ONE_TOTAL + nr;
716 for (;;) {
717 int wc = workerCounts;
718 int rc = wc & RUNNING_COUNT_MASK;
719 if (rc - nr < 0 || (wc >>> TOTAL_COUNT_SHIFT) == 0)
720 Thread.yield(); // back off if waiting for other updates
721 else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
722 wc, wc - unit))
723 break;
724 }
725
726 accumulateStealCount(w); // collect final count
727 if (!tryTerminate(false))
728 ensureEnoughTotalWorkers();
729 }
730
731 // Waiting for and signalling events
732
733 /**
734 * Ensures eventCount on exit is different (mod 2^32) than on
735 * entry. CAS failures are OK -- any change in count suffices.
736 */
737 private void advanceEventCount() {
738 int c;
739 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
740 }
741
742 /**
743 * Releases workers blocked on a count not equal to current count.
744 */
745 final void releaseWaiters() {
746 long top;
747 int id;
748 while ((id = (int)((top = eventWaiters) & WAITER_INDEX_MASK)) > 0 &&
749 (int)(top >>> EVENT_COUNT_SHIFT) != eventCount) {
750 ForkJoinWorkerThread[] ws = workers;
751 ForkJoinWorkerThread w;
752 if (ws.length >= id && (w = ws[id - 1]) != null &&
753 UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
754 top, w.nextWaiter))
755 LockSupport.unpark(w);
756 }
757 }
758
759 /**
760 * Advances eventCount and releases waiters until interference by
761 * other releasing threads is detected.
762 */
763 final void signalWork() {
764 int ec;
765 UNSAFE.compareAndSwapInt(this, eventCountOffset, ec=eventCount, ec+1);
766 outer:for (;;) {
767 long top = eventWaiters;
768 ec = eventCount;
769 for (;;) {
770 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
771 int id = (int)(top & WAITER_INDEX_MASK);
772 if (id <= 0 || (int)(top >>> EVENT_COUNT_SHIFT) == ec)
773 return;
774 if ((ws = workers).length < id || (w = ws[id - 1]) == null ||
775 !UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
776 top, top = w.nextWaiter))
777 continue outer; // possibly stale; reread
778 LockSupport.unpark(w);
779 if (top != eventWaiters) // let someone else take over
780 return;
781 }
782 }
783 }
784
785 /**
786 * If worker is inactive, blocks until terminating or event count
787 * advances from last value held by worker; in any case helps
788 * release others.
789 *
790 * @param w the calling worker thread
791 */
792 private void eventSync(ForkJoinWorkerThread w) {
793 if (!w.active) {
794 int prev = w.lastEventCount;
795 long nextTop = (((long)prev << EVENT_COUNT_SHIFT) |
796 ((long)(w.poolIndex + 1)));
797 long top;
798 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
799 (((int)(top = eventWaiters) & WAITER_INDEX_MASK) == 0 ||
800 (int)(top >>> EVENT_COUNT_SHIFT) == prev) &&
801 eventCount == prev) {
802 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
803 w.nextWaiter = top, nextTop)) {
804 accumulateStealCount(w); // transfer steals while idle
805 Thread.interrupted(); // clear/ignore interrupt
806 while (eventCount == prev)
807 w.doPark();
808 break;
809 }
810 }
811 w.lastEventCount = eventCount;
812 }
813 releaseWaiters();
814 }
815
816 /**
817 * Callback from workers invoked upon each top-level action (i.e.,
818 * stealing a task or taking a submission and running
819 * it). Performs one or both of the following:
820 *
821 * * If the worker cannot find work, updates its active status to
822 * inactive and updates activeCount unless there is contention, in
823 * which case it may try again (either in this or a subsequent
824 * call). Additionally, awaits the next task event and/or helps
825 * wake up other releasable waiters.
826 *
827 * * If there are too many running threads, suspends this worker
828 * (first forcing inactivation if necessary). If it is not
829 * resumed before a keepAlive elapses, the worker may be "trimmed"
830 * -- killed while suspended within suspendAsSpare. Otherwise,
831 * upon resume it rechecks to make sure that it is still needed.
832 *
833 * @param w the worker
834 * @param worked false if the worker scanned for work but didn't
835 * find any (in which case it may block waiting for work).
836 */
837 final void preStep(ForkJoinWorkerThread w, boolean worked) {
838 boolean active = w.active;
839 boolean inactivate = !worked & active;
840 for (;;) {
841 if (inactivate) {
842 int c = runState;
843 if (UNSAFE.compareAndSwapInt(this, runStateOffset,
844 c, c - ONE_ACTIVE))
845 inactivate = active = w.active = false;
846 }
847 int wc = workerCounts;
848 if ((wc & RUNNING_COUNT_MASK) <= parallelism) {
849 if (!worked)
850 eventSync(w);
851 return;
852 }
853 if (!(inactivate |= active) && // must inactivate to suspend
854 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
855 wc, wc - ONE_RUNNING) &&
856 !w.suspendAsSpare()) // false if trimmed
857 return;
858 }
859 }
860
861 /**
862 * Adjusts counts and creates or resumes compensating threads for
863 * a worker blocking on task joinMe. First tries resuming an
864 * existing spare (which usually also avoids any count
865 * adjustment), but must then decrement running count to determine
866 * whether a new thread is needed. See above for fuller
867 * explanation. This code is sprawled out non-modularly mainly
868 * because adaptive spinning works best if the entire method is
869 * either interpreted or compiled vs having only some pieces of it
870 * compiled.
871 *
872 * @param joinMe the task to join
873 * @return task status on exit (to simplify usage by callers)
874 */
875 final int awaitJoin(ForkJoinTask<?> joinMe) {
876 int pc = parallelism;
877 boolean adj = false; // true when running count adjusted
878 int scans = 0;
879
880 while (joinMe.status >= 0) {
881 ForkJoinWorkerThread spare = null;
882 if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
883 ForkJoinWorkerThread[] ws = workers;
884 int nws = ws.length;
885 for (int i = 0; i < nws; ++i) {
886 ForkJoinWorkerThread w = ws[i];
887 if (w != null && w.isSuspended()) {
888 spare = w;
889 break;
890 }
891 }
892 if (joinMe.status < 0)
893 break;
894 }
895 int wc = workerCounts;
896 int rc = wc & RUNNING_COUNT_MASK;
897 int dc = pc - rc;
898 if (dc > 0 && spare != null && spare.tryUnsuspend()) {
899 if (adj) {
900 int c;
901 do {} while (!UNSAFE.compareAndSwapInt
902 (this, workerCountsOffset,
903 c = workerCounts, c + ONE_RUNNING));
904 }
905 adj = true;
906 LockSupport.unpark(spare);
907 }
908 else if (adj) {
909 if (dc <= 0)
910 break;
911 int tc = wc >>> TOTAL_COUNT_SHIFT;
912 if (scans > tc) {
913 int ts = (tc - pc) * pc;
914 if (rc != 0 && (dc * dc < ts || !maintainsParallelism))
915 break;
916 if (scans > ts && tc < maxPoolSize &&
917 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
918 wc+(ONE_RUNNING|ONE_TOTAL))){
919 addWorker();
920 break;
921 }
922 }
923 }
924 else if (rc != 0)
925 adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
926 wc, wc - ONE_RUNNING);
927 if ((scans++ & 1) == 0)
928 releaseWaiters(); // help others progress
929 else
930 Thread.yield(); // avoid starving productive threads
931 }
932
933 if (adj) {
934 joinMe.internalAwaitDone();
935 int c;
936 do {} while (!UNSAFE.compareAndSwapInt
937 (this, workerCountsOffset,
938 c = workerCounts, c + ONE_RUNNING));
939 }
940 return joinMe.status;
941 }
942
943 /**
944 * Same idea as awaitJoin
945 */
946 final void awaitBlocker(ManagedBlocker blocker, boolean maintainPar)
947 throws InterruptedException {
948 maintainPar &= maintainsParallelism;
949 int pc = parallelism;
950 boolean adj = false; // true when running count adjusted
951 int scans = 0;
952 boolean done;
953
954 for (;;) {
955 if (done = blocker.isReleasable())
956 break;
957 ForkJoinWorkerThread spare = null;
958 if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
959 ForkJoinWorkerThread[] ws = workers;
960 int nws = ws.length;
961 for (int i = 0; i < nws; ++i) {
962 ForkJoinWorkerThread w = ws[i];
963 if (w != null && w.isSuspended()) {
964 spare = w;
965 break;
966 }
967 }
968 if (done = blocker.isReleasable())
969 break;
970 }
971 int wc = workerCounts;
972 int rc = wc & RUNNING_COUNT_MASK;
973 int dc = pc - rc;
974 if (dc > 0 && spare != null && spare.tryUnsuspend()) {
975 if (adj) {
976 int c;
977 do {} while (!UNSAFE.compareAndSwapInt
978 (this, workerCountsOffset,
979 c = workerCounts, c + ONE_RUNNING));
980 }
981 adj = true;
982 LockSupport.unpark(spare);
983 }
984 else if (adj) {
985 if (dc <= 0)
986 break;
987 int tc = wc >>> TOTAL_COUNT_SHIFT;
988 if (scans > tc) {
989 int ts = (tc - pc) * pc;
990 if (rc != 0 && (dc * dc < ts || !maintainPar))
991 break;
992 if (scans > ts && tc < maxPoolSize &&
993 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
994 wc+(ONE_RUNNING|ONE_TOTAL))){
995 addWorker();
996 break;
997 }
998 }
999 }
1000 else if (rc != 0)
1001 adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
1002 wc, wc - ONE_RUNNING);
1003 if ((++scans & 1) == 0)
1004 releaseWaiters(); // help others progress
1005 else
1006 Thread.yield(); // avoid starving productive threads
1007 }
1008
1009 try {
1010 if (!done)
1011 do {} while (!blocker.isReleasable() && !blocker.block());
1012 } finally {
1013 if (adj) {
1014 int c;
1015 do {} while (!UNSAFE.compareAndSwapInt
1016 (this, workerCountsOffset,
1017 c = workerCounts, c + ONE_RUNNING));
1018 }
1019 }
1020 }
1021
1022 /**
1023 * Unless there are not enough other running threads, adjusts
1024 * counts and blocks a worker performing helpJoin that cannot find
1025 * any work.
1026 *
1027 * @return true if joinMe now done
1028 */
1029 final boolean tryAwaitBusyJoin(ForkJoinTask<?> joinMe) {
1030 int pc = parallelism;
1031 outer:for (;;) {
1032 releaseWaiters();
1033 if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
1034 ForkJoinWorkerThread[] ws = workers;
1035 int nws = ws.length;
1036 for (int i = 0; i < nws; ++i) {
1037 ForkJoinWorkerThread w = ws[i];
1038 if (w != null && w.isSuspended()) {
1039 if (joinMe.status < 0)
1040 return true;
1041 if ((workerCounts & RUNNING_COUNT_MASK) > pc)
1042 break;
1043 if (w.tryUnsuspend()) {
1044 LockSupport.unpark(w);
1045 break outer;
1046 }
1047 continue outer;
1048 }
1049 }
1050 }
1051 if (joinMe.status < 0)
1052 return true;
1053 int wc = workerCounts;
1054 if ((wc & RUNNING_COUNT_MASK) <= 2 ||
1055 (wc >>> TOTAL_COUNT_SHIFT) < pc)
1056 return false; // keep this thread alive
1057 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058 wc, wc - ONE_RUNNING))
1059 break;
1060 }
1061
1062 joinMe.internalAwaitDone();
1063 int c;
1064 do {} while (!UNSAFE.compareAndSwapInt
1065 (this, workerCountsOffset,
1066 c = workerCounts, c + ONE_RUNNING));
1067 return true;
1068 }
1069
1070 /**
1071 * Possibly initiates and/or completes termination.
1072 *
1073 * @param now if true, unconditionally terminate, else only
1074 * if shutdown and empty queue and no active workers
1075 * @return true if now terminating or terminated
1076 */
1077 private boolean tryTerminate(boolean now) {
1078 if (now)
1079 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1080 else if (runState < SHUTDOWN ||
1081 !submissionQueue.isEmpty() ||
1082 (runState & ACTIVE_COUNT_MASK) != 0)
1083 return false;
1084
1085 if (advanceRunLevel(TERMINATING))
1086 startTerminating();
1087
1088 // Finish now if all threads terminated; else in some subsequent call
1089 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1090 advanceRunLevel(TERMINATED);
1091 terminationLatch.countDown();
1092 }
1093 return true;
1094 }
1095
1096 /**
1097 * Actions on transition to TERMINATING
1098 */
1099 private void startTerminating() {
1100 for (int i = 0; i < 2; ++i) { // twice to mop up newly created workers
1101 cancelSubmissions();
1102 shutdownWorkers();
1103 cancelWorkerTasks();
1104 advanceEventCount();
1105 releaseWaiters();
1106 interruptWorkers();
1107 }
1108 }
1109
1110 /**
1111 * Clear out and cancel submissions, ignoring exceptions
1112 */
1113 private void cancelSubmissions() {
1114 ForkJoinTask<?> task;
1115 while ((task = submissionQueue.poll()) != null) {
1116 try {
1117 task.cancel(false);
1118 } catch (Throwable ignore) {
1119 }
1120 }
1121 }
1122
1123 /**
1124 * Sets all worker run states to at least shutdown,
1125 * also resuming suspended workers
1126 */
1127 private void shutdownWorkers() {
1128 ForkJoinWorkerThread[] ws = workers;
1129 int nws = ws.length;
1130 for (int i = 0; i < nws; ++i) {
1131 ForkJoinWorkerThread w = ws[i];
1132 if (w != null)
1133 w.shutdown();
1134 }
1135 }
1136
1137 /**
1138 * Clears out and cancels all locally queued tasks
1139 */
1140 private void cancelWorkerTasks() {
1141 ForkJoinWorkerThread[] ws = workers;
1142 int nws = ws.length;
1143 for (int i = 0; i < nws; ++i) {
1144 ForkJoinWorkerThread w = ws[i];
1145 if (w != null)
1146 w.cancelTasks();
1147 }
1148 }
1149
1150 /**
1151 * Unsticks all workers blocked on joins etc
1152 */
1153 private void interruptWorkers() {
1154 ForkJoinWorkerThread[] ws = workers;
1155 int nws = ws.length;
1156 for (int i = 0; i < nws; ++i) {
1157 ForkJoinWorkerThread w = ws[i];
1158 if (w != null && !w.isTerminated()) {
1159 try {
1160 w.interrupt();
1161 } catch (SecurityException ignore) {
1162 }
1163 }
1164 }
1165 }
1166
1167 // misc support for ForkJoinWorkerThread
1168
1169 /**
1170 * Returns pool number
1171 */
1172 final int getPoolNumber() {
1173 return poolNumber;
1174 }
1175
1176 /**
1177 * Accumulates steal count from a worker, clearing
1178 * the worker's value
1179 */
1180 final void accumulateStealCount(ForkJoinWorkerThread w) {
1181 int sc = w.stealCount;
1182 if (sc != 0) {
1183 long c;
1184 w.stealCount = 0;
1185 do {} while (!UNSAFE.compareAndSwapLong(this, stealCountOffset,
1186 c = stealCount, c + sc));
1187 }
1188 }
1189
1190 /**
1191 * Returns the approximate (non-atomic) number of idle threads per
1192 * active thread.
1193 */
1194 final int idlePerActive() {
1195 int ac = runState; // no mask -- artifically boosts during shutdown
1196 int pc = parallelism; // use targeted parallelism, not rc
1197 // Use exact results for small values, saturate past 4
1198 return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1199 }
1200
1201 // Public and protected methods
1202
1203 // Constructors
1204
1205 /**
1206 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1207 * java.lang.Runtime#availableProcessors}, and using the {@linkplain
1208 * #defaultForkJoinWorkerThreadFactory default thread factory}.
1209 *
1210 * @throws SecurityException if a security manager exists and
1211 * the caller is not permitted to modify threads
1212 * because it does not hold {@link
1213 * java.lang.RuntimePermission}{@code ("modifyThread")}
1214 */
1215 public ForkJoinPool() {
1216 this(Runtime.getRuntime().availableProcessors(),
1217 defaultForkJoinWorkerThreadFactory);
1218 }
1219
1220 /**
1221 * Creates a {@code ForkJoinPool} with the indicated parallelism
1222 * level and using the {@linkplain
1223 * #defaultForkJoinWorkerThreadFactory default thread factory}.
1224 *
1225 * @param parallelism the parallelism level
1226 * @throws IllegalArgumentException if parallelism less than or
1227 * equal to zero, or greater than implementation limit
1228 * @throws SecurityException if a security manager exists and
1229 * the caller is not permitted to modify threads
1230 * because it does not hold {@link
1231 * java.lang.RuntimePermission}{@code ("modifyThread")}
1232 */
1233 public ForkJoinPool(int parallelism) {
1234 this(parallelism, defaultForkJoinWorkerThreadFactory);
1235 }
1236
1237 /**
1238 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1239 * java.lang.Runtime#availableProcessors}, and using the given
1240 * thread factory.
1241 *
1242 * @param factory the factory for creating new threads
1243 * @throws NullPointerException if the factory is null
1244 * @throws SecurityException if a security manager exists and
1245 * the caller is not permitted to modify threads
1246 * because it does not hold {@link
1247 * java.lang.RuntimePermission}{@code ("modifyThread")}
1248 */
1249 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
1250 this(Runtime.getRuntime().availableProcessors(), factory);
1251 }
1252
1253 /**
1254 * Creates a {@code ForkJoinPool} with the given parallelism and
1255 * thread factory.
1256 *
1257 * @param parallelism the parallelism level
1258 * @param factory the factory for creating new threads
1259 * @throws IllegalArgumentException if parallelism less than or
1260 * equal to zero, or greater than implementation limit
1261 * @throws NullPointerException if the factory is null
1262 * @throws SecurityException if a security manager exists and
1263 * the caller is not permitted to modify threads
1264 * because it does not hold {@link
1265 * java.lang.RuntimePermission}{@code ("modifyThread")}
1266 */
1267 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
1268 checkPermission();
1269 if (factory == null)
1270 throw new NullPointerException();
1271 if (parallelism <= 0 || parallelism > MAX_THREADS)
1272 throw new IllegalArgumentException();
1273 this.poolNumber = poolNumberGenerator.incrementAndGet();
1274 int arraySize = initialArraySizeFor(parallelism);
1275 this.parallelism = parallelism;
1276 this.factory = factory;
1277 this.maxPoolSize = MAX_THREADS;
1278 this.maintainsParallelism = true;
1279 this.workers = new ForkJoinWorkerThread[arraySize];
1280 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1281 this.workerLock = new ReentrantLock();
1282 this.terminationLatch = new CountDownLatch(1);
1283 }
1284
1285 /**
1286 * Returns initial power of two size for workers array.
1287 * @param pc the initial parallelism level
1288 */
1289 private static int initialArraySizeFor(int pc) {
1290 // See Hackers Delight, sec 3.2. We know MAX_THREADS < (1 >>> 16)
1291 int size = pc < MAX_THREADS ? pc + 1 : MAX_THREADS;
1292 size |= size >>> 1;
1293 size |= size >>> 2;
1294 size |= size >>> 4;
1295 size |= size >>> 8;
1296 return size + 1;
1297 }
1298
1299 // Execution methods
1300
1301 /**
1302 * Common code for execute, invoke and submit
1303 */
1304 private <T> void doSubmit(ForkJoinTask<T> task) {
1305 if (task == null)
1306 throw new NullPointerException();
1307 if (runState >= SHUTDOWN)
1308 throw new RejectedExecutionException();
1309 submissionQueue.offer(task);
1310 advanceEventCount();
1311 releaseWaiters();
1312 ensureEnoughTotalWorkers();
1313 }
1314
1315 /**
1316 * Performs the given task, returning its result upon completion.
1317 *
1318 * @param task the task
1319 * @return the task's result
1320 * @throws NullPointerException if the task is null
1321 * @throws RejectedExecutionException if the task cannot be
1322 * scheduled for execution
1323 */
1324 public <T> T invoke(ForkJoinTask<T> task) {
1325 doSubmit(task);
1326 return task.join();
1327 }
1328
1329 /**
1330 * Arranges for (asynchronous) execution of the given task.
1331 *
1332 * @param task the task
1333 * @throws NullPointerException if the task is null
1334 * @throws RejectedExecutionException if the task cannot be
1335 * scheduled for execution
1336 */
1337 public void execute(ForkJoinTask<?> task) {
1338 doSubmit(task);
1339 }
1340
1341 // AbstractExecutorService methods
1342
1343 /**
1344 * @throws NullPointerException if the task is null
1345 * @throws RejectedExecutionException if the task cannot be
1346 * scheduled for execution
1347 */
1348 public void execute(Runnable task) {
1349 ForkJoinTask<?> job;
1350 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1351 job = (ForkJoinTask<?>) task;
1352 else
1353 job = ForkJoinTask.adapt(task, null);
1354 doSubmit(job);
1355 }
1356
1357 /**
1358 * @throws NullPointerException if the task is null
1359 * @throws RejectedExecutionException if the task cannot be
1360 * scheduled for execution
1361 */
1362 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1363 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1364 doSubmit(job);
1365 return job;
1366 }
1367
1368 /**
1369 * @throws NullPointerException if the task is null
1370 * @throws RejectedExecutionException if the task cannot be
1371 * scheduled for execution
1372 */
1373 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1374 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1375 doSubmit(job);
1376 return job;
1377 }
1378
1379 /**
1380 * @throws NullPointerException if the task is null
1381 * @throws RejectedExecutionException if the task cannot be
1382 * scheduled for execution
1383 */
1384 public ForkJoinTask<?> submit(Runnable task) {
1385 ForkJoinTask<?> job;
1386 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1387 job = (ForkJoinTask<?>) task;
1388 else
1389 job = ForkJoinTask.adapt(task, null);
1390 doSubmit(job);
1391 return job;
1392 }
1393
1394 /**
1395 * Submits a ForkJoinTask for execution.
1396 *
1397 * @param task the task to submit
1398 * @return the task
1399 * @throws NullPointerException if the task is null
1400 * @throws RejectedExecutionException if the task cannot be
1401 * scheduled for execution
1402 */
1403 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1404 doSubmit(task);
1405 return task;
1406 }
1407
1408 /**
1409 * @throws NullPointerException {@inheritDoc}
1410 * @throws RejectedExecutionException {@inheritDoc}
1411 */
1412 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1413 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1414 new ArrayList<ForkJoinTask<T>>(tasks.size());
1415 for (Callable<T> task : tasks)
1416 forkJoinTasks.add(ForkJoinTask.adapt(task));
1417 invoke(new InvokeAll<T>(forkJoinTasks));
1418
1419 @SuppressWarnings({"unchecked", "rawtypes"})
1420 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1421 return futures;
1422 }
1423
1424 static final class InvokeAll<T> extends RecursiveAction {
1425 final ArrayList<ForkJoinTask<T>> tasks;
1426 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1427 public void compute() {
1428 try { invokeAll(tasks); }
1429 catch (Exception ignore) {}
1430 }
1431 private static final long serialVersionUID = -7914297376763021607L;
1432 }
1433
1434 /**
1435 * Returns the factory used for constructing new workers.
1436 *
1437 * @return the factory used for constructing new workers
1438 */
1439 public ForkJoinWorkerThreadFactory getFactory() {
1440 return factory;
1441 }
1442
1443 /**
1444 * Returns the handler for internal worker threads that terminate
1445 * due to unrecoverable errors encountered while executing tasks.
1446 *
1447 * @return the handler, or {@code null} if none
1448 */
1449 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1450 workerCountReadFence();
1451 return ueh;
1452 }
1453
1454 /**
1455 * Sets the handler for internal worker threads that terminate due
1456 * to unrecoverable errors encountered while executing tasks.
1457 * Unless set, the current default or ThreadGroup handler is used
1458 * as handler.
1459 *
1460 * @param h the new handler
1461 * @return the old handler, or {@code null} if none
1462 * @throws SecurityException if a security manager exists and
1463 * the caller is not permitted to modify threads
1464 * because it does not hold {@link
1465 * java.lang.RuntimePermission}{@code ("modifyThread")}
1466 */
1467 public Thread.UncaughtExceptionHandler
1468 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
1469 checkPermission();
1470 Thread.UncaughtExceptionHandler old = ueh;
1471 if (h != old) {
1472 ueh = h;
1473 ForkJoinWorkerThread[] ws = workers;
1474 int nws = ws.length;
1475 for (int i = 0; i < nws; ++i) {
1476 ForkJoinWorkerThread w = ws[i];
1477 if (w != null)
1478 w.setUncaughtExceptionHandler(h);
1479 }
1480 }
1481 return old;
1482 }
1483
1484 /**
1485 * Sets the target parallelism level of this pool.
1486 *
1487 * @param parallelism the target parallelism
1488 * @throws IllegalArgumentException if parallelism less than or
1489 * equal to zero or greater than maximum size bounds
1490 * @throws SecurityException if a security manager exists and
1491 * the caller is not permitted to modify threads
1492 * because it does not hold {@link
1493 * java.lang.RuntimePermission}{@code ("modifyThread")}
1494 */
1495 public void setParallelism(int parallelism) {
1496 checkPermission();
1497 if (parallelism <= 0 || parallelism > maxPoolSize)
1498 throw new IllegalArgumentException();
1499 workerCountReadFence();
1500 int pc = this.parallelism;
1501 if (pc != parallelism) {
1502 this.parallelism = parallelism;
1503 workerCountWriteFence();
1504 // Release spares. If too many, some will die after re-suspend
1505 ForkJoinWorkerThread[] ws = workers;
1506 int nws = ws.length;
1507 for (int i = 0; i < nws; ++i) {
1508 ForkJoinWorkerThread w = ws[i];
1509 if (w != null && w.tryUnsuspend()) {
1510 int c;
1511 do {} while (!UNSAFE.compareAndSwapInt
1512 (this, workerCountsOffset,
1513 c = workerCounts, c + ONE_RUNNING));
1514 LockSupport.unpark(w);
1515 }
1516 }
1517 ensureEnoughTotalWorkers();
1518 advanceEventCount();
1519 releaseWaiters(); // force config recheck by existing workers
1520 }
1521 }
1522
1523 /**
1524 * Returns the targeted parallelism level of this pool.
1525 *
1526 * @return the targeted parallelism level of this pool
1527 */
1528 public int getParallelism() {
1529 // workerCountReadFence(); // inlined below
1530 int ignore = workerCounts;
1531 return parallelism;
1532 }
1533
1534 /**
1535 * Returns the number of worker threads that have started but not
1536 * yet terminated. This result returned by this method may differ
1537 * from {@link #getParallelism} when threads are created to
1538 * maintain parallelism when others are cooperatively blocked.
1539 *
1540 * @return the number of worker threads
1541 */
1542 public int getPoolSize() {
1543 return workerCounts >>> TOTAL_COUNT_SHIFT;
1544 }
1545
1546 /**
1547 * Returns the maximum number of threads allowed to exist in the
1548 * pool. Unless set using {@link #setMaximumPoolSize}, the
1549 * maximum is an implementation-defined value designed only to
1550 * prevent runaway growth.
1551 *
1552 * @return the maximum
1553 */
1554 public int getMaximumPoolSize() {
1555 workerCountReadFence();
1556 return maxPoolSize;
1557 }
1558
1559 /**
1560 * Sets the maximum number of threads allowed to exist in the
1561 * pool. The given value should normally be greater than or equal
1562 * to the {@link #getParallelism parallelism} level. Setting this
1563 * value has no effect on current pool size. It controls
1564 * construction of new threads. The use of this method may cause
1565 * tasks that intrinsically require extra threads for dependent
1566 * computations to indefinitely stall. If you are instead trying
1567 * to minimize internal thread creation, consider setting {@link
1568 * #setMaintainsParallelism} as false.
1569 *
1570 * @throws IllegalArgumentException if negative or greater than
1571 * internal implementation limit
1572 */
1573 public void setMaximumPoolSize(int newMax) {
1574 if (newMax < 0 || newMax > MAX_THREADS)
1575 throw new IllegalArgumentException();
1576 maxPoolSize = newMax;
1577 workerCountWriteFence();
1578 }
1579
1580 /**
1581 * Returns {@code true} if this pool dynamically maintains its
1582 * target parallelism level. If false, new threads are added only
1583 * to avoid possible starvation. This setting is by default true.
1584 *
1585 * @return {@code true} if maintains parallelism
1586 */
1587 public boolean getMaintainsParallelism() {
1588 workerCountReadFence();
1589 return maintainsParallelism;
1590 }
1591
1592 /**
1593 * Sets whether this pool dynamically maintains its target
1594 * parallelism level. If false, new threads are added only to
1595 * avoid possible starvation.
1596 *
1597 * @param enable {@code true} to maintain parallelism
1598 */
1599 public void setMaintainsParallelism(boolean enable) {
1600 maintainsParallelism = enable;
1601 workerCountWriteFence();
1602 }
1603
1604 /**
1605 * Establishes local first-in-first-out scheduling mode for forked
1606 * tasks that are never joined. This mode may be more appropriate
1607 * than default locally stack-based mode in applications in which
1608 * worker threads only process asynchronous tasks. This method is
1609 * designed to be invoked only when the pool is quiescent, and
1610 * typically only before any tasks are submitted. The effects of
1611 * invocations at other times may be unpredictable.
1612 *
1613 * @param async if {@code true}, use locally FIFO scheduling
1614 * @return the previous mode
1615 * @see #getAsyncMode
1616 */
1617 public boolean setAsyncMode(boolean async) {
1618 workerCountReadFence();
1619 boolean oldMode = locallyFifo;
1620 if (oldMode != async) {
1621 locallyFifo = async;
1622 workerCountWriteFence();
1623 ForkJoinWorkerThread[] ws = workers;
1624 int nws = ws.length;
1625 for (int i = 0; i < nws; ++i) {
1626 ForkJoinWorkerThread w = ws[i];
1627 if (w != null)
1628 w.setAsyncMode(async);
1629 }
1630 }
1631 return oldMode;
1632 }
1633
1634 /**
1635 * Returns {@code true} if this pool uses local first-in-first-out
1636 * scheduling mode for forked tasks that are never joined.
1637 *
1638 * @return {@code true} if this pool uses async mode
1639 * @see #setAsyncMode
1640 */
1641 public boolean getAsyncMode() {
1642 workerCountReadFence();
1643 return locallyFifo;
1644 }
1645
1646 /**
1647 * Returns an estimate of the number of worker threads that are
1648 * not blocked waiting to join tasks or for other managed
1649 * synchronization. This method may overestimate the
1650 * number of running threads.
1651 *
1652 * @return the number of worker threads
1653 */
1654 public int getRunningThreadCount() {
1655 return workerCounts & RUNNING_COUNT_MASK;
1656 }
1657
1658 /**
1659 * Returns an estimate of the number of threads that are currently
1660 * stealing or executing tasks. This method may overestimate the
1661 * number of active threads.
1662 *
1663 * @return the number of active threads
1664 */
1665 public int getActiveThreadCount() {
1666 return runState & ACTIVE_COUNT_MASK;
1667 }
1668
1669 /**
1670 * Returns {@code true} if all worker threads are currently idle.
1671 * An idle worker is one that cannot obtain a task to execute
1672 * because none are available to steal from other threads, and
1673 * there are no pending submissions to the pool. This method is
1674 * conservative; it might not return {@code true} immediately upon
1675 * idleness of all threads, but will eventually become true if
1676 * threads remain inactive.
1677 *
1678 * @return {@code true} if all threads are currently idle
1679 */
1680 public boolean isQuiescent() {
1681 return (runState & ACTIVE_COUNT_MASK) == 0;
1682 }
1683
1684 /**
1685 * Returns an estimate of the total number of tasks stolen from
1686 * one thread's work queue by another. The reported value
1687 * underestimates the actual total number of steals when the pool
1688 * is not quiescent. This value may be useful for monitoring and
1689 * tuning fork/join programs: in general, steal counts should be
1690 * high enough to keep threads busy, but low enough to avoid
1691 * overhead and contention across threads.
1692 *
1693 * @return the number of steals
1694 */
1695 public long getStealCount() {
1696 return stealCount;
1697 }
1698
1699 /**
1700 * Returns an estimate of the total number of tasks currently held
1701 * in queues by worker threads (but not including tasks submitted
1702 * to the pool that have not begun executing). This value is only
1703 * an approximation, obtained by iterating across all threads in
1704 * the pool. This method may be useful for tuning task
1705 * granularities.
1706 *
1707 * @return the number of queued tasks
1708 */
1709 public long getQueuedTaskCount() {
1710 long count = 0;
1711 ForkJoinWorkerThread[] ws = workers;
1712 int nws = ws.length;
1713 for (int i = 0; i < nws; ++i) {
1714 ForkJoinWorkerThread w = ws[i];
1715 if (w != null)
1716 count += w.getQueueSize();
1717 }
1718 return count;
1719 }
1720
1721 /**
1722 * Returns an estimate of the number of tasks submitted to this
1723 * pool that have not yet begun executing. This method takes time
1724 * proportional to the number of submissions.
1725 *
1726 * @return the number of queued submissions
1727 */
1728 public int getQueuedSubmissionCount() {
1729 return submissionQueue.size();
1730 }
1731
1732 /**
1733 * Returns {@code true} if there are any tasks submitted to this
1734 * pool that have not yet begun executing.
1735 *
1736 * @return {@code true} if there are any queued submissions
1737 */
1738 public boolean hasQueuedSubmissions() {
1739 return !submissionQueue.isEmpty();
1740 }
1741
1742 /**
1743 * Removes and returns the next unexecuted submission if one is
1744 * available. This method may be useful in extensions to this
1745 * class that re-assign work in systems with multiple pools.
1746 *
1747 * @return the next submission, or {@code null} if none
1748 */
1749 protected ForkJoinTask<?> pollSubmission() {
1750 return submissionQueue.poll();
1751 }
1752
1753 /**
1754 * Removes all available unexecuted submitted and forked tasks
1755 * from scheduling queues and adds them to the given collection,
1756 * without altering their execution status. These may include
1757 * artificially generated or wrapped tasks. This method is
1758 * designed to be invoked only when the pool is known to be
1759 * quiescent. Invocations at other times may not remove all
1760 * tasks. A failure encountered while attempting to add elements
1761 * to collection {@code c} may result in elements being in
1762 * neither, either or both collections when the associated
1763 * exception is thrown. The behavior of this operation is
1764 * undefined if the specified collection is modified while the
1765 * operation is in progress.
1766 *
1767 * @param c the collection to transfer elements into
1768 * @return the number of elements transferred
1769 */
1770 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1771 int n = submissionQueue.drainTo(c);
1772 ForkJoinWorkerThread[] ws = workers;
1773 int nws = ws.length;
1774 for (int i = 0; i < nws; ++i) {
1775 ForkJoinWorkerThread w = ws[i];
1776 if (w != null)
1777 n += w.drainTasksTo(c);
1778 }
1779 return n;
1780 }
1781
1782 /**
1783 * Returns a string identifying this pool, as well as its state,
1784 * including indications of run state, parallelism level, and
1785 * worker and task counts.
1786 *
1787 * @return a string identifying this pool, as well as its state
1788 */
1789 public String toString() {
1790 long st = getStealCount();
1791 long qt = getQueuedTaskCount();
1792 long qs = getQueuedSubmissionCount();
1793 int wc = workerCounts;
1794 int tc = wc >>> TOTAL_COUNT_SHIFT;
1795 int rc = wc & RUNNING_COUNT_MASK;
1796 int pc = parallelism;
1797 int rs = runState;
1798 int ac = rs & ACTIVE_COUNT_MASK;
1799 return super.toString() +
1800 "[" + runLevelToString(rs) +
1801 ", parallelism = " + pc +
1802 ", size = " + tc +
1803 ", active = " + ac +
1804 ", running = " + rc +
1805 ", steals = " + st +
1806 ", tasks = " + qt +
1807 ", submissions = " + qs +
1808 "]";
1809 }
1810
1811 private static String runLevelToString(int s) {
1812 return ((s & TERMINATED) != 0 ? "Terminated" :
1813 ((s & TERMINATING) != 0 ? "Terminating" :
1814 ((s & SHUTDOWN) != 0 ? "Shutting down" :
1815 "Running")));
1816 }
1817
1818 /**
1819 * Initiates an orderly shutdown in which previously submitted
1820 * tasks are executed, but no new tasks will be accepted.
1821 * Invocation has no additional effect if already shut down.
1822 * Tasks that are in the process of being submitted concurrently
1823 * during the course of this method may or may not be rejected.
1824 *
1825 * @throws SecurityException if a security manager exists and
1826 * the caller is not permitted to modify threads
1827 * because it does not hold {@link
1828 * java.lang.RuntimePermission}{@code ("modifyThread")}
1829 */
1830 public void shutdown() {
1831 checkPermission();
1832 advanceRunLevel(SHUTDOWN);
1833 tryTerminate(false);
1834 }
1835
1836 /**
1837 * Attempts to cancel and/or stop all tasks, and reject all
1838 * subsequently submitted tasks. Tasks that are in the process of
1839 * being submitted or executed concurrently during the course of
1840 * this method may or may not be rejected. This method cancels
1841 * both existing and unexecuted tasks, in order to permit
1842 * termination in the presence of task dependencies. So the method
1843 * always returns an empty list (unlike the case for some other
1844 * Executors).
1845 *
1846 * @return an empty list
1847 * @throws SecurityException if a security manager exists and
1848 * the caller is not permitted to modify threads
1849 * because it does not hold {@link
1850 * java.lang.RuntimePermission}{@code ("modifyThread")}
1851 */
1852 public List<Runnable> shutdownNow() {
1853 checkPermission();
1854 tryTerminate(true);
1855 return Collections.emptyList();
1856 }
1857
1858 /**
1859 * Returns {@code true} if all tasks have completed following shut down.
1860 *
1861 * @return {@code true} if all tasks have completed following shut down
1862 */
1863 public boolean isTerminated() {
1864 return runState >= TERMINATED;
1865 }
1866
1867 /**
1868 * Returns {@code true} if the process of termination has
1869 * commenced but not yet completed. This method may be useful for
1870 * debugging. A return of {@code true} reported a sufficient
1871 * period after shutdown may indicate that submitted tasks have
1872 * ignored or suppressed interruption, causing this executor not
1873 * to properly terminate.
1874 *
1875 * @return {@code true} if terminating but not yet terminated
1876 */
1877 public boolean isTerminating() {
1878 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1879 }
1880
1881 /**
1882 * Returns {@code true} if this pool has been shut down.
1883 *
1884 * @return {@code true} if this pool has been shut down
1885 */
1886 public boolean isShutdown() {
1887 return runState >= SHUTDOWN;
1888 }
1889
1890 /**
1891 * Blocks until all tasks have completed execution after a shutdown
1892 * request, or the timeout occurs, or the current thread is
1893 * interrupted, whichever happens first.
1894 *
1895 * @param timeout the maximum time to wait
1896 * @param unit the time unit of the timeout argument
1897 * @return {@code true} if this executor terminated and
1898 * {@code false} if the timeout elapsed before termination
1899 * @throws InterruptedException if interrupted while waiting
1900 */
1901 public boolean awaitTermination(long timeout, TimeUnit unit)
1902 throws InterruptedException {
1903 return terminationLatch.await(timeout, unit);
1904 }
1905
1906 /**
1907 * Interface for extending managed parallelism for tasks running
1908 * in {@link ForkJoinPool}s.
1909 *
1910 * <p>A {@code ManagedBlocker} provides two methods.
1911 * Method {@code isReleasable} must return {@code true} if
1912 * blocking is not necessary. Method {@code block} blocks the
1913 * current thread if necessary (perhaps internally invoking
1914 * {@code isReleasable} before actually blocking).
1915 *
1916 * <p>For example, here is a ManagedBlocker based on a
1917 * ReentrantLock:
1918 * <pre> {@code
1919 * class ManagedLocker implements ManagedBlocker {
1920 * final ReentrantLock lock;
1921 * boolean hasLock = false;
1922 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1923 * public boolean block() {
1924 * if (!hasLock)
1925 * lock.lock();
1926 * return true;
1927 * }
1928 * public boolean isReleasable() {
1929 * return hasLock || (hasLock = lock.tryLock());
1930 * }
1931 * }}</pre>
1932 */
1933 public static interface ManagedBlocker {
1934 /**
1935 * Possibly blocks the current thread, for example waiting for
1936 * a lock or condition.
1937 *
1938 * @return {@code true} if no additional blocking is necessary
1939 * (i.e., if isReleasable would return true)
1940 * @throws InterruptedException if interrupted while waiting
1941 * (the method is not required to do so, but is allowed to)
1942 */
1943 boolean block() throws InterruptedException;
1944
1945 /**
1946 * Returns {@code true} if blocking is unnecessary.
1947 */
1948 boolean isReleasable();
1949 }
1950
1951 /**
1952 * Blocks in accord with the given blocker. If the current thread
1953 * is a {@link ForkJoinWorkerThread}, this method possibly
1954 * arranges for a spare thread to be activated if necessary to
1955 * ensure parallelism while the current thread is blocked.
1956 *
1957 * <p>If {@code maintainParallelism} is {@code true} and the pool
1958 * supports it ({@link #getMaintainsParallelism}), this method
1959 * attempts to maintain the pool's nominal parallelism. Otherwise
1960 * it activates a thread only if necessary to avoid complete
1961 * starvation. This option may be preferable when blockages use
1962 * timeouts, or are almost always brief.
1963 *
1964 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1965 * behaviorally equivalent to
1966 * <pre> {@code
1967 * while (!blocker.isReleasable())
1968 * if (blocker.block())
1969 * return;
1970 * }</pre>
1971 *
1972 * If the caller is a {@code ForkJoinTask}, then the pool may
1973 * first be expanded to ensure parallelism, and later adjusted.
1974 *
1975 * @param blocker the blocker
1976 * @param maintainParallelism if {@code true} and supported by
1977 * this pool, attempt to maintain the pool's nominal parallelism;
1978 * otherwise activate a thread only if necessary to avoid
1979 * complete starvation.
1980 * @throws InterruptedException if blocker.block did so
1981 */
1982 public static void managedBlock(ManagedBlocker blocker,
1983 boolean maintainParallelism)
1984 throws InterruptedException {
1985 Thread t = Thread.currentThread();
1986 if (t instanceof ForkJoinWorkerThread)
1987 ((ForkJoinWorkerThread) t).pool.
1988 awaitBlocker(blocker, maintainParallelism);
1989 else
1990 awaitBlocker(blocker);
1991 }
1992
1993 /**
1994 * Performs Non-FJ blocking
1995 */
1996 private static void awaitBlocker(ManagedBlocker blocker)
1997 throws InterruptedException {
1998 do {} while (!blocker.isReleasable() && !blocker.block());
1999 }
2000
2001 // AbstractExecutorService overrides. These rely on undocumented
2002 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
2003 // implement RunnableFuture.
2004
2005 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
2006 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
2007 }
2008
2009 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2010 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
2011 }
2012
2013 // Unsafe mechanics
2014
2015 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
2016 private static final long workerCountsOffset =
2017 objectFieldOffset("workerCounts", ForkJoinPool.class);
2018 private static final long runStateOffset =
2019 objectFieldOffset("runState", ForkJoinPool.class);
2020 private static final long eventCountOffset =
2021 objectFieldOffset("eventCount", ForkJoinPool.class);
2022 private static final long eventWaitersOffset =
2023 objectFieldOffset("eventWaiters",ForkJoinPool.class);
2024 private static final long stealCountOffset =
2025 objectFieldOffset("stealCount",ForkJoinPool.class);
2026
2027
2028 private static long objectFieldOffset(String field, Class<?> klazz) {
2029 try {
2030 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
2031 } catch (NoSuchFieldException e) {
2032 // Convert Exception to corresponding Error
2033 NoSuchFieldError error = new NoSuchFieldError(field);
2034 error.initCause(e);
2035 throw error;
2036 }
2037 }
2038 }