ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.13
Committed: Sat Dec 5 11:43:01 2009 UTC (14 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.12: +78 -105 lines
Log Message:
Sync with jsr166y version

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.LockSupport;
16 import java.util.concurrent.locks.ReentrantLock;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.atomic.AtomicLong;
19
20 /**
21 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
22 * A {@code ForkJoinPool} provides the entry point for submissions
23 * from non-{@code ForkJoinTask}s, as well as management and
24 * monitoring operations.
25 *
26 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
27 * ExecutorService} mainly by virtue of employing
28 * <em>work-stealing</em>: all threads in the pool attempt to find and
29 * execute subtasks created by other active tasks (eventually blocking
30 * waiting for work if none exist). This enables efficient processing
31 * when most tasks spawn other subtasks (as do most {@code
32 * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
33 * execution of some plain {@code Runnable}- or {@code Callable}-
34 * based activities along with {@code ForkJoinTask}s. When setting
35 * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
36 * also be appropriate for use with fine-grained tasks of any form
37 * that are never joined. Otherwise, other {@code ExecutorService}
38 * implementations are typically more appropriate choices.
39 *
40 * <p>A {@code ForkJoinPool} is constructed with a given target
41 * parallelism level; by default, equal to the number of available
42 * processors. Unless configured otherwise via {@link
43 * #setMaintainsParallelism}, the pool attempts to maintain this
44 * number of active (or available) threads by dynamically adding,
45 * suspending, or resuming internal worker threads, even if some tasks
46 * are stalled waiting to join others. However, no such adjustments
47 * are performed in the face of blocked IO or other unmanaged
48 * synchronization. The nested {@link ManagedBlocker} interface
49 * enables extension of the kinds of synchronization accommodated.
50 * The target parallelism level may also be changed dynamically
51 * ({@link #setParallelism}). The total number of threads may be
52 * limited using method {@link #setMaximumPoolSize}, in which case it
53 * may become possible for the activities of a pool to stall due to
54 * the lack of available threads to process new tasks.
55 *
56 * <p>In addition to execution and lifecycle control methods, this
57 * class provides status check methods (for example
58 * {@link #getStealCount}) that are intended to aid in developing,
59 * tuning, and monitoring fork/join applications. Also, method
60 * {@link #toString} returns indications of pool state in a
61 * convenient form for informal monitoring.
62 *
63 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
64 * used for all parallel task execution in a program or subsystem.
65 * Otherwise, use would not usually outweigh the construction and
66 * bookkeeping overhead of creating a large set of threads. For
67 * example, a common pool could be used for the {@code SortTasks}
68 * illustrated in {@link RecursiveAction}. Because {@code
69 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
70 * daemon} mode, there is typically no need to explicitly {@link
71 * #shutdown} such a pool upon program exit.
72 *
73 * <pre>
74 * static final ForkJoinPool mainPool = new ForkJoinPool();
75 * ...
76 * public void sort(long[] array) {
77 * mainPool.invoke(new SortTask(array, 0, array.length));
78 * }
79 * </pre>
80 *
81 * <p><b>Implementation notes</b>: This implementation restricts the
82 * maximum number of running threads to 32767. Attempts to create
83 * pools with greater than the maximum number result in
84 * {@code IllegalArgumentException}.
85 *
86 * <p>This implementation rejects submitted tasks (that is, by throwing
87 * {@link RejectedExecutionException}) only when the pool is shut down.
88 *
89 * @since 1.7
90 * @author Doug Lea
91 */
92 public class ForkJoinPool extends AbstractExecutorService {
93
94 /*
95 * See the extended comments interspersed below for design,
96 * rationale, and walkthroughs.
97 */
98
99 /** Mask for packing and unpacking shorts */
100 private static final int shortMask = 0xffff;
101
102 /** Max pool size -- must be a power of two minus 1 */
103 private static final int MAX_THREADS = 0x7FFF;
104
105 /**
106 * Factory for creating new {@link ForkJoinWorkerThread}s.
107 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
108 * for {@code ForkJoinWorkerThread} subclasses that extend base
109 * functionality or initialize threads with different contexts.
110 */
111 public static interface ForkJoinWorkerThreadFactory {
112 /**
113 * Returns a new worker thread operating in the given pool.
114 *
115 * @param pool the pool this thread works in
116 * @throws NullPointerException if the pool is null
117 */
118 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
119 }
120
121 /**
122 * Default ForkJoinWorkerThreadFactory implementation; creates a
123 * new ForkJoinWorkerThread.
124 */
125 static class DefaultForkJoinWorkerThreadFactory
126 implements ForkJoinWorkerThreadFactory {
127 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
128 try {
129 return new ForkJoinWorkerThread(pool);
130 } catch (OutOfMemoryError oom) {
131 return null;
132 }
133 }
134 }
135
136 /**
137 * Creates a new ForkJoinWorkerThread. This factory is used unless
138 * overridden in ForkJoinPool constructors.
139 */
140 public static final ForkJoinWorkerThreadFactory
141 defaultForkJoinWorkerThreadFactory =
142 new DefaultForkJoinWorkerThreadFactory();
143
144 /**
145 * Permission required for callers of methods that may start or
146 * kill threads.
147 */
148 private static final RuntimePermission modifyThreadPermission =
149 new RuntimePermission("modifyThread");
150
151 /**
152 * If there is a security manager, makes sure caller has
153 * permission to modify threads.
154 */
155 private static void checkPermission() {
156 SecurityManager security = System.getSecurityManager();
157 if (security != null)
158 security.checkPermission(modifyThreadPermission);
159 }
160
161 /**
162 * Generator for assigning sequence numbers as pool names.
163 */
164 private static final AtomicInteger poolNumberGenerator =
165 new AtomicInteger();
166
167 /**
168 * Array holding all worker threads in the pool. Initialized upon
169 * first use. Array size must be a power of two. Updates and
170 * replacements are protected by workerLock, but it is always kept
171 * in a consistent enough state to be randomly accessed without
172 * locking by workers performing work-stealing.
173 */
174 volatile ForkJoinWorkerThread[] workers;
175
176 /**
177 * Lock protecting access to workers.
178 */
179 private final ReentrantLock workerLock;
180
181 /**
182 * Condition for awaitTermination.
183 */
184 private final Condition termination;
185
186 /**
187 * The uncaught exception handler used when any worker
188 * abruptly terminates
189 */
190 private Thread.UncaughtExceptionHandler ueh;
191
192 /**
193 * Creation factory for worker threads.
194 */
195 private final ForkJoinWorkerThreadFactory factory;
196
197 /**
198 * Head of stack of threads that were created to maintain
199 * parallelism when other threads blocked, but have since
200 * suspended when the parallelism level rose.
201 */
202 private volatile WaitQueueNode spareStack;
203
204 /**
205 * Sum of per-thread steal counts, updated only when threads are
206 * idle or terminating.
207 */
208 private final AtomicLong stealCount;
209
210 /**
211 * Queue for external submissions.
212 */
213 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
214
215 /**
216 * Head of Treiber stack for barrier sync. See below for explanation.
217 */
218 private volatile WaitQueueNode syncStack;
219
220 /**
221 * The count for event barrier
222 */
223 private volatile long eventCount;
224
225 /**
226 * Pool number, just for assigning useful names to worker threads
227 */
228 private final int poolNumber;
229
230 /**
231 * The maximum allowed pool size
232 */
233 private volatile int maxPoolSize;
234
235 /**
236 * The desired parallelism level, updated only under workerLock.
237 */
238 private volatile int parallelism;
239
240 /**
241 * True if use local fifo, not default lifo, for local polling
242 */
243 private volatile boolean locallyFifo;
244
245 /**
246 * Holds number of total (i.e., created and not yet terminated)
247 * and running (i.e., not blocked on joins or other managed sync)
248 * threads, packed into one int to ensure consistent snapshot when
249 * making decisions about creating and suspending spare
250 * threads. Updated only by CAS. Note: CASes in
251 * updateRunningCount and preJoin assume that running active count
252 * is in low word, so need to be modified if this changes.
253 */
254 private volatile int workerCounts;
255
256 private static int totalCountOf(int s) { return s >>> 16; }
257 private static int runningCountOf(int s) { return s & shortMask; }
258 private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
259
260 /**
261 * Adds delta (which may be negative) to running count. This must
262 * be called before (with negative arg) and after (with positive)
263 * any managed synchronization (i.e., mainly, joins).
264 *
265 * @param delta the number to add
266 */
267 final void updateRunningCount(int delta) {
268 int s;
269 do {} while (!casWorkerCounts(s = workerCounts, s + delta));
270 }
271
272 /**
273 * Adds delta (which may be negative) to both total and running
274 * count. This must be called upon creation and termination of
275 * worker threads.
276 *
277 * @param delta the number to add
278 */
279 private void updateWorkerCount(int delta) {
280 int d = delta + (delta << 16); // add to both lo and hi parts
281 int s;
282 do {} while (!casWorkerCounts(s = workerCounts, s + d));
283 }
284
285 /**
286 * Lifecycle control. High word contains runState, low word
287 * contains the number of workers that are (probably) executing
288 * tasks. This value is atomically incremented before a worker
289 * gets a task to run, and decremented when worker has no tasks
290 * and cannot find any. These two fields are bundled together to
291 * support correct termination triggering. Note: activeCount
292 * CAS'es cheat by assuming active count is in low word, so need
293 * to be modified if this changes
294 */
295 private volatile int runControl;
296
297 // RunState values. Order among values matters
298 private static final int RUNNING = 0;
299 private static final int SHUTDOWN = 1;
300 private static final int TERMINATING = 2;
301 private static final int TERMINATED = 3;
302
303 private static int runStateOf(int c) { return c >>> 16; }
304 private static int activeCountOf(int c) { return c & shortMask; }
305 private static int runControlFor(int r, int a) { return (r << 16) + a; }
306
307 /**
308 * Tries incrementing active count; fails on contention.
309 * Called by workers before/during executing tasks.
310 *
311 * @return true on success
312 */
313 final boolean tryIncrementActiveCount() {
314 int c = runControl;
315 return casRunControl(c, c+1);
316 }
317
318 /**
319 * Tries decrementing active count; fails on contention.
320 * Possibly triggers termination on success.
321 * Called by workers when they can't find tasks.
322 *
323 * @return true on success
324 */
325 final boolean tryDecrementActiveCount() {
326 int c = runControl;
327 int nextc = c - 1;
328 if (!casRunControl(c, nextc))
329 return false;
330 if (canTerminateOnShutdown(nextc))
331 terminateOnShutdown();
332 return true;
333 }
334
335 /**
336 * Returns {@code true} if argument represents zero active count
337 * and nonzero runstate, which is the triggering condition for
338 * terminating on shutdown.
339 */
340 private static boolean canTerminateOnShutdown(int c) {
341 // i.e. least bit is nonzero runState bit
342 return ((c & -c) >>> 16) != 0;
343 }
344
345 /**
346 * Transition run state to at least the given state. Return true
347 * if not already at least given state.
348 */
349 private boolean transitionRunStateTo(int state) {
350 for (;;) {
351 int c = runControl;
352 if (runStateOf(c) >= state)
353 return false;
354 if (casRunControl(c, runControlFor(state, activeCountOf(c))))
355 return true;
356 }
357 }
358
359 /**
360 * Controls whether to add spares to maintain parallelism
361 */
362 private volatile boolean maintainsParallelism;
363
364 // Constructors
365
366 /**
367 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
368 * java.lang.Runtime#availableProcessors}, and using the {@linkplain
369 * #defaultForkJoinWorkerThreadFactory default thread factory}.
370 *
371 * @throws SecurityException if a security manager exists and
372 * the caller is not permitted to modify threads
373 * because it does not hold {@link
374 * java.lang.RuntimePermission}{@code ("modifyThread")}
375 */
376 public ForkJoinPool() {
377 this(Runtime.getRuntime().availableProcessors(),
378 defaultForkJoinWorkerThreadFactory);
379 }
380
381 /**
382 * Creates a {@code ForkJoinPool} with the indicated parallelism
383 * level and using the {@linkplain
384 * #defaultForkJoinWorkerThreadFactory default thread factory}.
385 *
386 * @param parallelism the parallelism level
387 * @throws IllegalArgumentException if parallelism less than or
388 * equal to zero, or greater than implementation limit
389 * @throws SecurityException if a security manager exists and
390 * the caller is not permitted to modify threads
391 * because it does not hold {@link
392 * java.lang.RuntimePermission}{@code ("modifyThread")}
393 */
394 public ForkJoinPool(int parallelism) {
395 this(parallelism, defaultForkJoinWorkerThreadFactory);
396 }
397
398 /**
399 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
400 * java.lang.Runtime#availableProcessors}, and using the given
401 * thread factory.
402 *
403 * @param factory the factory for creating new threads
404 * @throws NullPointerException if the factory is null
405 * @throws SecurityException if a security manager exists and
406 * the caller is not permitted to modify threads
407 * because it does not hold {@link
408 * java.lang.RuntimePermission}{@code ("modifyThread")}
409 */
410 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
411 this(Runtime.getRuntime().availableProcessors(), factory);
412 }
413
414 /**
415 * Creates a {@code ForkJoinPool} with the given parallelism and
416 * thread factory.
417 *
418 * @param parallelism the parallelism level
419 * @param factory the factory for creating new threads
420 * @throws IllegalArgumentException if parallelism less than or
421 * equal to zero, or greater than implementation limit
422 * @throws NullPointerException if the factory is null
423 * @throws SecurityException if a security manager exists and
424 * the caller is not permitted to modify threads
425 * because it does not hold {@link
426 * java.lang.RuntimePermission}{@code ("modifyThread")}
427 */
428 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
429 if (parallelism <= 0 || parallelism > MAX_THREADS)
430 throw new IllegalArgumentException();
431 if (factory == null)
432 throw new NullPointerException();
433 checkPermission();
434 this.factory = factory;
435 this.parallelism = parallelism;
436 this.maxPoolSize = MAX_THREADS;
437 this.maintainsParallelism = true;
438 this.poolNumber = poolNumberGenerator.incrementAndGet();
439 this.workerLock = new ReentrantLock();
440 this.termination = workerLock.newCondition();
441 this.stealCount = new AtomicLong();
442 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
443 // worker array and workers are lazily constructed
444 }
445
446 /**
447 * Creates a new worker thread using factory.
448 *
449 * @param index the index to assign worker
450 * @return new worker, or null if factory failed
451 */
452 private ForkJoinWorkerThread createWorker(int index) {
453 Thread.UncaughtExceptionHandler h = ueh;
454 ForkJoinWorkerThread w = factory.newThread(this);
455 if (w != null) {
456 w.poolIndex = index;
457 w.setDaemon(true);
458 w.setAsyncMode(locallyFifo);
459 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
460 if (h != null)
461 w.setUncaughtExceptionHandler(h);
462 }
463 return w;
464 }
465
466 /**
467 * Returns a good size for worker array given pool size.
468 * Currently requires size to be a power of two.
469 */
470 private static int arraySizeFor(int poolSize) {
471 if (poolSize <= 1)
472 return 1;
473 // See Hackers Delight, sec 3.2
474 int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
475 c |= c >>> 1;
476 c |= c >>> 2;
477 c |= c >>> 4;
478 c |= c >>> 8;
479 c |= c >>> 16;
480 return c + 1;
481 }
482
483 /**
484 * Creates or resizes array if necessary to hold newLength.
485 * Call only under exclusion.
486 *
487 * @return the array
488 */
489 private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
490 ForkJoinWorkerThread[] ws = workers;
491 if (ws == null)
492 return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
493 else if (newLength > ws.length)
494 return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
495 else
496 return ws;
497 }
498
499 /**
500 * Tries to shrink workers into smaller array after one or more terminate.
501 */
502 private void tryShrinkWorkerArray() {
503 ForkJoinWorkerThread[] ws = workers;
504 if (ws != null) {
505 int len = ws.length;
506 int last = len - 1;
507 while (last >= 0 && ws[last] == null)
508 --last;
509 int newLength = arraySizeFor(last+1);
510 if (newLength < len)
511 workers = Arrays.copyOf(ws, newLength);
512 }
513 }
514
515 /**
516 * Initializes workers if necessary.
517 */
518 final void ensureWorkerInitialization() {
519 ForkJoinWorkerThread[] ws = workers;
520 if (ws == null) {
521 final ReentrantLock lock = this.workerLock;
522 lock.lock();
523 try {
524 ws = workers;
525 if (ws == null) {
526 int ps = parallelism;
527 updateWorkerCount(ps);
528 ws = ensureWorkerArrayCapacity(ps);
529 for (int i = 0; i < ps; ++i) {
530 ForkJoinWorkerThread w = createWorker(i);
531 if (w != null) {
532 ws[i] = w;
533 w.start();
534 }
535 else
536 updateWorkerCount(-1);
537 }
538 }
539 } finally {
540 lock.unlock();
541 }
542 }
543 }
544
545 /**
546 * Worker creation and startup for threads added via setParallelism.
547 */
548 private void createAndStartAddedWorkers() {
549 resumeAllSpares(); // Allow spares to convert to nonspare
550 int ps = parallelism;
551 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
552 int len = ws.length;
553 // Sweep through slots, to keep lowest indices most populated
554 int k = 0;
555 while (k < len) {
556 if (ws[k] != null) {
557 ++k;
558 continue;
559 }
560 int s = workerCounts;
561 int tc = totalCountOf(s);
562 int rc = runningCountOf(s);
563 if (rc >= ps || tc >= ps)
564 break;
565 if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
566 ForkJoinWorkerThread w = createWorker(k);
567 if (w != null) {
568 ws[k++] = w;
569 w.start();
570 }
571 else {
572 updateWorkerCount(-1); // back out on failed creation
573 break;
574 }
575 }
576 }
577 }
578
579 // Execution methods
580
581 /**
582 * Common code for execute, invoke and submit
583 */
584 private <T> void doSubmit(ForkJoinTask<T> task) {
585 if (task == null)
586 throw new NullPointerException();
587 if (isShutdown())
588 throw new RejectedExecutionException();
589 if (workers == null)
590 ensureWorkerInitialization();
591 submissionQueue.offer(task);
592 signalIdleWorkers();
593 }
594
595 /**
596 * Performs the given task, returning its result upon completion.
597 *
598 * @param task the task
599 * @return the task's result
600 * @throws NullPointerException if the task is null
601 * @throws RejectedExecutionException if the task cannot be
602 * scheduled for execution
603 */
604 public <T> T invoke(ForkJoinTask<T> task) {
605 doSubmit(task);
606 return task.join();
607 }
608
609 /**
610 * Arranges for (asynchronous) execution of the given task.
611 *
612 * @param task the task
613 * @throws NullPointerException if the task is null
614 * @throws RejectedExecutionException if the task cannot be
615 * scheduled for execution
616 */
617 public void execute(ForkJoinTask<?> task) {
618 doSubmit(task);
619 }
620
621 // AbstractExecutorService methods
622
623 /**
624 * @throws NullPointerException if the task is null
625 * @throws RejectedExecutionException if the task cannot be
626 * scheduled for execution
627 */
628 public void execute(Runnable task) {
629 ForkJoinTask<?> job;
630 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
631 job = (ForkJoinTask<?>) task;
632 else
633 job = ForkJoinTask.adapt(task, null);
634 doSubmit(job);
635 }
636
637 /**
638 * @throws NullPointerException if the task is null
639 * @throws RejectedExecutionException if the task cannot be
640 * scheduled for execution
641 */
642 public <T> ForkJoinTask<T> submit(Callable<T> task) {
643 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
644 doSubmit(job);
645 return job;
646 }
647
648 /**
649 * @throws NullPointerException if the task is null
650 * @throws RejectedExecutionException if the task cannot be
651 * scheduled for execution
652 */
653 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
654 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
655 doSubmit(job);
656 return job;
657 }
658
659 /**
660 * @throws NullPointerException if the task is null
661 * @throws RejectedExecutionException if the task cannot be
662 * scheduled for execution
663 */
664 public ForkJoinTask<?> submit(Runnable task) {
665 ForkJoinTask<?> job;
666 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
667 job = (ForkJoinTask<?>) task;
668 else
669 job = ForkJoinTask.adapt(task, null);
670 doSubmit(job);
671 return job;
672 }
673
674 /**
675 * Submits a ForkJoinTask for execution.
676 *
677 * @param task the task to submit
678 * @return the task
679 * @throws NullPointerException if the task is null
680 * @throws RejectedExecutionException if the task cannot be
681 * scheduled for execution
682 */
683 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
684 doSubmit(task);
685 return task;
686 }
687
688
689 /**
690 * @throws NullPointerException {@inheritDoc}
691 * @throws RejectedExecutionException {@inheritDoc}
692 */
693 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
694 ArrayList<ForkJoinTask<T>> forkJoinTasks =
695 new ArrayList<ForkJoinTask<T>>(tasks.size());
696 for (Callable<T> task : tasks)
697 forkJoinTasks.add(ForkJoinTask.adapt(task));
698 invoke(new InvokeAll<T>(forkJoinTasks));
699
700 @SuppressWarnings({"unchecked", "rawtypes"})
701 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
702 return futures;
703 }
704
705 static final class InvokeAll<T> extends RecursiveAction {
706 final ArrayList<ForkJoinTask<T>> tasks;
707 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
708 public void compute() {
709 try { invokeAll(tasks); }
710 catch (Exception ignore) {}
711 }
712 private static final long serialVersionUID = -7914297376763021607L;
713 }
714
715 // Configuration and status settings and queries
716
717 /**
718 * Returns the factory used for constructing new workers.
719 *
720 * @return the factory used for constructing new workers
721 */
722 public ForkJoinWorkerThreadFactory getFactory() {
723 return factory;
724 }
725
726 /**
727 * Returns the handler for internal worker threads that terminate
728 * due to unrecoverable errors encountered while executing tasks.
729 *
730 * @return the handler, or {@code null} if none
731 */
732 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
733 Thread.UncaughtExceptionHandler h;
734 final ReentrantLock lock = this.workerLock;
735 lock.lock();
736 try {
737 h = ueh;
738 } finally {
739 lock.unlock();
740 }
741 return h;
742 }
743
744 /**
745 * Sets the handler for internal worker threads that terminate due
746 * to unrecoverable errors encountered while executing tasks.
747 * Unless set, the current default or ThreadGroup handler is used
748 * as handler.
749 *
750 * @param h the new handler
751 * @return the old handler, or {@code null} if none
752 * @throws SecurityException if a security manager exists and
753 * the caller is not permitted to modify threads
754 * because it does not hold {@link
755 * java.lang.RuntimePermission}{@code ("modifyThread")}
756 */
757 public Thread.UncaughtExceptionHandler
758 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
759 checkPermission();
760 Thread.UncaughtExceptionHandler old = null;
761 final ReentrantLock lock = this.workerLock;
762 lock.lock();
763 try {
764 old = ueh;
765 ueh = h;
766 ForkJoinWorkerThread[] ws = workers;
767 if (ws != null) {
768 for (int i = 0; i < ws.length; ++i) {
769 ForkJoinWorkerThread w = ws[i];
770 if (w != null)
771 w.setUncaughtExceptionHandler(h);
772 }
773 }
774 } finally {
775 lock.unlock();
776 }
777 return old;
778 }
779
780
781 /**
782 * Sets the target parallelism level of this pool.
783 *
784 * @param parallelism the target parallelism
785 * @throws IllegalArgumentException if parallelism less than or
786 * equal to zero or greater than maximum size bounds
787 * @throws SecurityException if a security manager exists and
788 * the caller is not permitted to modify threads
789 * because it does not hold {@link
790 * java.lang.RuntimePermission}{@code ("modifyThread")}
791 */
792 public void setParallelism(int parallelism) {
793 checkPermission();
794 if (parallelism <= 0 || parallelism > maxPoolSize)
795 throw new IllegalArgumentException();
796 final ReentrantLock lock = this.workerLock;
797 lock.lock();
798 try {
799 if (isProcessingTasks()) {
800 int p = this.parallelism;
801 this.parallelism = parallelism;
802 if (workers != null) {
803 if (parallelism > p)
804 createAndStartAddedWorkers();
805 else
806 trimSpares();
807 }
808 }
809 } finally {
810 lock.unlock();
811 }
812 signalIdleWorkers();
813 }
814
815 /**
816 * Returns the targeted parallelism level of this pool.
817 *
818 * @return the targeted parallelism level of this pool
819 */
820 public int getParallelism() {
821 return parallelism;
822 }
823
824 /**
825 * Returns the number of worker threads that have started but not
826 * yet terminated. This result returned by this method may differ
827 * from {@link #getParallelism} when threads are created to
828 * maintain parallelism when others are cooperatively blocked.
829 *
830 * @return the number of worker threads
831 */
832 public int getPoolSize() {
833 return totalCountOf(workerCounts);
834 }
835
836 /**
837 * Returns the maximum number of threads allowed to exist in the
838 * pool. Unless set using {@link #setMaximumPoolSize}, the
839 * maximum is an implementation-defined value designed only to
840 * prevent runaway growth.
841 *
842 * @return the maximum
843 */
844 public int getMaximumPoolSize() {
845 return maxPoolSize;
846 }
847
848 /**
849 * Sets the maximum number of threads allowed to exist in the
850 * pool. The given value should normally be greater than or equal
851 * to the {@link #getParallelism parallelism} level. Setting this
852 * value has no effect on current pool size. It controls
853 * construction of new threads.
854 *
855 * @throws IllegalArgumentException if negative or greater than
856 * internal implementation limit
857 */
858 public void setMaximumPoolSize(int newMax) {
859 if (newMax < 0 || newMax > MAX_THREADS)
860 throw new IllegalArgumentException();
861 maxPoolSize = newMax;
862 }
863
864
865 /**
866 * Returns {@code true} if this pool dynamically maintains its
867 * target parallelism level. If false, new threads are added only
868 * to avoid possible starvation. This setting is by default true.
869 *
870 * @return {@code true} if maintains parallelism
871 */
872 public boolean getMaintainsParallelism() {
873 return maintainsParallelism;
874 }
875
876 /**
877 * Sets whether this pool dynamically maintains its target
878 * parallelism level. If false, new threads are added only to
879 * avoid possible starvation.
880 *
881 * @param enable {@code true} to maintain parallelism
882 */
883 public void setMaintainsParallelism(boolean enable) {
884 maintainsParallelism = enable;
885 }
886
887 /**
888 * Establishes local first-in-first-out scheduling mode for forked
889 * tasks that are never joined. This mode may be more appropriate
890 * than default locally stack-based mode in applications in which
891 * worker threads only process asynchronous tasks. This method is
892 * designed to be invoked only when the pool is quiescent, and
893 * typically only before any tasks are submitted. The effects of
894 * invocations at other times may be unpredictable.
895 *
896 * @param async if {@code true}, use locally FIFO scheduling
897 * @return the previous mode
898 * @see #getAsyncMode
899 */
900 public boolean setAsyncMode(boolean async) {
901 boolean oldMode = locallyFifo;
902 locallyFifo = async;
903 ForkJoinWorkerThread[] ws = workers;
904 if (ws != null) {
905 for (int i = 0; i < ws.length; ++i) {
906 ForkJoinWorkerThread t = ws[i];
907 if (t != null)
908 t.setAsyncMode(async);
909 }
910 }
911 return oldMode;
912 }
913
914 /**
915 * Returns {@code true} if this pool uses local first-in-first-out
916 * scheduling mode for forked tasks that are never joined.
917 *
918 * @return {@code true} if this pool uses async mode
919 * @see #setAsyncMode
920 */
921 public boolean getAsyncMode() {
922 return locallyFifo;
923 }
924
925 /**
926 * Returns an estimate of the number of worker threads that are
927 * not blocked waiting to join tasks or for other managed
928 * synchronization.
929 *
930 * @return the number of worker threads
931 */
932 public int getRunningThreadCount() {
933 return runningCountOf(workerCounts);
934 }
935
936 /**
937 * Returns an estimate of the number of threads that are currently
938 * stealing or executing tasks. This method may overestimate the
939 * number of active threads.
940 *
941 * @return the number of active threads
942 */
943 public int getActiveThreadCount() {
944 return activeCountOf(runControl);
945 }
946
947 /**
948 * Returns an estimate of the number of threads that are currently
949 * idle waiting for tasks. This method may underestimate the
950 * number of idle threads.
951 *
952 * @return the number of idle threads
953 */
954 final int getIdleThreadCount() {
955 int c = runningCountOf(workerCounts) - activeCountOf(runControl);
956 return (c <= 0) ? 0 : c;
957 }
958
959 /**
960 * Returns {@code true} if all worker threads are currently idle.
961 * An idle worker is one that cannot obtain a task to execute
962 * because none are available to steal from other threads, and
963 * there are no pending submissions to the pool. This method is
964 * conservative; it might not return {@code true} immediately upon
965 * idleness of all threads, but will eventually become true if
966 * threads remain inactive.
967 *
968 * @return {@code true} if all threads are currently idle
969 */
970 public boolean isQuiescent() {
971 return activeCountOf(runControl) == 0;
972 }
973
974 /**
975 * Returns an estimate of the total number of tasks stolen from
976 * one thread's work queue by another. The reported value
977 * underestimates the actual total number of steals when the pool
978 * is not quiescent. This value may be useful for monitoring and
979 * tuning fork/join programs: in general, steal counts should be
980 * high enough to keep threads busy, but low enough to avoid
981 * overhead and contention across threads.
982 *
983 * @return the number of steals
984 */
985 public long getStealCount() {
986 return stealCount.get();
987 }
988
989 /**
990 * Accumulates steal count from a worker.
991 * Call only when worker known to be idle.
992 */
993 private void updateStealCount(ForkJoinWorkerThread w) {
994 int sc = w.getAndClearStealCount();
995 if (sc != 0)
996 stealCount.addAndGet(sc);
997 }
998
999 /**
1000 * Returns an estimate of the total number of tasks currently held
1001 * in queues by worker threads (but not including tasks submitted
1002 * to the pool that have not begun executing). This value is only
1003 * an approximation, obtained by iterating across all threads in
1004 * the pool. This method may be useful for tuning task
1005 * granularities.
1006 *
1007 * @return the number of queued tasks
1008 */
1009 public long getQueuedTaskCount() {
1010 long count = 0;
1011 ForkJoinWorkerThread[] ws = workers;
1012 if (ws != null) {
1013 for (int i = 0; i < ws.length; ++i) {
1014 ForkJoinWorkerThread t = ws[i];
1015 if (t != null)
1016 count += t.getQueueSize();
1017 }
1018 }
1019 return count;
1020 }
1021
1022 /**
1023 * Returns an estimate of the number of tasks submitted to this
1024 * pool that have not yet begun executing. This method takes time
1025 * proportional to the number of submissions.
1026 *
1027 * @return the number of queued submissions
1028 */
1029 public int getQueuedSubmissionCount() {
1030 return submissionQueue.size();
1031 }
1032
1033 /**
1034 * Returns {@code true} if there are any tasks submitted to this
1035 * pool that have not yet begun executing.
1036 *
1037 * @return {@code true} if there are any queued submissions
1038 */
1039 public boolean hasQueuedSubmissions() {
1040 return !submissionQueue.isEmpty();
1041 }
1042
1043 /**
1044 * Removes and returns the next unexecuted submission if one is
1045 * available. This method may be useful in extensions to this
1046 * class that re-assign work in systems with multiple pools.
1047 *
1048 * @return the next submission, or {@code null} if none
1049 */
1050 protected ForkJoinTask<?> pollSubmission() {
1051 return submissionQueue.poll();
1052 }
1053
1054 /**
1055 * Removes all available unexecuted submitted and forked tasks
1056 * from scheduling queues and adds them to the given collection,
1057 * without altering their execution status. These may include
1058 * artificially generated or wrapped tasks. This method is
1059 * designed to be invoked only when the pool is known to be
1060 * quiescent. Invocations at other times may not remove all
1061 * tasks. A failure encountered while attempting to add elements
1062 * to collection {@code c} may result in elements being in
1063 * neither, either or both collections when the associated
1064 * exception is thrown. The behavior of this operation is
1065 * undefined if the specified collection is modified while the
1066 * operation is in progress.
1067 *
1068 * @param c the collection to transfer elements into
1069 * @return the number of elements transferred
1070 */
1071 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1072 int n = submissionQueue.drainTo(c);
1073 ForkJoinWorkerThread[] ws = workers;
1074 if (ws != null) {
1075 for (int i = 0; i < ws.length; ++i) {
1076 ForkJoinWorkerThread w = ws[i];
1077 if (w != null)
1078 n += w.drainTasksTo(c);
1079 }
1080 }
1081 return n;
1082 }
1083
1084 /**
1085 * Returns a string identifying this pool, as well as its state,
1086 * including indications of run state, parallelism level, and
1087 * worker and task counts.
1088 *
1089 * @return a string identifying this pool, as well as its state
1090 */
1091 public String toString() {
1092 int ps = parallelism;
1093 int wc = workerCounts;
1094 int rc = runControl;
1095 long st = getStealCount();
1096 long qt = getQueuedTaskCount();
1097 long qs = getQueuedSubmissionCount();
1098 return super.toString() +
1099 "[" + runStateToString(runStateOf(rc)) +
1100 ", parallelism = " + ps +
1101 ", size = " + totalCountOf(wc) +
1102 ", active = " + activeCountOf(rc) +
1103 ", running = " + runningCountOf(wc) +
1104 ", steals = " + st +
1105 ", tasks = " + qt +
1106 ", submissions = " + qs +
1107 "]";
1108 }
1109
1110 private static String runStateToString(int rs) {
1111 switch (rs) {
1112 case RUNNING: return "Running";
1113 case SHUTDOWN: return "Shutting down";
1114 case TERMINATING: return "Terminating";
1115 case TERMINATED: return "Terminated";
1116 default: throw new Error("Unknown run state");
1117 }
1118 }
1119
1120 // lifecycle control
1121
1122 /**
1123 * Initiates an orderly shutdown in which previously submitted
1124 * tasks are executed, but no new tasks will be accepted.
1125 * Invocation has no additional effect if already shut down.
1126 * Tasks that are in the process of being submitted concurrently
1127 * during the course of this method may or may not be rejected.
1128 *
1129 * @throws SecurityException if a security manager exists and
1130 * the caller is not permitted to modify threads
1131 * because it does not hold {@link
1132 * java.lang.RuntimePermission}{@code ("modifyThread")}
1133 */
1134 public void shutdown() {
1135 checkPermission();
1136 transitionRunStateTo(SHUTDOWN);
1137 if (canTerminateOnShutdown(runControl)) {
1138 if (workers == null) { // shutting down before workers created
1139 final ReentrantLock lock = this.workerLock;
1140 lock.lock();
1141 try {
1142 if (workers == null) {
1143 terminate();
1144 transitionRunStateTo(TERMINATED);
1145 termination.signalAll();
1146 }
1147 } finally {
1148 lock.unlock();
1149 }
1150 }
1151 terminateOnShutdown();
1152 }
1153 }
1154
1155 /**
1156 * Attempts to cancel and/or stop all tasks, and reject all
1157 * subsequently submitted tasks. Tasks that are in the process of
1158 * being submitted or executed concurrently during the course of
1159 * this method may or may not be rejected. This method cancels
1160 * both existing and unexecuted tasks, in order to permit
1161 * termination in the presence of task dependencies. So the method
1162 * always returns an empty list (unlike the case for some other
1163 * Executors).
1164 *
1165 * @return an empty list
1166 * @throws SecurityException if a security manager exists and
1167 * the caller is not permitted to modify threads
1168 * because it does not hold {@link
1169 * java.lang.RuntimePermission}{@code ("modifyThread")}
1170 */
1171 public List<Runnable> shutdownNow() {
1172 checkPermission();
1173 terminate();
1174 return Collections.emptyList();
1175 }
1176
1177 /**
1178 * Returns {@code true} if all tasks have completed following shut down.
1179 *
1180 * @return {@code true} if all tasks have completed following shut down
1181 */
1182 public boolean isTerminated() {
1183 return runStateOf(runControl) == TERMINATED;
1184 }
1185
1186 /**
1187 * Returns {@code true} if the process of termination has
1188 * commenced but not yet completed. This method may be useful for
1189 * debugging. A return of {@code true} reported a sufficient
1190 * period after shutdown may indicate that submitted tasks have
1191 * ignored or suppressed interruption, causing this executor not
1192 * to properly terminate.
1193 *
1194 * @return {@code true} if terminating but not yet terminated
1195 */
1196 public boolean isTerminating() {
1197 return runStateOf(runControl) == TERMINATING;
1198 }
1199
1200 /**
1201 * Returns {@code true} if this pool has been shut down.
1202 *
1203 * @return {@code true} if this pool has been shut down
1204 */
1205 public boolean isShutdown() {
1206 return runStateOf(runControl) >= SHUTDOWN;
1207 }
1208
1209 /**
1210 * Returns true if pool is not terminating or terminated.
1211 * Used internally to suppress execution when terminating.
1212 */
1213 final boolean isProcessingTasks() {
1214 return runStateOf(runControl) < TERMINATING;
1215 }
1216
1217 /**
1218 * Blocks until all tasks have completed execution after a shutdown
1219 * request, or the timeout occurs, or the current thread is
1220 * interrupted, whichever happens first.
1221 *
1222 * @param timeout the maximum time to wait
1223 * @param unit the time unit of the timeout argument
1224 * @return {@code true} if this executor terminated and
1225 * {@code false} if the timeout elapsed before termination
1226 * @throws InterruptedException if interrupted while waiting
1227 */
1228 public boolean awaitTermination(long timeout, TimeUnit unit)
1229 throws InterruptedException {
1230 long nanos = unit.toNanos(timeout);
1231 final ReentrantLock lock = this.workerLock;
1232 lock.lock();
1233 try {
1234 for (;;) {
1235 if (isTerminated())
1236 return true;
1237 if (nanos <= 0)
1238 return false;
1239 nanos = termination.awaitNanos(nanos);
1240 }
1241 } finally {
1242 lock.unlock();
1243 }
1244 }
1245
1246 // Shutdown and termination support
1247
1248 /**
1249 * Callback from terminating worker. Nulls out the corresponding
1250 * workers slot, and if terminating, tries to terminate; else
1251 * tries to shrink workers array.
1252 *
1253 * @param w the worker
1254 */
1255 final void workerTerminated(ForkJoinWorkerThread w) {
1256 updateStealCount(w);
1257 updateWorkerCount(-1);
1258 final ReentrantLock lock = this.workerLock;
1259 lock.lock();
1260 try {
1261 ForkJoinWorkerThread[] ws = workers;
1262 if (ws != null) {
1263 int idx = w.poolIndex;
1264 if (idx >= 0 && idx < ws.length && ws[idx] == w)
1265 ws[idx] = null;
1266 if (totalCountOf(workerCounts) == 0) {
1267 terminate(); // no-op if already terminating
1268 transitionRunStateTo(TERMINATED);
1269 termination.signalAll();
1270 }
1271 else if (isProcessingTasks()) {
1272 tryShrinkWorkerArray();
1273 tryResumeSpare(true); // allow replacement
1274 }
1275 }
1276 } finally {
1277 lock.unlock();
1278 }
1279 signalIdleWorkers();
1280 }
1281
1282 /**
1283 * Initiates termination.
1284 */
1285 private void terminate() {
1286 if (transitionRunStateTo(TERMINATING)) {
1287 stopAllWorkers();
1288 resumeAllSpares();
1289 signalIdleWorkers();
1290 cancelQueuedSubmissions();
1291 cancelQueuedWorkerTasks();
1292 interruptUnterminatedWorkers();
1293 signalIdleWorkers(); // resignal after interrupt
1294 }
1295 }
1296
1297 /**
1298 * Possibly terminates when on shutdown state.
1299 */
1300 private void terminateOnShutdown() {
1301 if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1302 terminate();
1303 }
1304
1305 /**
1306 * Clears out and cancels submissions.
1307 */
1308 private void cancelQueuedSubmissions() {
1309 ForkJoinTask<?> task;
1310 while ((task = pollSubmission()) != null)
1311 task.cancel(false);
1312 }
1313
1314 /**
1315 * Cleans out worker queues.
1316 */
1317 private void cancelQueuedWorkerTasks() {
1318 final ReentrantLock lock = this.workerLock;
1319 lock.lock();
1320 try {
1321 ForkJoinWorkerThread[] ws = workers;
1322 if (ws != null) {
1323 for (int i = 0; i < ws.length; ++i) {
1324 ForkJoinWorkerThread t = ws[i];
1325 if (t != null)
1326 t.cancelTasks();
1327 }
1328 }
1329 } finally {
1330 lock.unlock();
1331 }
1332 }
1333
1334 /**
1335 * Sets each worker's status to terminating. Requires lock to avoid
1336 * conflicts with add/remove.
1337 */
1338 private void stopAllWorkers() {
1339 final ReentrantLock lock = this.workerLock;
1340 lock.lock();
1341 try {
1342 ForkJoinWorkerThread[] ws = workers;
1343 if (ws != null) {
1344 for (int i = 0; i < ws.length; ++i) {
1345 ForkJoinWorkerThread t = ws[i];
1346 if (t != null)
1347 t.shutdownNow();
1348 }
1349 }
1350 } finally {
1351 lock.unlock();
1352 }
1353 }
1354
1355 /**
1356 * Interrupts all unterminated workers. This is not required for
1357 * sake of internal control, but may help unstick user code during
1358 * shutdown.
1359 */
1360 private void interruptUnterminatedWorkers() {
1361 final ReentrantLock lock = this.workerLock;
1362 lock.lock();
1363 try {
1364 ForkJoinWorkerThread[] ws = workers;
1365 if (ws != null) {
1366 for (int i = 0; i < ws.length; ++i) {
1367 ForkJoinWorkerThread t = ws[i];
1368 if (t != null && !t.isTerminated()) {
1369 try {
1370 t.interrupt();
1371 } catch (SecurityException ignore) {
1372 }
1373 }
1374 }
1375 }
1376 } finally {
1377 lock.unlock();
1378 }
1379 }
1380
1381 /*
1382 * Nodes for event barrier to manage idle threads. Queue nodes
1383 * are basic Treiber stack nodes, also used for spare stack.
1384 *
1385 * The event barrier has an event count and a wait queue (actually
1386 * a Treiber stack). Workers are enabled to look for work when
1387 * the eventCount is incremented. If they fail to find work, they
1388 * may wait for next count. Upon release, threads help others wake
1389 * up.
1390 *
1391 * Synchronization events occur only in enough contexts to
1392 * maintain overall liveness:
1393 *
1394 * - Submission of a new task to the pool
1395 * - Resizes or other changes to the workers array
1396 * - pool termination
1397 * - A worker pushing a task on an empty queue
1398 *
1399 * The case of pushing a task occurs often enough, and is heavy
1400 * enough compared to simple stack pushes, to require special
1401 * handling: Method signalWork returns without advancing count if
1402 * the queue appears to be empty. This would ordinarily result in
1403 * races causing some queued waiters not to be woken up. To avoid
1404 * this, the first worker enqueued in method sync rescans for
1405 * tasks after being enqueued, and helps signal if any are
1406 * found. This works well because the worker has nothing better to
1407 * do, and so might as well help alleviate the overhead and
1408 * contention on the threads actually doing work. Also, since
1409 * event counts increments on task availability exist to maintain
1410 * liveness (rather than to force refreshes etc), it is OK for
1411 * callers to exit early if contending with another signaller.
1412 */
1413 static final class WaitQueueNode {
1414 WaitQueueNode next; // only written before enqueued
1415 volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1416 final long count; // unused for spare stack
1417
1418 WaitQueueNode(long c, ForkJoinWorkerThread w) {
1419 count = c;
1420 thread = w;
1421 }
1422
1423 /**
1424 * Wakes up waiter, also clearing thread field
1425 */
1426 void signal() {
1427 ForkJoinWorkerThread t = thread;
1428 if (t != null) {
1429 thread = null;
1430 LockSupport.unpark(t);
1431 }
1432 }
1433 }
1434
1435 /**
1436 * Ensures that no thread is waiting for count to advance from the
1437 * current value of eventCount read on entry to this method, by
1438 * releasing waiting threads if necessary.
1439 */
1440 final void ensureSync() {
1441 long c = eventCount;
1442 WaitQueueNode q;
1443 while ((q = syncStack) != null && q.count < c) {
1444 if (casBarrierStack(q, null)) {
1445 do {
1446 q.signal();
1447 } while ((q = q.next) != null);
1448 break;
1449 }
1450 }
1451 }
1452
1453 /**
1454 * Increments event count and releases waiting threads.
1455 */
1456 private void signalIdleWorkers() {
1457 long c;
1458 do {} while (!casEventCount(c = eventCount, c+1));
1459 ensureSync();
1460 }
1461
1462 /**
1463 * Signals threads waiting to poll a task. Because method sync
1464 * rechecks availability, it is OK to only proceed if queue
1465 * appears to be non-empty, and OK if CAS to increment count
1466 * fails (since some other thread succeeded).
1467 */
1468 final void signalWork() {
1469 if (syncStack != null) {
1470 long c = eventCount;
1471 casEventCount(c, c+1);
1472 WaitQueueNode q = syncStack;
1473 if (q != null && q.count <= c) {
1474 if (casBarrierStack(q, q.next))
1475 q.signal();
1476 else
1477 ensureSync(); // awaken all on contention
1478 }
1479 }
1480 }
1481
1482 /**
1483 * Possibly blocks until event count advances from last value held
1484 * by caller, or if excess threads, caller is resumed as spare, or
1485 * caller or pool is terminating. Updates caller's event on exit.
1486 *
1487 * @param w the calling worker thread
1488 */
1489 final void sync(ForkJoinWorkerThread w) {
1490 updateStealCount(w); // Transfer w's count while it is idle
1491
1492 if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493 long prev = w.lastEventCount;
1494 WaitQueueNode node = null;
1495 WaitQueueNode h;
1496 long c;
1497 while ((c = eventCount) == prev &&
1498 ((h = syncStack) == null || h.count == prev)) {
1499 if (node == null)
1500 node = new WaitQueueNode(prev, w);
1501 if (casBarrierStack(node.next = h, node)) {
1502 if (!Thread.interrupted() &&
1503 node.thread != null &&
1504 eventCount == prev &&
1505 (h != null || // cover signalWork race
1506 (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1507 eventCount == prev)))
1508 LockSupport.park(this);
1509 c = eventCount;
1510 if (node.thread != null) { // help signal if not unparked
1511 node.thread = null;
1512 if (c == prev)
1513 casEventCount(prev, prev + 1);
1514 }
1515 break;
1516 }
1517 }
1518 w.lastEventCount = c;
1519 ensureSync();
1520 }
1521 }
1522
1523 /**
1524 * Returns {@code true} if a new sync event occurred since last
1525 * call to sync or this method, if so, updating caller's count.
1526 */
1527 final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1528 long wc = w.lastEventCount;
1529 long c = eventCount;
1530 if (wc != c)
1531 w.lastEventCount = c;
1532 ensureSync();
1533 return wc != c || wc != eventCount;
1534 }
1535
1536 // Parallelism maintenance
1537
1538 /**
1539 * Decrements running count; if too low, adds spare.
1540 *
1541 * Conceptually, all we need to do here is add or resume a
1542 * spare thread when one is about to block (and remove or
1543 * suspend it later when unblocked -- see suspendIfSpare).
1544 * However, implementing this idea requires coping with
1545 * several problems: we have imperfect information about the
1546 * states of threads. Some count updates can and usually do
1547 * lag run state changes, despite arrangements to keep them
1548 * accurate (for example, when possible, updating counts
1549 * before signalling or resuming), especially when running on
1550 * dynamic JVMs that don't optimize the infrequent paths that
1551 * update counts. Generating too many threads can make these
1552 * problems become worse, because excess threads are more
1553 * likely to be context-switched with others, slowing them all
1554 * down, especially if there is no work available, so all are
1555 * busy scanning or idling. Also, excess spare threads can
1556 * only be suspended or removed when they are idle, not
1557 * immediately when they aren't needed. So adding threads will
1558 * raise parallelism level for longer than necessary. Also,
1559 * FJ applications often encounter highly transient peaks when
1560 * many threads are blocked joining, but for less time than it
1561 * takes to create or resume spares.
1562 *
1563 * @param joinMe if non-null, return early if done
1564 * @param maintainParallelism if true, try to stay within
1565 * target counts, else create only to avoid starvation
1566 * @return true if joinMe known to be done
1567 */
1568 final boolean preJoin(ForkJoinTask<?> joinMe,
1569 boolean maintainParallelism) {
1570 maintainParallelism &= maintainsParallelism; // overrride
1571 boolean dec = false; // true when running count decremented
1572 while (spareStack == null || !tryResumeSpare(dec)) {
1573 int counts = workerCounts;
1574 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1575 if (!needSpare(counts, maintainParallelism))
1576 break;
1577 if (joinMe.status < 0)
1578 return true;
1579 if (tryAddSpare(counts))
1580 break;
1581 }
1582 }
1583 return false;
1584 }
1585
1586 /**
1587 * Same idea as preJoin
1588 */
1589 final boolean preBlock(ManagedBlocker blocker,
1590 boolean maintainParallelism) {
1591 maintainParallelism &= maintainsParallelism;
1592 boolean dec = false;
1593 while (spareStack == null || !tryResumeSpare(dec)) {
1594 int counts = workerCounts;
1595 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1596 if (!needSpare(counts, maintainParallelism))
1597 break;
1598 if (blocker.isReleasable())
1599 return true;
1600 if (tryAddSpare(counts))
1601 break;
1602 }
1603 }
1604 return false;
1605 }
1606
1607 /**
1608 * Returns {@code true} if a spare thread appears to be needed.
1609 * If maintaining parallelism, returns true when the deficit in
1610 * running threads is more than the surplus of total threads, and
1611 * there is apparently some work to do. This self-limiting rule
1612 * means that the more threads that have already been added, the
1613 * less parallelism we will tolerate before adding another.
1614 *
1615 * @param counts current worker counts
1616 * @param maintainParallelism try to maintain parallelism
1617 */
1618 private boolean needSpare(int counts, boolean maintainParallelism) {
1619 int ps = parallelism;
1620 int rc = runningCountOf(counts);
1621 int tc = totalCountOf(counts);
1622 int runningDeficit = ps - rc;
1623 int totalSurplus = tc - ps;
1624 return (tc < maxPoolSize &&
1625 (rc == 0 || totalSurplus < 0 ||
1626 (maintainParallelism &&
1627 runningDeficit > totalSurplus &&
1628 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1629 }
1630
1631 /**
1632 * Adds a spare worker if lock available and no more than the
1633 * expected numbers of threads exist.
1634 *
1635 * @return true if successful
1636 */
1637 private boolean tryAddSpare(int expectedCounts) {
1638 final ReentrantLock lock = this.workerLock;
1639 int expectedRunning = runningCountOf(expectedCounts);
1640 int expectedTotal = totalCountOf(expectedCounts);
1641 boolean success = false;
1642 boolean locked = false;
1643 // confirm counts while locking; CAS after obtaining lock
1644 try {
1645 for (;;) {
1646 int s = workerCounts;
1647 int tc = totalCountOf(s);
1648 int rc = runningCountOf(s);
1649 if (rc > expectedRunning || tc > expectedTotal)
1650 break;
1651 if (!locked && !(locked = lock.tryLock()))
1652 break;
1653 if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1654 createAndStartSpare(tc);
1655 success = true;
1656 break;
1657 }
1658 }
1659 } finally {
1660 if (locked)
1661 lock.unlock();
1662 }
1663 return success;
1664 }
1665
1666 /**
1667 * Adds the kth spare worker. On entry, pool counts are already
1668 * adjusted to reflect addition.
1669 */
1670 private void createAndStartSpare(int k) {
1671 ForkJoinWorkerThread w = null;
1672 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1673 int len = ws.length;
1674 // Probably, we can place at slot k. If not, find empty slot
1675 if (k < len && ws[k] != null) {
1676 for (k = 0; k < len && ws[k] != null; ++k)
1677 ;
1678 }
1679 if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1680 ws[k] = w;
1681 w.start();
1682 }
1683 else
1684 updateWorkerCount(-1); // adjust on failure
1685 signalIdleWorkers();
1686 }
1687
1688 /**
1689 * Suspends calling thread w if there are excess threads. Called
1690 * only from sync. Spares are enqueued in a Treiber stack using
1691 * the same WaitQueueNodes as barriers. They are resumed mainly
1692 * in preJoin, but are also woken on pool events that require all
1693 * threads to check run state.
1694 *
1695 * @param w the caller
1696 */
1697 private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1698 WaitQueueNode node = null;
1699 for (;;) {
1700 int s = workerCounts;
1701 int rc = runningCountOf(s);
1702 int tc = totalCountOf(s);
1703 int ps = parallelism;
1704 // use tc as bound if rc transiently out of sync
1705 if (tc <= ps || rc <= ps)
1706 return false; // not a spare
1707 if (node == null)
1708 node = new WaitQueueNode(0, w);
1709 if (casWorkerCounts(s, workerCountsFor(tc, rc - 1)))
1710 break;
1711 }
1712 // push onto stack
1713 do {} while (!casSpareStack(node.next = spareStack, node));
1714 // block until released by resumeSpare
1715 while (!Thread.interrupted() && node.thread != null)
1716 LockSupport.park(this);
1717 return true;
1718 }
1719
1720 /**
1721 * Tries to pop and resume a spare thread.
1722 *
1723 * @param updateCount if true, increment running count on success
1724 * @return true if successful
1725 */
1726 private boolean tryResumeSpare(boolean updateCount) {
1727 WaitQueueNode q;
1728 while ((q = spareStack) != null) {
1729 if (casSpareStack(q, q.next)) {
1730 if (updateCount)
1731 updateRunningCount(1);
1732 q.signal();
1733 return true;
1734 }
1735 }
1736 return false;
1737 }
1738
1739 /**
1740 * Pops and resumes all spare threads. Same idea as ensureSync.
1741 *
1742 * @return true if any spares released
1743 */
1744 private boolean resumeAllSpares() {
1745 WaitQueueNode q;
1746 while ( (q = spareStack) != null) {
1747 if (casSpareStack(q, null)) {
1748 do {
1749 updateRunningCount(1);
1750 q.signal();
1751 } while ((q = q.next) != null);
1752 return true;
1753 }
1754 }
1755 return false;
1756 }
1757
1758 /**
1759 * Pops and shuts down excessive spare threads. Call only while
1760 * holding lock. This is not guaranteed to eliminate all excess
1761 * threads, only those suspended as spares, which are the ones
1762 * unlikely to be needed in the future.
1763 */
1764 private void trimSpares() {
1765 int surplus = totalCountOf(workerCounts) - parallelism;
1766 WaitQueueNode q;
1767 while (surplus > 0 && (q = spareStack) != null) {
1768 if (casSpareStack(q, null)) {
1769 do {
1770 updateRunningCount(1);
1771 ForkJoinWorkerThread w = q.thread;
1772 if (w != null && surplus > 0 &&
1773 runningCountOf(workerCounts) > 0 && w.shutdown())
1774 --surplus;
1775 q.signal();
1776 } while ((q = q.next) != null);
1777 }
1778 }
1779 }
1780
1781 /**
1782 * Interface for extending managed parallelism for tasks running
1783 * in {@link ForkJoinPool}s.
1784 *
1785 * <p>A {@code ManagedBlocker} provides two methods.
1786 * Method {@code isReleasable} must return {@code true} if
1787 * blocking is not necessary. Method {@code block} blocks the
1788 * current thread if necessary (perhaps internally invoking
1789 * {@code isReleasable} before actually blocking).
1790 *
1791 * <p>For example, here is a ManagedBlocker based on a
1792 * ReentrantLock:
1793 * <pre> {@code
1794 * class ManagedLocker implements ManagedBlocker {
1795 * final ReentrantLock lock;
1796 * boolean hasLock = false;
1797 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1798 * public boolean block() {
1799 * if (!hasLock)
1800 * lock.lock();
1801 * return true;
1802 * }
1803 * public boolean isReleasable() {
1804 * return hasLock || (hasLock = lock.tryLock());
1805 * }
1806 * }}</pre>
1807 */
1808 public static interface ManagedBlocker {
1809 /**
1810 * Possibly blocks the current thread, for example waiting for
1811 * a lock or condition.
1812 *
1813 * @return {@code true} if no additional blocking is necessary
1814 * (i.e., if isReleasable would return true)
1815 * @throws InterruptedException if interrupted while waiting
1816 * (the method is not required to do so, but is allowed to)
1817 */
1818 boolean block() throws InterruptedException;
1819
1820 /**
1821 * Returns {@code true} if blocking is unnecessary.
1822 */
1823 boolean isReleasable();
1824 }
1825
1826 /**
1827 * Blocks in accord with the given blocker. If the current thread
1828 * is a {@link ForkJoinWorkerThread}, this method possibly
1829 * arranges for a spare thread to be activated if necessary to
1830 * ensure parallelism while the current thread is blocked.
1831 *
1832 * <p>If {@code maintainParallelism} is {@code true} and the pool
1833 * supports it ({@link #getMaintainsParallelism}), this method
1834 * attempts to maintain the pool's nominal parallelism. Otherwise
1835 * it activates a thread only if necessary to avoid complete
1836 * starvation. This option may be preferable when blockages use
1837 * timeouts, or are almost always brief.
1838 *
1839 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1840 * behaviorally equivalent to
1841 * <pre> {@code
1842 * while (!blocker.isReleasable())
1843 * if (blocker.block())
1844 * return;
1845 * }</pre>
1846 *
1847 * If the caller is a {@code ForkJoinTask}, then the pool may
1848 * first be expanded to ensure parallelism, and later adjusted.
1849 *
1850 * @param blocker the blocker
1851 * @param maintainParallelism if {@code true} and supported by
1852 * this pool, attempt to maintain the pool's nominal parallelism;
1853 * otherwise activate a thread only if necessary to avoid
1854 * complete starvation.
1855 * @throws InterruptedException if blocker.block did so
1856 */
1857 public static void managedBlock(ManagedBlocker blocker,
1858 boolean maintainParallelism)
1859 throws InterruptedException {
1860 Thread t = Thread.currentThread();
1861 ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1862 ((ForkJoinWorkerThread) t).pool : null);
1863 if (!blocker.isReleasable()) {
1864 try {
1865 if (pool == null ||
1866 !pool.preBlock(blocker, maintainParallelism))
1867 awaitBlocker(blocker);
1868 } finally {
1869 if (pool != null)
1870 pool.updateRunningCount(1);
1871 }
1872 }
1873 }
1874
1875 private static void awaitBlocker(ManagedBlocker blocker)
1876 throws InterruptedException {
1877 do {} while (!blocker.isReleasable() && !blocker.block());
1878 }
1879
1880 // AbstractExecutorService overrides. These rely on undocumented
1881 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1882 // implement RunnableFuture.
1883
1884 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1885 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1886 }
1887
1888 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1889 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1890 }
1891
1892 // Unsafe mechanics
1893
1894 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1895 private static final long eventCountOffset =
1896 objectFieldOffset("eventCount", ForkJoinPool.class);
1897 private static final long workerCountsOffset =
1898 objectFieldOffset("workerCounts", ForkJoinPool.class);
1899 private static final long runControlOffset =
1900 objectFieldOffset("runControl", ForkJoinPool.class);
1901 private static final long syncStackOffset =
1902 objectFieldOffset("syncStack",ForkJoinPool.class);
1903 private static final long spareStackOffset =
1904 objectFieldOffset("spareStack", ForkJoinPool.class);
1905
1906 private boolean casEventCount(long cmp, long val) {
1907 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1908 }
1909 private boolean casWorkerCounts(int cmp, int val) {
1910 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1911 }
1912 private boolean casRunControl(int cmp, int val) {
1913 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1914 }
1915 private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1916 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1917 }
1918 private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1919 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1920 }
1921
1922 private static long objectFieldOffset(String field, Class<?> klazz) {
1923 try {
1924 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1925 } catch (NoSuchFieldException e) {
1926 // Convert Exception to corresponding Error
1927 NoSuchFieldError error = new NoSuchFieldError(field);
1928 error.initCause(e);
1929 throw error;
1930 }
1931 }
1932 }