ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
Revision: 1.11
Committed: Wed Aug 12 04:10:59 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.10: +41 -12 lines
Log Message:
sync with jsr166 package

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.ArrayList;
10     import java.util.Arrays;
11     import java.util.Collection;
12     import java.util.Collections;
13     import java.util.List;
14     import java.util.concurrent.locks.Condition;
15     import java.util.concurrent.locks.LockSupport;
16     import java.util.concurrent.locks.ReentrantLock;
17     import java.util.concurrent.atomic.AtomicInteger;
18     import java.util.concurrent.atomic.AtomicLong;
19    
20     /**
21 jsr166 1.4 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
22 jsr166 1.8 * A {@code ForkJoinPool} provides the entry point for submissions
23     * from non-{@code ForkJoinTask}s, as well as management and
24 jsr166 1.11 * monitoring operations.
25 jsr166 1.1 *
26 jsr166 1.9 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
27     * ExecutorService} mainly by virtue of employing
28     * <em>work-stealing</em>: all threads in the pool attempt to find and
29     * execute subtasks created by other active tasks (eventually blocking
30     * waiting for work if none exist). This enables efficient processing
31     * when most tasks spawn other subtasks (as do most {@code
32     * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
33     * execution of some plain {@code Runnable}- or {@code Callable}-
34     * based activities along with {@code ForkJoinTask}s. When setting
35     * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
36     * also be appropriate for use with fine-grained tasks of any form
37     * that are never joined. Otherwise, other {@code ExecutorService}
38     * implementations are typically more appropriate choices.
39 jsr166 1.1 *
40 jsr166 1.9 * <p>A {@code ForkJoinPool} is constructed with a given target
41     * parallelism level; by default, equal to the number of available
42     * processors. Unless configured otherwise via {@link
43     * #setMaintainsParallelism}, the pool attempts to maintain this
44     * number of active (or available) threads by dynamically adding,
45     * suspending, or resuming internal worker threads, even if some tasks
46 jsr166 1.10 * are stalled waiting to join others. However, no such adjustments
47     * are performed in the face of blocked IO or other unmanaged
48 jsr166 1.8 * synchronization. The nested {@link ManagedBlocker} interface
49     * enables extension of the kinds of synchronization accommodated.
50     * The target parallelism level may also be changed dynamically
51 jsr166 1.9 * ({@link #setParallelism}). The total number of threads may be
52     * limited using method {@link #setMaximumPoolSize}, in which case it
53     * may become possible for the activities of a pool to stall due to
54     * the lack of available threads to process new tasks.
55 jsr166 1.1 *
56     * <p>In addition to execution and lifecycle control methods, this
57     * class provides status check methods (for example
58 jsr166 1.4 * {@link #getStealCount}) that are intended to aid in developing,
59 jsr166 1.1 * tuning, and monitoring fork/join applications. Also, method
60 jsr166 1.4 * {@link #toString} returns indications of pool state in a
61 jsr166 1.1 * convenient form for informal monitoring.
62     *
63 jsr166 1.9 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
64     * used for all parallel task execution in a program or subsystem.
65     * Otherwise, use would not usually outweigh the construction and
66     * bookkeeping overhead of creating a large set of threads. For
67     * example, a common pool could be used for the {@code SortTasks}
68     * illustrated in {@link RecursiveAction}. Because {@code
69     * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
70     * daemon} mode, there is typically no need to explicitly {@link
71     * #shutdown} such a pool upon program exit.
72     *
73     * <pre>
74     * static final ForkJoinPool mainPool = new ForkJoinPool();
75     * ...
76     * public void sort(long[] array) {
77     * mainPool.invoke(new SortTask(array, 0, array.length));
78     * }
79     * </pre>
80     *
81 jsr166 1.1 * <p><b>Implementation notes</b>: This implementation restricts the
82     * maximum number of running threads to 32767. Attempts to create
83 jsr166 1.11 * pools with greater than the maximum number result in
84 jsr166 1.8 * {@code IllegalArgumentException}.
85 jsr166 1.1 *
86 jsr166 1.11 * <p>This implementation rejects submitted tasks (that is, by throwing
87     * {@link RejectedExecutionException}) only when the pool is shut down.
88     *
89 jsr166 1.1 * @since 1.7
90     * @author Doug Lea
91     */
92     public class ForkJoinPool extends AbstractExecutorService {
93    
94     /*
95     * See the extended comments interspersed below for design,
96     * rationale, and walkthroughs.
97     */
98    
99     /** Mask for packing and unpacking shorts */
100     private static final int shortMask = 0xffff;
101    
102     /** Max pool size -- must be a power of two minus 1 */
103     private static final int MAX_THREADS = 0x7FFF;
104    
105     /**
106 jsr166 1.8 * Factory for creating new {@link ForkJoinWorkerThread}s.
107     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
108     * for {@code ForkJoinWorkerThread} subclasses that extend base
109     * functionality or initialize threads with different contexts.
110 jsr166 1.1 */
111     public static interface ForkJoinWorkerThreadFactory {
112     /**
113     * Returns a new worker thread operating in the given pool.
114     *
115     * @param pool the pool this thread works in
116 jsr166 1.11 * @throws NullPointerException if the pool is null
117 jsr166 1.1 */
118     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
119     }
120    
121     /**
122     * Default ForkJoinWorkerThreadFactory implementation; creates a
123     * new ForkJoinWorkerThread.
124     */
125     static class DefaultForkJoinWorkerThreadFactory
126     implements ForkJoinWorkerThreadFactory {
127     public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
128     try {
129     return new ForkJoinWorkerThread(pool);
130     } catch (OutOfMemoryError oom) {
131     return null;
132     }
133     }
134     }
135    
136     /**
137     * Creates a new ForkJoinWorkerThread. This factory is used unless
138     * overridden in ForkJoinPool constructors.
139     */
140     public static final ForkJoinWorkerThreadFactory
141     defaultForkJoinWorkerThreadFactory =
142     new DefaultForkJoinWorkerThreadFactory();
143    
144     /**
145     * Permission required for callers of methods that may start or
146     * kill threads.
147     */
148     private static final RuntimePermission modifyThreadPermission =
149     new RuntimePermission("modifyThread");
150    
151     /**
152     * If there is a security manager, makes sure caller has
153     * permission to modify threads.
154     */
155     private static void checkPermission() {
156     SecurityManager security = System.getSecurityManager();
157     if (security != null)
158     security.checkPermission(modifyThreadPermission);
159     }
160    
161     /**
162     * Generator for assigning sequence numbers as pool names.
163     */
164     private static final AtomicInteger poolNumberGenerator =
165     new AtomicInteger();
166    
167     /**
168     * Array holding all worker threads in the pool. Initialized upon
169     * first use. Array size must be a power of two. Updates and
170     * replacements are protected by workerLock, but it is always kept
171     * in a consistent enough state to be randomly accessed without
172     * locking by workers performing work-stealing.
173     */
174     volatile ForkJoinWorkerThread[] workers;
175    
176     /**
177     * Lock protecting access to workers.
178     */
179     private final ReentrantLock workerLock;
180    
181     /**
182     * Condition for awaitTermination.
183     */
184     private final Condition termination;
185    
186     /**
187     * The uncaught exception handler used when any worker
188     * abruptly terminates
189     */
190     private Thread.UncaughtExceptionHandler ueh;
191    
192     /**
193     * Creation factory for worker threads.
194     */
195     private final ForkJoinWorkerThreadFactory factory;
196    
197     /**
198     * Head of stack of threads that were created to maintain
199     * parallelism when other threads blocked, but have since
200     * suspended when the parallelism level rose.
201     */
202     private volatile WaitQueueNode spareStack;
203    
204     /**
205     * Sum of per-thread steal counts, updated only when threads are
206     * idle or terminating.
207     */
208     private final AtomicLong stealCount;
209    
210     /**
211     * Queue for external submissions.
212     */
213     private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
214    
215     /**
216     * Head of Treiber stack for barrier sync. See below for explanation.
217     */
218     private volatile WaitQueueNode syncStack;
219    
220     /**
221     * The count for event barrier
222     */
223     private volatile long eventCount;
224    
225     /**
226     * Pool number, just for assigning useful names to worker threads
227     */
228     private final int poolNumber;
229    
230     /**
231     * The maximum allowed pool size
232     */
233     private volatile int maxPoolSize;
234    
235     /**
236     * The desired parallelism level, updated only under workerLock.
237     */
238     private volatile int parallelism;
239    
240     /**
241     * True if use local fifo, not default lifo, for local polling
242     */
243     private volatile boolean locallyFifo;
244    
245     /**
246     * Holds number of total (i.e., created and not yet terminated)
247     * and running (i.e., not blocked on joins or other managed sync)
248     * threads, packed into one int to ensure consistent snapshot when
249     * making decisions about creating and suspending spare
250     * threads. Updated only by CAS. Note: CASes in
251     * updateRunningCount and preJoin assume that running active count
252     * is in low word, so need to be modified if this changes.
253     */
254     private volatile int workerCounts;
255    
256     private static int totalCountOf(int s) { return s >>> 16; }
257     private static int runningCountOf(int s) { return s & shortMask; }
258     private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
259    
260     /**
261     * Adds delta (which may be negative) to running count. This must
262     * be called before (with negative arg) and after (with positive)
263     * any managed synchronization (i.e., mainly, joins).
264     *
265     * @param delta the number to add
266     */
267     final void updateRunningCount(int delta) {
268     int s;
269     do {} while (!casWorkerCounts(s = workerCounts, s + delta));
270     }
271    
272     /**
273     * Adds delta (which may be negative) to both total and running
274     * count. This must be called upon creation and termination of
275     * worker threads.
276     *
277     * @param delta the number to add
278     */
279     private void updateWorkerCount(int delta) {
280     int d = delta + (delta << 16); // add to both lo and hi parts
281     int s;
282     do {} while (!casWorkerCounts(s = workerCounts, s + d));
283     }
284    
285     /**
286     * Lifecycle control. High word contains runState, low word
287     * contains the number of workers that are (probably) executing
288     * tasks. This value is atomically incremented before a worker
289     * gets a task to run, and decremented when worker has no tasks
290     * and cannot find any. These two fields are bundled together to
291     * support correct termination triggering. Note: activeCount
292     * CAS'es cheat by assuming active count is in low word, so need
293     * to be modified if this changes
294     */
295     private volatile int runControl;
296    
297     // RunState values. Order among values matters
298     private static final int RUNNING = 0;
299     private static final int SHUTDOWN = 1;
300     private static final int TERMINATING = 2;
301     private static final int TERMINATED = 3;
302    
303     private static int runStateOf(int c) { return c >>> 16; }
304     private static int activeCountOf(int c) { return c & shortMask; }
305     private static int runControlFor(int r, int a) { return (r << 16) + a; }
306    
307     /**
308     * Tries incrementing active count; fails on contention.
309     * Called by workers before/during executing tasks.
310     *
311     * @return true on success
312     */
313     final boolean tryIncrementActiveCount() {
314     int c = runControl;
315     return casRunControl(c, c+1);
316     }
317    
318     /**
319     * Tries decrementing active count; fails on contention.
320     * Possibly triggers termination on success.
321     * Called by workers when they can't find tasks.
322     *
323     * @return true on success
324     */
325     final boolean tryDecrementActiveCount() {
326     int c = runControl;
327     int nextc = c - 1;
328     if (!casRunControl(c, nextc))
329     return false;
330     if (canTerminateOnShutdown(nextc))
331     terminateOnShutdown();
332     return true;
333     }
334    
335     /**
336 jsr166 1.4 * Returns {@code true} if argument represents zero active count
337     * and nonzero runstate, which is the triggering condition for
338 jsr166 1.1 * terminating on shutdown.
339     */
340     private static boolean canTerminateOnShutdown(int c) {
341     // i.e. least bit is nonzero runState bit
342     return ((c & -c) >>> 16) != 0;
343     }
344    
345     /**
346     * Transition run state to at least the given state. Return true
347     * if not already at least given state.
348     */
349     private boolean transitionRunStateTo(int state) {
350     for (;;) {
351     int c = runControl;
352     if (runStateOf(c) >= state)
353     return false;
354     if (casRunControl(c, runControlFor(state, activeCountOf(c))))
355     return true;
356     }
357     }
358    
359     /**
360     * Controls whether to add spares to maintain parallelism
361     */
362     private volatile boolean maintainsParallelism;
363    
364     // Constructors
365    
366     /**
367 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
368     * java.lang.Runtime#availableProcessors}, and using the {@linkplain
369     * #defaultForkJoinWorkerThreadFactory default thread factory}.
370 jsr166 1.1 *
371     * @throws SecurityException if a security manager exists and
372     * the caller is not permitted to modify threads
373     * because it does not hold {@link
374     * java.lang.RuntimePermission}{@code ("modifyThread")}
375     */
376     public ForkJoinPool() {
377     this(Runtime.getRuntime().availableProcessors(),
378     defaultForkJoinWorkerThreadFactory);
379     }
380    
381     /**
382 jsr166 1.9 * Creates a {@code ForkJoinPool} with the indicated parallelism
383     * level and using the {@linkplain
384     * #defaultForkJoinWorkerThreadFactory default thread factory}.
385 jsr166 1.1 *
386 jsr166 1.9 * @param parallelism the parallelism level
387 jsr166 1.1 * @throws IllegalArgumentException if parallelism less than or
388 jsr166 1.11 * equal to zero, or greater than implementation limit
389 jsr166 1.1 * @throws SecurityException if a security manager exists and
390     * the caller is not permitted to modify threads
391     * because it does not hold {@link
392     * java.lang.RuntimePermission}{@code ("modifyThread")}
393     */
394     public ForkJoinPool(int parallelism) {
395     this(parallelism, defaultForkJoinWorkerThreadFactory);
396     }
397    
398     /**
399 jsr166 1.9 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
400     * java.lang.Runtime#availableProcessors}, and using the given
401     * thread factory.
402 jsr166 1.1 *
403     * @param factory the factory for creating new threads
404 jsr166 1.11 * @throws NullPointerException if the factory is null
405 jsr166 1.1 * @throws SecurityException if a security manager exists and
406     * the caller is not permitted to modify threads
407     * because it does not hold {@link
408     * java.lang.RuntimePermission}{@code ("modifyThread")}
409     */
410     public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
411     this(Runtime.getRuntime().availableProcessors(), factory);
412     }
413    
414     /**
415 jsr166 1.8 * Creates a {@code ForkJoinPool} with the given parallelism and
416     * thread factory.
417 jsr166 1.1 *
418 jsr166 1.9 * @param parallelism the parallelism level
419 jsr166 1.1 * @param factory the factory for creating new threads
420     * @throws IllegalArgumentException if parallelism less than or
421 jsr166 1.11 * equal to zero, or greater than implementation limit
422     * @throws NullPointerException if the factory is null
423 jsr166 1.1 * @throws SecurityException if a security manager exists and
424     * the caller is not permitted to modify threads
425     * because it does not hold {@link
426     * java.lang.RuntimePermission}{@code ("modifyThread")}
427     */
428     public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
429     if (parallelism <= 0 || parallelism > MAX_THREADS)
430     throw new IllegalArgumentException();
431     if (factory == null)
432     throw new NullPointerException();
433     checkPermission();
434     this.factory = factory;
435     this.parallelism = parallelism;
436     this.maxPoolSize = MAX_THREADS;
437     this.maintainsParallelism = true;
438     this.poolNumber = poolNumberGenerator.incrementAndGet();
439     this.workerLock = new ReentrantLock();
440     this.termination = workerLock.newCondition();
441     this.stealCount = new AtomicLong();
442     this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
443     // worker array and workers are lazily constructed
444     }
445    
446     /**
447     * Creates a new worker thread using factory.
448     *
449     * @param index the index to assign worker
450 jsr166 1.8 * @return new worker, or null if factory failed
451 jsr166 1.1 */
452     private ForkJoinWorkerThread createWorker(int index) {
453     Thread.UncaughtExceptionHandler h = ueh;
454     ForkJoinWorkerThread w = factory.newThread(this);
455     if (w != null) {
456     w.poolIndex = index;
457     w.setDaemon(true);
458     w.setAsyncMode(locallyFifo);
459     w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
460     if (h != null)
461     w.setUncaughtExceptionHandler(h);
462     }
463     return w;
464     }
465    
466     /**
467     * Returns a good size for worker array given pool size.
468     * Currently requires size to be a power of two.
469     */
470     private static int arraySizeFor(int poolSize) {
471 jsr166 1.9 if (poolSize <= 1)
472     return 1;
473     // See Hackers Delight, sec 3.2
474     int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
475     c |= c >>> 1;
476     c |= c >>> 2;
477     c |= c >>> 4;
478     c |= c >>> 8;
479     c |= c >>> 16;
480     return c + 1;
481 jsr166 1.1 }
482    
483     /**
484     * Creates or resizes array if necessary to hold newLength.
485     * Call only under exclusion.
486     *
487     * @return the array
488     */
489     private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
490     ForkJoinWorkerThread[] ws = workers;
491     if (ws == null)
492     return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
493     else if (newLength > ws.length)
494     return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
495     else
496     return ws;
497     }
498    
499     /**
500     * Tries to shrink workers into smaller array after one or more terminate.
501     */
502     private void tryShrinkWorkerArray() {
503     ForkJoinWorkerThread[] ws = workers;
504     if (ws != null) {
505     int len = ws.length;
506     int last = len - 1;
507     while (last >= 0 && ws[last] == null)
508     --last;
509     int newLength = arraySizeFor(last+1);
510     if (newLength < len)
511     workers = Arrays.copyOf(ws, newLength);
512     }
513     }
514    
515     /**
516     * Initializes workers if necessary.
517     */
518     final void ensureWorkerInitialization() {
519     ForkJoinWorkerThread[] ws = workers;
520     if (ws == null) {
521     final ReentrantLock lock = this.workerLock;
522     lock.lock();
523     try {
524     ws = workers;
525     if (ws == null) {
526     int ps = parallelism;
527     ws = ensureWorkerArrayCapacity(ps);
528     for (int i = 0; i < ps; ++i) {
529     ForkJoinWorkerThread w = createWorker(i);
530     if (w != null) {
531     ws[i] = w;
532     w.start();
533     updateWorkerCount(1);
534     }
535     }
536     }
537     } finally {
538     lock.unlock();
539     }
540     }
541     }
542    
543     /**
544     * Worker creation and startup for threads added via setParallelism.
545     */
546     private void createAndStartAddedWorkers() {
547     resumeAllSpares(); // Allow spares to convert to nonspare
548     int ps = parallelism;
549     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
550     int len = ws.length;
551     // Sweep through slots, to keep lowest indices most populated
552     int k = 0;
553     while (k < len) {
554     if (ws[k] != null) {
555     ++k;
556     continue;
557     }
558     int s = workerCounts;
559     int tc = totalCountOf(s);
560     int rc = runningCountOf(s);
561     if (rc >= ps || tc >= ps)
562     break;
563     if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
564     ForkJoinWorkerThread w = createWorker(k);
565     if (w != null) {
566     ws[k++] = w;
567     w.start();
568     }
569     else {
570     updateWorkerCount(-1); // back out on failed creation
571     break;
572     }
573     }
574     }
575     }
576    
577     // Execution methods
578    
579     /**
580     * Common code for execute, invoke and submit
581     */
582     private <T> void doSubmit(ForkJoinTask<T> task) {
583 jsr166 1.2 if (task == null)
584     throw new NullPointerException();
585 jsr166 1.1 if (isShutdown())
586     throw new RejectedExecutionException();
587     if (workers == null)
588     ensureWorkerInitialization();
589     submissionQueue.offer(task);
590     signalIdleWorkers();
591     }
592    
593     /**
594     * Performs the given task, returning its result upon completion.
595     *
596     * @param task the task
597     * @return the task's result
598 jsr166 1.11 * @throws NullPointerException if the task is null
599     * @throws RejectedExecutionException if the task cannot be
600     * scheduled for execution
601 jsr166 1.1 */
602     public <T> T invoke(ForkJoinTask<T> task) {
603     doSubmit(task);
604     return task.join();
605     }
606    
607     /**
608     * Arranges for (asynchronous) execution of the given task.
609     *
610     * @param task the task
611 jsr166 1.11 * @throws NullPointerException if the task is null
612     * @throws RejectedExecutionException if the task cannot be
613     * scheduled for execution
614 jsr166 1.1 */
615 jsr166 1.8 public void execute(ForkJoinTask<?> task) {
616 jsr166 1.1 doSubmit(task);
617     }
618    
619     // AbstractExecutorService methods
620    
621 jsr166 1.11 /**
622     * @throws NullPointerException if the task is null
623     * @throws RejectedExecutionException if the task cannot be
624     * scheduled for execution
625     */
626 jsr166 1.1 public void execute(Runnable task) {
627 jsr166 1.2 ForkJoinTask<?> job;
628 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
629     job = (ForkJoinTask<?>) task;
630 jsr166 1.2 else
631 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
632 jsr166 1.2 doSubmit(job);
633 jsr166 1.1 }
634    
635 jsr166 1.11 /**
636     * @throws NullPointerException if the task is null
637     * @throws RejectedExecutionException if the task cannot be
638     * scheduled for execution
639     */
640 jsr166 1.1 public <T> ForkJoinTask<T> submit(Callable<T> task) {
641 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
642 jsr166 1.1 doSubmit(job);
643     return job;
644     }
645    
646 jsr166 1.11 /**
647     * @throws NullPointerException if the task is null
648     * @throws RejectedExecutionException if the task cannot be
649     * scheduled for execution
650     */
651 jsr166 1.1 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
652 jsr166 1.7 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
653 jsr166 1.1 doSubmit(job);
654     return job;
655     }
656    
657 jsr166 1.11 /**
658     * @throws NullPointerException if the task is null
659     * @throws RejectedExecutionException if the task cannot be
660     * scheduled for execution
661     */
662 jsr166 1.1 public ForkJoinTask<?> submit(Runnable task) {
663 jsr166 1.2 ForkJoinTask<?> job;
664 jsr166 1.3 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
665     job = (ForkJoinTask<?>) task;
666 jsr166 1.2 else
667 jsr166 1.7 job = ForkJoinTask.adapt(task, null);
668 jsr166 1.1 doSubmit(job);
669     return job;
670     }
671    
672     /**
673 jsr166 1.2 * Submits a ForkJoinTask for execution.
674     *
675     * @param task the task to submit
676     * @return the task
677 jsr166 1.11 * @throws NullPointerException if the task is null
678 jsr166 1.2 * @throws RejectedExecutionException if the task cannot be
679     * scheduled for execution
680     */
681     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
682     doSubmit(task);
683     return task;
684     }
685    
686 jsr166 1.1
687 jsr166 1.11 /**
688     * @throws NullPointerException {@inheritDoc}
689     * @throws RejectedExecutionException {@inheritDoc}
690     */
691 jsr166 1.1 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
692     ArrayList<ForkJoinTask<T>> forkJoinTasks =
693     new ArrayList<ForkJoinTask<T>>(tasks.size());
694     for (Callable<T> task : tasks)
695 jsr166 1.7 forkJoinTasks.add(ForkJoinTask.adapt(task));
696 jsr166 1.1 invoke(new InvokeAll<T>(forkJoinTasks));
697    
698     @SuppressWarnings({"unchecked", "rawtypes"})
699     List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
700     return futures;
701     }
702    
703     static final class InvokeAll<T> extends RecursiveAction {
704     final ArrayList<ForkJoinTask<T>> tasks;
705     InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
706     public void compute() {
707     try { invokeAll(tasks); }
708     catch (Exception ignore) {}
709     }
710     private static final long serialVersionUID = -7914297376763021607L;
711     }
712    
713     // Configuration and status settings and queries
714    
715     /**
716     * Returns the factory used for constructing new workers.
717     *
718     * @return the factory used for constructing new workers
719     */
720     public ForkJoinWorkerThreadFactory getFactory() {
721     return factory;
722     }
723    
724     /**
725     * Returns the handler for internal worker threads that terminate
726     * due to unrecoverable errors encountered while executing tasks.
727     *
728 jsr166 1.4 * @return the handler, or {@code null} if none
729 jsr166 1.1 */
730     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
731     Thread.UncaughtExceptionHandler h;
732     final ReentrantLock lock = this.workerLock;
733     lock.lock();
734     try {
735     h = ueh;
736     } finally {
737     lock.unlock();
738     }
739     return h;
740     }
741    
742     /**
743     * Sets the handler for internal worker threads that terminate due
744     * to unrecoverable errors encountered while executing tasks.
745     * Unless set, the current default or ThreadGroup handler is used
746     * as handler.
747     *
748     * @param h the new handler
749 jsr166 1.4 * @return the old handler, or {@code null} if none
750 jsr166 1.1 * @throws SecurityException if a security manager exists and
751     * the caller is not permitted to modify threads
752     * because it does not hold {@link
753     * java.lang.RuntimePermission}{@code ("modifyThread")}
754     */
755     public Thread.UncaughtExceptionHandler
756     setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
757     checkPermission();
758     Thread.UncaughtExceptionHandler old = null;
759     final ReentrantLock lock = this.workerLock;
760     lock.lock();
761     try {
762     old = ueh;
763     ueh = h;
764     ForkJoinWorkerThread[] ws = workers;
765     if (ws != null) {
766     for (int i = 0; i < ws.length; ++i) {
767     ForkJoinWorkerThread w = ws[i];
768     if (w != null)
769     w.setUncaughtExceptionHandler(h);
770     }
771     }
772     } finally {
773     lock.unlock();
774     }
775     return old;
776     }
777    
778    
779     /**
780     * Sets the target parallelism level of this pool.
781     *
782     * @param parallelism the target parallelism
783     * @throws IllegalArgumentException if parallelism less than or
784     * equal to zero or greater than maximum size bounds
785     * @throws SecurityException if a security manager exists and
786     * the caller is not permitted to modify threads
787     * because it does not hold {@link
788     * java.lang.RuntimePermission}{@code ("modifyThread")}
789     */
790     public void setParallelism(int parallelism) {
791     checkPermission();
792     if (parallelism <= 0 || parallelism > maxPoolSize)
793     throw new IllegalArgumentException();
794     final ReentrantLock lock = this.workerLock;
795     lock.lock();
796     try {
797 jsr166 1.9 if (isProcessingTasks()) {
798 jsr166 1.1 int p = this.parallelism;
799     this.parallelism = parallelism;
800     if (parallelism > p)
801     createAndStartAddedWorkers();
802     else
803     trimSpares();
804     }
805     } finally {
806     lock.unlock();
807     }
808     signalIdleWorkers();
809     }
810    
811     /**
812 jsr166 1.9 * Returns the targeted parallelism level of this pool.
813 jsr166 1.1 *
814 jsr166 1.9 * @return the targeted parallelism level of this pool
815 jsr166 1.1 */
816     public int getParallelism() {
817     return parallelism;
818     }
819    
820     /**
821     * Returns the number of worker threads that have started but not
822     * yet terminated. This result returned by this method may differ
823 jsr166 1.4 * from {@link #getParallelism} when threads are created to
824 jsr166 1.1 * maintain parallelism when others are cooperatively blocked.
825     *
826     * @return the number of worker threads
827     */
828     public int getPoolSize() {
829     return totalCountOf(workerCounts);
830     }
831    
832     /**
833     * Returns the maximum number of threads allowed to exist in the
834 jsr166 1.10 * pool. Unless set using {@link #setMaximumPoolSize}, the
835 jsr166 1.9 * maximum is an implementation-defined value designed only to
836     * prevent runaway growth.
837 jsr166 1.1 *
838     * @return the maximum
839     */
840     public int getMaximumPoolSize() {
841     return maxPoolSize;
842     }
843    
844     /**
845     * Sets the maximum number of threads allowed to exist in the
846 jsr166 1.10 * pool. The given value should normally be greater than or equal
847     * to the {@link #getParallelism parallelism} level. Setting this
848     * value has no effect on current pool size. It controls
849     * construction of new threads.
850 jsr166 1.1 *
851 jsr166 1.8 * @throws IllegalArgumentException if negative or greater than
852 jsr166 1.1 * internal implementation limit
853     */
854     public void setMaximumPoolSize(int newMax) {
855     if (newMax < 0 || newMax > MAX_THREADS)
856     throw new IllegalArgumentException();
857     maxPoolSize = newMax;
858     }
859    
860    
861     /**
862 jsr166 1.4 * Returns {@code true} if this pool dynamically maintains its
863     * target parallelism level. If false, new threads are added only
864     * to avoid possible starvation. This setting is by default true.
865 jsr166 1.1 *
866 jsr166 1.4 * @return {@code true} if maintains parallelism
867 jsr166 1.1 */
868     public boolean getMaintainsParallelism() {
869     return maintainsParallelism;
870     }
871    
872     /**
873     * Sets whether this pool dynamically maintains its target
874     * parallelism level. If false, new threads are added only to
875     * avoid possible starvation.
876     *
877 jsr166 1.4 * @param enable {@code true} to maintain parallelism
878 jsr166 1.1 */
879     public void setMaintainsParallelism(boolean enable) {
880     maintainsParallelism = enable;
881     }
882    
883     /**
884     * Establishes local first-in-first-out scheduling mode for forked
885     * tasks that are never joined. This mode may be more appropriate
886     * than default locally stack-based mode in applications in which
887     * worker threads only process asynchronous tasks. This method is
888 jsr166 1.4 * designed to be invoked only when the pool is quiescent, and
889 jsr166 1.1 * typically only before any tasks are submitted. The effects of
890     * invocations at other times may be unpredictable.
891     *
892 jsr166 1.4 * @param async if {@code true}, use locally FIFO scheduling
893 jsr166 1.1 * @return the previous mode
894 jsr166 1.4 * @see #getAsyncMode
895 jsr166 1.1 */
896     public boolean setAsyncMode(boolean async) {
897     boolean oldMode = locallyFifo;
898     locallyFifo = async;
899     ForkJoinWorkerThread[] ws = workers;
900     if (ws != null) {
901     for (int i = 0; i < ws.length; ++i) {
902     ForkJoinWorkerThread t = ws[i];
903     if (t != null)
904     t.setAsyncMode(async);
905     }
906     }
907     return oldMode;
908     }
909    
910     /**
911 jsr166 1.4 * Returns {@code true} if this pool uses local first-in-first-out
912 jsr166 1.1 * scheduling mode for forked tasks that are never joined.
913     *
914 jsr166 1.4 * @return {@code true} if this pool uses async mode
915     * @see #setAsyncMode
916 jsr166 1.1 */
917     public boolean getAsyncMode() {
918     return locallyFifo;
919     }
920    
921     /**
922     * Returns an estimate of the number of worker threads that are
923     * not blocked waiting to join tasks or for other managed
924     * synchronization.
925     *
926     * @return the number of worker threads
927     */
928     public int getRunningThreadCount() {
929     return runningCountOf(workerCounts);
930     }
931    
932     /**
933     * Returns an estimate of the number of threads that are currently
934     * stealing or executing tasks. This method may overestimate the
935     * number of active threads.
936     *
937     * @return the number of active threads
938     */
939     public int getActiveThreadCount() {
940     return activeCountOf(runControl);
941     }
942    
943     /**
944     * Returns an estimate of the number of threads that are currently
945     * idle waiting for tasks. This method may underestimate the
946     * number of idle threads.
947     *
948     * @return the number of idle threads
949     */
950     final int getIdleThreadCount() {
951     int c = runningCountOf(workerCounts) - activeCountOf(runControl);
952     return (c <= 0) ? 0 : c;
953     }
954    
955     /**
956 jsr166 1.4 * Returns {@code true} if all worker threads are currently idle.
957     * An idle worker is one that cannot obtain a task to execute
958     * because none are available to steal from other threads, and
959     * there are no pending submissions to the pool. This method is
960     * conservative; it might not return {@code true} immediately upon
961     * idleness of all threads, but will eventually become true if
962     * threads remain inactive.
963 jsr166 1.1 *
964 jsr166 1.4 * @return {@code true} if all threads are currently idle
965 jsr166 1.1 */
966     public boolean isQuiescent() {
967     return activeCountOf(runControl) == 0;
968     }
969    
970     /**
971     * Returns an estimate of the total number of tasks stolen from
972     * one thread's work queue by another. The reported value
973     * underestimates the actual total number of steals when the pool
974     * is not quiescent. This value may be useful for monitoring and
975     * tuning fork/join programs: in general, steal counts should be
976     * high enough to keep threads busy, but low enough to avoid
977     * overhead and contention across threads.
978     *
979     * @return the number of steals
980     */
981     public long getStealCount() {
982     return stealCount.get();
983     }
984    
985     /**
986     * Accumulates steal count from a worker.
987     * Call only when worker known to be idle.
988     */
989     private void updateStealCount(ForkJoinWorkerThread w) {
990     int sc = w.getAndClearStealCount();
991     if (sc != 0)
992     stealCount.addAndGet(sc);
993     }
994    
995     /**
996     * Returns an estimate of the total number of tasks currently held
997     * in queues by worker threads (but not including tasks submitted
998     * to the pool that have not begun executing). This value is only
999     * an approximation, obtained by iterating across all threads in
1000     * the pool. This method may be useful for tuning task
1001     * granularities.
1002     *
1003     * @return the number of queued tasks
1004     */
1005     public long getQueuedTaskCount() {
1006     long count = 0;
1007     ForkJoinWorkerThread[] ws = workers;
1008     if (ws != null) {
1009     for (int i = 0; i < ws.length; ++i) {
1010     ForkJoinWorkerThread t = ws[i];
1011     if (t != null)
1012     count += t.getQueueSize();
1013     }
1014     }
1015     return count;
1016     }
1017    
1018     /**
1019 jsr166 1.8 * Returns an estimate of the number of tasks submitted to this
1020     * pool that have not yet begun executing. This method takes time
1021 jsr166 1.1 * proportional to the number of submissions.
1022     *
1023     * @return the number of queued submissions
1024     */
1025     public int getQueuedSubmissionCount() {
1026     return submissionQueue.size();
1027     }
1028    
1029     /**
1030 jsr166 1.4 * Returns {@code true} if there are any tasks submitted to this
1031     * pool that have not yet begun executing.
1032 jsr166 1.1 *
1033     * @return {@code true} if there are any queued submissions
1034     */
1035     public boolean hasQueuedSubmissions() {
1036     return !submissionQueue.isEmpty();
1037     }
1038    
1039     /**
1040     * Removes and returns the next unexecuted submission if one is
1041     * available. This method may be useful in extensions to this
1042     * class that re-assign work in systems with multiple pools.
1043     *
1044 jsr166 1.4 * @return the next submission, or {@code null} if none
1045 jsr166 1.1 */
1046     protected ForkJoinTask<?> pollSubmission() {
1047     return submissionQueue.poll();
1048     }
1049    
1050     /**
1051     * Removes all available unexecuted submitted and forked tasks
1052     * from scheduling queues and adds them to the given collection,
1053     * without altering their execution status. These may include
1054 jsr166 1.8 * artificially generated or wrapped tasks. This method is
1055     * designed to be invoked only when the pool is known to be
1056 jsr166 1.1 * quiescent. Invocations at other times may not remove all
1057     * tasks. A failure encountered while attempting to add elements
1058     * to collection {@code c} may result in elements being in
1059     * neither, either or both collections when the associated
1060     * exception is thrown. The behavior of this operation is
1061     * undefined if the specified collection is modified while the
1062     * operation is in progress.
1063     *
1064     * @param c the collection to transfer elements into
1065     * @return the number of elements transferred
1066     */
1067 jsr166 1.5 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1068 jsr166 1.1 int n = submissionQueue.drainTo(c);
1069     ForkJoinWorkerThread[] ws = workers;
1070     if (ws != null) {
1071     for (int i = 0; i < ws.length; ++i) {
1072     ForkJoinWorkerThread w = ws[i];
1073     if (w != null)
1074     n += w.drainTasksTo(c);
1075     }
1076     }
1077     return n;
1078     }
1079    
1080     /**
1081     * Returns a string identifying this pool, as well as its state,
1082     * including indications of run state, parallelism level, and
1083     * worker and task counts.
1084     *
1085     * @return a string identifying this pool, as well as its state
1086     */
1087     public String toString() {
1088     int ps = parallelism;
1089     int wc = workerCounts;
1090     int rc = runControl;
1091     long st = getStealCount();
1092     long qt = getQueuedTaskCount();
1093     long qs = getQueuedSubmissionCount();
1094     return super.toString() +
1095     "[" + runStateToString(runStateOf(rc)) +
1096     ", parallelism = " + ps +
1097     ", size = " + totalCountOf(wc) +
1098     ", active = " + activeCountOf(rc) +
1099     ", running = " + runningCountOf(wc) +
1100     ", steals = " + st +
1101     ", tasks = " + qt +
1102     ", submissions = " + qs +
1103     "]";
1104     }
1105    
1106     private static String runStateToString(int rs) {
1107     switch(rs) {
1108     case RUNNING: return "Running";
1109     case SHUTDOWN: return "Shutting down";
1110     case TERMINATING: return "Terminating";
1111     case TERMINATED: return "Terminated";
1112     default: throw new Error("Unknown run state");
1113     }
1114     }
1115    
1116     // lifecycle control
1117    
1118     /**
1119     * Initiates an orderly shutdown in which previously submitted
1120     * tasks are executed, but no new tasks will be accepted.
1121     * Invocation has no additional effect if already shut down.
1122     * Tasks that are in the process of being submitted concurrently
1123     * during the course of this method may or may not be rejected.
1124     *
1125     * @throws SecurityException if a security manager exists and
1126     * the caller is not permitted to modify threads
1127     * because it does not hold {@link
1128     * java.lang.RuntimePermission}{@code ("modifyThread")}
1129     */
1130     public void shutdown() {
1131     checkPermission();
1132     transitionRunStateTo(SHUTDOWN);
1133 jsr166 1.6 if (canTerminateOnShutdown(runControl)) {
1134     if (workers == null) { // shutting down before workers created
1135     final ReentrantLock lock = this.workerLock;
1136     lock.lock();
1137     try {
1138     if (workers == null) {
1139     terminate();
1140     transitionRunStateTo(TERMINATED);
1141     termination.signalAll();
1142     }
1143     } finally {
1144     lock.unlock();
1145     }
1146     }
1147 jsr166 1.1 terminateOnShutdown();
1148 jsr166 1.6 }
1149 jsr166 1.1 }
1150    
1151     /**
1152 jsr166 1.9 * Attempts to cancel and/or stop all tasks, and reject all
1153     * subsequently submitted tasks. Tasks that are in the process of
1154     * being submitted or executed concurrently during the course of
1155     * this method may or may not be rejected. This method cancels
1156     * both existing and unexecuted tasks, in order to permit
1157     * termination in the presence of task dependencies. So the method
1158     * always returns an empty list (unlike the case for some other
1159     * Executors).
1160 jsr166 1.1 *
1161     * @return an empty list
1162     * @throws SecurityException if a security manager exists and
1163     * the caller is not permitted to modify threads
1164     * because it does not hold {@link
1165     * java.lang.RuntimePermission}{@code ("modifyThread")}
1166     */
1167     public List<Runnable> shutdownNow() {
1168     checkPermission();
1169     terminate();
1170     return Collections.emptyList();
1171     }
1172    
1173     /**
1174     * Returns {@code true} if all tasks have completed following shut down.
1175     *
1176     * @return {@code true} if all tasks have completed following shut down
1177     */
1178     public boolean isTerminated() {
1179     return runStateOf(runControl) == TERMINATED;
1180     }
1181    
1182     /**
1183     * Returns {@code true} if the process of termination has
1184 jsr166 1.9 * commenced but not yet completed. This method may be useful for
1185     * debugging. A return of {@code true} reported a sufficient
1186     * period after shutdown may indicate that submitted tasks have
1187     * ignored or suppressed interruption, causing this executor not
1188     * to properly terminate.
1189 jsr166 1.1 *
1190 jsr166 1.9 * @return {@code true} if terminating but not yet terminated
1191 jsr166 1.1 */
1192     public boolean isTerminating() {
1193 jsr166 1.9 return runStateOf(runControl) == TERMINATING;
1194 jsr166 1.1 }
1195    
1196     /**
1197     * Returns {@code true} if this pool has been shut down.
1198     *
1199     * @return {@code true} if this pool has been shut down
1200     */
1201     public boolean isShutdown() {
1202     return runStateOf(runControl) >= SHUTDOWN;
1203     }
1204    
1205     /**
1206 jsr166 1.9 * Returns true if pool is not terminating or terminated.
1207     * Used internally to suppress execution when terminating.
1208     */
1209     final boolean isProcessingTasks() {
1210     return runStateOf(runControl) < TERMINATING;
1211     }
1212    
1213     /**
1214 jsr166 1.1 * Blocks until all tasks have completed execution after a shutdown
1215     * request, or the timeout occurs, or the current thread is
1216     * interrupted, whichever happens first.
1217     *
1218     * @param timeout the maximum time to wait
1219     * @param unit the time unit of the timeout argument
1220     * @return {@code true} if this executor terminated and
1221     * {@code false} if the timeout elapsed before termination
1222     * @throws InterruptedException if interrupted while waiting
1223     */
1224     public boolean awaitTermination(long timeout, TimeUnit unit)
1225     throws InterruptedException {
1226     long nanos = unit.toNanos(timeout);
1227     final ReentrantLock lock = this.workerLock;
1228     lock.lock();
1229     try {
1230     for (;;) {
1231     if (isTerminated())
1232     return true;
1233     if (nanos <= 0)
1234     return false;
1235     nanos = termination.awaitNanos(nanos);
1236     }
1237     } finally {
1238     lock.unlock();
1239     }
1240     }
1241    
1242     // Shutdown and termination support
1243    
1244     /**
1245     * Callback from terminating worker. Nulls out the corresponding
1246     * workers slot, and if terminating, tries to terminate; else
1247     * tries to shrink workers array.
1248     *
1249     * @param w the worker
1250     */
1251     final void workerTerminated(ForkJoinWorkerThread w) {
1252     updateStealCount(w);
1253     updateWorkerCount(-1);
1254     final ReentrantLock lock = this.workerLock;
1255     lock.lock();
1256     try {
1257     ForkJoinWorkerThread[] ws = workers;
1258     if (ws != null) {
1259     int idx = w.poolIndex;
1260     if (idx >= 0 && idx < ws.length && ws[idx] == w)
1261     ws[idx] = null;
1262     if (totalCountOf(workerCounts) == 0) {
1263     terminate(); // no-op if already terminating
1264     transitionRunStateTo(TERMINATED);
1265     termination.signalAll();
1266     }
1267 jsr166 1.9 else if (isProcessingTasks()) {
1268 jsr166 1.1 tryShrinkWorkerArray();
1269     tryResumeSpare(true); // allow replacement
1270     }
1271     }
1272     } finally {
1273     lock.unlock();
1274     }
1275     signalIdleWorkers();
1276     }
1277    
1278     /**
1279     * Initiates termination.
1280     */
1281     private void terminate() {
1282     if (transitionRunStateTo(TERMINATING)) {
1283     stopAllWorkers();
1284     resumeAllSpares();
1285     signalIdleWorkers();
1286     cancelQueuedSubmissions();
1287     cancelQueuedWorkerTasks();
1288     interruptUnterminatedWorkers();
1289     signalIdleWorkers(); // resignal after interrupt
1290     }
1291     }
1292    
1293     /**
1294     * Possibly terminates when on shutdown state.
1295     */
1296     private void terminateOnShutdown() {
1297     if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1298     terminate();
1299     }
1300    
1301     /**
1302     * Clears out and cancels submissions.
1303     */
1304     private void cancelQueuedSubmissions() {
1305     ForkJoinTask<?> task;
1306     while ((task = pollSubmission()) != null)
1307     task.cancel(false);
1308     }
1309    
1310     /**
1311     * Cleans out worker queues.
1312     */
1313     private void cancelQueuedWorkerTasks() {
1314     final ReentrantLock lock = this.workerLock;
1315     lock.lock();
1316     try {
1317     ForkJoinWorkerThread[] ws = workers;
1318     if (ws != null) {
1319     for (int i = 0; i < ws.length; ++i) {
1320     ForkJoinWorkerThread t = ws[i];
1321     if (t != null)
1322     t.cancelTasks();
1323     }
1324     }
1325     } finally {
1326     lock.unlock();
1327     }
1328     }
1329    
1330     /**
1331     * Sets each worker's status to terminating. Requires lock to avoid
1332     * conflicts with add/remove.
1333     */
1334     private void stopAllWorkers() {
1335     final ReentrantLock lock = this.workerLock;
1336     lock.lock();
1337     try {
1338     ForkJoinWorkerThread[] ws = workers;
1339     if (ws != null) {
1340     for (int i = 0; i < ws.length; ++i) {
1341     ForkJoinWorkerThread t = ws[i];
1342     if (t != null)
1343     t.shutdownNow();
1344     }
1345     }
1346     } finally {
1347     lock.unlock();
1348     }
1349     }
1350    
1351     /**
1352     * Interrupts all unterminated workers. This is not required for
1353     * sake of internal control, but may help unstick user code during
1354     * shutdown.
1355     */
1356     private void interruptUnterminatedWorkers() {
1357     final ReentrantLock lock = this.workerLock;
1358     lock.lock();
1359     try {
1360     ForkJoinWorkerThread[] ws = workers;
1361     if (ws != null) {
1362     for (int i = 0; i < ws.length; ++i) {
1363     ForkJoinWorkerThread t = ws[i];
1364     if (t != null && !t.isTerminated()) {
1365     try {
1366     t.interrupt();
1367     } catch (SecurityException ignore) {
1368     }
1369     }
1370     }
1371     }
1372     } finally {
1373     lock.unlock();
1374     }
1375     }
1376    
1377    
1378     /*
1379     * Nodes for event barrier to manage idle threads. Queue nodes
1380     * are basic Treiber stack nodes, also used for spare stack.
1381     *
1382     * The event barrier has an event count and a wait queue (actually
1383     * a Treiber stack). Workers are enabled to look for work when
1384     * the eventCount is incremented. If they fail to find work, they
1385     * may wait for next count. Upon release, threads help others wake
1386     * up.
1387     *
1388     * Synchronization events occur only in enough contexts to
1389     * maintain overall liveness:
1390     *
1391     * - Submission of a new task to the pool
1392     * - Resizes or other changes to the workers array
1393     * - pool termination
1394     * - A worker pushing a task on an empty queue
1395     *
1396     * The case of pushing a task occurs often enough, and is heavy
1397     * enough compared to simple stack pushes, to require special
1398     * handling: Method signalWork returns without advancing count if
1399     * the queue appears to be empty. This would ordinarily result in
1400     * races causing some queued waiters not to be woken up. To avoid
1401     * this, the first worker enqueued in method sync (see
1402     * syncIsReleasable) rescans for tasks after being enqueued, and
1403     * helps signal if any are found. This works well because the
1404     * worker has nothing better to do, and so might as well help
1405     * alleviate the overhead and contention on the threads actually
1406     * doing work. Also, since event counts increments on task
1407     * availability exist to maintain liveness (rather than to force
1408     * refreshes etc), it is OK for callers to exit early if
1409     * contending with another signaller.
1410     */
1411     static final class WaitQueueNode {
1412     WaitQueueNode next; // only written before enqueued
1413     volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1414     final long count; // unused for spare stack
1415    
1416     WaitQueueNode(long c, ForkJoinWorkerThread w) {
1417     count = c;
1418     thread = w;
1419     }
1420    
1421     /**
1422     * Wakes up waiter, returning false if known to already
1423     */
1424     boolean signal() {
1425     ForkJoinWorkerThread t = thread;
1426     if (t == null)
1427     return false;
1428     thread = null;
1429     LockSupport.unpark(t);
1430     return true;
1431     }
1432    
1433     /**
1434     * Awaits release on sync.
1435     */
1436     void awaitSyncRelease(ForkJoinPool p) {
1437     while (thread != null && !p.syncIsReleasable(this))
1438     LockSupport.park(this);
1439     }
1440    
1441     /**
1442     * Awaits resumption as spare.
1443     */
1444     void awaitSpareRelease() {
1445     while (thread != null) {
1446     if (!Thread.interrupted())
1447     LockSupport.park(this);
1448     }
1449     }
1450     }
1451    
1452     /**
1453     * Ensures that no thread is waiting for count to advance from the
1454     * current value of eventCount read on entry to this method, by
1455     * releasing waiting threads if necessary.
1456     *
1457     * @return the count
1458     */
1459     final long ensureSync() {
1460     long c = eventCount;
1461     WaitQueueNode q;
1462     while ((q = syncStack) != null && q.count < c) {
1463     if (casBarrierStack(q, null)) {
1464     do {
1465     q.signal();
1466     } while ((q = q.next) != null);
1467     break;
1468     }
1469     }
1470     return c;
1471     }
1472    
1473     /**
1474     * Increments event count and releases waiting threads.
1475     */
1476     private void signalIdleWorkers() {
1477     long c;
1478     do {} while (!casEventCount(c = eventCount, c+1));
1479     ensureSync();
1480     }
1481    
1482     /**
1483     * Signals threads waiting to poll a task. Because method sync
1484     * rechecks availability, it is OK to only proceed if queue
1485     * appears to be non-empty, and OK to skip under contention to
1486     * increment count (since some other thread succeeded).
1487     */
1488     final void signalWork() {
1489     long c;
1490     WaitQueueNode q;
1491     if (syncStack != null &&
1492     casEventCount(c = eventCount, c+1) &&
1493     (((q = syncStack) != null && q.count <= c) &&
1494     (!casBarrierStack(q, q.next) || !q.signal())))
1495     ensureSync();
1496     }
1497    
1498     /**
1499     * Waits until event count advances from last value held by
1500     * caller, or if excess threads, caller is resumed as spare, or
1501     * caller or pool is terminating. Updates caller's event on exit.
1502     *
1503     * @param w the calling worker thread
1504     */
1505     final void sync(ForkJoinWorkerThread w) {
1506     updateStealCount(w); // Transfer w's count while it is idle
1507    
1508 jsr166 1.9 while (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1509 jsr166 1.1 long prev = w.lastEventCount;
1510     WaitQueueNode node = null;
1511     WaitQueueNode h;
1512     while (eventCount == prev &&
1513     ((h = syncStack) == null || h.count == prev)) {
1514     if (node == null)
1515     node = new WaitQueueNode(prev, w);
1516     if (casBarrierStack(node.next = h, node)) {
1517     node.awaitSyncRelease(this);
1518     break;
1519     }
1520     }
1521     long ec = ensureSync();
1522     if (ec != prev) {
1523     w.lastEventCount = ec;
1524     break;
1525     }
1526     }
1527     }
1528    
1529     /**
1530 jsr166 1.4 * Returns {@code true} if worker waiting on sync can proceed:
1531 jsr166 1.1 * - on signal (thread == null)
1532     * - on event count advance (winning race to notify vs signaller)
1533     * - on interrupt
1534     * - if the first queued node, we find work available
1535     * If node was not signalled and event count not advanced on exit,
1536     * then we also help advance event count.
1537     *
1538 jsr166 1.4 * @return {@code true} if node can be released
1539 jsr166 1.1 */
1540     final boolean syncIsReleasable(WaitQueueNode node) {
1541     long prev = node.count;
1542     if (!Thread.interrupted() && node.thread != null &&
1543     (node.next != null ||
1544     !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1545     eventCount == prev)
1546     return false;
1547     if (node.thread != null) {
1548     node.thread = null;
1549     long ec = eventCount;
1550     if (prev <= ec) // help signal
1551     casEventCount(ec, ec+1);
1552     }
1553     return true;
1554     }
1555    
1556     /**
1557 jsr166 1.4 * Returns {@code true} if a new sync event occurred since last
1558     * call to sync or this method, if so, updating caller's count.
1559 jsr166 1.1 */
1560     final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1561     long lc = w.lastEventCount;
1562     long ec = ensureSync();
1563     if (ec == lc)
1564     return false;
1565     w.lastEventCount = ec;
1566     return true;
1567     }
1568    
1569     // Parallelism maintenance
1570    
1571     /**
1572     * Decrements running count; if too low, adds spare.
1573     *
1574     * Conceptually, all we need to do here is add or resume a
1575     * spare thread when one is about to block (and remove or
1576     * suspend it later when unblocked -- see suspendIfSpare).
1577     * However, implementing this idea requires coping with
1578     * several problems: we have imperfect information about the
1579     * states of threads. Some count updates can and usually do
1580     * lag run state changes, despite arrangements to keep them
1581     * accurate (for example, when possible, updating counts
1582     * before signalling or resuming), especially when running on
1583     * dynamic JVMs that don't optimize the infrequent paths that
1584     * update counts. Generating too many threads can make these
1585     * problems become worse, because excess threads are more
1586     * likely to be context-switched with others, slowing them all
1587     * down, especially if there is no work available, so all are
1588     * busy scanning or idling. Also, excess spare threads can
1589     * only be suspended or removed when they are idle, not
1590     * immediately when they aren't needed. So adding threads will
1591     * raise parallelism level for longer than necessary. Also,
1592     * FJ applications often encounter highly transient peaks when
1593     * many threads are blocked joining, but for less time than it
1594     * takes to create or resume spares.
1595     *
1596     * @param joinMe if non-null, return early if done
1597     * @param maintainParallelism if true, try to stay within
1598     * target counts, else create only to avoid starvation
1599     * @return true if joinMe known to be done
1600     */
1601     final boolean preJoin(ForkJoinTask<?> joinMe,
1602     boolean maintainParallelism) {
1603     maintainParallelism &= maintainsParallelism; // overrride
1604     boolean dec = false; // true when running count decremented
1605     while (spareStack == null || !tryResumeSpare(dec)) {
1606     int counts = workerCounts;
1607     if (dec || (dec = casWorkerCounts(counts, --counts))) {
1608     if (!needSpare(counts, maintainParallelism))
1609     break;
1610     if (joinMe.status < 0)
1611     return true;
1612     if (tryAddSpare(counts))
1613     break;
1614     }
1615     }
1616     return false;
1617     }
1618    
1619     /**
1620     * Same idea as preJoin
1621     */
1622     final boolean preBlock(ManagedBlocker blocker,
1623     boolean maintainParallelism) {
1624     maintainParallelism &= maintainsParallelism;
1625     boolean dec = false;
1626     while (spareStack == null || !tryResumeSpare(dec)) {
1627     int counts = workerCounts;
1628     if (dec || (dec = casWorkerCounts(counts, --counts))) {
1629     if (!needSpare(counts, maintainParallelism))
1630     break;
1631     if (blocker.isReleasable())
1632     return true;
1633     if (tryAddSpare(counts))
1634     break;
1635     }
1636     }
1637     return false;
1638     }
1639    
1640     /**
1641 jsr166 1.4 * Returns {@code true} if a spare thread appears to be needed.
1642     * If maintaining parallelism, returns true when the deficit in
1643 jsr166 1.1 * running threads is more than the surplus of total threads, and
1644     * there is apparently some work to do. This self-limiting rule
1645     * means that the more threads that have already been added, the
1646     * less parallelism we will tolerate before adding another.
1647     *
1648     * @param counts current worker counts
1649     * @param maintainParallelism try to maintain parallelism
1650     */
1651     private boolean needSpare(int counts, boolean maintainParallelism) {
1652     int ps = parallelism;
1653     int rc = runningCountOf(counts);
1654     int tc = totalCountOf(counts);
1655     int runningDeficit = ps - rc;
1656     int totalSurplus = tc - ps;
1657     return (tc < maxPoolSize &&
1658     (rc == 0 || totalSurplus < 0 ||
1659     (maintainParallelism &&
1660     runningDeficit > totalSurplus &&
1661     ForkJoinWorkerThread.hasQueuedTasks(workers))));
1662     }
1663    
1664     /**
1665     * Adds a spare worker if lock available and no more than the
1666     * expected numbers of threads exist.
1667     *
1668     * @return true if successful
1669     */
1670     private boolean tryAddSpare(int expectedCounts) {
1671     final ReentrantLock lock = this.workerLock;
1672     int expectedRunning = runningCountOf(expectedCounts);
1673     int expectedTotal = totalCountOf(expectedCounts);
1674     boolean success = false;
1675     boolean locked = false;
1676     // confirm counts while locking; CAS after obtaining lock
1677     try {
1678     for (;;) {
1679     int s = workerCounts;
1680     int tc = totalCountOf(s);
1681     int rc = runningCountOf(s);
1682     if (rc > expectedRunning || tc > expectedTotal)
1683     break;
1684     if (!locked && !(locked = lock.tryLock()))
1685     break;
1686     if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1687     createAndStartSpare(tc);
1688     success = true;
1689     break;
1690     }
1691     }
1692     } finally {
1693     if (locked)
1694     lock.unlock();
1695     }
1696     return success;
1697     }
1698    
1699     /**
1700     * Adds the kth spare worker. On entry, pool counts are already
1701     * adjusted to reflect addition.
1702     */
1703     private void createAndStartSpare(int k) {
1704     ForkJoinWorkerThread w = null;
1705     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1706     int len = ws.length;
1707     // Probably, we can place at slot k. If not, find empty slot
1708     if (k < len && ws[k] != null) {
1709     for (k = 0; k < len && ws[k] != null; ++k)
1710     ;
1711     }
1712 jsr166 1.9 if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1713 jsr166 1.1 ws[k] = w;
1714     w.start();
1715     }
1716     else
1717     updateWorkerCount(-1); // adjust on failure
1718     signalIdleWorkers();
1719     }
1720    
1721     /**
1722     * Suspends calling thread w if there are excess threads. Called
1723     * only from sync. Spares are enqueued in a Treiber stack using
1724     * the same WaitQueueNodes as barriers. They are resumed mainly
1725     * in preJoin, but are also woken on pool events that require all
1726     * threads to check run state.
1727     *
1728     * @param w the caller
1729     */
1730     private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1731     WaitQueueNode node = null;
1732     int s;
1733     while (parallelism < runningCountOf(s = workerCounts)) {
1734     if (node == null)
1735     node = new WaitQueueNode(0, w);
1736     if (casWorkerCounts(s, s-1)) { // representation-dependent
1737     // push onto stack
1738     do {} while (!casSpareStack(node.next = spareStack, node));
1739     // block until released by resumeSpare
1740     node.awaitSpareRelease();
1741     return true;
1742     }
1743     }
1744     return false;
1745     }
1746    
1747     /**
1748     * Tries to pop and resume a spare thread.
1749     *
1750     * @param updateCount if true, increment running count on success
1751     * @return true if successful
1752     */
1753     private boolean tryResumeSpare(boolean updateCount) {
1754     WaitQueueNode q;
1755     while ((q = spareStack) != null) {
1756     if (casSpareStack(q, q.next)) {
1757     if (updateCount)
1758     updateRunningCount(1);
1759     q.signal();
1760     return true;
1761     }
1762     }
1763     return false;
1764     }
1765    
1766     /**
1767     * Pops and resumes all spare threads. Same idea as ensureSync.
1768     *
1769     * @return true if any spares released
1770     */
1771     private boolean resumeAllSpares() {
1772     WaitQueueNode q;
1773     while ( (q = spareStack) != null) {
1774     if (casSpareStack(q, null)) {
1775     do {
1776     updateRunningCount(1);
1777     q.signal();
1778     } while ((q = q.next) != null);
1779     return true;
1780     }
1781     }
1782     return false;
1783     }
1784    
1785     /**
1786     * Pops and shuts down excessive spare threads. Call only while
1787     * holding lock. This is not guaranteed to eliminate all excess
1788     * threads, only those suspended as spares, which are the ones
1789     * unlikely to be needed in the future.
1790     */
1791     private void trimSpares() {
1792     int surplus = totalCountOf(workerCounts) - parallelism;
1793     WaitQueueNode q;
1794     while (surplus > 0 && (q = spareStack) != null) {
1795     if (casSpareStack(q, null)) {
1796     do {
1797     updateRunningCount(1);
1798     ForkJoinWorkerThread w = q.thread;
1799     if (w != null && surplus > 0 &&
1800     runningCountOf(workerCounts) > 0 && w.shutdown())
1801     --surplus;
1802     q.signal();
1803     } while ((q = q.next) != null);
1804     }
1805     }
1806     }
1807    
1808     /**
1809     * Interface for extending managed parallelism for tasks running
1810 jsr166 1.8 * in {@link ForkJoinPool}s.
1811     *
1812     * <p>A {@code ManagedBlocker} provides two methods.
1813 jsr166 1.4 * Method {@code isReleasable} must return {@code true} if
1814     * blocking is not necessary. Method {@code block} blocks the
1815     * current thread if necessary (perhaps internally invoking
1816 jsr166 1.8 * {@code isReleasable} before actually blocking).
1817 jsr166 1.1 *
1818     * <p>For example, here is a ManagedBlocker based on a
1819     * ReentrantLock:
1820     * <pre> {@code
1821     * class ManagedLocker implements ManagedBlocker {
1822     * final ReentrantLock lock;
1823     * boolean hasLock = false;
1824     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1825     * public boolean block() {
1826     * if (!hasLock)
1827     * lock.lock();
1828     * return true;
1829     * }
1830     * public boolean isReleasable() {
1831     * return hasLock || (hasLock = lock.tryLock());
1832     * }
1833     * }}</pre>
1834     */
1835     public static interface ManagedBlocker {
1836     /**
1837     * Possibly blocks the current thread, for example waiting for
1838     * a lock or condition.
1839     *
1840 jsr166 1.4 * @return {@code true} if no additional blocking is necessary
1841     * (i.e., if isReleasable would return true)
1842 jsr166 1.1 * @throws InterruptedException if interrupted while waiting
1843     * (the method is not required to do so, but is allowed to)
1844     */
1845     boolean block() throws InterruptedException;
1846    
1847     /**
1848 jsr166 1.4 * Returns {@code true} if blocking is unnecessary.
1849 jsr166 1.1 */
1850     boolean isReleasable();
1851     }
1852    
1853     /**
1854     * Blocks in accord with the given blocker. If the current thread
1855 jsr166 1.8 * is a {@link ForkJoinWorkerThread}, this method possibly
1856     * arranges for a spare thread to be activated if necessary to
1857     * ensure parallelism while the current thread is blocked.
1858     *
1859     * <p>If {@code maintainParallelism} is {@code true} and the pool
1860     * supports it ({@link #getMaintainsParallelism}), this method
1861     * attempts to maintain the pool's nominal parallelism. Otherwise
1862     * it activates a thread only if necessary to avoid complete
1863     * starvation. This option may be preferable when blockages use
1864     * timeouts, or are almost always brief.
1865 jsr166 1.1 *
1866 jsr166 1.8 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1867     * behaviorally equivalent to
1868 jsr166 1.1 * <pre> {@code
1869     * while (!blocker.isReleasable())
1870     * if (blocker.block())
1871     * return;
1872     * }</pre>
1873 jsr166 1.8 *
1874     * If the caller is a {@code ForkJoinTask}, then the pool may
1875     * first be expanded to ensure parallelism, and later adjusted.
1876 jsr166 1.1 *
1877     * @param blocker the blocker
1878 jsr166 1.4 * @param maintainParallelism if {@code true} and supported by
1879     * this pool, attempt to maintain the pool's nominal parallelism;
1880     * otherwise activate a thread only if necessary to avoid
1881     * complete starvation.
1882 jsr166 1.1 * @throws InterruptedException if blocker.block did so
1883     */
1884     public static void managedBlock(ManagedBlocker blocker,
1885     boolean maintainParallelism)
1886     throws InterruptedException {
1887     Thread t = Thread.currentThread();
1888     ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1889     ((ForkJoinWorkerThread) t).pool : null);
1890     if (!blocker.isReleasable()) {
1891     try {
1892     if (pool == null ||
1893     !pool.preBlock(blocker, maintainParallelism))
1894     awaitBlocker(blocker);
1895     } finally {
1896     if (pool != null)
1897     pool.updateRunningCount(1);
1898     }
1899     }
1900     }
1901    
1902     private static void awaitBlocker(ManagedBlocker blocker)
1903     throws InterruptedException {
1904     do {} while (!blocker.isReleasable() && !blocker.block());
1905     }
1906    
1907 jsr166 1.7 // AbstractExecutorService overrides. These rely on undocumented
1908     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1909     // implement RunnableFuture.
1910 jsr166 1.1
1911     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1912 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1913 jsr166 1.1 }
1914    
1915     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1916 jsr166 1.7 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1917 jsr166 1.1 }
1918    
1919     // Unsafe mechanics
1920    
1921     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1922 jsr166 1.2 private static final long eventCountOffset =
1923 jsr166 1.3 objectFieldOffset("eventCount", ForkJoinPool.class);
1924 jsr166 1.2 private static final long workerCountsOffset =
1925 jsr166 1.3 objectFieldOffset("workerCounts", ForkJoinPool.class);
1926 jsr166 1.2 private static final long runControlOffset =
1927 jsr166 1.3 objectFieldOffset("runControl", ForkJoinPool.class);
1928 jsr166 1.2 private static final long syncStackOffset =
1929 jsr166 1.3 objectFieldOffset("syncStack",ForkJoinPool.class);
1930 jsr166 1.2 private static final long spareStackOffset =
1931 jsr166 1.3 objectFieldOffset("spareStack", ForkJoinPool.class);
1932 jsr166 1.1
1933     private boolean casEventCount(long cmp, long val) {
1934     return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1935     }
1936     private boolean casWorkerCounts(int cmp, int val) {
1937     return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1938     }
1939     private boolean casRunControl(int cmp, int val) {
1940     return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1941     }
1942     private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1943     return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1944     }
1945     private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1946     return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1947     }
1948 jsr166 1.3
1949     private static long objectFieldOffset(String field, Class<?> klazz) {
1950     try {
1951     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1952     } catch (NoSuchFieldException e) {
1953     // Convert Exception to corresponding Error
1954     NoSuchFieldError error = new NoSuchFieldError(field);
1955     error.initCause(e);
1956     throw error;
1957     }
1958     }
1959 jsr166 1.1 }