ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.12
Committed: Mon Nov 16 04:57:09 2009 UTC (14 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.11: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.LockSupport;
16 import java.util.concurrent.locks.ReentrantLock;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.atomic.AtomicLong;
19
20 /**
21 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
22 * A {@code ForkJoinPool} provides the entry point for submissions
23 * from non-{@code ForkJoinTask}s, as well as management and
24 * monitoring operations.
25 *
26 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
27 * ExecutorService} mainly by virtue of employing
28 * <em>work-stealing</em>: all threads in the pool attempt to find and
29 * execute subtasks created by other active tasks (eventually blocking
30 * waiting for work if none exist). This enables efficient processing
31 * when most tasks spawn other subtasks (as do most {@code
32 * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
33 * execution of some plain {@code Runnable}- or {@code Callable}-
34 * based activities along with {@code ForkJoinTask}s. When setting
35 * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
36 * also be appropriate for use with fine-grained tasks of any form
37 * that are never joined. Otherwise, other {@code ExecutorService}
38 * implementations are typically more appropriate choices.
39 *
40 * <p>A {@code ForkJoinPool} is constructed with a given target
41 * parallelism level; by default, equal to the number of available
42 * processors. Unless configured otherwise via {@link
43 * #setMaintainsParallelism}, the pool attempts to maintain this
44 * number of active (or available) threads by dynamically adding,
45 * suspending, or resuming internal worker threads, even if some tasks
46 * are stalled waiting to join others. However, no such adjustments
47 * are performed in the face of blocked IO or other unmanaged
48 * synchronization. The nested {@link ManagedBlocker} interface
49 * enables extension of the kinds of synchronization accommodated.
50 * The target parallelism level may also be changed dynamically
51 * ({@link #setParallelism}). The total number of threads may be
52 * limited using method {@link #setMaximumPoolSize}, in which case it
53 * may become possible for the activities of a pool to stall due to
54 * the lack of available threads to process new tasks.
55 *
56 * <p>In addition to execution and lifecycle control methods, this
57 * class provides status check methods (for example
58 * {@link #getStealCount}) that are intended to aid in developing,
59 * tuning, and monitoring fork/join applications. Also, method
60 * {@link #toString} returns indications of pool state in a
61 * convenient form for informal monitoring.
62 *
63 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
64 * used for all parallel task execution in a program or subsystem.
65 * Otherwise, use would not usually outweigh the construction and
66 * bookkeeping overhead of creating a large set of threads. For
67 * example, a common pool could be used for the {@code SortTasks}
68 * illustrated in {@link RecursiveAction}. Because {@code
69 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
70 * daemon} mode, there is typically no need to explicitly {@link
71 * #shutdown} such a pool upon program exit.
72 *
73 * <pre>
74 * static final ForkJoinPool mainPool = new ForkJoinPool();
75 * ...
76 * public void sort(long[] array) {
77 * mainPool.invoke(new SortTask(array, 0, array.length));
78 * }
79 * </pre>
80 *
81 * <p><b>Implementation notes</b>: This implementation restricts the
82 * maximum number of running threads to 32767. Attempts to create
83 * pools with greater than the maximum number result in
84 * {@code IllegalArgumentException}.
85 *
86 * <p>This implementation rejects submitted tasks (that is, by throwing
87 * {@link RejectedExecutionException}) only when the pool is shut down.
88 *
89 * @since 1.7
90 * @author Doug Lea
91 */
92 public class ForkJoinPool extends AbstractExecutorService {
93
94 /*
95 * See the extended comments interspersed below for design,
96 * rationale, and walkthroughs.
97 */
98
99 /** Mask for packing and unpacking shorts */
100 private static final int shortMask = 0xffff;
101
102 /** Max pool size -- must be a power of two minus 1 */
103 private static final int MAX_THREADS = 0x7FFF;
104
105 /**
106 * Factory for creating new {@link ForkJoinWorkerThread}s.
107 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
108 * for {@code ForkJoinWorkerThread} subclasses that extend base
109 * functionality or initialize threads with different contexts.
110 */
111 public static interface ForkJoinWorkerThreadFactory {
112 /**
113 * Returns a new worker thread operating in the given pool.
114 *
115 * @param pool the pool this thread works in
116 * @throws NullPointerException if the pool is null
117 */
118 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
119 }
120
121 /**
122 * Default ForkJoinWorkerThreadFactory implementation; creates a
123 * new ForkJoinWorkerThread.
124 */
125 static class DefaultForkJoinWorkerThreadFactory
126 implements ForkJoinWorkerThreadFactory {
127 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
128 try {
129 return new ForkJoinWorkerThread(pool);
130 } catch (OutOfMemoryError oom) {
131 return null;
132 }
133 }
134 }
135
136 /**
137 * Creates a new ForkJoinWorkerThread. This factory is used unless
138 * overridden in ForkJoinPool constructors.
139 */
140 public static final ForkJoinWorkerThreadFactory
141 defaultForkJoinWorkerThreadFactory =
142 new DefaultForkJoinWorkerThreadFactory();
143
144 /**
145 * Permission required for callers of methods that may start or
146 * kill threads.
147 */
148 private static final RuntimePermission modifyThreadPermission =
149 new RuntimePermission("modifyThread");
150
151 /**
152 * If there is a security manager, makes sure caller has
153 * permission to modify threads.
154 */
155 private static void checkPermission() {
156 SecurityManager security = System.getSecurityManager();
157 if (security != null)
158 security.checkPermission(modifyThreadPermission);
159 }
160
161 /**
162 * Generator for assigning sequence numbers as pool names.
163 */
164 private static final AtomicInteger poolNumberGenerator =
165 new AtomicInteger();
166
167 /**
168 * Array holding all worker threads in the pool. Initialized upon
169 * first use. Array size must be a power of two. Updates and
170 * replacements are protected by workerLock, but it is always kept
171 * in a consistent enough state to be randomly accessed without
172 * locking by workers performing work-stealing.
173 */
174 volatile ForkJoinWorkerThread[] workers;
175
176 /**
177 * Lock protecting access to workers.
178 */
179 private final ReentrantLock workerLock;
180
181 /**
182 * Condition for awaitTermination.
183 */
184 private final Condition termination;
185
186 /**
187 * The uncaught exception handler used when any worker
188 * abruptly terminates
189 */
190 private Thread.UncaughtExceptionHandler ueh;
191
192 /**
193 * Creation factory for worker threads.
194 */
195 private final ForkJoinWorkerThreadFactory factory;
196
197 /**
198 * Head of stack of threads that were created to maintain
199 * parallelism when other threads blocked, but have since
200 * suspended when the parallelism level rose.
201 */
202 private volatile WaitQueueNode spareStack;
203
204 /**
205 * Sum of per-thread steal counts, updated only when threads are
206 * idle or terminating.
207 */
208 private final AtomicLong stealCount;
209
210 /**
211 * Queue for external submissions.
212 */
213 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
214
215 /**
216 * Head of Treiber stack for barrier sync. See below for explanation.
217 */
218 private volatile WaitQueueNode syncStack;
219
220 /**
221 * The count for event barrier
222 */
223 private volatile long eventCount;
224
225 /**
226 * Pool number, just for assigning useful names to worker threads
227 */
228 private final int poolNumber;
229
230 /**
231 * The maximum allowed pool size
232 */
233 private volatile int maxPoolSize;
234
235 /**
236 * The desired parallelism level, updated only under workerLock.
237 */
238 private volatile int parallelism;
239
240 /**
241 * True if use local fifo, not default lifo, for local polling
242 */
243 private volatile boolean locallyFifo;
244
245 /**
246 * Holds number of total (i.e., created and not yet terminated)
247 * and running (i.e., not blocked on joins or other managed sync)
248 * threads, packed into one int to ensure consistent snapshot when
249 * making decisions about creating and suspending spare
250 * threads. Updated only by CAS. Note: CASes in
251 * updateRunningCount and preJoin assume that running active count
252 * is in low word, so need to be modified if this changes.
253 */
254 private volatile int workerCounts;
255
256 private static int totalCountOf(int s) { return s >>> 16; }
257 private static int runningCountOf(int s) { return s & shortMask; }
258 private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
259
260 /**
261 * Adds delta (which may be negative) to running count. This must
262 * be called before (with negative arg) and after (with positive)
263 * any managed synchronization (i.e., mainly, joins).
264 *
265 * @param delta the number to add
266 */
267 final void updateRunningCount(int delta) {
268 int s;
269 do {} while (!casWorkerCounts(s = workerCounts, s + delta));
270 }
271
272 /**
273 * Adds delta (which may be negative) to both total and running
274 * count. This must be called upon creation and termination of
275 * worker threads.
276 *
277 * @param delta the number to add
278 */
279 private void updateWorkerCount(int delta) {
280 int d = delta + (delta << 16); // add to both lo and hi parts
281 int s;
282 do {} while (!casWorkerCounts(s = workerCounts, s + d));
283 }
284
285 /**
286 * Lifecycle control. High word contains runState, low word
287 * contains the number of workers that are (probably) executing
288 * tasks. This value is atomically incremented before a worker
289 * gets a task to run, and decremented when worker has no tasks
290 * and cannot find any. These two fields are bundled together to
291 * support correct termination triggering. Note: activeCount
292 * CAS'es cheat by assuming active count is in low word, so need
293 * to be modified if this changes
294 */
295 private volatile int runControl;
296
297 // RunState values. Order among values matters
298 private static final int RUNNING = 0;
299 private static final int SHUTDOWN = 1;
300 private static final int TERMINATING = 2;
301 private static final int TERMINATED = 3;
302
303 private static int runStateOf(int c) { return c >>> 16; }
304 private static int activeCountOf(int c) { return c & shortMask; }
305 private static int runControlFor(int r, int a) { return (r << 16) + a; }
306
307 /**
308 * Tries incrementing active count; fails on contention.
309 * Called by workers before/during executing tasks.
310 *
311 * @return true on success
312 */
313 final boolean tryIncrementActiveCount() {
314 int c = runControl;
315 return casRunControl(c, c+1);
316 }
317
318 /**
319 * Tries decrementing active count; fails on contention.
320 * Possibly triggers termination on success.
321 * Called by workers when they can't find tasks.
322 *
323 * @return true on success
324 */
325 final boolean tryDecrementActiveCount() {
326 int c = runControl;
327 int nextc = c - 1;
328 if (!casRunControl(c, nextc))
329 return false;
330 if (canTerminateOnShutdown(nextc))
331 terminateOnShutdown();
332 return true;
333 }
334
335 /**
336 * Returns {@code true} if argument represents zero active count
337 * and nonzero runstate, which is the triggering condition for
338 * terminating on shutdown.
339 */
340 private static boolean canTerminateOnShutdown(int c) {
341 // i.e. least bit is nonzero runState bit
342 return ((c & -c) >>> 16) != 0;
343 }
344
345 /**
346 * Transition run state to at least the given state. Return true
347 * if not already at least given state.
348 */
349 private boolean transitionRunStateTo(int state) {
350 for (;;) {
351 int c = runControl;
352 if (runStateOf(c) >= state)
353 return false;
354 if (casRunControl(c, runControlFor(state, activeCountOf(c))))
355 return true;
356 }
357 }
358
359 /**
360 * Controls whether to add spares to maintain parallelism
361 */
362 private volatile boolean maintainsParallelism;
363
364 // Constructors
365
366 /**
367 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
368 * java.lang.Runtime#availableProcessors}, and using the {@linkplain
369 * #defaultForkJoinWorkerThreadFactory default thread factory}.
370 *
371 * @throws SecurityException if a security manager exists and
372 * the caller is not permitted to modify threads
373 * because it does not hold {@link
374 * java.lang.RuntimePermission}{@code ("modifyThread")}
375 */
376 public ForkJoinPool() {
377 this(Runtime.getRuntime().availableProcessors(),
378 defaultForkJoinWorkerThreadFactory);
379 }
380
381 /**
382 * Creates a {@code ForkJoinPool} with the indicated parallelism
383 * level and using the {@linkplain
384 * #defaultForkJoinWorkerThreadFactory default thread factory}.
385 *
386 * @param parallelism the parallelism level
387 * @throws IllegalArgumentException if parallelism less than or
388 * equal to zero, or greater than implementation limit
389 * @throws SecurityException if a security manager exists and
390 * the caller is not permitted to modify threads
391 * because it does not hold {@link
392 * java.lang.RuntimePermission}{@code ("modifyThread")}
393 */
394 public ForkJoinPool(int parallelism) {
395 this(parallelism, defaultForkJoinWorkerThreadFactory);
396 }
397
398 /**
399 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
400 * java.lang.Runtime#availableProcessors}, and using the given
401 * thread factory.
402 *
403 * @param factory the factory for creating new threads
404 * @throws NullPointerException if the factory is null
405 * @throws SecurityException if a security manager exists and
406 * the caller is not permitted to modify threads
407 * because it does not hold {@link
408 * java.lang.RuntimePermission}{@code ("modifyThread")}
409 */
410 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
411 this(Runtime.getRuntime().availableProcessors(), factory);
412 }
413
414 /**
415 * Creates a {@code ForkJoinPool} with the given parallelism and
416 * thread factory.
417 *
418 * @param parallelism the parallelism level
419 * @param factory the factory for creating new threads
420 * @throws IllegalArgumentException if parallelism less than or
421 * equal to zero, or greater than implementation limit
422 * @throws NullPointerException if the factory is null
423 * @throws SecurityException if a security manager exists and
424 * the caller is not permitted to modify threads
425 * because it does not hold {@link
426 * java.lang.RuntimePermission}{@code ("modifyThread")}
427 */
428 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
429 if (parallelism <= 0 || parallelism > MAX_THREADS)
430 throw new IllegalArgumentException();
431 if (factory == null)
432 throw new NullPointerException();
433 checkPermission();
434 this.factory = factory;
435 this.parallelism = parallelism;
436 this.maxPoolSize = MAX_THREADS;
437 this.maintainsParallelism = true;
438 this.poolNumber = poolNumberGenerator.incrementAndGet();
439 this.workerLock = new ReentrantLock();
440 this.termination = workerLock.newCondition();
441 this.stealCount = new AtomicLong();
442 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
443 // worker array and workers are lazily constructed
444 }
445
446 /**
447 * Creates a new worker thread using factory.
448 *
449 * @param index the index to assign worker
450 * @return new worker, or null if factory failed
451 */
452 private ForkJoinWorkerThread createWorker(int index) {
453 Thread.UncaughtExceptionHandler h = ueh;
454 ForkJoinWorkerThread w = factory.newThread(this);
455 if (w != null) {
456 w.poolIndex = index;
457 w.setDaemon(true);
458 w.setAsyncMode(locallyFifo);
459 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
460 if (h != null)
461 w.setUncaughtExceptionHandler(h);
462 }
463 return w;
464 }
465
466 /**
467 * Returns a good size for worker array given pool size.
468 * Currently requires size to be a power of two.
469 */
470 private static int arraySizeFor(int poolSize) {
471 if (poolSize <= 1)
472 return 1;
473 // See Hackers Delight, sec 3.2
474 int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
475 c |= c >>> 1;
476 c |= c >>> 2;
477 c |= c >>> 4;
478 c |= c >>> 8;
479 c |= c >>> 16;
480 return c + 1;
481 }
482
483 /**
484 * Creates or resizes array if necessary to hold newLength.
485 * Call only under exclusion.
486 *
487 * @return the array
488 */
489 private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
490 ForkJoinWorkerThread[] ws = workers;
491 if (ws == null)
492 return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
493 else if (newLength > ws.length)
494 return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
495 else
496 return ws;
497 }
498
499 /**
500 * Tries to shrink workers into smaller array after one or more terminate.
501 */
502 private void tryShrinkWorkerArray() {
503 ForkJoinWorkerThread[] ws = workers;
504 if (ws != null) {
505 int len = ws.length;
506 int last = len - 1;
507 while (last >= 0 && ws[last] == null)
508 --last;
509 int newLength = arraySizeFor(last+1);
510 if (newLength < len)
511 workers = Arrays.copyOf(ws, newLength);
512 }
513 }
514
515 /**
516 * Initializes workers if necessary.
517 */
518 final void ensureWorkerInitialization() {
519 ForkJoinWorkerThread[] ws = workers;
520 if (ws == null) {
521 final ReentrantLock lock = this.workerLock;
522 lock.lock();
523 try {
524 ws = workers;
525 if (ws == null) {
526 int ps = parallelism;
527 ws = ensureWorkerArrayCapacity(ps);
528 for (int i = 0; i < ps; ++i) {
529 ForkJoinWorkerThread w = createWorker(i);
530 if (w != null) {
531 ws[i] = w;
532 w.start();
533 updateWorkerCount(1);
534 }
535 }
536 }
537 } finally {
538 lock.unlock();
539 }
540 }
541 }
542
543 /**
544 * Worker creation and startup for threads added via setParallelism.
545 */
546 private void createAndStartAddedWorkers() {
547 resumeAllSpares(); // Allow spares to convert to nonspare
548 int ps = parallelism;
549 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
550 int len = ws.length;
551 // Sweep through slots, to keep lowest indices most populated
552 int k = 0;
553 while (k < len) {
554 if (ws[k] != null) {
555 ++k;
556 continue;
557 }
558 int s = workerCounts;
559 int tc = totalCountOf(s);
560 int rc = runningCountOf(s);
561 if (rc >= ps || tc >= ps)
562 break;
563 if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
564 ForkJoinWorkerThread w = createWorker(k);
565 if (w != null) {
566 ws[k++] = w;
567 w.start();
568 }
569 else {
570 updateWorkerCount(-1); // back out on failed creation
571 break;
572 }
573 }
574 }
575 }
576
577 // Execution methods
578
579 /**
580 * Common code for execute, invoke and submit
581 */
582 private <T> void doSubmit(ForkJoinTask<T> task) {
583 if (task == null)
584 throw new NullPointerException();
585 if (isShutdown())
586 throw new RejectedExecutionException();
587 if (workers == null)
588 ensureWorkerInitialization();
589 submissionQueue.offer(task);
590 signalIdleWorkers();
591 }
592
593 /**
594 * Performs the given task, returning its result upon completion.
595 *
596 * @param task the task
597 * @return the task's result
598 * @throws NullPointerException if the task is null
599 * @throws RejectedExecutionException if the task cannot be
600 * scheduled for execution
601 */
602 public <T> T invoke(ForkJoinTask<T> task) {
603 doSubmit(task);
604 return task.join();
605 }
606
607 /**
608 * Arranges for (asynchronous) execution of the given task.
609 *
610 * @param task the task
611 * @throws NullPointerException if the task is null
612 * @throws RejectedExecutionException if the task cannot be
613 * scheduled for execution
614 */
615 public void execute(ForkJoinTask<?> task) {
616 doSubmit(task);
617 }
618
619 // AbstractExecutorService methods
620
621 /**
622 * @throws NullPointerException if the task is null
623 * @throws RejectedExecutionException if the task cannot be
624 * scheduled for execution
625 */
626 public void execute(Runnable task) {
627 ForkJoinTask<?> job;
628 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
629 job = (ForkJoinTask<?>) task;
630 else
631 job = ForkJoinTask.adapt(task, null);
632 doSubmit(job);
633 }
634
635 /**
636 * @throws NullPointerException if the task is null
637 * @throws RejectedExecutionException if the task cannot be
638 * scheduled for execution
639 */
640 public <T> ForkJoinTask<T> submit(Callable<T> task) {
641 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
642 doSubmit(job);
643 return job;
644 }
645
646 /**
647 * @throws NullPointerException if the task is null
648 * @throws RejectedExecutionException if the task cannot be
649 * scheduled for execution
650 */
651 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
652 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
653 doSubmit(job);
654 return job;
655 }
656
657 /**
658 * @throws NullPointerException if the task is null
659 * @throws RejectedExecutionException if the task cannot be
660 * scheduled for execution
661 */
662 public ForkJoinTask<?> submit(Runnable task) {
663 ForkJoinTask<?> job;
664 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
665 job = (ForkJoinTask<?>) task;
666 else
667 job = ForkJoinTask.adapt(task, null);
668 doSubmit(job);
669 return job;
670 }
671
672 /**
673 * Submits a ForkJoinTask for execution.
674 *
675 * @param task the task to submit
676 * @return the task
677 * @throws NullPointerException if the task is null
678 * @throws RejectedExecutionException if the task cannot be
679 * scheduled for execution
680 */
681 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
682 doSubmit(task);
683 return task;
684 }
685
686
687 /**
688 * @throws NullPointerException {@inheritDoc}
689 * @throws RejectedExecutionException {@inheritDoc}
690 */
691 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
692 ArrayList<ForkJoinTask<T>> forkJoinTasks =
693 new ArrayList<ForkJoinTask<T>>(tasks.size());
694 for (Callable<T> task : tasks)
695 forkJoinTasks.add(ForkJoinTask.adapt(task));
696 invoke(new InvokeAll<T>(forkJoinTasks));
697
698 @SuppressWarnings({"unchecked", "rawtypes"})
699 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
700 return futures;
701 }
702
703 static final class InvokeAll<T> extends RecursiveAction {
704 final ArrayList<ForkJoinTask<T>> tasks;
705 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
706 public void compute() {
707 try { invokeAll(tasks); }
708 catch (Exception ignore) {}
709 }
710 private static final long serialVersionUID = -7914297376763021607L;
711 }
712
713 // Configuration and status settings and queries
714
715 /**
716 * Returns the factory used for constructing new workers.
717 *
718 * @return the factory used for constructing new workers
719 */
720 public ForkJoinWorkerThreadFactory getFactory() {
721 return factory;
722 }
723
724 /**
725 * Returns the handler for internal worker threads that terminate
726 * due to unrecoverable errors encountered while executing tasks.
727 *
728 * @return the handler, or {@code null} if none
729 */
730 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
731 Thread.UncaughtExceptionHandler h;
732 final ReentrantLock lock = this.workerLock;
733 lock.lock();
734 try {
735 h = ueh;
736 } finally {
737 lock.unlock();
738 }
739 return h;
740 }
741
742 /**
743 * Sets the handler for internal worker threads that terminate due
744 * to unrecoverable errors encountered while executing tasks.
745 * Unless set, the current default or ThreadGroup handler is used
746 * as handler.
747 *
748 * @param h the new handler
749 * @return the old handler, or {@code null} if none
750 * @throws SecurityException if a security manager exists and
751 * the caller is not permitted to modify threads
752 * because it does not hold {@link
753 * java.lang.RuntimePermission}{@code ("modifyThread")}
754 */
755 public Thread.UncaughtExceptionHandler
756 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
757 checkPermission();
758 Thread.UncaughtExceptionHandler old = null;
759 final ReentrantLock lock = this.workerLock;
760 lock.lock();
761 try {
762 old = ueh;
763 ueh = h;
764 ForkJoinWorkerThread[] ws = workers;
765 if (ws != null) {
766 for (int i = 0; i < ws.length; ++i) {
767 ForkJoinWorkerThread w = ws[i];
768 if (w != null)
769 w.setUncaughtExceptionHandler(h);
770 }
771 }
772 } finally {
773 lock.unlock();
774 }
775 return old;
776 }
777
778
779 /**
780 * Sets the target parallelism level of this pool.
781 *
782 * @param parallelism the target parallelism
783 * @throws IllegalArgumentException if parallelism less than or
784 * equal to zero or greater than maximum size bounds
785 * @throws SecurityException if a security manager exists and
786 * the caller is not permitted to modify threads
787 * because it does not hold {@link
788 * java.lang.RuntimePermission}{@code ("modifyThread")}
789 */
790 public void setParallelism(int parallelism) {
791 checkPermission();
792 if (parallelism <= 0 || parallelism > maxPoolSize)
793 throw new IllegalArgumentException();
794 final ReentrantLock lock = this.workerLock;
795 lock.lock();
796 try {
797 if (isProcessingTasks()) {
798 int p = this.parallelism;
799 this.parallelism = parallelism;
800 if (parallelism > p)
801 createAndStartAddedWorkers();
802 else
803 trimSpares();
804 }
805 } finally {
806 lock.unlock();
807 }
808 signalIdleWorkers();
809 }
810
811 /**
812 * Returns the targeted parallelism level of this pool.
813 *
814 * @return the targeted parallelism level of this pool
815 */
816 public int getParallelism() {
817 return parallelism;
818 }
819
820 /**
821 * Returns the number of worker threads that have started but not
822 * yet terminated. This result returned by this method may differ
823 * from {@link #getParallelism} when threads are created to
824 * maintain parallelism when others are cooperatively blocked.
825 *
826 * @return the number of worker threads
827 */
828 public int getPoolSize() {
829 return totalCountOf(workerCounts);
830 }
831
832 /**
833 * Returns the maximum number of threads allowed to exist in the
834 * pool. Unless set using {@link #setMaximumPoolSize}, the
835 * maximum is an implementation-defined value designed only to
836 * prevent runaway growth.
837 *
838 * @return the maximum
839 */
840 public int getMaximumPoolSize() {
841 return maxPoolSize;
842 }
843
844 /**
845 * Sets the maximum number of threads allowed to exist in the
846 * pool. The given value should normally be greater than or equal
847 * to the {@link #getParallelism parallelism} level. Setting this
848 * value has no effect on current pool size. It controls
849 * construction of new threads.
850 *
851 * @throws IllegalArgumentException if negative or greater than
852 * internal implementation limit
853 */
854 public void setMaximumPoolSize(int newMax) {
855 if (newMax < 0 || newMax > MAX_THREADS)
856 throw new IllegalArgumentException();
857 maxPoolSize = newMax;
858 }
859
860
861 /**
862 * Returns {@code true} if this pool dynamically maintains its
863 * target parallelism level. If false, new threads are added only
864 * to avoid possible starvation. This setting is by default true.
865 *
866 * @return {@code true} if maintains parallelism
867 */
868 public boolean getMaintainsParallelism() {
869 return maintainsParallelism;
870 }
871
872 /**
873 * Sets whether this pool dynamically maintains its target
874 * parallelism level. If false, new threads are added only to
875 * avoid possible starvation.
876 *
877 * @param enable {@code true} to maintain parallelism
878 */
879 public void setMaintainsParallelism(boolean enable) {
880 maintainsParallelism = enable;
881 }
882
883 /**
884 * Establishes local first-in-first-out scheduling mode for forked
885 * tasks that are never joined. This mode may be more appropriate
886 * than default locally stack-based mode in applications in which
887 * worker threads only process asynchronous tasks. This method is
888 * designed to be invoked only when the pool is quiescent, and
889 * typically only before any tasks are submitted. The effects of
890 * invocations at other times may be unpredictable.
891 *
892 * @param async if {@code true}, use locally FIFO scheduling
893 * @return the previous mode
894 * @see #getAsyncMode
895 */
896 public boolean setAsyncMode(boolean async) {
897 boolean oldMode = locallyFifo;
898 locallyFifo = async;
899 ForkJoinWorkerThread[] ws = workers;
900 if (ws != null) {
901 for (int i = 0; i < ws.length; ++i) {
902 ForkJoinWorkerThread t = ws[i];
903 if (t != null)
904 t.setAsyncMode(async);
905 }
906 }
907 return oldMode;
908 }
909
910 /**
911 * Returns {@code true} if this pool uses local first-in-first-out
912 * scheduling mode for forked tasks that are never joined.
913 *
914 * @return {@code true} if this pool uses async mode
915 * @see #setAsyncMode
916 */
917 public boolean getAsyncMode() {
918 return locallyFifo;
919 }
920
921 /**
922 * Returns an estimate of the number of worker threads that are
923 * not blocked waiting to join tasks or for other managed
924 * synchronization.
925 *
926 * @return the number of worker threads
927 */
928 public int getRunningThreadCount() {
929 return runningCountOf(workerCounts);
930 }
931
932 /**
933 * Returns an estimate of the number of threads that are currently
934 * stealing or executing tasks. This method may overestimate the
935 * number of active threads.
936 *
937 * @return the number of active threads
938 */
939 public int getActiveThreadCount() {
940 return activeCountOf(runControl);
941 }
942
943 /**
944 * Returns an estimate of the number of threads that are currently
945 * idle waiting for tasks. This method may underestimate the
946 * number of idle threads.
947 *
948 * @return the number of idle threads
949 */
950 final int getIdleThreadCount() {
951 int c = runningCountOf(workerCounts) - activeCountOf(runControl);
952 return (c <= 0) ? 0 : c;
953 }
954
955 /**
956 * Returns {@code true} if all worker threads are currently idle.
957 * An idle worker is one that cannot obtain a task to execute
958 * because none are available to steal from other threads, and
959 * there are no pending submissions to the pool. This method is
960 * conservative; it might not return {@code true} immediately upon
961 * idleness of all threads, but will eventually become true if
962 * threads remain inactive.
963 *
964 * @return {@code true} if all threads are currently idle
965 */
966 public boolean isQuiescent() {
967 return activeCountOf(runControl) == 0;
968 }
969
970 /**
971 * Returns an estimate of the total number of tasks stolen from
972 * one thread's work queue by another. The reported value
973 * underestimates the actual total number of steals when the pool
974 * is not quiescent. This value may be useful for monitoring and
975 * tuning fork/join programs: in general, steal counts should be
976 * high enough to keep threads busy, but low enough to avoid
977 * overhead and contention across threads.
978 *
979 * @return the number of steals
980 */
981 public long getStealCount() {
982 return stealCount.get();
983 }
984
985 /**
986 * Accumulates steal count from a worker.
987 * Call only when worker known to be idle.
988 */
989 private void updateStealCount(ForkJoinWorkerThread w) {
990 int sc = w.getAndClearStealCount();
991 if (sc != 0)
992 stealCount.addAndGet(sc);
993 }
994
995 /**
996 * Returns an estimate of the total number of tasks currently held
997 * in queues by worker threads (but not including tasks submitted
998 * to the pool that have not begun executing). This value is only
999 * an approximation, obtained by iterating across all threads in
1000 * the pool. This method may be useful for tuning task
1001 * granularities.
1002 *
1003 * @return the number of queued tasks
1004 */
1005 public long getQueuedTaskCount() {
1006 long count = 0;
1007 ForkJoinWorkerThread[] ws = workers;
1008 if (ws != null) {
1009 for (int i = 0; i < ws.length; ++i) {
1010 ForkJoinWorkerThread t = ws[i];
1011 if (t != null)
1012 count += t.getQueueSize();
1013 }
1014 }
1015 return count;
1016 }
1017
1018 /**
1019 * Returns an estimate of the number of tasks submitted to this
1020 * pool that have not yet begun executing. This method takes time
1021 * proportional to the number of submissions.
1022 *
1023 * @return the number of queued submissions
1024 */
1025 public int getQueuedSubmissionCount() {
1026 return submissionQueue.size();
1027 }
1028
1029 /**
1030 * Returns {@code true} if there are any tasks submitted to this
1031 * pool that have not yet begun executing.
1032 *
1033 * @return {@code true} if there are any queued submissions
1034 */
1035 public boolean hasQueuedSubmissions() {
1036 return !submissionQueue.isEmpty();
1037 }
1038
1039 /**
1040 * Removes and returns the next unexecuted submission if one is
1041 * available. This method may be useful in extensions to this
1042 * class that re-assign work in systems with multiple pools.
1043 *
1044 * @return the next submission, or {@code null} if none
1045 */
1046 protected ForkJoinTask<?> pollSubmission() {
1047 return submissionQueue.poll();
1048 }
1049
1050 /**
1051 * Removes all available unexecuted submitted and forked tasks
1052 * from scheduling queues and adds them to the given collection,
1053 * without altering their execution status. These may include
1054 * artificially generated or wrapped tasks. This method is
1055 * designed to be invoked only when the pool is known to be
1056 * quiescent. Invocations at other times may not remove all
1057 * tasks. A failure encountered while attempting to add elements
1058 * to collection {@code c} may result in elements being in
1059 * neither, either or both collections when the associated
1060 * exception is thrown. The behavior of this operation is
1061 * undefined if the specified collection is modified while the
1062 * operation is in progress.
1063 *
1064 * @param c the collection to transfer elements into
1065 * @return the number of elements transferred
1066 */
1067 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1068 int n = submissionQueue.drainTo(c);
1069 ForkJoinWorkerThread[] ws = workers;
1070 if (ws != null) {
1071 for (int i = 0; i < ws.length; ++i) {
1072 ForkJoinWorkerThread w = ws[i];
1073 if (w != null)
1074 n += w.drainTasksTo(c);
1075 }
1076 }
1077 return n;
1078 }
1079
1080 /**
1081 * Returns a string identifying this pool, as well as its state,
1082 * including indications of run state, parallelism level, and
1083 * worker and task counts.
1084 *
1085 * @return a string identifying this pool, as well as its state
1086 */
1087 public String toString() {
1088 int ps = parallelism;
1089 int wc = workerCounts;
1090 int rc = runControl;
1091 long st = getStealCount();
1092 long qt = getQueuedTaskCount();
1093 long qs = getQueuedSubmissionCount();
1094 return super.toString() +
1095 "[" + runStateToString(runStateOf(rc)) +
1096 ", parallelism = " + ps +
1097 ", size = " + totalCountOf(wc) +
1098 ", active = " + activeCountOf(rc) +
1099 ", running = " + runningCountOf(wc) +
1100 ", steals = " + st +
1101 ", tasks = " + qt +
1102 ", submissions = " + qs +
1103 "]";
1104 }
1105
1106 private static String runStateToString(int rs) {
1107 switch (rs) {
1108 case RUNNING: return "Running";
1109 case SHUTDOWN: return "Shutting down";
1110 case TERMINATING: return "Terminating";
1111 case TERMINATED: return "Terminated";
1112 default: throw new Error("Unknown run state");
1113 }
1114 }
1115
1116 // lifecycle control
1117
1118 /**
1119 * Initiates an orderly shutdown in which previously submitted
1120 * tasks are executed, but no new tasks will be accepted.
1121 * Invocation has no additional effect if already shut down.
1122 * Tasks that are in the process of being submitted concurrently
1123 * during the course of this method may or may not be rejected.
1124 *
1125 * @throws SecurityException if a security manager exists and
1126 * the caller is not permitted to modify threads
1127 * because it does not hold {@link
1128 * java.lang.RuntimePermission}{@code ("modifyThread")}
1129 */
1130 public void shutdown() {
1131 checkPermission();
1132 transitionRunStateTo(SHUTDOWN);
1133 if (canTerminateOnShutdown(runControl)) {
1134 if (workers == null) { // shutting down before workers created
1135 final ReentrantLock lock = this.workerLock;
1136 lock.lock();
1137 try {
1138 if (workers == null) {
1139 terminate();
1140 transitionRunStateTo(TERMINATED);
1141 termination.signalAll();
1142 }
1143 } finally {
1144 lock.unlock();
1145 }
1146 }
1147 terminateOnShutdown();
1148 }
1149 }
1150
1151 /**
1152 * Attempts to cancel and/or stop all tasks, and reject all
1153 * subsequently submitted tasks. Tasks that are in the process of
1154 * being submitted or executed concurrently during the course of
1155 * this method may or may not be rejected. This method cancels
1156 * both existing and unexecuted tasks, in order to permit
1157 * termination in the presence of task dependencies. So the method
1158 * always returns an empty list (unlike the case for some other
1159 * Executors).
1160 *
1161 * @return an empty list
1162 * @throws SecurityException if a security manager exists and
1163 * the caller is not permitted to modify threads
1164 * because it does not hold {@link
1165 * java.lang.RuntimePermission}{@code ("modifyThread")}
1166 */
1167 public List<Runnable> shutdownNow() {
1168 checkPermission();
1169 terminate();
1170 return Collections.emptyList();
1171 }
1172
1173 /**
1174 * Returns {@code true} if all tasks have completed following shut down.
1175 *
1176 * @return {@code true} if all tasks have completed following shut down
1177 */
1178 public boolean isTerminated() {
1179 return runStateOf(runControl) == TERMINATED;
1180 }
1181
1182 /**
1183 * Returns {@code true} if the process of termination has
1184 * commenced but not yet completed. This method may be useful for
1185 * debugging. A return of {@code true} reported a sufficient
1186 * period after shutdown may indicate that submitted tasks have
1187 * ignored or suppressed interruption, causing this executor not
1188 * to properly terminate.
1189 *
1190 * @return {@code true} if terminating but not yet terminated
1191 */
1192 public boolean isTerminating() {
1193 return runStateOf(runControl) == TERMINATING;
1194 }
1195
1196 /**
1197 * Returns {@code true} if this pool has been shut down.
1198 *
1199 * @return {@code true} if this pool has been shut down
1200 */
1201 public boolean isShutdown() {
1202 return runStateOf(runControl) >= SHUTDOWN;
1203 }
1204
1205 /**
1206 * Returns true if pool is not terminating or terminated.
1207 * Used internally to suppress execution when terminating.
1208 */
1209 final boolean isProcessingTasks() {
1210 return runStateOf(runControl) < TERMINATING;
1211 }
1212
1213 /**
1214 * Blocks until all tasks have completed execution after a shutdown
1215 * request, or the timeout occurs, or the current thread is
1216 * interrupted, whichever happens first.
1217 *
1218 * @param timeout the maximum time to wait
1219 * @param unit the time unit of the timeout argument
1220 * @return {@code true} if this executor terminated and
1221 * {@code false} if the timeout elapsed before termination
1222 * @throws InterruptedException if interrupted while waiting
1223 */
1224 public boolean awaitTermination(long timeout, TimeUnit unit)
1225 throws InterruptedException {
1226 long nanos = unit.toNanos(timeout);
1227 final ReentrantLock lock = this.workerLock;
1228 lock.lock();
1229 try {
1230 for (;;) {
1231 if (isTerminated())
1232 return true;
1233 if (nanos <= 0)
1234 return false;
1235 nanos = termination.awaitNanos(nanos);
1236 }
1237 } finally {
1238 lock.unlock();
1239 }
1240 }
1241
1242 // Shutdown and termination support
1243
1244 /**
1245 * Callback from terminating worker. Nulls out the corresponding
1246 * workers slot, and if terminating, tries to terminate; else
1247 * tries to shrink workers array.
1248 *
1249 * @param w the worker
1250 */
1251 final void workerTerminated(ForkJoinWorkerThread w) {
1252 updateStealCount(w);
1253 updateWorkerCount(-1);
1254 final ReentrantLock lock = this.workerLock;
1255 lock.lock();
1256 try {
1257 ForkJoinWorkerThread[] ws = workers;
1258 if (ws != null) {
1259 int idx = w.poolIndex;
1260 if (idx >= 0 && idx < ws.length && ws[idx] == w)
1261 ws[idx] = null;
1262 if (totalCountOf(workerCounts) == 0) {
1263 terminate(); // no-op if already terminating
1264 transitionRunStateTo(TERMINATED);
1265 termination.signalAll();
1266 }
1267 else if (isProcessingTasks()) {
1268 tryShrinkWorkerArray();
1269 tryResumeSpare(true); // allow replacement
1270 }
1271 }
1272 } finally {
1273 lock.unlock();
1274 }
1275 signalIdleWorkers();
1276 }
1277
1278 /**
1279 * Initiates termination.
1280 */
1281 private void terminate() {
1282 if (transitionRunStateTo(TERMINATING)) {
1283 stopAllWorkers();
1284 resumeAllSpares();
1285 signalIdleWorkers();
1286 cancelQueuedSubmissions();
1287 cancelQueuedWorkerTasks();
1288 interruptUnterminatedWorkers();
1289 signalIdleWorkers(); // resignal after interrupt
1290 }
1291 }
1292
1293 /**
1294 * Possibly terminates when on shutdown state.
1295 */
1296 private void terminateOnShutdown() {
1297 if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1298 terminate();
1299 }
1300
1301 /**
1302 * Clears out and cancels submissions.
1303 */
1304 private void cancelQueuedSubmissions() {
1305 ForkJoinTask<?> task;
1306 while ((task = pollSubmission()) != null)
1307 task.cancel(false);
1308 }
1309
1310 /**
1311 * Cleans out worker queues.
1312 */
1313 private void cancelQueuedWorkerTasks() {
1314 final ReentrantLock lock = this.workerLock;
1315 lock.lock();
1316 try {
1317 ForkJoinWorkerThread[] ws = workers;
1318 if (ws != null) {
1319 for (int i = 0; i < ws.length; ++i) {
1320 ForkJoinWorkerThread t = ws[i];
1321 if (t != null)
1322 t.cancelTasks();
1323 }
1324 }
1325 } finally {
1326 lock.unlock();
1327 }
1328 }
1329
1330 /**
1331 * Sets each worker's status to terminating. Requires lock to avoid
1332 * conflicts with add/remove.
1333 */
1334 private void stopAllWorkers() {
1335 final ReentrantLock lock = this.workerLock;
1336 lock.lock();
1337 try {
1338 ForkJoinWorkerThread[] ws = workers;
1339 if (ws != null) {
1340 for (int i = 0; i < ws.length; ++i) {
1341 ForkJoinWorkerThread t = ws[i];
1342 if (t != null)
1343 t.shutdownNow();
1344 }
1345 }
1346 } finally {
1347 lock.unlock();
1348 }
1349 }
1350
1351 /**
1352 * Interrupts all unterminated workers. This is not required for
1353 * sake of internal control, but may help unstick user code during
1354 * shutdown.
1355 */
1356 private void interruptUnterminatedWorkers() {
1357 final ReentrantLock lock = this.workerLock;
1358 lock.lock();
1359 try {
1360 ForkJoinWorkerThread[] ws = workers;
1361 if (ws != null) {
1362 for (int i = 0; i < ws.length; ++i) {
1363 ForkJoinWorkerThread t = ws[i];
1364 if (t != null && !t.isTerminated()) {
1365 try {
1366 t.interrupt();
1367 } catch (SecurityException ignore) {
1368 }
1369 }
1370 }
1371 }
1372 } finally {
1373 lock.unlock();
1374 }
1375 }
1376
1377
1378 /*
1379 * Nodes for event barrier to manage idle threads. Queue nodes
1380 * are basic Treiber stack nodes, also used for spare stack.
1381 *
1382 * The event barrier has an event count and a wait queue (actually
1383 * a Treiber stack). Workers are enabled to look for work when
1384 * the eventCount is incremented. If they fail to find work, they
1385 * may wait for next count. Upon release, threads help others wake
1386 * up.
1387 *
1388 * Synchronization events occur only in enough contexts to
1389 * maintain overall liveness:
1390 *
1391 * - Submission of a new task to the pool
1392 * - Resizes or other changes to the workers array
1393 * - pool termination
1394 * - A worker pushing a task on an empty queue
1395 *
1396 * The case of pushing a task occurs often enough, and is heavy
1397 * enough compared to simple stack pushes, to require special
1398 * handling: Method signalWork returns without advancing count if
1399 * the queue appears to be empty. This would ordinarily result in
1400 * races causing some queued waiters not to be woken up. To avoid
1401 * this, the first worker enqueued in method sync (see
1402 * syncIsReleasable) rescans for tasks after being enqueued, and
1403 * helps signal if any are found. This works well because the
1404 * worker has nothing better to do, and so might as well help
1405 * alleviate the overhead and contention on the threads actually
1406 * doing work. Also, since event counts increments on task
1407 * availability exist to maintain liveness (rather than to force
1408 * refreshes etc), it is OK for callers to exit early if
1409 * contending with another signaller.
1410 */
1411 static final class WaitQueueNode {
1412 WaitQueueNode next; // only written before enqueued
1413 volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1414 final long count; // unused for spare stack
1415
1416 WaitQueueNode(long c, ForkJoinWorkerThread w) {
1417 count = c;
1418 thread = w;
1419 }
1420
1421 /**
1422 * Wakes up waiter, returning false if known to already
1423 */
1424 boolean signal() {
1425 ForkJoinWorkerThread t = thread;
1426 if (t == null)
1427 return false;
1428 thread = null;
1429 LockSupport.unpark(t);
1430 return true;
1431 }
1432
1433 /**
1434 * Awaits release on sync.
1435 */
1436 void awaitSyncRelease(ForkJoinPool p) {
1437 while (thread != null && !p.syncIsReleasable(this))
1438 LockSupport.park(this);
1439 }
1440
1441 /**
1442 * Awaits resumption as spare.
1443 */
1444 void awaitSpareRelease() {
1445 while (thread != null) {
1446 if (!Thread.interrupted())
1447 LockSupport.park(this);
1448 }
1449 }
1450 }
1451
1452 /**
1453 * Ensures that no thread is waiting for count to advance from the
1454 * current value of eventCount read on entry to this method, by
1455 * releasing waiting threads if necessary.
1456 *
1457 * @return the count
1458 */
1459 final long ensureSync() {
1460 long c = eventCount;
1461 WaitQueueNode q;
1462 while ((q = syncStack) != null && q.count < c) {
1463 if (casBarrierStack(q, null)) {
1464 do {
1465 q.signal();
1466 } while ((q = q.next) != null);
1467 break;
1468 }
1469 }
1470 return c;
1471 }
1472
1473 /**
1474 * Increments event count and releases waiting threads.
1475 */
1476 private void signalIdleWorkers() {
1477 long c;
1478 do {} while (!casEventCount(c = eventCount, c+1));
1479 ensureSync();
1480 }
1481
1482 /**
1483 * Signals threads waiting to poll a task. Because method sync
1484 * rechecks availability, it is OK to only proceed if queue
1485 * appears to be non-empty, and OK to skip under contention to
1486 * increment count (since some other thread succeeded).
1487 */
1488 final void signalWork() {
1489 long c;
1490 WaitQueueNode q;
1491 if (syncStack != null &&
1492 casEventCount(c = eventCount, c+1) &&
1493 (((q = syncStack) != null && q.count <= c) &&
1494 (!casBarrierStack(q, q.next) || !q.signal())))
1495 ensureSync();
1496 }
1497
1498 /**
1499 * Waits until event count advances from last value held by
1500 * caller, or if excess threads, caller is resumed as spare, or
1501 * caller or pool is terminating. Updates caller's event on exit.
1502 *
1503 * @param w the calling worker thread
1504 */
1505 final void sync(ForkJoinWorkerThread w) {
1506 updateStealCount(w); // Transfer w's count while it is idle
1507
1508 while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1509 long prev = w.lastEventCount;
1510 WaitQueueNode node = null;
1511 WaitQueueNode h;
1512 while (eventCount == prev &&
1513 ((h = syncStack) == null || h.count == prev)) {
1514 if (node == null)
1515 node = new WaitQueueNode(prev, w);
1516 if (casBarrierStack(node.next = h, node)) {
1517 node.awaitSyncRelease(this);
1518 break;
1519 }
1520 }
1521 long ec = ensureSync();
1522 if (ec != prev) {
1523 w.lastEventCount = ec;
1524 break;
1525 }
1526 }
1527 }
1528
1529 /**
1530 * Returns {@code true} if worker waiting on sync can proceed:
1531 * - on signal (thread == null)
1532 * - on event count advance (winning race to notify vs signaller)
1533 * - on interrupt
1534 * - if the first queued node, we find work available
1535 * If node was not signalled and event count not advanced on exit,
1536 * then we also help advance event count.
1537 *
1538 * @return {@code true} if node can be released
1539 */
1540 final boolean syncIsReleasable(WaitQueueNode node) {
1541 long prev = node.count;
1542 if (!Thread.interrupted() && node.thread != null &&
1543 (node.next != null ||
1544 !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1545 eventCount == prev)
1546 return false;
1547 if (node.thread != null) {
1548 node.thread = null;
1549 long ec = eventCount;
1550 if (prev <= ec) // help signal
1551 casEventCount(ec, ec+1);
1552 }
1553 return true;
1554 }
1555
1556 /**
1557 * Returns {@code true} if a new sync event occurred since last
1558 * call to sync or this method, if so, updating caller's count.
1559 */
1560 final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1561 long lc = w.lastEventCount;
1562 long ec = ensureSync();
1563 if (ec == lc)
1564 return false;
1565 w.lastEventCount = ec;
1566 return true;
1567 }
1568
1569 // Parallelism maintenance
1570
1571 /**
1572 * Decrements running count; if too low, adds spare.
1573 *
1574 * Conceptually, all we need to do here is add or resume a
1575 * spare thread when one is about to block (and remove or
1576 * suspend it later when unblocked -- see suspendIfSpare).
1577 * However, implementing this idea requires coping with
1578 * several problems: we have imperfect information about the
1579 * states of threads. Some count updates can and usually do
1580 * lag run state changes, despite arrangements to keep them
1581 * accurate (for example, when possible, updating counts
1582 * before signalling or resuming), especially when running on
1583 * dynamic JVMs that don't optimize the infrequent paths that
1584 * update counts. Generating too many threads can make these
1585 * problems become worse, because excess threads are more
1586 * likely to be context-switched with others, slowing them all
1587 * down, especially if there is no work available, so all are
1588 * busy scanning or idling. Also, excess spare threads can
1589 * only be suspended or removed when they are idle, not
1590 * immediately when they aren't needed. So adding threads will
1591 * raise parallelism level for longer than necessary. Also,
1592 * FJ applications often encounter highly transient peaks when
1593 * many threads are blocked joining, but for less time than it
1594 * takes to create or resume spares.
1595 *
1596 * @param joinMe if non-null, return early if done
1597 * @param maintainParallelism if true, try to stay within
1598 * target counts, else create only to avoid starvation
1599 * @return true if joinMe known to be done
1600 */
1601 final boolean preJoin(ForkJoinTask<?> joinMe,
1602 boolean maintainParallelism) {
1603 maintainParallelism &= maintainsParallelism; // overrride
1604 boolean dec = false; // true when running count decremented
1605 while (spareStack == null || !tryResumeSpare(dec)) {
1606 int counts = workerCounts;
1607 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1608 if (!needSpare(counts, maintainParallelism))
1609 break;
1610 if (joinMe.status < 0)
1611 return true;
1612 if (tryAddSpare(counts))
1613 break;
1614 }
1615 }
1616 return false;
1617 }
1618
1619 /**
1620 * Same idea as preJoin
1621 */
1622 final boolean preBlock(ManagedBlocker blocker,
1623 boolean maintainParallelism) {
1624 maintainParallelism &= maintainsParallelism;
1625 boolean dec = false;
1626 while (spareStack == null || !tryResumeSpare(dec)) {
1627 int counts = workerCounts;
1628 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1629 if (!needSpare(counts, maintainParallelism))
1630 break;
1631 if (blocker.isReleasable())
1632 return true;
1633 if (tryAddSpare(counts))
1634 break;
1635 }
1636 }
1637 return false;
1638 }
1639
1640 /**
1641 * Returns {@code true} if a spare thread appears to be needed.
1642 * If maintaining parallelism, returns true when the deficit in
1643 * running threads is more than the surplus of total threads, and
1644 * there is apparently some work to do. This self-limiting rule
1645 * means that the more threads that have already been added, the
1646 * less parallelism we will tolerate before adding another.
1647 *
1648 * @param counts current worker counts
1649 * @param maintainParallelism try to maintain parallelism
1650 */
1651 private boolean needSpare(int counts, boolean maintainParallelism) {
1652 int ps = parallelism;
1653 int rc = runningCountOf(counts);
1654 int tc = totalCountOf(counts);
1655 int runningDeficit = ps - rc;
1656 int totalSurplus = tc - ps;
1657 return (tc < maxPoolSize &&
1658 (rc == 0 || totalSurplus < 0 ||
1659 (maintainParallelism &&
1660 runningDeficit > totalSurplus &&
1661 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1662 }
1663
1664 /**
1665 * Adds a spare worker if lock available and no more than the
1666 * expected numbers of threads exist.
1667 *
1668 * @return true if successful
1669 */
1670 private boolean tryAddSpare(int expectedCounts) {
1671 final ReentrantLock lock = this.workerLock;
1672 int expectedRunning = runningCountOf(expectedCounts);
1673 int expectedTotal = totalCountOf(expectedCounts);
1674 boolean success = false;
1675 boolean locked = false;
1676 // confirm counts while locking; CAS after obtaining lock
1677 try {
1678 for (;;) {
1679 int s = workerCounts;
1680 int tc = totalCountOf(s);
1681 int rc = runningCountOf(s);
1682 if (rc > expectedRunning || tc > expectedTotal)
1683 break;
1684 if (!locked && !(locked = lock.tryLock()))
1685 break;
1686 if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1687 createAndStartSpare(tc);
1688 success = true;
1689 break;
1690 }
1691 }
1692 } finally {
1693 if (locked)
1694 lock.unlock();
1695 }
1696 return success;
1697 }
1698
1699 /**
1700 * Adds the kth spare worker. On entry, pool counts are already
1701 * adjusted to reflect addition.
1702 */
1703 private void createAndStartSpare(int k) {
1704 ForkJoinWorkerThread w = null;
1705 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1706 int len = ws.length;
1707 // Probably, we can place at slot k. If not, find empty slot
1708 if (k < len && ws[k] != null) {
1709 for (k = 0; k < len && ws[k] != null; ++k)
1710 ;
1711 }
1712 if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1713 ws[k] = w;
1714 w.start();
1715 }
1716 else
1717 updateWorkerCount(-1); // adjust on failure
1718 signalIdleWorkers();
1719 }
1720
1721 /**
1722 * Suspends calling thread w if there are excess threads. Called
1723 * only from sync. Spares are enqueued in a Treiber stack using
1724 * the same WaitQueueNodes as barriers. They are resumed mainly
1725 * in preJoin, but are also woken on pool events that require all
1726 * threads to check run state.
1727 *
1728 * @param w the caller
1729 */
1730 private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1731 WaitQueueNode node = null;
1732 int s;
1733 while (parallelism < runningCountOf(s = workerCounts)) {
1734 if (node == null)
1735 node = new WaitQueueNode(0, w);
1736 if (casWorkerCounts(s, s-1)) { // representation-dependent
1737 // push onto stack
1738 do {} while (!casSpareStack(node.next = spareStack, node));
1739 // block until released by resumeSpare
1740 node.awaitSpareRelease();
1741 return true;
1742 }
1743 }
1744 return false;
1745 }
1746
1747 /**
1748 * Tries to pop and resume a spare thread.
1749 *
1750 * @param updateCount if true, increment running count on success
1751 * @return true if successful
1752 */
1753 private boolean tryResumeSpare(boolean updateCount) {
1754 WaitQueueNode q;
1755 while ((q = spareStack) != null) {
1756 if (casSpareStack(q, q.next)) {
1757 if (updateCount)
1758 updateRunningCount(1);
1759 q.signal();
1760 return true;
1761 }
1762 }
1763 return false;
1764 }
1765
1766 /**
1767 * Pops and resumes all spare threads. Same idea as ensureSync.
1768 *
1769 * @return true if any spares released
1770 */
1771 private boolean resumeAllSpares() {
1772 WaitQueueNode q;
1773 while ( (q = spareStack) != null) {
1774 if (casSpareStack(q, null)) {
1775 do {
1776 updateRunningCount(1);
1777 q.signal();
1778 } while ((q = q.next) != null);
1779 return true;
1780 }
1781 }
1782 return false;
1783 }
1784
1785 /**
1786 * Pops and shuts down excessive spare threads. Call only while
1787 * holding lock. This is not guaranteed to eliminate all excess
1788 * threads, only those suspended as spares, which are the ones
1789 * unlikely to be needed in the future.
1790 */
1791 private void trimSpares() {
1792 int surplus = totalCountOf(workerCounts) - parallelism;
1793 WaitQueueNode q;
1794 while (surplus > 0 && (q = spareStack) != null) {
1795 if (casSpareStack(q, null)) {
1796 do {
1797 updateRunningCount(1);
1798 ForkJoinWorkerThread w = q.thread;
1799 if (w != null && surplus > 0 &&
1800 runningCountOf(workerCounts) > 0 && w.shutdown())
1801 --surplus;
1802 q.signal();
1803 } while ((q = q.next) != null);
1804 }
1805 }
1806 }
1807
1808 /**
1809 * Interface for extending managed parallelism for tasks running
1810 * in {@link ForkJoinPool}s.
1811 *
1812 * <p>A {@code ManagedBlocker} provides two methods.
1813 * Method {@code isReleasable} must return {@code true} if
1814 * blocking is not necessary. Method {@code block} blocks the
1815 * current thread if necessary (perhaps internally invoking
1816 * {@code isReleasable} before actually blocking).
1817 *
1818 * <p>For example, here is a ManagedBlocker based on a
1819 * ReentrantLock:
1820 * <pre> {@code
1821 * class ManagedLocker implements ManagedBlocker {
1822 * final ReentrantLock lock;
1823 * boolean hasLock = false;
1824 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1825 * public boolean block() {
1826 * if (!hasLock)
1827 * lock.lock();
1828 * return true;
1829 * }
1830 * public boolean isReleasable() {
1831 * return hasLock || (hasLock = lock.tryLock());
1832 * }
1833 * }}</pre>
1834 */
1835 public static interface ManagedBlocker {
1836 /**
1837 * Possibly blocks the current thread, for example waiting for
1838 * a lock or condition.
1839 *
1840 * @return {@code true} if no additional blocking is necessary
1841 * (i.e., if isReleasable would return true)
1842 * @throws InterruptedException if interrupted while waiting
1843 * (the method is not required to do so, but is allowed to)
1844 */
1845 boolean block() throws InterruptedException;
1846
1847 /**
1848 * Returns {@code true} if blocking is unnecessary.
1849 */
1850 boolean isReleasable();
1851 }
1852
1853 /**
1854 * Blocks in accord with the given blocker. If the current thread
1855 * is a {@link ForkJoinWorkerThread}, this method possibly
1856 * arranges for a spare thread to be activated if necessary to
1857 * ensure parallelism while the current thread is blocked.
1858 *
1859 * <p>If {@code maintainParallelism} is {@code true} and the pool
1860 * supports it ({@link #getMaintainsParallelism}), this method
1861 * attempts to maintain the pool's nominal parallelism. Otherwise
1862 * it activates a thread only if necessary to avoid complete
1863 * starvation. This option may be preferable when blockages use
1864 * timeouts, or are almost always brief.
1865 *
1866 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1867 * behaviorally equivalent to
1868 * <pre> {@code
1869 * while (!blocker.isReleasable())
1870 * if (blocker.block())
1871 * return;
1872 * }</pre>
1873 *
1874 * If the caller is a {@code ForkJoinTask}, then the pool may
1875 * first be expanded to ensure parallelism, and later adjusted.
1876 *
1877 * @param blocker the blocker
1878 * @param maintainParallelism if {@code true} and supported by
1879 * this pool, attempt to maintain the pool's nominal parallelism;
1880 * otherwise activate a thread only if necessary to avoid
1881 * complete starvation.
1882 * @throws InterruptedException if blocker.block did so
1883 */
1884 public static void managedBlock(ManagedBlocker blocker,
1885 boolean maintainParallelism)
1886 throws InterruptedException {
1887 Thread t = Thread.currentThread();
1888 ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1889 ((ForkJoinWorkerThread) t).pool : null);
1890 if (!blocker.isReleasable()) {
1891 try {
1892 if (pool == null ||
1893 !pool.preBlock(blocker, maintainParallelism))
1894 awaitBlocker(blocker);
1895 } finally {
1896 if (pool != null)
1897 pool.updateRunningCount(1);
1898 }
1899 }
1900 }
1901
1902 private static void awaitBlocker(ManagedBlocker blocker)
1903 throws InterruptedException {
1904 do {} while (!blocker.isReleasable() && !blocker.block());
1905 }
1906
1907 // AbstractExecutorService overrides. These rely on undocumented
1908 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1909 // implement RunnableFuture.
1910
1911 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1912 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1913 }
1914
1915 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1916 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1917 }
1918
1919 // Unsafe mechanics
1920
1921 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1922 private static final long eventCountOffset =
1923 objectFieldOffset("eventCount", ForkJoinPool.class);
1924 private static final long workerCountsOffset =
1925 objectFieldOffset("workerCounts", ForkJoinPool.class);
1926 private static final long runControlOffset =
1927 objectFieldOffset("runControl", ForkJoinPool.class);
1928 private static final long syncStackOffset =
1929 objectFieldOffset("syncStack",ForkJoinPool.class);
1930 private static final long spareStackOffset =
1931 objectFieldOffset("spareStack", ForkJoinPool.class);
1932
1933 private boolean casEventCount(long cmp, long val) {
1934 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1935 }
1936 private boolean casWorkerCounts(int cmp, int val) {
1937 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1938 }
1939 private boolean casRunControl(int cmp, int val) {
1940 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1941 }
1942 private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1943 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1944 }
1945 private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1946 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1947 }
1948
1949 private static long objectFieldOffset(String field, Class<?> klazz) {
1950 try {
1951 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1952 } catch (NoSuchFieldException e) {
1953 // Convert Exception to corresponding Error
1954 NoSuchFieldError error = new NoSuchFieldError(field);
1955 error.initCause(e);
1956 throw error;
1957 }
1958 }
1959 }