ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
Revision: 1.49
Committed: Mon Nov 16 04:57:09 2009 UTC (14 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.48: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8
9 import java.util.concurrent.*;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.LockSupport;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 /**
23 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24 * A {@code ForkJoinPool} provides the entry point for submissions
25 * from non-{@code ForkJoinTask}s, as well as management and
26 * monitoring operations.
27 *
28 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29 * ExecutorService} mainly by virtue of employing
30 * <em>work-stealing</em>: all threads in the pool attempt to find and
31 * execute subtasks created by other active tasks (eventually blocking
32 * waiting for work if none exist). This enables efficient processing
33 * when most tasks spawn other subtasks (as do most {@code
34 * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
35 * execution of some plain {@code Runnable}- or {@code Callable}-
36 * based activities along with {@code ForkJoinTask}s. When setting
37 * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
38 * also be appropriate for use with fine-grained tasks of any form
39 * that are never joined. Otherwise, other {@code ExecutorService}
40 * implementations are typically more appropriate choices.
41 *
42 * <p>A {@code ForkJoinPool} is constructed with a given target
43 * parallelism level; by default, equal to the number of available
44 * processors. Unless configured otherwise via {@link
45 * #setMaintainsParallelism}, the pool attempts to maintain this
46 * number of active (or available) threads by dynamically adding,
47 * suspending, or resuming internal worker threads, even if some tasks
48 * are stalled waiting to join others. However, no such adjustments
49 * are performed in the face of blocked IO or other unmanaged
50 * synchronization. The nested {@link ManagedBlocker} interface
51 * enables extension of the kinds of synchronization accommodated.
52 * The target parallelism level may also be changed dynamically
53 * ({@link #setParallelism}). The total number of threads may be
54 * limited using method {@link #setMaximumPoolSize}, in which case it
55 * may become possible for the activities of a pool to stall due to
56 * the lack of available threads to process new tasks.
57 *
58 * <p>In addition to execution and lifecycle control methods, this
59 * class provides status check methods (for example
60 * {@link #getStealCount}) that are intended to aid in developing,
61 * tuning, and monitoring fork/join applications. Also, method
62 * {@link #toString} returns indications of pool state in a
63 * convenient form for informal monitoring.
64 *
65 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
66 * used for all parallel task execution in a program or subsystem.
67 * Otherwise, use would not usually outweigh the construction and
68 * bookkeeping overhead of creating a large set of threads. For
69 * example, a common pool could be used for the {@code SortTasks}
70 * illustrated in {@link RecursiveAction}. Because {@code
71 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
72 * daemon} mode, there is typically no need to explicitly {@link
73 * #shutdown} such a pool upon program exit.
74 *
75 * <pre>
76 * static final ForkJoinPool mainPool = new ForkJoinPool();
77 * ...
78 * public void sort(long[] array) {
79 * mainPool.invoke(new SortTask(array, 0, array.length));
80 * }
81 * </pre>
82 *
83 * <p><b>Implementation notes</b>: This implementation restricts the
84 * maximum number of running threads to 32767. Attempts to create
85 * pools with greater than the maximum number result in
86 * {@code IllegalArgumentException}.
87 *
88 * <p>This implementation rejects submitted tasks (that is, by throwing
89 * {@link RejectedExecutionException}) only when the pool is shut down.
90 *
91 * @since 1.7
92 * @author Doug Lea
93 */
94 public class ForkJoinPool extends AbstractExecutorService {
95
96 /*
97 * See the extended comments interspersed below for design,
98 * rationale, and walkthroughs.
99 */
100
101 /** Mask for packing and unpacking shorts */
102 private static final int shortMask = 0xffff;
103
104 /** Max pool size -- must be a power of two minus 1 */
105 private static final int MAX_THREADS = 0x7FFF;
106
107 /**
108 * Factory for creating new {@link ForkJoinWorkerThread}s.
109 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
110 * for {@code ForkJoinWorkerThread} subclasses that extend base
111 * functionality or initialize threads with different contexts.
112 */
113 public static interface ForkJoinWorkerThreadFactory {
114 /**
115 * Returns a new worker thread operating in the given pool.
116 *
117 * @param pool the pool this thread works in
118 * @throws NullPointerException if the pool is null
119 */
120 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121 }
122
123 /**
124 * Default ForkJoinWorkerThreadFactory implementation; creates a
125 * new ForkJoinWorkerThread.
126 */
127 static class DefaultForkJoinWorkerThreadFactory
128 implements ForkJoinWorkerThreadFactory {
129 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
130 try {
131 return new ForkJoinWorkerThread(pool);
132 } catch (OutOfMemoryError oom) {
133 return null;
134 }
135 }
136 }
137
138 /**
139 * Creates a new ForkJoinWorkerThread. This factory is used unless
140 * overridden in ForkJoinPool constructors.
141 */
142 public static final ForkJoinWorkerThreadFactory
143 defaultForkJoinWorkerThreadFactory =
144 new DefaultForkJoinWorkerThreadFactory();
145
146 /**
147 * Permission required for callers of methods that may start or
148 * kill threads.
149 */
150 private static final RuntimePermission modifyThreadPermission =
151 new RuntimePermission("modifyThread");
152
153 /**
154 * If there is a security manager, makes sure caller has
155 * permission to modify threads.
156 */
157 private static void checkPermission() {
158 SecurityManager security = System.getSecurityManager();
159 if (security != null)
160 security.checkPermission(modifyThreadPermission);
161 }
162
163 /**
164 * Generator for assigning sequence numbers as pool names.
165 */
166 private static final AtomicInteger poolNumberGenerator =
167 new AtomicInteger();
168
169 /**
170 * Array holding all worker threads in the pool. Initialized upon
171 * first use. Array size must be a power of two. Updates and
172 * replacements are protected by workerLock, but it is always kept
173 * in a consistent enough state to be randomly accessed without
174 * locking by workers performing work-stealing.
175 */
176 volatile ForkJoinWorkerThread[] workers;
177
178 /**
179 * Lock protecting access to workers.
180 */
181 private final ReentrantLock workerLock;
182
183 /**
184 * Condition for awaitTermination.
185 */
186 private final Condition termination;
187
188 /**
189 * The uncaught exception handler used when any worker
190 * abruptly terminates
191 */
192 private Thread.UncaughtExceptionHandler ueh;
193
194 /**
195 * Creation factory for worker threads.
196 */
197 private final ForkJoinWorkerThreadFactory factory;
198
199 /**
200 * Head of stack of threads that were created to maintain
201 * parallelism when other threads blocked, but have since
202 * suspended when the parallelism level rose.
203 */
204 private volatile WaitQueueNode spareStack;
205
206 /**
207 * Sum of per-thread steal counts, updated only when threads are
208 * idle or terminating.
209 */
210 private final AtomicLong stealCount;
211
212 /**
213 * Queue for external submissions.
214 */
215 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
216
217 /**
218 * Head of Treiber stack for barrier sync. See below for explanation.
219 */
220 private volatile WaitQueueNode syncStack;
221
222 /**
223 * The count for event barrier
224 */
225 private volatile long eventCount;
226
227 /**
228 * Pool number, just for assigning useful names to worker threads
229 */
230 private final int poolNumber;
231
232 /**
233 * The maximum allowed pool size
234 */
235 private volatile int maxPoolSize;
236
237 /**
238 * The desired parallelism level, updated only under workerLock.
239 */
240 private volatile int parallelism;
241
242 /**
243 * True if use local fifo, not default lifo, for local polling
244 */
245 private volatile boolean locallyFifo;
246
247 /**
248 * Holds number of total (i.e., created and not yet terminated)
249 * and running (i.e., not blocked on joins or other managed sync)
250 * threads, packed into one int to ensure consistent snapshot when
251 * making decisions about creating and suspending spare
252 * threads. Updated only by CAS. Note: CASes in
253 * updateRunningCount and preJoin assume that running active count
254 * is in low word, so need to be modified if this changes.
255 */
256 private volatile int workerCounts;
257
258 private static int totalCountOf(int s) { return s >>> 16; }
259 private static int runningCountOf(int s) { return s & shortMask; }
260 private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
261
262 /**
263 * Adds delta (which may be negative) to running count. This must
264 * be called before (with negative arg) and after (with positive)
265 * any managed synchronization (i.e., mainly, joins).
266 *
267 * @param delta the number to add
268 */
269 final void updateRunningCount(int delta) {
270 int s;
271 do {} while (!casWorkerCounts(s = workerCounts, s + delta));
272 }
273
274 /**
275 * Adds delta (which may be negative) to both total and running
276 * count. This must be called upon creation and termination of
277 * worker threads.
278 *
279 * @param delta the number to add
280 */
281 private void updateWorkerCount(int delta) {
282 int d = delta + (delta << 16); // add to both lo and hi parts
283 int s;
284 do {} while (!casWorkerCounts(s = workerCounts, s + d));
285 }
286
287 /**
288 * Lifecycle control. High word contains runState, low word
289 * contains the number of workers that are (probably) executing
290 * tasks. This value is atomically incremented before a worker
291 * gets a task to run, and decremented when worker has no tasks
292 * and cannot find any. These two fields are bundled together to
293 * support correct termination triggering. Note: activeCount
294 * CAS'es cheat by assuming active count is in low word, so need
295 * to be modified if this changes
296 */
297 private volatile int runControl;
298
299 // RunState values. Order among values matters
300 private static final int RUNNING = 0;
301 private static final int SHUTDOWN = 1;
302 private static final int TERMINATING = 2;
303 private static final int TERMINATED = 3;
304
305 private static int runStateOf(int c) { return c >>> 16; }
306 private static int activeCountOf(int c) { return c & shortMask; }
307 private static int runControlFor(int r, int a) { return (r << 16) + a; }
308
309 /**
310 * Tries incrementing active count; fails on contention.
311 * Called by workers before/during executing tasks.
312 *
313 * @return true on success
314 */
315 final boolean tryIncrementActiveCount() {
316 int c = runControl;
317 return casRunControl(c, c+1);
318 }
319
320 /**
321 * Tries decrementing active count; fails on contention.
322 * Possibly triggers termination on success.
323 * Called by workers when they can't find tasks.
324 *
325 * @return true on success
326 */
327 final boolean tryDecrementActiveCount() {
328 int c = runControl;
329 int nextc = c - 1;
330 if (!casRunControl(c, nextc))
331 return false;
332 if (canTerminateOnShutdown(nextc))
333 terminateOnShutdown();
334 return true;
335 }
336
337 /**
338 * Returns {@code true} if argument represents zero active count
339 * and nonzero runstate, which is the triggering condition for
340 * terminating on shutdown.
341 */
342 private static boolean canTerminateOnShutdown(int c) {
343 // i.e. least bit is nonzero runState bit
344 return ((c & -c) >>> 16) != 0;
345 }
346
347 /**
348 * Transition run state to at least the given state. Return true
349 * if not already at least given state.
350 */
351 private boolean transitionRunStateTo(int state) {
352 for (;;) {
353 int c = runControl;
354 if (runStateOf(c) >= state)
355 return false;
356 if (casRunControl(c, runControlFor(state, activeCountOf(c))))
357 return true;
358 }
359 }
360
361 /**
362 * Controls whether to add spares to maintain parallelism
363 */
364 private volatile boolean maintainsParallelism;
365
366 // Constructors
367
368 /**
369 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
370 * java.lang.Runtime#availableProcessors}, and using the {@linkplain
371 * #defaultForkJoinWorkerThreadFactory default thread factory}.
372 *
373 * @throws SecurityException if a security manager exists and
374 * the caller is not permitted to modify threads
375 * because it does not hold {@link
376 * java.lang.RuntimePermission}{@code ("modifyThread")}
377 */
378 public ForkJoinPool() {
379 this(Runtime.getRuntime().availableProcessors(),
380 defaultForkJoinWorkerThreadFactory);
381 }
382
383 /**
384 * Creates a {@code ForkJoinPool} with the indicated parallelism
385 * level and using the {@linkplain
386 * #defaultForkJoinWorkerThreadFactory default thread factory}.
387 *
388 * @param parallelism the parallelism level
389 * @throws IllegalArgumentException if parallelism less than or
390 * equal to zero, or greater than implementation limit
391 * @throws SecurityException if a security manager exists and
392 * the caller is not permitted to modify threads
393 * because it does not hold {@link
394 * java.lang.RuntimePermission}{@code ("modifyThread")}
395 */
396 public ForkJoinPool(int parallelism) {
397 this(parallelism, defaultForkJoinWorkerThreadFactory);
398 }
399
400 /**
401 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
402 * java.lang.Runtime#availableProcessors}, and using the given
403 * thread factory.
404 *
405 * @param factory the factory for creating new threads
406 * @throws NullPointerException if the factory is null
407 * @throws SecurityException if a security manager exists and
408 * the caller is not permitted to modify threads
409 * because it does not hold {@link
410 * java.lang.RuntimePermission}{@code ("modifyThread")}
411 */
412 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
413 this(Runtime.getRuntime().availableProcessors(), factory);
414 }
415
416 /**
417 * Creates a {@code ForkJoinPool} with the given parallelism and
418 * thread factory.
419 *
420 * @param parallelism the parallelism level
421 * @param factory the factory for creating new threads
422 * @throws IllegalArgumentException if parallelism less than or
423 * equal to zero, or greater than implementation limit
424 * @throws NullPointerException if the factory is null
425 * @throws SecurityException if a security manager exists and
426 * the caller is not permitted to modify threads
427 * because it does not hold {@link
428 * java.lang.RuntimePermission}{@code ("modifyThread")}
429 */
430 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
431 if (parallelism <= 0 || parallelism > MAX_THREADS)
432 throw new IllegalArgumentException();
433 if (factory == null)
434 throw new NullPointerException();
435 checkPermission();
436 this.factory = factory;
437 this.parallelism = parallelism;
438 this.maxPoolSize = MAX_THREADS;
439 this.maintainsParallelism = true;
440 this.poolNumber = poolNumberGenerator.incrementAndGet();
441 this.workerLock = new ReentrantLock();
442 this.termination = workerLock.newCondition();
443 this.stealCount = new AtomicLong();
444 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
445 // worker array and workers are lazily constructed
446 }
447
448 /**
449 * Creates a new worker thread using factory.
450 *
451 * @param index the index to assign worker
452 * @return new worker, or null if factory failed
453 */
454 private ForkJoinWorkerThread createWorker(int index) {
455 Thread.UncaughtExceptionHandler h = ueh;
456 ForkJoinWorkerThread w = factory.newThread(this);
457 if (w != null) {
458 w.poolIndex = index;
459 w.setDaemon(true);
460 w.setAsyncMode(locallyFifo);
461 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
462 if (h != null)
463 w.setUncaughtExceptionHandler(h);
464 }
465 return w;
466 }
467
468 /**
469 * Returns a good size for worker array given pool size.
470 * Currently requires size to be a power of two.
471 */
472 private static int arraySizeFor(int poolSize) {
473 if (poolSize <= 1)
474 return 1;
475 // See Hackers Delight, sec 3.2
476 int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
477 c |= c >>> 1;
478 c |= c >>> 2;
479 c |= c >>> 4;
480 c |= c >>> 8;
481 c |= c >>> 16;
482 return c + 1;
483 }
484
485 /**
486 * Creates or resizes array if necessary to hold newLength.
487 * Call only under exclusion.
488 *
489 * @return the array
490 */
491 private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
492 ForkJoinWorkerThread[] ws = workers;
493 if (ws == null)
494 return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
495 else if (newLength > ws.length)
496 return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
497 else
498 return ws;
499 }
500
501 /**
502 * Tries to shrink workers into smaller array after one or more terminate.
503 */
504 private void tryShrinkWorkerArray() {
505 ForkJoinWorkerThread[] ws = workers;
506 if (ws != null) {
507 int len = ws.length;
508 int last = len - 1;
509 while (last >= 0 && ws[last] == null)
510 --last;
511 int newLength = arraySizeFor(last+1);
512 if (newLength < len)
513 workers = Arrays.copyOf(ws, newLength);
514 }
515 }
516
517 /**
518 * Initializes workers if necessary.
519 */
520 final void ensureWorkerInitialization() {
521 ForkJoinWorkerThread[] ws = workers;
522 if (ws == null) {
523 final ReentrantLock lock = this.workerLock;
524 lock.lock();
525 try {
526 ws = workers;
527 if (ws == null) {
528 int ps = parallelism;
529 ws = ensureWorkerArrayCapacity(ps);
530 for (int i = 0; i < ps; ++i) {
531 ForkJoinWorkerThread w = createWorker(i);
532 if (w != null) {
533 ws[i] = w;
534 w.start();
535 updateWorkerCount(1);
536 }
537 }
538 }
539 } finally {
540 lock.unlock();
541 }
542 }
543 }
544
545 /**
546 * Worker creation and startup for threads added via setParallelism.
547 */
548 private void createAndStartAddedWorkers() {
549 resumeAllSpares(); // Allow spares to convert to nonspare
550 int ps = parallelism;
551 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
552 int len = ws.length;
553 // Sweep through slots, to keep lowest indices most populated
554 int k = 0;
555 while (k < len) {
556 if (ws[k] != null) {
557 ++k;
558 continue;
559 }
560 int s = workerCounts;
561 int tc = totalCountOf(s);
562 int rc = runningCountOf(s);
563 if (rc >= ps || tc >= ps)
564 break;
565 if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
566 ForkJoinWorkerThread w = createWorker(k);
567 if (w != null) {
568 ws[k++] = w;
569 w.start();
570 }
571 else {
572 updateWorkerCount(-1); // back out on failed creation
573 break;
574 }
575 }
576 }
577 }
578
579 // Execution methods
580
581 /**
582 * Common code for execute, invoke and submit
583 */
584 private <T> void doSubmit(ForkJoinTask<T> task) {
585 if (task == null)
586 throw new NullPointerException();
587 if (isShutdown())
588 throw new RejectedExecutionException();
589 if (workers == null)
590 ensureWorkerInitialization();
591 submissionQueue.offer(task);
592 signalIdleWorkers();
593 }
594
595 /**
596 * Performs the given task, returning its result upon completion.
597 *
598 * @param task the task
599 * @return the task's result
600 * @throws NullPointerException if the task is null
601 * @throws RejectedExecutionException if the task cannot be
602 * scheduled for execution
603 */
604 public <T> T invoke(ForkJoinTask<T> task) {
605 doSubmit(task);
606 return task.join();
607 }
608
609 /**
610 * Arranges for (asynchronous) execution of the given task.
611 *
612 * @param task the task
613 * @throws NullPointerException if the task is null
614 * @throws RejectedExecutionException if the task cannot be
615 * scheduled for execution
616 */
617 public void execute(ForkJoinTask<?> task) {
618 doSubmit(task);
619 }
620
621 // AbstractExecutorService methods
622
623 /**
624 * @throws NullPointerException if the task is null
625 * @throws RejectedExecutionException if the task cannot be
626 * scheduled for execution
627 */
628 public void execute(Runnable task) {
629 ForkJoinTask<?> job;
630 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
631 job = (ForkJoinTask<?>) task;
632 else
633 job = ForkJoinTask.adapt(task, null);
634 doSubmit(job);
635 }
636
637 /**
638 * @throws NullPointerException if the task is null
639 * @throws RejectedExecutionException if the task cannot be
640 * scheduled for execution
641 */
642 public <T> ForkJoinTask<T> submit(Callable<T> task) {
643 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
644 doSubmit(job);
645 return job;
646 }
647
648 /**
649 * @throws NullPointerException if the task is null
650 * @throws RejectedExecutionException if the task cannot be
651 * scheduled for execution
652 */
653 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
654 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
655 doSubmit(job);
656 return job;
657 }
658
659 /**
660 * @throws NullPointerException if the task is null
661 * @throws RejectedExecutionException if the task cannot be
662 * scheduled for execution
663 */
664 public ForkJoinTask<?> submit(Runnable task) {
665 ForkJoinTask<?> job;
666 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
667 job = (ForkJoinTask<?>) task;
668 else
669 job = ForkJoinTask.adapt(task, null);
670 doSubmit(job);
671 return job;
672 }
673
674 /**
675 * Submits a ForkJoinTask for execution.
676 *
677 * @param task the task to submit
678 * @return the task
679 * @throws NullPointerException if the task is null
680 * @throws RejectedExecutionException if the task cannot be
681 * scheduled for execution
682 */
683 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
684 doSubmit(task);
685 return task;
686 }
687
688
689 /**
690 * @throws NullPointerException {@inheritDoc}
691 * @throws RejectedExecutionException {@inheritDoc}
692 */
693 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
694 ArrayList<ForkJoinTask<T>> forkJoinTasks =
695 new ArrayList<ForkJoinTask<T>>(tasks.size());
696 for (Callable<T> task : tasks)
697 forkJoinTasks.add(ForkJoinTask.adapt(task));
698 invoke(new InvokeAll<T>(forkJoinTasks));
699
700 @SuppressWarnings({"unchecked", "rawtypes"})
701 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
702 return futures;
703 }
704
705 static final class InvokeAll<T> extends RecursiveAction {
706 final ArrayList<ForkJoinTask<T>> tasks;
707 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
708 public void compute() {
709 try { invokeAll(tasks); }
710 catch (Exception ignore) {}
711 }
712 private static final long serialVersionUID = -7914297376763021607L;
713 }
714
715 // Configuration and status settings and queries
716
717 /**
718 * Returns the factory used for constructing new workers.
719 *
720 * @return the factory used for constructing new workers
721 */
722 public ForkJoinWorkerThreadFactory getFactory() {
723 return factory;
724 }
725
726 /**
727 * Returns the handler for internal worker threads that terminate
728 * due to unrecoverable errors encountered while executing tasks.
729 *
730 * @return the handler, or {@code null} if none
731 */
732 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
733 Thread.UncaughtExceptionHandler h;
734 final ReentrantLock lock = this.workerLock;
735 lock.lock();
736 try {
737 h = ueh;
738 } finally {
739 lock.unlock();
740 }
741 return h;
742 }
743
744 /**
745 * Sets the handler for internal worker threads that terminate due
746 * to unrecoverable errors encountered while executing tasks.
747 * Unless set, the current default or ThreadGroup handler is used
748 * as handler.
749 *
750 * @param h the new handler
751 * @return the old handler, or {@code null} if none
752 * @throws SecurityException if a security manager exists and
753 * the caller is not permitted to modify threads
754 * because it does not hold {@link
755 * java.lang.RuntimePermission}{@code ("modifyThread")}
756 */
757 public Thread.UncaughtExceptionHandler
758 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
759 checkPermission();
760 Thread.UncaughtExceptionHandler old = null;
761 final ReentrantLock lock = this.workerLock;
762 lock.lock();
763 try {
764 old = ueh;
765 ueh = h;
766 ForkJoinWorkerThread[] ws = workers;
767 if (ws != null) {
768 for (int i = 0; i < ws.length; ++i) {
769 ForkJoinWorkerThread w = ws[i];
770 if (w != null)
771 w.setUncaughtExceptionHandler(h);
772 }
773 }
774 } finally {
775 lock.unlock();
776 }
777 return old;
778 }
779
780
781 /**
782 * Sets the target parallelism level of this pool.
783 *
784 * @param parallelism the target parallelism
785 * @throws IllegalArgumentException if parallelism less than or
786 * equal to zero or greater than maximum size bounds
787 * @throws SecurityException if a security manager exists and
788 * the caller is not permitted to modify threads
789 * because it does not hold {@link
790 * java.lang.RuntimePermission}{@code ("modifyThread")}
791 */
792 public void setParallelism(int parallelism) {
793 checkPermission();
794 if (parallelism <= 0 || parallelism > maxPoolSize)
795 throw new IllegalArgumentException();
796 final ReentrantLock lock = this.workerLock;
797 lock.lock();
798 try {
799 if (isProcessingTasks()) {
800 int p = this.parallelism;
801 this.parallelism = parallelism;
802 if (parallelism > p)
803 createAndStartAddedWorkers();
804 else
805 trimSpares();
806 }
807 } finally {
808 lock.unlock();
809 }
810 signalIdleWorkers();
811 }
812
813 /**
814 * Returns the targeted parallelism level of this pool.
815 *
816 * @return the targeted parallelism level of this pool
817 */
818 public int getParallelism() {
819 return parallelism;
820 }
821
822 /**
823 * Returns the number of worker threads that have started but not
824 * yet terminated. This result returned by this method may differ
825 * from {@link #getParallelism} when threads are created to
826 * maintain parallelism when others are cooperatively blocked.
827 *
828 * @return the number of worker threads
829 */
830 public int getPoolSize() {
831 return totalCountOf(workerCounts);
832 }
833
834 /**
835 * Returns the maximum number of threads allowed to exist in the
836 * pool. Unless set using {@link #setMaximumPoolSize}, the
837 * maximum is an implementation-defined value designed only to
838 * prevent runaway growth.
839 *
840 * @return the maximum
841 */
842 public int getMaximumPoolSize() {
843 return maxPoolSize;
844 }
845
846 /**
847 * Sets the maximum number of threads allowed to exist in the
848 * pool. The given value should normally be greater than or equal
849 * to the {@link #getParallelism parallelism} level. Setting this
850 * value has no effect on current pool size. It controls
851 * construction of new threads.
852 *
853 * @throws IllegalArgumentException if negative or greater than
854 * internal implementation limit
855 */
856 public void setMaximumPoolSize(int newMax) {
857 if (newMax < 0 || newMax > MAX_THREADS)
858 throw new IllegalArgumentException();
859 maxPoolSize = newMax;
860 }
861
862
863 /**
864 * Returns {@code true} if this pool dynamically maintains its
865 * target parallelism level. If false, new threads are added only
866 * to avoid possible starvation. This setting is by default true.
867 *
868 * @return {@code true} if maintains parallelism
869 */
870 public boolean getMaintainsParallelism() {
871 return maintainsParallelism;
872 }
873
874 /**
875 * Sets whether this pool dynamically maintains its target
876 * parallelism level. If false, new threads are added only to
877 * avoid possible starvation.
878 *
879 * @param enable {@code true} to maintain parallelism
880 */
881 public void setMaintainsParallelism(boolean enable) {
882 maintainsParallelism = enable;
883 }
884
885 /**
886 * Establishes local first-in-first-out scheduling mode for forked
887 * tasks that are never joined. This mode may be more appropriate
888 * than default locally stack-based mode in applications in which
889 * worker threads only process asynchronous tasks. This method is
890 * designed to be invoked only when the pool is quiescent, and
891 * typically only before any tasks are submitted. The effects of
892 * invocations at other times may be unpredictable.
893 *
894 * @param async if {@code true}, use locally FIFO scheduling
895 * @return the previous mode
896 * @see #getAsyncMode
897 */
898 public boolean setAsyncMode(boolean async) {
899 boolean oldMode = locallyFifo;
900 locallyFifo = async;
901 ForkJoinWorkerThread[] ws = workers;
902 if (ws != null) {
903 for (int i = 0; i < ws.length; ++i) {
904 ForkJoinWorkerThread t = ws[i];
905 if (t != null)
906 t.setAsyncMode(async);
907 }
908 }
909 return oldMode;
910 }
911
912 /**
913 * Returns {@code true} if this pool uses local first-in-first-out
914 * scheduling mode for forked tasks that are never joined.
915 *
916 * @return {@code true} if this pool uses async mode
917 * @see #setAsyncMode
918 */
919 public boolean getAsyncMode() {
920 return locallyFifo;
921 }
922
923 /**
924 * Returns an estimate of the number of worker threads that are
925 * not blocked waiting to join tasks or for other managed
926 * synchronization.
927 *
928 * @return the number of worker threads
929 */
930 public int getRunningThreadCount() {
931 return runningCountOf(workerCounts);
932 }
933
934 /**
935 * Returns an estimate of the number of threads that are currently
936 * stealing or executing tasks. This method may overestimate the
937 * number of active threads.
938 *
939 * @return the number of active threads
940 */
941 public int getActiveThreadCount() {
942 return activeCountOf(runControl);
943 }
944
945 /**
946 * Returns an estimate of the number of threads that are currently
947 * idle waiting for tasks. This method may underestimate the
948 * number of idle threads.
949 *
950 * @return the number of idle threads
951 */
952 final int getIdleThreadCount() {
953 int c = runningCountOf(workerCounts) - activeCountOf(runControl);
954 return (c <= 0) ? 0 : c;
955 }
956
957 /**
958 * Returns {@code true} if all worker threads are currently idle.
959 * An idle worker is one that cannot obtain a task to execute
960 * because none are available to steal from other threads, and
961 * there are no pending submissions to the pool. This method is
962 * conservative; it might not return {@code true} immediately upon
963 * idleness of all threads, but will eventually become true if
964 * threads remain inactive.
965 *
966 * @return {@code true} if all threads are currently idle
967 */
968 public boolean isQuiescent() {
969 return activeCountOf(runControl) == 0;
970 }
971
972 /**
973 * Returns an estimate of the total number of tasks stolen from
974 * one thread's work queue by another. The reported value
975 * underestimates the actual total number of steals when the pool
976 * is not quiescent. This value may be useful for monitoring and
977 * tuning fork/join programs: in general, steal counts should be
978 * high enough to keep threads busy, but low enough to avoid
979 * overhead and contention across threads.
980 *
981 * @return the number of steals
982 */
983 public long getStealCount() {
984 return stealCount.get();
985 }
986
987 /**
988 * Accumulates steal count from a worker.
989 * Call only when worker known to be idle.
990 */
991 private void updateStealCount(ForkJoinWorkerThread w) {
992 int sc = w.getAndClearStealCount();
993 if (sc != 0)
994 stealCount.addAndGet(sc);
995 }
996
997 /**
998 * Returns an estimate of the total number of tasks currently held
999 * in queues by worker threads (but not including tasks submitted
1000 * to the pool that have not begun executing). This value is only
1001 * an approximation, obtained by iterating across all threads in
1002 * the pool. This method may be useful for tuning task
1003 * granularities.
1004 *
1005 * @return the number of queued tasks
1006 */
1007 public long getQueuedTaskCount() {
1008 long count = 0;
1009 ForkJoinWorkerThread[] ws = workers;
1010 if (ws != null) {
1011 for (int i = 0; i < ws.length; ++i) {
1012 ForkJoinWorkerThread t = ws[i];
1013 if (t != null)
1014 count += t.getQueueSize();
1015 }
1016 }
1017 return count;
1018 }
1019
1020 /**
1021 * Returns an estimate of the number of tasks submitted to this
1022 * pool that have not yet begun executing. This method takes time
1023 * proportional to the number of submissions.
1024 *
1025 * @return the number of queued submissions
1026 */
1027 public int getQueuedSubmissionCount() {
1028 return submissionQueue.size();
1029 }
1030
1031 /**
1032 * Returns {@code true} if there are any tasks submitted to this
1033 * pool that have not yet begun executing.
1034 *
1035 * @return {@code true} if there are any queued submissions
1036 */
1037 public boolean hasQueuedSubmissions() {
1038 return !submissionQueue.isEmpty();
1039 }
1040
1041 /**
1042 * Removes and returns the next unexecuted submission if one is
1043 * available. This method may be useful in extensions to this
1044 * class that re-assign work in systems with multiple pools.
1045 *
1046 * @return the next submission, or {@code null} if none
1047 */
1048 protected ForkJoinTask<?> pollSubmission() {
1049 return submissionQueue.poll();
1050 }
1051
1052 /**
1053 * Removes all available unexecuted submitted and forked tasks
1054 * from scheduling queues and adds them to the given collection,
1055 * without altering their execution status. These may include
1056 * artificially generated or wrapped tasks. This method is
1057 * designed to be invoked only when the pool is known to be
1058 * quiescent. Invocations at other times may not remove all
1059 * tasks. A failure encountered while attempting to add elements
1060 * to collection {@code c} may result in elements being in
1061 * neither, either or both collections when the associated
1062 * exception is thrown. The behavior of this operation is
1063 * undefined if the specified collection is modified while the
1064 * operation is in progress.
1065 *
1066 * @param c the collection to transfer elements into
1067 * @return the number of elements transferred
1068 */
1069 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1070 int n = submissionQueue.drainTo(c);
1071 ForkJoinWorkerThread[] ws = workers;
1072 if (ws != null) {
1073 for (int i = 0; i < ws.length; ++i) {
1074 ForkJoinWorkerThread w = ws[i];
1075 if (w != null)
1076 n += w.drainTasksTo(c);
1077 }
1078 }
1079 return n;
1080 }
1081
1082 /**
1083 * Returns a string identifying this pool, as well as its state,
1084 * including indications of run state, parallelism level, and
1085 * worker and task counts.
1086 *
1087 * @return a string identifying this pool, as well as its state
1088 */
1089 public String toString() {
1090 int ps = parallelism;
1091 int wc = workerCounts;
1092 int rc = runControl;
1093 long st = getStealCount();
1094 long qt = getQueuedTaskCount();
1095 long qs = getQueuedSubmissionCount();
1096 return super.toString() +
1097 "[" + runStateToString(runStateOf(rc)) +
1098 ", parallelism = " + ps +
1099 ", size = " + totalCountOf(wc) +
1100 ", active = " + activeCountOf(rc) +
1101 ", running = " + runningCountOf(wc) +
1102 ", steals = " + st +
1103 ", tasks = " + qt +
1104 ", submissions = " + qs +
1105 "]";
1106 }
1107
1108 private static String runStateToString(int rs) {
1109 switch (rs) {
1110 case RUNNING: return "Running";
1111 case SHUTDOWN: return "Shutting down";
1112 case TERMINATING: return "Terminating";
1113 case TERMINATED: return "Terminated";
1114 default: throw new Error("Unknown run state");
1115 }
1116 }
1117
1118 // lifecycle control
1119
1120 /**
1121 * Initiates an orderly shutdown in which previously submitted
1122 * tasks are executed, but no new tasks will be accepted.
1123 * Invocation has no additional effect if already shut down.
1124 * Tasks that are in the process of being submitted concurrently
1125 * during the course of this method may or may not be rejected.
1126 *
1127 * @throws SecurityException if a security manager exists and
1128 * the caller is not permitted to modify threads
1129 * because it does not hold {@link
1130 * java.lang.RuntimePermission}{@code ("modifyThread")}
1131 */
1132 public void shutdown() {
1133 checkPermission();
1134 transitionRunStateTo(SHUTDOWN);
1135 if (canTerminateOnShutdown(runControl)) {
1136 if (workers == null) { // shutting down before workers created
1137 final ReentrantLock lock = this.workerLock;
1138 lock.lock();
1139 try {
1140 if (workers == null) {
1141 terminate();
1142 transitionRunStateTo(TERMINATED);
1143 termination.signalAll();
1144 }
1145 } finally {
1146 lock.unlock();
1147 }
1148 }
1149 terminateOnShutdown();
1150 }
1151 }
1152
1153 /**
1154 * Attempts to cancel and/or stop all tasks, and reject all
1155 * subsequently submitted tasks. Tasks that are in the process of
1156 * being submitted or executed concurrently during the course of
1157 * this method may or may not be rejected. This method cancels
1158 * both existing and unexecuted tasks, in order to permit
1159 * termination in the presence of task dependencies. So the method
1160 * always returns an empty list (unlike the case for some other
1161 * Executors).
1162 *
1163 * @return an empty list
1164 * @throws SecurityException if a security manager exists and
1165 * the caller is not permitted to modify threads
1166 * because it does not hold {@link
1167 * java.lang.RuntimePermission}{@code ("modifyThread")}
1168 */
1169 public List<Runnable> shutdownNow() {
1170 checkPermission();
1171 terminate();
1172 return Collections.emptyList();
1173 }
1174
1175 /**
1176 * Returns {@code true} if all tasks have completed following shut down.
1177 *
1178 * @return {@code true} if all tasks have completed following shut down
1179 */
1180 public boolean isTerminated() {
1181 return runStateOf(runControl) == TERMINATED;
1182 }
1183
1184 /**
1185 * Returns {@code true} if the process of termination has
1186 * commenced but not yet completed. This method may be useful for
1187 * debugging. A return of {@code true} reported a sufficient
1188 * period after shutdown may indicate that submitted tasks have
1189 * ignored or suppressed interruption, causing this executor not
1190 * to properly terminate.
1191 *
1192 * @return {@code true} if terminating but not yet terminated
1193 */
1194 public boolean isTerminating() {
1195 return runStateOf(runControl) == TERMINATING;
1196 }
1197
1198 /**
1199 * Returns {@code true} if this pool has been shut down.
1200 *
1201 * @return {@code true} if this pool has been shut down
1202 */
1203 public boolean isShutdown() {
1204 return runStateOf(runControl) >= SHUTDOWN;
1205 }
1206
1207 /**
1208 * Returns true if pool is not terminating or terminated.
1209 * Used internally to suppress execution when terminating.
1210 */
1211 final boolean isProcessingTasks() {
1212 return runStateOf(runControl) < TERMINATING;
1213 }
1214
1215 /**
1216 * Blocks until all tasks have completed execution after a shutdown
1217 * request, or the timeout occurs, or the current thread is
1218 * interrupted, whichever happens first.
1219 *
1220 * @param timeout the maximum time to wait
1221 * @param unit the time unit of the timeout argument
1222 * @return {@code true} if this executor terminated and
1223 * {@code false} if the timeout elapsed before termination
1224 * @throws InterruptedException if interrupted while waiting
1225 */
1226 public boolean awaitTermination(long timeout, TimeUnit unit)
1227 throws InterruptedException {
1228 long nanos = unit.toNanos(timeout);
1229 final ReentrantLock lock = this.workerLock;
1230 lock.lock();
1231 try {
1232 for (;;) {
1233 if (isTerminated())
1234 return true;
1235 if (nanos <= 0)
1236 return false;
1237 nanos = termination.awaitNanos(nanos);
1238 }
1239 } finally {
1240 lock.unlock();
1241 }
1242 }
1243
1244 // Shutdown and termination support
1245
1246 /**
1247 * Callback from terminating worker. Nulls out the corresponding
1248 * workers slot, and if terminating, tries to terminate; else
1249 * tries to shrink workers array.
1250 *
1251 * @param w the worker
1252 */
1253 final void workerTerminated(ForkJoinWorkerThread w) {
1254 updateStealCount(w);
1255 updateWorkerCount(-1);
1256 final ReentrantLock lock = this.workerLock;
1257 lock.lock();
1258 try {
1259 ForkJoinWorkerThread[] ws = workers;
1260 if (ws != null) {
1261 int idx = w.poolIndex;
1262 if (idx >= 0 && idx < ws.length && ws[idx] == w)
1263 ws[idx] = null;
1264 if (totalCountOf(workerCounts) == 0) {
1265 terminate(); // no-op if already terminating
1266 transitionRunStateTo(TERMINATED);
1267 termination.signalAll();
1268 }
1269 else if (isProcessingTasks()) {
1270 tryShrinkWorkerArray();
1271 tryResumeSpare(true); // allow replacement
1272 }
1273 }
1274 } finally {
1275 lock.unlock();
1276 }
1277 signalIdleWorkers();
1278 }
1279
1280 /**
1281 * Initiates termination.
1282 */
1283 private void terminate() {
1284 if (transitionRunStateTo(TERMINATING)) {
1285 stopAllWorkers();
1286 resumeAllSpares();
1287 signalIdleWorkers();
1288 cancelQueuedSubmissions();
1289 cancelQueuedWorkerTasks();
1290 interruptUnterminatedWorkers();
1291 signalIdleWorkers(); // resignal after interrupt
1292 }
1293 }
1294
1295 /**
1296 * Possibly terminates when on shutdown state.
1297 */
1298 private void terminateOnShutdown() {
1299 if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1300 terminate();
1301 }
1302
1303 /**
1304 * Clears out and cancels submissions.
1305 */
1306 private void cancelQueuedSubmissions() {
1307 ForkJoinTask<?> task;
1308 while ((task = pollSubmission()) != null)
1309 task.cancel(false);
1310 }
1311
1312 /**
1313 * Cleans out worker queues.
1314 */
1315 private void cancelQueuedWorkerTasks() {
1316 final ReentrantLock lock = this.workerLock;
1317 lock.lock();
1318 try {
1319 ForkJoinWorkerThread[] ws = workers;
1320 if (ws != null) {
1321 for (int i = 0; i < ws.length; ++i) {
1322 ForkJoinWorkerThread t = ws[i];
1323 if (t != null)
1324 t.cancelTasks();
1325 }
1326 }
1327 } finally {
1328 lock.unlock();
1329 }
1330 }
1331
1332 /**
1333 * Sets each worker's status to terminating. Requires lock to avoid
1334 * conflicts with add/remove.
1335 */
1336 private void stopAllWorkers() {
1337 final ReentrantLock lock = this.workerLock;
1338 lock.lock();
1339 try {
1340 ForkJoinWorkerThread[] ws = workers;
1341 if (ws != null) {
1342 for (int i = 0; i < ws.length; ++i) {
1343 ForkJoinWorkerThread t = ws[i];
1344 if (t != null)
1345 t.shutdownNow();
1346 }
1347 }
1348 } finally {
1349 lock.unlock();
1350 }
1351 }
1352
1353 /**
1354 * Interrupts all unterminated workers. This is not required for
1355 * sake of internal control, but may help unstick user code during
1356 * shutdown.
1357 */
1358 private void interruptUnterminatedWorkers() {
1359 final ReentrantLock lock = this.workerLock;
1360 lock.lock();
1361 try {
1362 ForkJoinWorkerThread[] ws = workers;
1363 if (ws != null) {
1364 for (int i = 0; i < ws.length; ++i) {
1365 ForkJoinWorkerThread t = ws[i];
1366 if (t != null && !t.isTerminated()) {
1367 try {
1368 t.interrupt();
1369 } catch (SecurityException ignore) {
1370 }
1371 }
1372 }
1373 }
1374 } finally {
1375 lock.unlock();
1376 }
1377 }
1378
1379
1380 /*
1381 * Nodes for event barrier to manage idle threads. Queue nodes
1382 * are basic Treiber stack nodes, also used for spare stack.
1383 *
1384 * The event barrier has an event count and a wait queue (actually
1385 * a Treiber stack). Workers are enabled to look for work when
1386 * the eventCount is incremented. If they fail to find work, they
1387 * may wait for next count. Upon release, threads help others wake
1388 * up.
1389 *
1390 * Synchronization events occur only in enough contexts to
1391 * maintain overall liveness:
1392 *
1393 * - Submission of a new task to the pool
1394 * - Resizes or other changes to the workers array
1395 * - pool termination
1396 * - A worker pushing a task on an empty queue
1397 *
1398 * The case of pushing a task occurs often enough, and is heavy
1399 * enough compared to simple stack pushes, to require special
1400 * handling: Method signalWork returns without advancing count if
1401 * the queue appears to be empty. This would ordinarily result in
1402 * races causing some queued waiters not to be woken up. To avoid
1403 * this, the first worker enqueued in method sync (see
1404 * syncIsReleasable) rescans for tasks after being enqueued, and
1405 * helps signal if any are found. This works well because the
1406 * worker has nothing better to do, and so might as well help
1407 * alleviate the overhead and contention on the threads actually
1408 * doing work. Also, since event counts increments on task
1409 * availability exist to maintain liveness (rather than to force
1410 * refreshes etc), it is OK for callers to exit early if
1411 * contending with another signaller.
1412 */
1413 static final class WaitQueueNode {
1414 WaitQueueNode next; // only written before enqueued
1415 volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1416 final long count; // unused for spare stack
1417
1418 WaitQueueNode(long c, ForkJoinWorkerThread w) {
1419 count = c;
1420 thread = w;
1421 }
1422
1423 /**
1424 * Wakes up waiter, returning false if known to already
1425 */
1426 boolean signal() {
1427 ForkJoinWorkerThread t = thread;
1428 if (t == null)
1429 return false;
1430 thread = null;
1431 LockSupport.unpark(t);
1432 return true;
1433 }
1434
1435 /**
1436 * Awaits release on sync.
1437 */
1438 void awaitSyncRelease(ForkJoinPool p) {
1439 while (thread != null && !p.syncIsReleasable(this))
1440 LockSupport.park(this);
1441 }
1442
1443 /**
1444 * Awaits resumption as spare.
1445 */
1446 void awaitSpareRelease() {
1447 while (thread != null) {
1448 if (!Thread.interrupted())
1449 LockSupport.park(this);
1450 }
1451 }
1452 }
1453
1454 /**
1455 * Ensures that no thread is waiting for count to advance from the
1456 * current value of eventCount read on entry to this method, by
1457 * releasing waiting threads if necessary.
1458 *
1459 * @return the count
1460 */
1461 final long ensureSync() {
1462 long c = eventCount;
1463 WaitQueueNode q;
1464 while ((q = syncStack) != null && q.count < c) {
1465 if (casBarrierStack(q, null)) {
1466 do {
1467 q.signal();
1468 } while ((q = q.next) != null);
1469 break;
1470 }
1471 }
1472 return c;
1473 }
1474
1475 /**
1476 * Increments event count and releases waiting threads.
1477 */
1478 private void signalIdleWorkers() {
1479 long c;
1480 do {} while (!casEventCount(c = eventCount, c+1));
1481 ensureSync();
1482 }
1483
1484 /**
1485 * Signals threads waiting to poll a task. Because method sync
1486 * rechecks availability, it is OK to only proceed if queue
1487 * appears to be non-empty, and OK to skip under contention to
1488 * increment count (since some other thread succeeded).
1489 */
1490 final void signalWork() {
1491 long c;
1492 WaitQueueNode q;
1493 if (syncStack != null &&
1494 casEventCount(c = eventCount, c+1) &&
1495 (((q = syncStack) != null && q.count <= c) &&
1496 (!casBarrierStack(q, q.next) || !q.signal())))
1497 ensureSync();
1498 }
1499
1500 /**
1501 * Waits until event count advances from last value held by
1502 * caller, or if excess threads, caller is resumed as spare, or
1503 * caller or pool is terminating. Updates caller's event on exit.
1504 *
1505 * @param w the calling worker thread
1506 */
1507 final void sync(ForkJoinWorkerThread w) {
1508 updateStealCount(w); // Transfer w's count while it is idle
1509
1510 while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1511 long prev = w.lastEventCount;
1512 WaitQueueNode node = null;
1513 WaitQueueNode h;
1514 while (eventCount == prev &&
1515 ((h = syncStack) == null || h.count == prev)) {
1516 if (node == null)
1517 node = new WaitQueueNode(prev, w);
1518 if (casBarrierStack(node.next = h, node)) {
1519 node.awaitSyncRelease(this);
1520 break;
1521 }
1522 }
1523 long ec = ensureSync();
1524 if (ec != prev) {
1525 w.lastEventCount = ec;
1526 break;
1527 }
1528 }
1529 }
1530
1531 /**
1532 * Returns {@code true} if worker waiting on sync can proceed:
1533 * - on signal (thread == null)
1534 * - on event count advance (winning race to notify vs signaller)
1535 * - on interrupt
1536 * - if the first queued node, we find work available
1537 * If node was not signalled and event count not advanced on exit,
1538 * then we also help advance event count.
1539 *
1540 * @return {@code true} if node can be released
1541 */
1542 final boolean syncIsReleasable(WaitQueueNode node) {
1543 long prev = node.count;
1544 if (!Thread.interrupted() && node.thread != null &&
1545 (node.next != null ||
1546 !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1547 eventCount == prev)
1548 return false;
1549 if (node.thread != null) {
1550 node.thread = null;
1551 long ec = eventCount;
1552 if (prev <= ec) // help signal
1553 casEventCount(ec, ec+1);
1554 }
1555 return true;
1556 }
1557
1558 /**
1559 * Returns {@code true} if a new sync event occurred since last
1560 * call to sync or this method, if so, updating caller's count.
1561 */
1562 final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1563 long lc = w.lastEventCount;
1564 long ec = ensureSync();
1565 if (ec == lc)
1566 return false;
1567 w.lastEventCount = ec;
1568 return true;
1569 }
1570
1571 // Parallelism maintenance
1572
1573 /**
1574 * Decrements running count; if too low, adds spare.
1575 *
1576 * Conceptually, all we need to do here is add or resume a
1577 * spare thread when one is about to block (and remove or
1578 * suspend it later when unblocked -- see suspendIfSpare).
1579 * However, implementing this idea requires coping with
1580 * several problems: we have imperfect information about the
1581 * states of threads. Some count updates can and usually do
1582 * lag run state changes, despite arrangements to keep them
1583 * accurate (for example, when possible, updating counts
1584 * before signalling or resuming), especially when running on
1585 * dynamic JVMs that don't optimize the infrequent paths that
1586 * update counts. Generating too many threads can make these
1587 * problems become worse, because excess threads are more
1588 * likely to be context-switched with others, slowing them all
1589 * down, especially if there is no work available, so all are
1590 * busy scanning or idling. Also, excess spare threads can
1591 * only be suspended or removed when they are idle, not
1592 * immediately when they aren't needed. So adding threads will
1593 * raise parallelism level for longer than necessary. Also,
1594 * FJ applications often encounter highly transient peaks when
1595 * many threads are blocked joining, but for less time than it
1596 * takes to create or resume spares.
1597 *
1598 * @param joinMe if non-null, return early if done
1599 * @param maintainParallelism if true, try to stay within
1600 * target counts, else create only to avoid starvation
1601 * @return true if joinMe known to be done
1602 */
1603 final boolean preJoin(ForkJoinTask<?> joinMe,
1604 boolean maintainParallelism) {
1605 maintainParallelism &= maintainsParallelism; // overrride
1606 boolean dec = false; // true when running count decremented
1607 while (spareStack == null || !tryResumeSpare(dec)) {
1608 int counts = workerCounts;
1609 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1610 if (!needSpare(counts, maintainParallelism))
1611 break;
1612 if (joinMe.status < 0)
1613 return true;
1614 if (tryAddSpare(counts))
1615 break;
1616 }
1617 }
1618 return false;
1619 }
1620
1621 /**
1622 * Same idea as preJoin
1623 */
1624 final boolean preBlock(ManagedBlocker blocker,
1625 boolean maintainParallelism) {
1626 maintainParallelism &= maintainsParallelism;
1627 boolean dec = false;
1628 while (spareStack == null || !tryResumeSpare(dec)) {
1629 int counts = workerCounts;
1630 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1631 if (!needSpare(counts, maintainParallelism))
1632 break;
1633 if (blocker.isReleasable())
1634 return true;
1635 if (tryAddSpare(counts))
1636 break;
1637 }
1638 }
1639 return false;
1640 }
1641
1642 /**
1643 * Returns {@code true} if a spare thread appears to be needed.
1644 * If maintaining parallelism, returns true when the deficit in
1645 * running threads is more than the surplus of total threads, and
1646 * there is apparently some work to do. This self-limiting rule
1647 * means that the more threads that have already been added, the
1648 * less parallelism we will tolerate before adding another.
1649 *
1650 * @param counts current worker counts
1651 * @param maintainParallelism try to maintain parallelism
1652 */
1653 private boolean needSpare(int counts, boolean maintainParallelism) {
1654 int ps = parallelism;
1655 int rc = runningCountOf(counts);
1656 int tc = totalCountOf(counts);
1657 int runningDeficit = ps - rc;
1658 int totalSurplus = tc - ps;
1659 return (tc < maxPoolSize &&
1660 (rc == 0 || totalSurplus < 0 ||
1661 (maintainParallelism &&
1662 runningDeficit > totalSurplus &&
1663 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1664 }
1665
1666 /**
1667 * Adds a spare worker if lock available and no more than the
1668 * expected numbers of threads exist.
1669 *
1670 * @return true if successful
1671 */
1672 private boolean tryAddSpare(int expectedCounts) {
1673 final ReentrantLock lock = this.workerLock;
1674 int expectedRunning = runningCountOf(expectedCounts);
1675 int expectedTotal = totalCountOf(expectedCounts);
1676 boolean success = false;
1677 boolean locked = false;
1678 // confirm counts while locking; CAS after obtaining lock
1679 try {
1680 for (;;) {
1681 int s = workerCounts;
1682 int tc = totalCountOf(s);
1683 int rc = runningCountOf(s);
1684 if (rc > expectedRunning || tc > expectedTotal)
1685 break;
1686 if (!locked && !(locked = lock.tryLock()))
1687 break;
1688 if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1689 createAndStartSpare(tc);
1690 success = true;
1691 break;
1692 }
1693 }
1694 } finally {
1695 if (locked)
1696 lock.unlock();
1697 }
1698 return success;
1699 }
1700
1701 /**
1702 * Adds the kth spare worker. On entry, pool counts are already
1703 * adjusted to reflect addition.
1704 */
1705 private void createAndStartSpare(int k) {
1706 ForkJoinWorkerThread w = null;
1707 ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1708 int len = ws.length;
1709 // Probably, we can place at slot k. If not, find empty slot
1710 if (k < len && ws[k] != null) {
1711 for (k = 0; k < len && ws[k] != null; ++k)
1712 ;
1713 }
1714 if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1715 ws[k] = w;
1716 w.start();
1717 }
1718 else
1719 updateWorkerCount(-1); // adjust on failure
1720 signalIdleWorkers();
1721 }
1722
1723 /**
1724 * Suspends calling thread w if there are excess threads. Called
1725 * only from sync. Spares are enqueued in a Treiber stack using
1726 * the same WaitQueueNodes as barriers. They are resumed mainly
1727 * in preJoin, but are also woken on pool events that require all
1728 * threads to check run state.
1729 *
1730 * @param w the caller
1731 */
1732 private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1733 WaitQueueNode node = null;
1734 int s;
1735 while (parallelism < runningCountOf(s = workerCounts)) {
1736 if (node == null)
1737 node = new WaitQueueNode(0, w);
1738 if (casWorkerCounts(s, s-1)) { // representation-dependent
1739 // push onto stack
1740 do {} while (!casSpareStack(node.next = spareStack, node));
1741 // block until released by resumeSpare
1742 node.awaitSpareRelease();
1743 return true;
1744 }
1745 }
1746 return false;
1747 }
1748
1749 /**
1750 * Tries to pop and resume a spare thread.
1751 *
1752 * @param updateCount if true, increment running count on success
1753 * @return true if successful
1754 */
1755 private boolean tryResumeSpare(boolean updateCount) {
1756 WaitQueueNode q;
1757 while ((q = spareStack) != null) {
1758 if (casSpareStack(q, q.next)) {
1759 if (updateCount)
1760 updateRunningCount(1);
1761 q.signal();
1762 return true;
1763 }
1764 }
1765 return false;
1766 }
1767
1768 /**
1769 * Pops and resumes all spare threads. Same idea as ensureSync.
1770 *
1771 * @return true if any spares released
1772 */
1773 private boolean resumeAllSpares() {
1774 WaitQueueNode q;
1775 while ( (q = spareStack) != null) {
1776 if (casSpareStack(q, null)) {
1777 do {
1778 updateRunningCount(1);
1779 q.signal();
1780 } while ((q = q.next) != null);
1781 return true;
1782 }
1783 }
1784 return false;
1785 }
1786
1787 /**
1788 * Pops and shuts down excessive spare threads. Call only while
1789 * holding lock. This is not guaranteed to eliminate all excess
1790 * threads, only those suspended as spares, which are the ones
1791 * unlikely to be needed in the future.
1792 */
1793 private void trimSpares() {
1794 int surplus = totalCountOf(workerCounts) - parallelism;
1795 WaitQueueNode q;
1796 while (surplus > 0 && (q = spareStack) != null) {
1797 if (casSpareStack(q, null)) {
1798 do {
1799 updateRunningCount(1);
1800 ForkJoinWorkerThread w = q.thread;
1801 if (w != null && surplus > 0 &&
1802 runningCountOf(workerCounts) > 0 && w.shutdown())
1803 --surplus;
1804 q.signal();
1805 } while ((q = q.next) != null);
1806 }
1807 }
1808 }
1809
1810 /**
1811 * Interface for extending managed parallelism for tasks running
1812 * in {@link ForkJoinPool}s.
1813 *
1814 * <p>A {@code ManagedBlocker} provides two methods.
1815 * Method {@code isReleasable} must return {@code true} if
1816 * blocking is not necessary. Method {@code block} blocks the
1817 * current thread if necessary (perhaps internally invoking
1818 * {@code isReleasable} before actually blocking).
1819 *
1820 * <p>For example, here is a ManagedBlocker based on a
1821 * ReentrantLock:
1822 * <pre> {@code
1823 * class ManagedLocker implements ManagedBlocker {
1824 * final ReentrantLock lock;
1825 * boolean hasLock = false;
1826 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1827 * public boolean block() {
1828 * if (!hasLock)
1829 * lock.lock();
1830 * return true;
1831 * }
1832 * public boolean isReleasable() {
1833 * return hasLock || (hasLock = lock.tryLock());
1834 * }
1835 * }}</pre>
1836 */
1837 public static interface ManagedBlocker {
1838 /**
1839 * Possibly blocks the current thread, for example waiting for
1840 * a lock or condition.
1841 *
1842 * @return {@code true} if no additional blocking is necessary
1843 * (i.e., if isReleasable would return true)
1844 * @throws InterruptedException if interrupted while waiting
1845 * (the method is not required to do so, but is allowed to)
1846 */
1847 boolean block() throws InterruptedException;
1848
1849 /**
1850 * Returns {@code true} if blocking is unnecessary.
1851 */
1852 boolean isReleasable();
1853 }
1854
1855 /**
1856 * Blocks in accord with the given blocker. If the current thread
1857 * is a {@link ForkJoinWorkerThread}, this method possibly
1858 * arranges for a spare thread to be activated if necessary to
1859 * ensure parallelism while the current thread is blocked.
1860 *
1861 * <p>If {@code maintainParallelism} is {@code true} and the pool
1862 * supports it ({@link #getMaintainsParallelism}), this method
1863 * attempts to maintain the pool's nominal parallelism. Otherwise
1864 * it activates a thread only if necessary to avoid complete
1865 * starvation. This option may be preferable when blockages use
1866 * timeouts, or are almost always brief.
1867 *
1868 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1869 * behaviorally equivalent to
1870 * <pre> {@code
1871 * while (!blocker.isReleasable())
1872 * if (blocker.block())
1873 * return;
1874 * }</pre>
1875 *
1876 * If the caller is a {@code ForkJoinTask}, then the pool may
1877 * first be expanded to ensure parallelism, and later adjusted.
1878 *
1879 * @param blocker the blocker
1880 * @param maintainParallelism if {@code true} and supported by
1881 * this pool, attempt to maintain the pool's nominal parallelism;
1882 * otherwise activate a thread only if necessary to avoid
1883 * complete starvation.
1884 * @throws InterruptedException if blocker.block did so
1885 */
1886 public static void managedBlock(ManagedBlocker blocker,
1887 boolean maintainParallelism)
1888 throws InterruptedException {
1889 Thread t = Thread.currentThread();
1890 ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1891 ((ForkJoinWorkerThread) t).pool : null);
1892 if (!blocker.isReleasable()) {
1893 try {
1894 if (pool == null ||
1895 !pool.preBlock(blocker, maintainParallelism))
1896 awaitBlocker(blocker);
1897 } finally {
1898 if (pool != null)
1899 pool.updateRunningCount(1);
1900 }
1901 }
1902 }
1903
1904 private static void awaitBlocker(ManagedBlocker blocker)
1905 throws InterruptedException {
1906 do {} while (!blocker.isReleasable() && !blocker.block());
1907 }
1908
1909 // AbstractExecutorService overrides. These rely on undocumented
1910 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1911 // implement RunnableFuture.
1912
1913 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1914 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1915 }
1916
1917 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1918 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1919 }
1920
1921 // Unsafe mechanics
1922
1923 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1924 private static final long eventCountOffset =
1925 objectFieldOffset("eventCount", ForkJoinPool.class);
1926 private static final long workerCountsOffset =
1927 objectFieldOffset("workerCounts", ForkJoinPool.class);
1928 private static final long runControlOffset =
1929 objectFieldOffset("runControl", ForkJoinPool.class);
1930 private static final long syncStackOffset =
1931 objectFieldOffset("syncStack",ForkJoinPool.class);
1932 private static final long spareStackOffset =
1933 objectFieldOffset("spareStack", ForkJoinPool.class);
1934
1935 private boolean casEventCount(long cmp, long val) {
1936 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1937 }
1938 private boolean casWorkerCounts(int cmp, int val) {
1939 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1940 }
1941 private boolean casRunControl(int cmp, int val) {
1942 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1943 }
1944 private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1945 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1946 }
1947 private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1948 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1949 }
1950
1951 private static long objectFieldOffset(String field, Class<?> klazz) {
1952 try {
1953 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1954 } catch (NoSuchFieldException e) {
1955 // Convert Exception to corresponding Error
1956 NoSuchFieldError error = new NoSuchFieldError(field);
1957 error.initCause(e);
1958 throw error;
1959 }
1960 }
1961
1962 /**
1963 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1964 * Replace with a simple call to Unsafe.getUnsafe when integrating
1965 * into a jdk.
1966 *
1967 * @return a sun.misc.Unsafe
1968 */
1969 private static sun.misc.Unsafe getUnsafe() {
1970 try {
1971 return sun.misc.Unsafe.getUnsafe();
1972 } catch (SecurityException se) {
1973 try {
1974 return java.security.AccessController.doPrivileged
1975 (new java.security
1976 .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1977 public sun.misc.Unsafe run() throws Exception {
1978 java.lang.reflect.Field f = sun.misc
1979 .Unsafe.class.getDeclaredField("theUnsafe");
1980 f.setAccessible(true);
1981 return (sun.misc.Unsafe) f.get(null);
1982 }});
1983 } catch (java.security.PrivilegedActionException e) {
1984 throw new RuntimeException("Could not initialize intrinsics",
1985 e.getCause());
1986 }
1987 }
1988 }
1989 }