ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
Revision: 1.50
Committed: Fri Dec 4 12:09:46 2009 UTC (14 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.49: +67 -94 lines
Log Message:
Fix spare mis-identification; plus steps to simplify eventCount sync

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8
9 import java.util.concurrent.*;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.LockSupport;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 /**
23 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24 * A {@code ForkJoinPool} provides the entry point for submissions
25 * from non-{@code ForkJoinTask}s, as well as management and
26 * monitoring operations.
27 *
28 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29 * ExecutorService} mainly by virtue of employing
30 * <em>work-stealing</em>: all threads in the pool attempt to find and
31 * execute subtasks created by other active tasks (eventually blocking
32 * waiting for work if none exist). This enables efficient processing
33 * when most tasks spawn other subtasks (as do most {@code
34 * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
35 * execution of some plain {@code Runnable}- or {@code Callable}-
36 * based activities along with {@code ForkJoinTask}s. When setting
37 * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
38 * also be appropriate for use with fine-grained tasks of any form
39 * that are never joined. Otherwise, other {@code ExecutorService}
40 * implementations are typically more appropriate choices.
41 *
42 * <p>A {@code ForkJoinPool} is constructed with a given target
43 * parallelism level; by default, equal to the number of available
44 * processors. Unless configured otherwise via {@link
45 * #setMaintainsParallelism}, the pool attempts to maintain this
46 * number of active (or available) threads by dynamically adding,
47 * suspending, or resuming internal worker threads, even if some tasks
48 * are stalled waiting to join others. However, no such adjustments
49 * are performed in the face of blocked IO or other unmanaged
50 * synchronization. The nested {@link ManagedBlocker} interface
51 * enables extension of the kinds of synchronization accommodated.
52 * The target parallelism level may also be changed dynamically
53 * ({@link #setParallelism}). The total number of threads may be
54 * limited using method {@link #setMaximumPoolSize}, in which case it
55 * may become possible for the activities of a pool to stall due to
56 * the lack of available threads to process new tasks.
57 *
58 * <p>In addition to execution and lifecycle control methods, this
59 * class provides status check methods (for example
60 * {@link #getStealCount}) that are intended to aid in developing,
61 * tuning, and monitoring fork/join applications. Also, method
62 * {@link #toString} returns indications of pool state in a
63 * convenient form for informal monitoring.
64 *
65 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
66 * used for all parallel task execution in a program or subsystem.
67 * Otherwise, use would not usually outweigh the construction and
68 * bookkeeping overhead of creating a large set of threads. For
69 * example, a common pool could be used for the {@code SortTasks}
70 * illustrated in {@link RecursiveAction}. Because {@code
71 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
72 * daemon} mode, there is typically no need to explicitly {@link
73 * #shutdown} such a pool upon program exit.
74 *
75 * <pre>
76 * static final ForkJoinPool mainPool = new ForkJoinPool();
77 * ...
78 * public void sort(long[] array) {
79 * mainPool.invoke(new SortTask(array, 0, array.length));
80 * }
81 * </pre>
82 *
83 * <p><b>Implementation notes</b>: This implementation restricts the
84 * maximum number of running threads to 32767. Attempts to create
85 * pools with greater than the maximum number result in
86 * {@code IllegalArgumentException}.
87 *
88 * <p>This implementation rejects submitted tasks (that is, by throwing
89 * {@link RejectedExecutionException}) only when the pool is shut down.
90 *
91 * @since 1.7
92 * @author Doug Lea
93 */
94 public class ForkJoinPool extends AbstractExecutorService {
95
96 /*
97 * See the extended comments interspersed below for design,
98 * rationale, and walkthroughs.
99 */
100
101 /** Mask for packing and unpacking shorts */
102 private static final int shortMask = 0xffff;
103
104 /** Max pool size -- must be a power of two minus 1 */
105 private static final int MAX_THREADS = 0x7FFF;
106
107 /**
108 * Factory for creating new {@link ForkJoinWorkerThread}s.
109 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
110 * for {@code ForkJoinWorkerThread} subclasses that extend base
111 * functionality or initialize threads with different contexts.
112 */
113 public static interface ForkJoinWorkerThreadFactory {
114 /**
115 * Returns a new worker thread operating in the given pool.
116 *
117 * @param pool the pool this thread works in
118 * @throws NullPointerException if the pool is null
119 */
120 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121 }
122
123 /**
124 * Default ForkJoinWorkerThreadFactory implementation; creates a
125 * new ForkJoinWorkerThread.
126 */
127 static class DefaultForkJoinWorkerThreadFactory
128 implements ForkJoinWorkerThreadFactory {
129 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
130 try {
131 return new ForkJoinWorkerThread(pool);
132 } catch (OutOfMemoryError oom) {
133 return null;
134 }
135 }
136 }
137
138 /**
139 * Creates a new ForkJoinWorkerThread. This factory is used unless
140 * overridden in ForkJoinPool constructors.
141 */
142 public static final ForkJoinWorkerThreadFactory
143 defaultForkJoinWorkerThreadFactory =
144 new DefaultForkJoinWorkerThreadFactory();
145
146 /**
147 * Permission required for callers of methods that may start or
148 * kill threads.
149 */
150 private static final RuntimePermission modifyThreadPermission =
151 new RuntimePermission("modifyThread");
152
153 /**
154 * If there is a security manager, makes sure caller has
155 * permission to modify threads.
156 */
157 private static void checkPermission() {
158 SecurityManager security = System.getSecurityManager();
159 if (security != null)
160 security.checkPermission(modifyThreadPermission);
161 }
162
163 /**
164 * Generator for assigning sequence numbers as pool names.
165 */
166 private static final AtomicInteger poolNumberGenerator =
167 new AtomicInteger();
168
169 /**
170 * Array holding all worker threads in the pool. Initialized upon
171 * first use. Array size must be a power of two. Updates and
172 * replacements are protected by workerLock, but it is always kept
173 * in a consistent enough state to be randomly accessed without
174 * locking by workers performing work-stealing.
175 */
176 volatile ForkJoinWorkerThread[] workers;
177
178 /**
179 * Lock protecting access to workers.
180 */
181 private final ReentrantLock workerLock;
182
183 /**
184 * Condition for awaitTermination.
185 */
186 private final Condition termination;
187
188 /**
189 * The uncaught exception handler used when any worker
190 * abruptly terminates
191 */
192 private Thread.UncaughtExceptionHandler ueh;
193
194 /**
195 * Creation factory for worker threads.
196 */
197 private final ForkJoinWorkerThreadFactory factory;
198
199 /**
200 * Head of stack of threads that were created to maintain
201 * parallelism when other threads blocked, but have since
202 * suspended when the parallelism level rose.
203 */
204 private volatile WaitQueueNode spareStack;
205
206 /**
207 * Sum of per-thread steal counts, updated only when threads are
208 * idle or terminating.
209 */
210 private final AtomicLong stealCount;
211
212 /**
213 * Queue for external submissions.
214 */
215 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
216
217 /**
218 * Head of Treiber stack for barrier sync. See below for explanation.
219 */
220 private volatile WaitQueueNode syncStack;
221
222 /**
223 * The count for event barrier
224 */
225 private volatile long eventCount;
226
227 /**
228 * Pool number, just for assigning useful names to worker threads
229 */
230 private final int poolNumber;
231
232 /**
233 * The maximum allowed pool size
234 */
235 private volatile int maxPoolSize;
236
237 /**
238 * The desired parallelism level, updated only under workerLock.
239 */
240 private volatile int parallelism;
241
242 /**
243 * True if use local fifo, not default lifo, for local polling
244 */
245 private volatile boolean locallyFifo;
246
247 /**
248 * Holds number of total (i.e., created and not yet terminated)
249 * and running (i.e., not blocked on joins or other managed sync)
250 * threads, packed into one int to ensure consistent snapshot when
251 * making decisions about creating and suspending spare
252 * threads. Updated only by CAS. Note: CASes in
253 * updateRunningCount and preJoin assume that running active count
254 * is in low word, so need to be modified if this changes.
255 */
256 private volatile int workerCounts;
257
258 private static int totalCountOf(int s) { return s >>> 16; }
259 private static int runningCountOf(int s) { return s & shortMask; }
260 private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
261
262 /**
263 * Adds delta (which may be negative) to running count. This must
264 * be called before (with negative arg) and after (with positive)
265 * any managed synchronization (i.e., mainly, joins).
266 *
267 * @param delta the number to add
268 */
269 final void updateRunningCount(int delta) {
270 int s;
271 do {} while (!casWorkerCounts(s = workerCounts, s + delta));
272 }
273
274 /**
275 * Adds delta (which may be negative) to both total and running
276 * count. This must be called upon creation and termination of
277 * worker threads.
278 *
279 * @param delta the number to add
280 */
281 private void updateWorkerCount(int delta) {
282 int d = delta + (delta << 16); // add to both lo and hi parts
283 int s;
284 do {} while (!casWorkerCounts(s = workerCounts, s + d));
285 }
286
287 /**
288 * Lifecycle control. High word contains runState, low word
289 * contains the number of workers that are (probably) executing
290 * tasks. This value is atomically incremented before a worker
291 * gets a task to run, and decremented when worker has no tasks
292 * and cannot find any. These two fields are bundled together to
293 * support correct termination triggering. Note: activeCount
294 * CAS'es cheat by assuming active count is in low word, so need
295 * to be modified if this changes
296 */
297 private volatile int runControl;
298
299 // RunState values. Order among values matters
300 private static final int RUNNING = 0;
301 private static final int SHUTDOWN = 1;
302 private static final int TERMINATING = 2;
303 private static final int TERMINATED = 3;
304
305 private static int runStateOf(int c) { return c >>> 16; }
306 private static int activeCountOf(int c) { return c & shortMask; }
307 private static int runControlFor(int r, int a) { return (r << 16) + a; }
308
309 /**
310 * Tries incrementing active count; fails on contention.
311 * Called by workers before/during executing tasks.
312 *
313 * @return true on success
314 */
315 final boolean tryIncrementActiveCount() {
316 int c = runControl;
317 return casRunControl(c, c+1);
318 }
319
320 /**
321 * Tries decrementing active count; fails on contention.
322 * Possibly triggers termination on success.
323 * Called by workers when they can't find tasks.
324 *
325 * @return true on success
326 */
327 final boolean tryDecrementActiveCount() {
328 int c = runControl;
329 int nextc = c - 1;
330 if (!casRunControl(c, nextc))
331 return false;
332 if (canTerminateOnShutdown(nextc))
333 terminateOnShutdown();
334 return true;
335 }
336
337 /**
338 * Returns {@code true} if argument represents zero active count
339 * and nonzero runstate, which is the triggering condition for
340 * terminating on shutdown.
341 */
342 private static boolean canTerminateOnShutdown(int c) {
343 // i.e. least bit is nonzero runState bit
344 return ((c & -c) >>> 16) != 0;
345 }
346
347 /**
348 * Transition run state to at least the given state. Return true
349 * if not already at least given state.
350 */
351 private boolean transitionRunStateTo(int state) {
352 for (;;) {
353 int c = runControl;
354 if (runStateOf(c) >= state)
355 return false;
356 if (casRunControl(c, runControlFor(state, activeCountOf(c))))
357 return true;
358 }
359 }
360
361 /**
362 * Controls whether to add spares to maintain parallelism
363 */
364 private volatile boolean maintainsParallelism;
365
366 // Constructors
367
368 /**
369 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
370 * java.lang.Runtime#availableProcessors}, and using the {@linkplain
371 * #defaultForkJoinWorkerThreadFactory default thread factory}.
372 *
373 * @throws SecurityException if a security manager exists and
374 * the caller is not permitted to modify threads
375 * because it does not hold {@link
376 * java.lang.RuntimePermission}{@code ("modifyThread")}
377 */
378 public ForkJoinPool() {
379 this(Runtime.getRuntime().availableProcessors(),
380 defaultForkJoinWorkerThreadFactory);
381 }
382
383 /**
384 * Creates a {@code ForkJoinPool} with the indicated parallelism
385 * level and using the {@linkplain
386 * #defaultForkJoinWorkerThreadFactory default thread factory}.
387 *
388 * @param parallelism the parallelism level
389 * @throws IllegalArgumentException if parallelism less than or
390 * equal to zero, or greater than implementation limit
391 * @throws SecurityException if a security manager exists and
392 * the caller is not permitted to modify threads
393 * because it does not hold {@link
394 * java.lang.RuntimePermission}{@code ("modifyThread")}
395 */
396 public ForkJoinPool(int parallelism) {
397 this(parallelism, defaultForkJoinWorkerThreadFactory);
398 }
399
400 /**
401 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
402 * java.lang.Runtime#availableProcessors}, and using the given
403 * thread factory.
404 *
405 * @param factory the factory for creating new threads
406 * @throws NullPointerException if the factory is null
407 * @throws SecurityException if a security manager exists and
408 * the caller is not permitted to modify threads
409 * because it does not hold {@link
410 * java.lang.RuntimePermission}{@code ("modifyThread")}
411 */
412 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
413 this(Runtime.getRuntime().availableProcessors(), factory);
414 }
415
416 /**
417 * Creates a {@code ForkJoinPool} with the given parallelism and
418 * thread factory.
419 *
420 * @param parallelism the parallelism level
421 * @param factory the factory for creating new threads
422 * @throws IllegalArgumentException if parallelism less than or
423 * equal to zero, or greater than implementation limit
424 * @throws NullPointerException if the factory is null
425 * @throws SecurityException if a security manager exists and
426 * the caller is not permitted to modify threads
427 * because it does not hold {@link
428 * java.lang.RuntimePermission}{@code ("modifyThread")}
429 */
430 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
431 if (parallelism <= 0 || parallelism > MAX_THREADS)
432 throw new IllegalArgumentException();
433 if (factory == null)
434 throw new NullPointerException();
435 checkPermission();
436 this.factory = factory;
437 this.parallelism = parallelism;
438 this.maxPoolSize = MAX_THREADS;
439 this.maintainsParallelism = true;
440 this.poolNumber = poolNumberGenerator.incrementAndGet();
441 this.workerLock = new ReentrantLock();
442 this.termination = workerLock.newCondition();
443 this.stealCount = new AtomicLong();
444 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
445 // worker array and workers are lazily constructed
446 }
447
448 /**
449 * Creates a new worker thread using factory.
450 *
451 * @param index the index to assign worker
452 * @return new worker, or null if factory failed
453 */
454 private ForkJoinWorkerThread createWorker(int index) {
455 Thread.UncaughtExceptionHandler h = ueh;
456 ForkJoinWorkerThread w = factory.newThread(this);
457 if (w != null) {
458 w.poolIndex = index;
459 w.setDaemon(true);
460 w.setAsyncMode(locallyFifo);
461 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
462 if (h != null)
463 w.setUncaughtExceptionHandler(h);
464 }
465 return w;
466 }
467
468 /**
469 * Returns a good size for worker array given pool size.
470 * Currently requires size to be a power of two.
471 */
472 private static int arraySizeFor(int poolSize) {
473 if (poolSize <= 1)
474 return 1;
475 // See Hackers Delight, sec 3.2
476 int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
477 c |= c >>> 1;
478 c |= c >>> 2;
479 c |= c >>> 4;
480 c |= c >>> 8;
481 c |= c >>> 16;
482 return c + 1;
483 }
484
485 /**
486 * Creates or resizes array if necessary to hold newLength.
487 * Call only under exclusion.
488 *
489 * @return the array
490 */
491 private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
492 ForkJoinWorkerThread[] ws = workers;
493 if (ws == null)
494 return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
495 else if (newLength > ws.length)
496 return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
497 else
498 return ws;
499 }
500
501 /**
502 * Tries to shrink workers into smaller array after one or more terminate.
503 */
504 private void tryShrinkWorkerArray() {
505 ForkJoinWorkerThread[] ws = workers;
506 if (ws != null) {
507 int len = ws.length;
508 int last = len - 1;
509 while (last >= 0 && ws[last] == null)
510 --last;
511 int newLength = arraySizeFor(last+1);
512 if (newLength < len)
513 workers = Arrays.copyOf(ws, newLength);
514 }
515 }
516
517 /**
518 * Initializes workers if necessary.
519 */
520 final void ensureWorkerInitialization() {
521 ForkJoinWorkerThread[] ws = workers;
522 if (ws == null) {
523 final ReentrantLock lock = this.workerLock;
524 lock.lock();
525 try {
526 ws = workers;
527 if (ws == null) {
528 int ps = parallelism;
529 updateWorkerCount(ps);
530 ws = ensureWorkerArrayCapacity(ps);
531 for (int i = 0; i < ps; ++i) {
532 ForkJoinWorkerThread w = createWorker(i);
533 if (w != null) {
534 ws[i] = w;
535 w.start();
536 }
537 else
538 updateWorkerCount(-1);
539 }
540 }
541 } finally {
542 lock.unlock();
543 }
544 }
545 }
546
547 /**
548 * Worker creation and startup for threads added via setParallelism.
549 */
550 private void createAndStartAddedWorkers() {
551 resumeAllSpares(); // Allow spares to convert to nonspare
552 int ps = parallelism;
553 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
554 int len = ws.length;
555 // Sweep through slots, to keep lowest indices most populated
556 int k = 0;
557 while (k < len) {
558 if (ws[k] != null) {
559 ++k;
560 continue;
561 }
562 int s = workerCounts;
563 int tc = totalCountOf(s);
564 int rc = runningCountOf(s);
565 if (rc >= ps || tc >= ps)
566 break;
567 if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
568 ForkJoinWorkerThread w = createWorker(k);
569 if (w != null) {
570 ws[k++] = w;
571 w.start();
572 }
573 else {
574 updateWorkerCount(-1); // back out on failed creation
575 break;
576 }
577 }
578 }
579 }
580
581 // Execution methods
582
583 /**
584 * Common code for execute, invoke and submit
585 */
586 private <T> void doSubmit(ForkJoinTask<T> task) {
587 if (task == null)
588 throw new NullPointerException();
589 if (isShutdown())
590 throw new RejectedExecutionException();
591 if (workers == null)
592 ensureWorkerInitialization();
593 submissionQueue.offer(task);
594 signalIdleWorkers();
595 }
596
597 /**
598 * Performs the given task, returning its result upon completion.
599 *
600 * @param task the task
601 * @return the task's result
602 * @throws NullPointerException if the task is null
603 * @throws RejectedExecutionException if the task cannot be
604 * scheduled for execution
605 */
606 public <T> T invoke(ForkJoinTask<T> task) {
607 doSubmit(task);
608 return task.join();
609 }
610
611 /**
612 * Arranges for (asynchronous) execution of the given task.
613 *
614 * @param task the task
615 * @throws NullPointerException if the task is null
616 * @throws RejectedExecutionException if the task cannot be
617 * scheduled for execution
618 */
619 public void execute(ForkJoinTask<?> task) {
620 doSubmit(task);
621 }
622
623 // AbstractExecutorService methods
624
625 /**
626 * @throws NullPointerException if the task is null
627 * @throws RejectedExecutionException if the task cannot be
628 * scheduled for execution
629 */
630 public void execute(Runnable task) {
631 ForkJoinTask<?> job;
632 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
633 job = (ForkJoinTask<?>) task;
634 else
635 job = ForkJoinTask.adapt(task, null);
636 doSubmit(job);
637 }
638
639 /**
640 * @throws NullPointerException if the task is null
641 * @throws RejectedExecutionException if the task cannot be
642 * scheduled for execution
643 */
644 public <T> ForkJoinTask<T> submit(Callable<T> task) {
645 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
646 doSubmit(job);
647 return job;
648 }
649
650 /**
651 * @throws NullPointerException if the task is null
652 * @throws RejectedExecutionException if the task cannot be
653 * scheduled for execution
654 */
655 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
656 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
657 doSubmit(job);
658 return job;
659 }
660
661 /**
662 * @throws NullPointerException if the task is null
663 * @throws RejectedExecutionException if the task cannot be
664 * scheduled for execution
665 */
666 public ForkJoinTask<?> submit(Runnable task) {
667 ForkJoinTask<?> job;
668 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
669 job = (ForkJoinTask<?>) task;
670 else
671 job = ForkJoinTask.adapt(task, null);
672 doSubmit(job);
673 return job;
674 }
675
676 /**
677 * Submits a ForkJoinTask for execution.
678 *
679 * @param task the task to submit
680 * @return the task
681 * @throws NullPointerException if the task is null
682 * @throws RejectedExecutionException if the task cannot be
683 * scheduled for execution
684 */
685 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
686 doSubmit(task);
687 return task;
688 }
689
690
691 /**
692 * @throws NullPointerException {@inheritDoc}
693 * @throws RejectedExecutionException {@inheritDoc}
694 */
695 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
696 ArrayList<ForkJoinTask<T>> forkJoinTasks =
697 new ArrayList<ForkJoinTask<T>>(tasks.size());
698 for (Callable<T> task : tasks)
699 forkJoinTasks.add(ForkJoinTask.adapt(task));
700 invoke(new InvokeAll<T>(forkJoinTasks));
701
702 @SuppressWarnings({"unchecked", "rawtypes"})
703 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
704 return futures;
705 }
706
707 static final class InvokeAll<T> extends RecursiveAction {
708 final ArrayList<ForkJoinTask<T>> tasks;
709 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
710 public void compute() {
711 try { invokeAll(tasks); }
712 catch (Exception ignore) {}
713 }
714 private static final long serialVersionUID = -7914297376763021607L;
715 }
716
717 // Configuration and status settings and queries
718
719 /**
720 * Returns the factory used for constructing new workers.
721 *
722 * @return the factory used for constructing new workers
723 */
724 public ForkJoinWorkerThreadFactory getFactory() {
725 return factory;
726 }
727
728 /**
729 * Returns the handler for internal worker threads that terminate
730 * due to unrecoverable errors encountered while executing tasks.
731 *
732 * @return the handler, or {@code null} if none
733 */
734 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
735 Thread.UncaughtExceptionHandler h;
736 final ReentrantLock lock = this.workerLock;
737 lock.lock();
738 try {
739 h = ueh;
740 } finally {
741 lock.unlock();
742 }
743 return h;
744 }
745
746 /**
747 * Sets the handler for internal worker threads that terminate due
748 * to unrecoverable errors encountered while executing tasks.
749 * Unless set, the current default or ThreadGroup handler is used
750 * as handler.
751 *
752 * @param h the new handler
753 * @return the old handler, or {@code null} if none
754 * @throws SecurityException if a security manager exists and
755 * the caller is not permitted to modify threads
756 * because it does not hold {@link
757 * java.lang.RuntimePermission}{@code ("modifyThread")}
758 */
759 public Thread.UncaughtExceptionHandler
760 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
761 checkPermission();
762 Thread.UncaughtExceptionHandler old = null;
763 final ReentrantLock lock = this.workerLock;
764 lock.lock();
765 try {
766 old = ueh;
767 ueh = h;
768 ForkJoinWorkerThread[] ws = workers;
769 if (ws != null) {
770 for (int i = 0; i < ws.length; ++i) {
771 ForkJoinWorkerThread w = ws[i];
772 if (w != null)
773 w.setUncaughtExceptionHandler(h);
774 }
775 }
776 } finally {
777 lock.unlock();
778 }
779 return old;
780 }
781
782
783 /**
784 * Sets the target parallelism level of this pool.
785 *
786 * @param parallelism the target parallelism
787 * @throws IllegalArgumentException if parallelism less than or
788 * equal to zero or greater than maximum size bounds
789 * @throws SecurityException if a security manager exists and
790 * the caller is not permitted to modify threads
791 * because it does not hold {@link
792 * java.lang.RuntimePermission}{@code ("modifyThread")}
793 */
794 public void setParallelism(int parallelism) {
795 checkPermission();
796 if (parallelism <= 0 || parallelism > maxPoolSize)
797 throw new IllegalArgumentException();
798 final ReentrantLock lock = this.workerLock;
799 lock.lock();
800 try {
801 if (isProcessingTasks()) {
802 int p = this.parallelism;
803 this.parallelism = parallelism;
804 if (workers != null) {
805 if (parallelism > p)
806 createAndStartAddedWorkers();
807 else
808 trimSpares();
809 }
810 }
811 } finally {
812 lock.unlock();
813 }
814 signalIdleWorkers();
815 }
816
817 /**
818 * Returns the targeted parallelism level of this pool.
819 *
820 * @return the targeted parallelism level of this pool
821 */
822 public int getParallelism() {
823 return parallelism;
824 }
825
826 /**
827 * Returns the number of worker threads that have started but not
828 * yet terminated. This result returned by this method may differ
829 * from {@link #getParallelism} when threads are created to
830 * maintain parallelism when others are cooperatively blocked.
831 *
832 * @return the number of worker threads
833 */
834 public int getPoolSize() {
835 return totalCountOf(workerCounts);
836 }
837
838 /**
839 * Returns the maximum number of threads allowed to exist in the
840 * pool. Unless set using {@link #setMaximumPoolSize}, the
841 * maximum is an implementation-defined value designed only to
842 * prevent runaway growth.
843 *
844 * @return the maximum
845 */
846 public int getMaximumPoolSize() {
847 return maxPoolSize;
848 }
849
850 /**
851 * Sets the maximum number of threads allowed to exist in the
852 * pool. The given value should normally be greater than or equal
853 * to the {@link #getParallelism parallelism} level. Setting this
854 * value has no effect on current pool size. It controls
855 * construction of new threads.
856 *
857 * @throws IllegalArgumentException if negative or greater than
858 * internal implementation limit
859 */
860 public void setMaximumPoolSize(int newMax) {
861 if (newMax < 0 || newMax > MAX_THREADS)
862 throw new IllegalArgumentException();
863 maxPoolSize = newMax;
864 }
865
866
867 /**
868 * Returns {@code true} if this pool dynamically maintains its
869 * target parallelism level. If false, new threads are added only
870 * to avoid possible starvation. This setting is by default true.
871 *
872 * @return {@code true} if maintains parallelism
873 */
874 public boolean getMaintainsParallelism() {
875 return maintainsParallelism;
876 }
877
878 /**
879 * Sets whether this pool dynamically maintains its target
880 * parallelism level. If false, new threads are added only to
881 * avoid possible starvation.
882 *
883 * @param enable {@code true} to maintain parallelism
884 */
885 public void setMaintainsParallelism(boolean enable) {
886 maintainsParallelism = enable;
887 }
888
889 /**
890 * Establishes local first-in-first-out scheduling mode for forked
891 * tasks that are never joined. This mode may be more appropriate
892 * than default locally stack-based mode in applications in which
893 * worker threads only process asynchronous tasks. This method is
894 * designed to be invoked only when the pool is quiescent, and
895 * typically only before any tasks are submitted. The effects of
896 * invocations at other times may be unpredictable.
897 *
898 * @param async if {@code true}, use locally FIFO scheduling
899 * @return the previous mode
900 * @see #getAsyncMode
901 */
902 public boolean setAsyncMode(boolean async) {
903 boolean oldMode = locallyFifo;
904 locallyFifo = async;
905 ForkJoinWorkerThread[] ws = workers;
906 if (ws != null) {
907 for (int i = 0; i < ws.length; ++i) {
908 ForkJoinWorkerThread t = ws[i];
909 if (t != null)
910 t.setAsyncMode(async);
911 }
912 }
913 return oldMode;
914 }
915
916 /**
917 * Returns {@code true} if this pool uses local first-in-first-out
918 * scheduling mode for forked tasks that are never joined.
919 *
920 * @return {@code true} if this pool uses async mode
921 * @see #setAsyncMode
922 */
923 public boolean getAsyncMode() {
924 return locallyFifo;
925 }
926
927 /**
928 * Returns an estimate of the number of worker threads that are
929 * not blocked waiting to join tasks or for other managed
930 * synchronization.
931 *
932 * @return the number of worker threads
933 */
934 public int getRunningThreadCount() {
935 return runningCountOf(workerCounts);
936 }
937
938 /**
939 * Returns an estimate of the number of threads that are currently
940 * stealing or executing tasks. This method may overestimate the
941 * number of active threads.
942 *
943 * @return the number of active threads
944 */
945 public int getActiveThreadCount() {
946 return activeCountOf(runControl);
947 }
948
949 /**
950 * Returns an estimate of the number of threads that are currently
951 * idle waiting for tasks. This method may underestimate the
952 * number of idle threads.
953 *
954 * @return the number of idle threads
955 */
956 final int getIdleThreadCount() {
957 int c = runningCountOf(workerCounts) - activeCountOf(runControl);
958 return (c <= 0) ? 0 : c;
959 }
960
961 /**
962 * Returns {@code true} if all worker threads are currently idle.
963 * An idle worker is one that cannot obtain a task to execute
964 * because none are available to steal from other threads, and
965 * there are no pending submissions to the pool. This method is
966 * conservative; it might not return {@code true} immediately upon
967 * idleness of all threads, but will eventually become true if
968 * threads remain inactive.
969 *
970 * @return {@code true} if all threads are currently idle
971 */
972 public boolean isQuiescent() {
973 return activeCountOf(runControl) == 0;
974 }
975
976 /**
977 * Returns an estimate of the total number of tasks stolen from
978 * one thread's work queue by another. The reported value
979 * underestimates the actual total number of steals when the pool
980 * is not quiescent. This value may be useful for monitoring and
981 * tuning fork/join programs: in general, steal counts should be
982 * high enough to keep threads busy, but low enough to avoid
983 * overhead and contention across threads.
984 *
985 * @return the number of steals
986 */
987 public long getStealCount() {
988 return stealCount.get();
989 }
990
991 /**
992 * Accumulates steal count from a worker.
993 * Call only when worker known to be idle.
994 */
995 private void updateStealCount(ForkJoinWorkerThread w) {
996 int sc = w.getAndClearStealCount();
997 if (sc != 0)
998 stealCount.addAndGet(sc);
999 }
1000
1001 /**
1002 * Returns an estimate of the total number of tasks currently held
1003 * in queues by worker threads (but not including tasks submitted
1004 * to the pool that have not begun executing). This value is only
1005 * an approximation, obtained by iterating across all threads in
1006 * the pool. This method may be useful for tuning task
1007 * granularities.
1008 *
1009 * @return the number of queued tasks
1010 */
1011 public long getQueuedTaskCount() {
1012 long count = 0;
1013 ForkJoinWorkerThread[] ws = workers;
1014 if (ws != null) {
1015 for (int i = 0; i < ws.length; ++i) {
1016 ForkJoinWorkerThread t = ws[i];
1017 if (t != null)
1018 count += t.getQueueSize();
1019 }
1020 }
1021 return count;
1022 }
1023
1024 /**
1025 * Returns an estimate of the number of tasks submitted to this
1026 * pool that have not yet begun executing. This method takes time
1027 * proportional to the number of submissions.
1028 *
1029 * @return the number of queued submissions
1030 */
1031 public int getQueuedSubmissionCount() {
1032 return submissionQueue.size();
1033 }
1034
1035 /**
1036 * Returns {@code true} if there are any tasks submitted to this
1037 * pool that have not yet begun executing.
1038 *
1039 * @return {@code true} if there are any queued submissions
1040 */
1041 public boolean hasQueuedSubmissions() {
1042 return !submissionQueue.isEmpty();
1043 }
1044
1045 /**
1046 * Removes and returns the next unexecuted submission if one is
1047 * available. This method may be useful in extensions to this
1048 * class that re-assign work in systems with multiple pools.
1049 *
1050 * @return the next submission, or {@code null} if none
1051 */
1052 protected ForkJoinTask<?> pollSubmission() {
1053 return submissionQueue.poll();
1054 }
1055
1056 /**
1057 * Removes all available unexecuted submitted and forked tasks
1058 * from scheduling queues and adds them to the given collection,
1059 * without altering their execution status. These may include
1060 * artificially generated or wrapped tasks. This method is
1061 * designed to be invoked only when the pool is known to be
1062 * quiescent. Invocations at other times may not remove all
1063 * tasks. A failure encountered while attempting to add elements
1064 * to collection {@code c} may result in elements being in
1065 * neither, either or both collections when the associated
1066 * exception is thrown. The behavior of this operation is
1067 * undefined if the specified collection is modified while the
1068 * operation is in progress.
1069 *
1070 * @param c the collection to transfer elements into
1071 * @return the number of elements transferred
1072 */
1073 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1074 int n = submissionQueue.drainTo(c);
1075 ForkJoinWorkerThread[] ws = workers;
1076 if (ws != null) {
1077 for (int i = 0; i < ws.length; ++i) {
1078 ForkJoinWorkerThread w = ws[i];
1079 if (w != null)
1080 n += w.drainTasksTo(c);
1081 }
1082 }
1083 return n;
1084 }
1085
1086 /**
1087 * Returns a string identifying this pool, as well as its state,
1088 * including indications of run state, parallelism level, and
1089 * worker and task counts.
1090 *
1091 * @return a string identifying this pool, as well as its state
1092 */
1093 public String toString() {
1094 int ps = parallelism;
1095 int wc = workerCounts;
1096 int rc = runControl;
1097 long st = getStealCount();
1098 long qt = getQueuedTaskCount();
1099 long qs = getQueuedSubmissionCount();
1100 return super.toString() +
1101 "[" + runStateToString(runStateOf(rc)) +
1102 ", parallelism = " + ps +
1103 ", size = " + totalCountOf(wc) +
1104 ", active = " + activeCountOf(rc) +
1105 ", running = " + runningCountOf(wc) +
1106 ", steals = " + st +
1107 ", tasks = " + qt +
1108 ", submissions = " + qs +
1109 "]";
1110 }
1111
1112 private static String runStateToString(int rs) {
1113 switch (rs) {
1114 case RUNNING: return "Running";
1115 case SHUTDOWN: return "Shutting down";
1116 case TERMINATING: return "Terminating";
1117 case TERMINATED: return "Terminated";
1118 default: throw new Error("Unknown run state");
1119 }
1120 }
1121
1122 // lifecycle control
1123
1124 /**
1125 * Initiates an orderly shutdown in which previously submitted
1126 * tasks are executed, but no new tasks will be accepted.
1127 * Invocation has no additional effect if already shut down.
1128 * Tasks that are in the process of being submitted concurrently
1129 * during the course of this method may or may not be rejected.
1130 *
1131 * @throws SecurityException if a security manager exists and
1132 * the caller is not permitted to modify threads
1133 * because it does not hold {@link
1134 * java.lang.RuntimePermission}{@code ("modifyThread")}
1135 */
1136 public void shutdown() {
1137 checkPermission();
1138 transitionRunStateTo(SHUTDOWN);
1139 if (canTerminateOnShutdown(runControl)) {
1140 if (workers == null) { // shutting down before workers created
1141 final ReentrantLock lock = this.workerLock;
1142 lock.lock();
1143 try {
1144 if (workers == null) {
1145 terminate();
1146 transitionRunStateTo(TERMINATED);
1147 termination.signalAll();
1148 }
1149 } finally {
1150 lock.unlock();
1151 }
1152 }
1153 terminateOnShutdown();
1154 }
1155 }
1156
1157 /**
1158 * Attempts to cancel and/or stop all tasks, and reject all
1159 * subsequently submitted tasks. Tasks that are in the process of
1160 * being submitted or executed concurrently during the course of
1161 * this method may or may not be rejected. This method cancels
1162 * both existing and unexecuted tasks, in order to permit
1163 * termination in the presence of task dependencies. So the method
1164 * always returns an empty list (unlike the case for some other
1165 * Executors).
1166 *
1167 * @return an empty list
1168 * @throws SecurityException if a security manager exists and
1169 * the caller is not permitted to modify threads
1170 * because it does not hold {@link
1171 * java.lang.RuntimePermission}{@code ("modifyThread")}
1172 */
1173 public List<Runnable> shutdownNow() {
1174 checkPermission();
1175 terminate();
1176 return Collections.emptyList();
1177 }
1178
1179 /**
1180 * Returns {@code true} if all tasks have completed following shut down.
1181 *
1182 * @return {@code true} if all tasks have completed following shut down
1183 */
1184 public boolean isTerminated() {
1185 return runStateOf(runControl) == TERMINATED;
1186 }
1187
1188 /**
1189 * Returns {@code true} if the process of termination has
1190 * commenced but not yet completed. This method may be useful for
1191 * debugging. A return of {@code true} reported a sufficient
1192 * period after shutdown may indicate that submitted tasks have
1193 * ignored or suppressed interruption, causing this executor not
1194 * to properly terminate.
1195 *
1196 * @return {@code true} if terminating but not yet terminated
1197 */
1198 public boolean isTerminating() {
1199 return runStateOf(runControl) == TERMINATING;
1200 }
1201
1202 /**
1203 * Returns {@code true} if this pool has been shut down.
1204 *
1205 * @return {@code true} if this pool has been shut down
1206 */
1207 public boolean isShutdown() {
1208 return runStateOf(runControl) >= SHUTDOWN;
1209 }
1210
1211 /**
1212 * Returns true if pool is not terminating or terminated.
1213 * Used internally to suppress execution when terminating.
1214 */
1215 final boolean isProcessingTasks() {
1216 return runStateOf(runControl) < TERMINATING;
1217 }
1218
1219 /**
1220 * Blocks until all tasks have completed execution after a shutdown
1221 * request, or the timeout occurs, or the current thread is
1222 * interrupted, whichever happens first.
1223 *
1224 * @param timeout the maximum time to wait
1225 * @param unit the time unit of the timeout argument
1226 * @return {@code true} if this executor terminated and
1227 * {@code false} if the timeout elapsed before termination
1228 * @throws InterruptedException if interrupted while waiting
1229 */
1230 public boolean awaitTermination(long timeout, TimeUnit unit)
1231 throws InterruptedException {
1232 long nanos = unit.toNanos(timeout);
1233 final ReentrantLock lock = this.workerLock;
1234 lock.lock();
1235 try {
1236 for (;;) {
1237 if (isTerminated())
1238 return true;
1239 if (nanos <= 0)
1240 return false;
1241 nanos = termination.awaitNanos(nanos);
1242 }
1243 } finally {
1244 lock.unlock();
1245 }
1246 }
1247
1248 // Shutdown and termination support
1249
1250 /**
1251 * Callback from terminating worker. Nulls out the corresponding
1252 * workers slot, and if terminating, tries to terminate; else
1253 * tries to shrink workers array.
1254 *
1255 * @param w the worker
1256 */
1257 final void workerTerminated(ForkJoinWorkerThread w) {
1258 updateStealCount(w);
1259 updateWorkerCount(-1);
1260 final ReentrantLock lock = this.workerLock;
1261 lock.lock();
1262 try {
1263 ForkJoinWorkerThread[] ws = workers;
1264 if (ws != null) {
1265 int idx = w.poolIndex;
1266 if (idx >= 0 && idx < ws.length && ws[idx] == w)
1267 ws[idx] = null;
1268 if (totalCountOf(workerCounts) == 0) {
1269 terminate(); // no-op if already terminating
1270 transitionRunStateTo(TERMINATED);
1271 termination.signalAll();
1272 }
1273 else if (isProcessingTasks()) {
1274 tryShrinkWorkerArray();
1275 tryResumeSpare(true); // allow replacement
1276 }
1277 }
1278 } finally {
1279 lock.unlock();
1280 }
1281 signalIdleWorkers();
1282 }
1283
1284 /**
1285 * Initiates termination.
1286 */
1287 private void terminate() {
1288 if (transitionRunStateTo(TERMINATING)) {
1289 stopAllWorkers();
1290 resumeAllSpares();
1291 signalIdleWorkers();
1292 cancelQueuedSubmissions();
1293 cancelQueuedWorkerTasks();
1294 interruptUnterminatedWorkers();
1295 signalIdleWorkers(); // resignal after interrupt
1296 }
1297 }
1298
1299 /**
1300 * Possibly terminates when on shutdown state.
1301 */
1302 private void terminateOnShutdown() {
1303 if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1304 terminate();
1305 }
1306
1307 /**
1308 * Clears out and cancels submissions.
1309 */
1310 private void cancelQueuedSubmissions() {
1311 ForkJoinTask<?> task;
1312 while ((task = pollSubmission()) != null)
1313 task.cancel(false);
1314 }
1315
1316 /**
1317 * Cleans out worker queues.
1318 */
1319 private void cancelQueuedWorkerTasks() {
1320 final ReentrantLock lock = this.workerLock;
1321 lock.lock();
1322 try {
1323 ForkJoinWorkerThread[] ws = workers;
1324 if (ws != null) {
1325 for (int i = 0; i < ws.length; ++i) {
1326 ForkJoinWorkerThread t = ws[i];
1327 if (t != null)
1328 t.cancelTasks();
1329 }
1330 }
1331 } finally {
1332 lock.unlock();
1333 }
1334 }
1335
1336 /**
1337 * Sets each worker's status to terminating. Requires lock to avoid
1338 * conflicts with add/remove.
1339 */
1340 private void stopAllWorkers() {
1341 final ReentrantLock lock = this.workerLock;
1342 lock.lock();
1343 try {
1344 ForkJoinWorkerThread[] ws = workers;
1345 if (ws != null) {
1346 for (int i = 0; i < ws.length; ++i) {
1347 ForkJoinWorkerThread t = ws[i];
1348 if (t != null)
1349 t.shutdownNow();
1350 }
1351 }
1352 } finally {
1353 lock.unlock();
1354 }
1355 }
1356
1357 /**
1358 * Interrupts all unterminated workers. This is not required for
1359 * sake of internal control, but may help unstick user code during
1360 * shutdown.
1361 */
1362 private void interruptUnterminatedWorkers() {
1363 final ReentrantLock lock = this.workerLock;
1364 lock.lock();
1365 try {
1366 ForkJoinWorkerThread[] ws = workers;
1367 if (ws != null) {
1368 for (int i = 0; i < ws.length; ++i) {
1369 ForkJoinWorkerThread t = ws[i];
1370 if (t != null && !t.isTerminated()) {
1371 try {
1372 t.interrupt();
1373 } catch (SecurityException ignore) {
1374 }
1375 }
1376 }
1377 }
1378 } finally {
1379 lock.unlock();
1380 }
1381 }
1382
1383 /*
1384 * Nodes for event barrier to manage idle threads. Queue nodes
1385 * are basic Treiber stack nodes, also used for spare stack.
1386 *
1387 * The event barrier has an event count and a wait queue (actually
1388 * a Treiber stack). Workers are enabled to look for work when
1389 * the eventCount is incremented. If they fail to find work, they
1390 * may wait for next count. Upon release, threads help others wake
1391 * up.
1392 *
1393 * Synchronization events occur only in enough contexts to
1394 * maintain overall liveness:
1395 *
1396 * - Submission of a new task to the pool
1397 * - Resizes or other changes to the workers array
1398 * - pool termination
1399 * - A worker pushing a task on an empty queue
1400 *
1401 * The case of pushing a task occurs often enough, and is heavy
1402 * enough compared to simple stack pushes, to require special
1403 * handling: Method signalWork returns without advancing count if
1404 * the queue appears to be empty. This would ordinarily result in
1405 * races causing some queued waiters not to be woken up. To avoid
1406 * this, the first worker enqueued in method sync rescans for
1407 * tasks after being enqueued, and helps signal if any are
1408 * found. This works well because the worker has nothing better to
1409 * do, and so might as well help alleviate the overhead and
1410 * contention on the threads actually doing work. Also, since
1411 * event counts increments on task availability exist to maintain
1412 * liveness (rather than to force refreshes etc), it is OK for
1413 * callers to exit early if contending with another signaller.
1414 */
1415 static final class WaitQueueNode {
1416 WaitQueueNode next; // only written before enqueued
1417 volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1418 final long count; // unused for spare stack
1419
1420 WaitQueueNode(long c, ForkJoinWorkerThread w) {
1421 count = c;
1422 thread = w;
1423 }
1424
1425 /**
1426 * Wakes up waiter, returning false if known to already be awake
1427 */
1428 boolean signal() {
1429 ForkJoinWorkerThread t = thread;
1430 if (t == null)
1431 return false;
1432 thread = null;
1433 LockSupport.unpark(t);
1434 return true;
1435 }
1436 }
1437
1438 /**
1439 * Ensures that no thread is waiting for count to advance from the
1440 * current value of eventCount read on entry to this method, by
1441 * releasing waiting threads if necessary.
1442 */
1443 final void ensureSync() {
1444 long c = eventCount;
1445 WaitQueueNode q;
1446 while ((q = syncStack) != null && q.count < c) {
1447 if (casBarrierStack(q, null)) {
1448 do {
1449 q.signal();
1450 } while ((q = q.next) != null);
1451 break;
1452 }
1453 }
1454 }
1455
1456 /**
1457 * Increments event count and releases waiting threads.
1458 */
1459 private void signalIdleWorkers() {
1460 long c;
1461 do {} while (!casEventCount(c = eventCount, c+1));
1462 ensureSync();
1463 }
1464
1465 /**
1466 * Signals threads waiting to poll a task. Because method sync
1467 * rechecks availability, it is OK to only proceed if queue
1468 * appears to be non-empty, and OK if CAS to increment count
1469 * fails (since some other thread succeeded).
1470 */
1471 final void signalWork() {
1472 if (syncStack != null) {
1473 long c;
1474 casEventCount(c = eventCount, c+1);
1475 WaitQueueNode q = syncStack;
1476 if (q != null && q.count <= c &&
1477 (!casBarrierStack(q, q.next) || !q.signal()))
1478 ensureSync();
1479 }
1480 }
1481
1482 /**
1483 * Waits until event count advances from last value held by
1484 * caller, or if excess threads, caller is resumed as spare, or
1485 * caller or pool is terminating. Updates caller's event on exit.
1486 *
1487 * @param w the calling worker thread
1488 */
1489 final void sync(ForkJoinWorkerThread w) {
1490 updateStealCount(w); // Transfer w's count while it is idle
1491
1492 if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493 long prev = w.lastEventCount;
1494 WaitQueueNode node = null;
1495 WaitQueueNode h;
1496 boolean helpSignal = false;
1497 while (eventCount == prev &&
1498 ((h = syncStack) == null || h.count == prev)) {
1499 if (node == null)
1500 node = new WaitQueueNode(prev, w);
1501 if (casBarrierStack(node.next = h, node)) {
1502 if (!Thread.interrupted() && node.thread != null &&
1503 eventCount == prev) {
1504 if (h == null && // cover signalWork race
1505 ForkJoinWorkerThread.hasQueuedTasks(workers))
1506 helpSignal = true;
1507 else
1508 LockSupport.park(this);
1509 }
1510 if (node.thread != null)
1511 node.thread = null;
1512 break;
1513 }
1514 }
1515 long ec = eventCount;
1516 if (ec != prev)
1517 w.lastEventCount = ec;
1518 else if (helpSignal)
1519 casEventCount(ec, ec + 1);
1520 ensureSync();
1521 }
1522 }
1523
1524
1525 /**
1526 * Returns {@code true} if a new sync event occurred since last
1527 * call to sync or this method, if so, updating caller's count.
1528 */
1529 final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530 long lc = w.lastEventCount;
1531 long ec = eventCount;
1532 if (lc != ec)
1533 w.lastEventCount = ec;
1534 ensureSync();
1535 return lc != ec || lc != eventCount;
1536 }
1537
1538 // Parallelism maintenance
1539
1540 /**
1541 * Decrements running count; if too low, adds spare.
1542 *
1543 * Conceptually, all we need to do here is add or resume a
1544 * spare thread when one is about to block (and remove or
1545 * suspend it later when unblocked -- see suspendIfSpare).
1546 * However, implementing this idea requires coping with
1547 * several problems: we have imperfect information about the
1548 * states of threads. Some count updates can and usually do
1549 * lag run state changes, despite arrangements to keep them
1550 * accurate (for example, when possible, updating counts
1551 * before signalling or resuming), especially when running on
1552 * dynamic JVMs that don't optimize the infrequent paths that
1553 * update counts. Generating too many threads can make these
1554 * problems become worse, because excess threads are more
1555 * likely to be context-switched with others, slowing them all
1556 * down, especially if there is no work available, so all are
1557 * busy scanning or idling. Also, excess spare threads can
1558 * only be suspended or removed when they are idle, not
1559 * immediately when they aren't needed. So adding threads will
1560 * raise parallelism level for longer than necessary. Also,
1561 * FJ applications often encounter highly transient peaks when
1562 * many threads are blocked joining, but for less time than it
1563 * takes to create or resume spares.
1564 *
1565 * @param joinMe if non-null, return early if done
1566 * @param maintainParallelism if true, try to stay within
1567 * target counts, else create only to avoid starvation
1568 * @return true if joinMe known to be done
1569 */
1570 final boolean preJoin(ForkJoinTask<?> joinMe,
1571 boolean maintainParallelism) {
1572 maintainParallelism &= maintainsParallelism; // overrride
1573 boolean dec = false; // true when running count decremented
1574 while (spareStack == null || !tryResumeSpare(dec)) {
1575 int counts = workerCounts;
1576 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1577 if (!needSpare(counts, maintainParallelism))
1578 break;
1579 if (joinMe.status < 0)
1580 return true;
1581 if (tryAddSpare(counts))
1582 break;
1583 }
1584 }
1585 return false;
1586 }
1587
1588 /**
1589 * Same idea as preJoin
1590 */
1591 final boolean preBlock(ManagedBlocker blocker,
1592 boolean maintainParallelism) {
1593 maintainParallelism &= maintainsParallelism;
1594 boolean dec = false;
1595 while (spareStack == null || !tryResumeSpare(dec)) {
1596 int counts = workerCounts;
1597 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1598 if (!needSpare(counts, maintainParallelism))
1599 break;
1600 if (blocker.isReleasable())
1601 return true;
1602 if (tryAddSpare(counts))
1603 break;
1604 }
1605 }
1606 return false;
1607 }
1608
1609 /**
1610 * Returns {@code true} if a spare thread appears to be needed.
1611 * If maintaining parallelism, returns true when the deficit in
1612 * running threads is more than the surplus of total threads, and
1613 * there is apparently some work to do. This self-limiting rule
1614 * means that the more threads that have already been added, the
1615 * less parallelism we will tolerate before adding another.
1616 *
1617 * @param counts current worker counts
1618 * @param maintainParallelism try to maintain parallelism
1619 */
1620 private boolean needSpare(int counts, boolean maintainParallelism) {
1621 int ps = parallelism;
1622 int rc = runningCountOf(counts);
1623 int tc = totalCountOf(counts);
1624 int runningDeficit = ps - rc;
1625 int totalSurplus = tc - ps;
1626 return (tc < maxPoolSize &&
1627 (rc == 0 || totalSurplus < 0 ||
1628 (maintainParallelism &&
1629 runningDeficit > totalSurplus &&
1630 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1631 }
1632
1633 /**
1634 * Adds a spare worker if lock available and no more than the
1635 * expected numbers of threads exist.
1636 *
1637 * @return true if successful
1638 */
1639 private boolean tryAddSpare(int expectedCounts) {
1640 final ReentrantLock lock = this.workerLock;
1641 int expectedRunning = runningCountOf(expectedCounts);
1642 int expectedTotal = totalCountOf(expectedCounts);
1643 boolean success = false;
1644 boolean locked = false;
1645 // confirm counts while locking; CAS after obtaining lock
1646 try {
1647 for (;;) {
1648 int s = workerCounts;
1649 int tc = totalCountOf(s);
1650 int rc = runningCountOf(s);
1651 if (rc > expectedRunning || tc > expectedTotal)
1652 break;
1653 if (!locked && !(locked = lock.tryLock()))
1654 break;
1655 if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1656 createAndStartSpare(tc);
1657 success = true;
1658 break;
1659 }
1660 }
1661 } finally {
1662 if (locked)
1663 lock.unlock();
1664 }
1665 return success;
1666 }
1667
1668 /**
1669 * Adds the kth spare worker. On entry, pool counts are already
1670 * adjusted to reflect addition.
1671 */
1672 private void createAndStartSpare(int k) {
1673 ForkJoinWorkerThread w = null;
1674 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1675 int len = ws.length;
1676 // Probably, we can place at slot k. If not, find empty slot
1677 if (k < len && ws[k] != null) {
1678 for (k = 0; k < len && ws[k] != null; ++k)
1679 ;
1680 }
1681 if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1682 ws[k] = w;
1683 w.start();
1684 }
1685 else
1686 updateWorkerCount(-1); // adjust on failure
1687 signalIdleWorkers();
1688 }
1689
1690 /**
1691 * Suspends calling thread w if there are excess threads. Called
1692 * only from sync. Spares are enqueued in a Treiber stack using
1693 * the same WaitQueueNodes as barriers. They are resumed mainly
1694 * in preJoin, but are also woken on pool events that require all
1695 * threads to check run state.
1696 *
1697 * @param w the caller
1698 */
1699 private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700 WaitQueueNode node = null;
1701 for (;;) {
1702 int p = parallelism;
1703 int s = workerCounts;
1704 int r = runningCountOf(s);
1705 int t = totalCountOf(s);
1706 // use t as bound if r transiently out of sync
1707 if (t <= p || r <= p)
1708 return false; // not a spare
1709 if (node == null)
1710 node = new WaitQueueNode(0, w);
1711 if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1712 break;
1713 }
1714 // push onto stack
1715 do {} while (!casSpareStack(node.next = spareStack, node));
1716 // block until released by resumeSpare
1717 while (!Thread.interrupted() && node.thread != null)
1718 LockSupport.park(this);
1719 return true;
1720 }
1721
1722 /**
1723 * Tries to pop and resume a spare thread.
1724 *
1725 * @param updateCount if true, increment running count on success
1726 * @return true if successful
1727 */
1728 private boolean tryResumeSpare(boolean updateCount) {
1729 WaitQueueNode q;
1730 while ((q = spareStack) != null) {
1731 if (casSpareStack(q, q.next)) {
1732 if (updateCount)
1733 updateRunningCount(1);
1734 q.signal();
1735 return true;
1736 }
1737 }
1738 return false;
1739 }
1740
1741 /**
1742 * Pops and resumes all spare threads. Same idea as ensureSync.
1743 *
1744 * @return true if any spares released
1745 */
1746 private boolean resumeAllSpares() {
1747 WaitQueueNode q;
1748 while ( (q = spareStack) != null) {
1749 if (casSpareStack(q, null)) {
1750 do {
1751 updateRunningCount(1);
1752 q.signal();
1753 } while ((q = q.next) != null);
1754 return true;
1755 }
1756 }
1757 return false;
1758 }
1759
1760 /**
1761 * Pops and shuts down excessive spare threads. Call only while
1762 * holding lock. This is not guaranteed to eliminate all excess
1763 * threads, only those suspended as spares, which are the ones
1764 * unlikely to be needed in the future.
1765 */
1766 private void trimSpares() {
1767 int surplus = totalCountOf(workerCounts) - parallelism;
1768 WaitQueueNode q;
1769 while (surplus > 0 && (q = spareStack) != null) {
1770 if (casSpareStack(q, null)) {
1771 do {
1772 updateRunningCount(1);
1773 ForkJoinWorkerThread w = q.thread;
1774 if (w != null && surplus > 0 &&
1775 runningCountOf(workerCounts) > 0 && w.shutdown())
1776 --surplus;
1777 q.signal();
1778 } while ((q = q.next) != null);
1779 }
1780 }
1781 }
1782
1783 /**
1784 * Interface for extending managed parallelism for tasks running
1785 * in {@link ForkJoinPool}s.
1786 *
1787 * <p>A {@code ManagedBlocker} provides two methods.
1788 * Method {@code isReleasable} must return {@code true} if
1789 * blocking is not necessary. Method {@code block} blocks the
1790 * current thread if necessary (perhaps internally invoking
1791 * {@code isReleasable} before actually blocking).
1792 *
1793 * <p>For example, here is a ManagedBlocker based on a
1794 * ReentrantLock:
1795 * <pre> {@code
1796 * class ManagedLocker implements ManagedBlocker {
1797 * final ReentrantLock lock;
1798 * boolean hasLock = false;
1799 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1800 * public boolean block() {
1801 * if (!hasLock)
1802 * lock.lock();
1803 * return true;
1804 * }
1805 * public boolean isReleasable() {
1806 * return hasLock || (hasLock = lock.tryLock());
1807 * }
1808 * }}</pre>
1809 */
1810 public static interface ManagedBlocker {
1811 /**
1812 * Possibly blocks the current thread, for example waiting for
1813 * a lock or condition.
1814 *
1815 * @return {@code true} if no additional blocking is necessary
1816 * (i.e., if isReleasable would return true)
1817 * @throws InterruptedException if interrupted while waiting
1818 * (the method is not required to do so, but is allowed to)
1819 */
1820 boolean block() throws InterruptedException;
1821
1822 /**
1823 * Returns {@code true} if blocking is unnecessary.
1824 */
1825 boolean isReleasable();
1826 }
1827
1828 /**
1829 * Blocks in accord with the given blocker. If the current thread
1830 * is a {@link ForkJoinWorkerThread}, this method possibly
1831 * arranges for a spare thread to be activated if necessary to
1832 * ensure parallelism while the current thread is blocked.
1833 *
1834 * <p>If {@code maintainParallelism} is {@code true} and the pool
1835 * supports it ({@link #getMaintainsParallelism}), this method
1836 * attempts to maintain the pool's nominal parallelism. Otherwise
1837 * it activates a thread only if necessary to avoid complete
1838 * starvation. This option may be preferable when blockages use
1839 * timeouts, or are almost always brief.
1840 *
1841 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1842 * behaviorally equivalent to
1843 * <pre> {@code
1844 * while (!blocker.isReleasable())
1845 * if (blocker.block())
1846 * return;
1847 * }</pre>
1848 *
1849 * If the caller is a {@code ForkJoinTask}, then the pool may
1850 * first be expanded to ensure parallelism, and later adjusted.
1851 *
1852 * @param blocker the blocker
1853 * @param maintainParallelism if {@code true} and supported by
1854 * this pool, attempt to maintain the pool's nominal parallelism;
1855 * otherwise activate a thread only if necessary to avoid
1856 * complete starvation.
1857 * @throws InterruptedException if blocker.block did so
1858 */
1859 public static void managedBlock(ManagedBlocker blocker,
1860 boolean maintainParallelism)
1861 throws InterruptedException {
1862 Thread t = Thread.currentThread();
1863 ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1864 ((ForkJoinWorkerThread) t).pool : null);
1865 if (!blocker.isReleasable()) {
1866 try {
1867 if (pool == null ||
1868 !pool.preBlock(blocker, maintainParallelism))
1869 awaitBlocker(blocker);
1870 } finally {
1871 if (pool != null)
1872 pool.updateRunningCount(1);
1873 }
1874 }
1875 }
1876
1877 private static void awaitBlocker(ManagedBlocker blocker)
1878 throws InterruptedException {
1879 do {} while (!blocker.isReleasable() && !blocker.block());
1880 }
1881
1882 // AbstractExecutorService overrides. These rely on undocumented
1883 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1884 // implement RunnableFuture.
1885
1886 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1887 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1888 }
1889
1890 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1891 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1892 }
1893
1894 // Unsafe mechanics
1895
1896 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1897 private static final long eventCountOffset =
1898 objectFieldOffset("eventCount", ForkJoinPool.class);
1899 private static final long workerCountsOffset =
1900 objectFieldOffset("workerCounts", ForkJoinPool.class);
1901 private static final long runControlOffset =
1902 objectFieldOffset("runControl", ForkJoinPool.class);
1903 private static final long syncStackOffset =
1904 objectFieldOffset("syncStack",ForkJoinPool.class);
1905 private static final long spareStackOffset =
1906 objectFieldOffset("spareStack", ForkJoinPool.class);
1907
1908 private boolean casEventCount(long cmp, long val) {
1909 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1910 }
1911 private boolean casWorkerCounts(int cmp, int val) {
1912 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1913 }
1914 private boolean casRunControl(int cmp, int val) {
1915 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1916 }
1917 private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1918 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1919 }
1920 private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1921 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1922 }
1923
1924 private static long objectFieldOffset(String field, Class<?> klazz) {
1925 try {
1926 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1927 } catch (NoSuchFieldException e) {
1928 // Convert Exception to corresponding Error
1929 NoSuchFieldError error = new NoSuchFieldError(field);
1930 error.initCause(e);
1931 throw error;
1932 }
1933 }
1934
1935 /**
1936 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1937 * Replace with a simple call to Unsafe.getUnsafe when integrating
1938 * into a jdk.
1939 *
1940 * @return a sun.misc.Unsafe
1941 */
1942 private static sun.misc.Unsafe getUnsafe() {
1943 try {
1944 return sun.misc.Unsafe.getUnsafe();
1945 } catch (SecurityException se) {
1946 try {
1947 return java.security.AccessController.doPrivileged
1948 (new java.security
1949 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1950 public sun.misc.Unsafe run() throws Exception {
1951 java.lang.reflect.Field f = sun.misc
1952 .Unsafe.class.getDeclaredField("theUnsafe");
1953 f.setAccessible(true);
1954 return (sun.misc.Unsafe) f.get(null);
1955 }});
1956 } catch (java.security.PrivilegedActionException e) {
1957 throw new RuntimeException("Could not initialize intrinsics",
1958 e.getCause());
1959 }
1960 }
1961 }
1962 }