ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
Revision: 1.33
Committed: Fri Jul 31 16:27:08 2009 UTC (14 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.32: +10 -61 lines
Log Message:
Refactor Adapted tasks into ForkJoinTask; mesh peek/pollNextLocalTask specs and code

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8 jsr166 1.22
9 dl 1.1 import java.util.concurrent.*;
10 jsr166 1.22
11     import java.util.ArrayList;
12     import java.util.Arrays;
13     import java.util.Collection;
14     import java.util.Collections;
15     import java.util.List;
16     import java.util.concurrent.locks.Condition;
17     import java.util.concurrent.locks.LockSupport;
18     import java.util.concurrent.locks.ReentrantLock;
19     import java.util.concurrent.atomic.AtomicInteger;
20     import java.util.concurrent.atomic.AtomicLong;
21 dl 1.1
22     /**
23 jsr166 1.29 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24     * A ForkJoinPool provides the entry point for submissions from
25 dl 1.2 * non-ForkJoinTasks, as well as management and monitoring operations.
26     * Normally a single ForkJoinPool is used for a large number of
27     * submitted tasks. Otherwise, use would not usually outweigh the
28     * construction and bookkeeping overhead of creating a large set of
29     * threads.
30 dl 1.1 *
31 dl 1.2 * <p>ForkJoinPools differ from other kinds of Executors mainly in
32     * that they provide <em>work-stealing</em>: all threads in the pool
33 dl 1.1 * attempt to find and execute subtasks created by other active tasks
34     * (eventually blocking if none exist). This makes them efficient when
35 dl 1.2 * most tasks spawn other subtasks (as do most ForkJoinTasks), as well
36     * as the mixed execution of some plain Runnable- or Callable- based
37 jsr166 1.29 * activities along with ForkJoinTasks. When setting {@linkplain
38     * #setAsyncMode async mode}, a ForkJoinPool may also be appropriate
39     * for use with fine-grained tasks that are never joined. Otherwise,
40     * other ExecutorService implementations are typically more
41     * appropriate choices.
42 dl 1.1 *
43     * <p>A ForkJoinPool may be constructed with a given parallelism level
44     * (target pool size), which it attempts to maintain by dynamically
45 dl 1.2 * adding, suspending, or resuming threads, even if some tasks are
46     * waiting to join others. However, no such adjustments are performed
47     * in the face of blocked IO or other unmanaged synchronization. The
48 jsr166 1.29 * nested {@link ManagedBlocker} interface enables extension of
49 dl 1.2 * the kinds of synchronization accommodated. The target parallelism
50 jsr166 1.29 * level may also be changed dynamically ({@link #setParallelism})
51 dl 1.6 * and thread construction can be limited using methods
52 jsr166 1.29 * {@link #setMaximumPoolSize} and/or
53     * {@link #setMaintainsParallelism}.
54 dl 1.1 *
55     * <p>In addition to execution and lifecycle control methods, this
56     * class provides status check methods (for example
57 jsr166 1.29 * {@link #getStealCount}) that are intended to aid in developing,
58 dl 1.1 * tuning, and monitoring fork/join applications. Also, method
59 jsr166 1.29 * {@link #toString} returns indications of pool state in a
60 dl 1.2 * convenient form for informal monitoring.
61 dl 1.1 *
62     * <p><b>Implementation notes</b>: This implementation restricts the
63 dl 1.2 * maximum number of running threads to 32767. Attempts to create
64     * pools with greater than the maximum result in
65     * IllegalArgumentExceptions.
66 jsr166 1.16 *
67     * @since 1.7
68     * @author Doug Lea
69 dl 1.1 */
70 dl 1.2 public class ForkJoinPool extends AbstractExecutorService {
71 dl 1.1
72     /*
73     * See the extended comments interspersed below for design,
74     * rationale, and walkthroughs.
75     */
76    
77     /** Mask for packing and unpacking shorts */
78     private static final int shortMask = 0xffff;
79    
80     /** Max pool size -- must be a power of two minus 1 */
81     private static final int MAX_THREADS = 0x7FFF;
82    
83     /**
84     * Factory for creating new ForkJoinWorkerThreads. A
85     * ForkJoinWorkerThreadFactory must be defined and used for
86     * ForkJoinWorkerThread subclasses that extend base functionality
87     * or initialize threads with different contexts.
88     */
89     public static interface ForkJoinWorkerThreadFactory {
90     /**
91     * Returns a new worker thread operating in the given pool.
92     *
93     * @param pool the pool this thread works in
94 jsr166 1.16 * @throws NullPointerException if pool is null
95 dl 1.1 */
96     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
97     }
98    
99     /**
100 jsr166 1.17 * Default ForkJoinWorkerThreadFactory implementation; creates a
101 dl 1.1 * new ForkJoinWorkerThread.
102     */
103 dl 1.2 static class DefaultForkJoinWorkerThreadFactory
104 dl 1.1 implements ForkJoinWorkerThreadFactory {
105     public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
106     try {
107     return new ForkJoinWorkerThread(pool);
108     } catch (OutOfMemoryError oom) {
109     return null;
110     }
111     }
112     }
113    
114     /**
115 dl 1.2 * Creates a new ForkJoinWorkerThread. This factory is used unless
116     * overridden in ForkJoinPool constructors.
117 dl 1.1 */
118 dl 1.2 public static final ForkJoinWorkerThreadFactory
119 dl 1.1 defaultForkJoinWorkerThreadFactory =
120     new DefaultForkJoinWorkerThreadFactory();
121    
122     /**
123     * Permission required for callers of methods that may start or
124     * kill threads.
125     */
126     private static final RuntimePermission modifyThreadPermission =
127     new RuntimePermission("modifyThread");
128    
129     /**
130     * If there is a security manager, makes sure caller has
131     * permission to modify threads.
132     */
133     private static void checkPermission() {
134     SecurityManager security = System.getSecurityManager();
135     if (security != null)
136     security.checkPermission(modifyThreadPermission);
137     }
138    
139     /**
140     * Generator for assigning sequence numbers as pool names.
141     */
142     private static final AtomicInteger poolNumberGenerator =
143     new AtomicInteger();
144    
145     /**
146 dl 1.6 * Array holding all worker threads in the pool. Initialized upon
147     * first use. Array size must be a power of two. Updates and
148     * replacements are protected by workerLock, but it is always kept
149     * in a consistent enough state to be randomly accessed without
150     * locking by workers performing work-stealing.
151 dl 1.1 */
152     volatile ForkJoinWorkerThread[] workers;
153    
154     /**
155     * Lock protecting access to workers.
156     */
157     private final ReentrantLock workerLock;
158    
159     /**
160     * Condition for awaitTermination.
161     */
162     private final Condition termination;
163    
164     /**
165     * The uncaught exception handler used when any worker
166 jsr166 1.16 * abruptly terminates
167 dl 1.1 */
168     private Thread.UncaughtExceptionHandler ueh;
169    
170     /**
171     * Creation factory for worker threads.
172     */
173     private final ForkJoinWorkerThreadFactory factory;
174    
175     /**
176     * Head of stack of threads that were created to maintain
177     * parallelism when other threads blocked, but have since
178     * suspended when the parallelism level rose.
179     */
180     private volatile WaitQueueNode spareStack;
181    
182     /**
183     * Sum of per-thread steal counts, updated only when threads are
184     * idle or terminating.
185     */
186     private final AtomicLong stealCount;
187    
188     /**
189     * Queue for external submissions.
190     */
191     private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
192    
193     /**
194 jsr166 1.17 * Head of Treiber stack for barrier sync. See below for explanation.
195 dl 1.1 */
196 dl 1.4 private volatile WaitQueueNode syncStack;
197 dl 1.1
198     /**
199     * The count for event barrier
200     */
201     private volatile long eventCount;
202    
203     /**
204     * Pool number, just for assigning useful names to worker threads
205     */
206     private final int poolNumber;
207    
208     /**
209     * The maximum allowed pool size
210     */
211     private volatile int maxPoolSize;
212    
213     /**
214     * The desired parallelism level, updated only under workerLock.
215     */
216     private volatile int parallelism;
217    
218     /**
219 dl 1.6 * True if use local fifo, not default lifo, for local polling
220     */
221     private volatile boolean locallyFifo;
222    
223     /**
224 dl 1.1 * Holds number of total (i.e., created and not yet terminated)
225     * and running (i.e., not blocked on joins or other managed sync)
226     * threads, packed into one int to ensure consistent snapshot when
227     * making decisions about creating and suspending spare
228     * threads. Updated only by CAS. Note: CASes in
229 dl 1.14 * updateRunningCount and preJoin assume that running active count
230 jsr166 1.16 * is in low word, so need to be modified if this changes.
231 dl 1.1 */
232     private volatile int workerCounts;
233    
234     private static int totalCountOf(int s) { return s >>> 16; }
235     private static int runningCountOf(int s) { return s & shortMask; }
236     private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
237    
238     /**
239 jsr166 1.16 * Adds delta (which may be negative) to running count. This must
240 dl 1.1 * be called before (with negative arg) and after (with positive)
241 jsr166 1.16 * any managed synchronization (i.e., mainly, joins).
242 jsr166 1.17 *
243 dl 1.1 * @param delta the number to add
244     */
245     final void updateRunningCount(int delta) {
246     int s;
247 jsr166 1.17 do {} while (!casWorkerCounts(s = workerCounts, s + delta));
248 dl 1.1 }
249    
250     /**
251 jsr166 1.16 * Adds delta (which may be negative) to both total and running
252 dl 1.1 * count. This must be called upon creation and termination of
253     * worker threads.
254 jsr166 1.17 *
255 dl 1.1 * @param delta the number to add
256     */
257     private void updateWorkerCount(int delta) {
258     int d = delta + (delta << 16); // add to both lo and hi parts
259     int s;
260 jsr166 1.17 do {} while (!casWorkerCounts(s = workerCounts, s + d));
261 dl 1.1 }
262    
263     /**
264     * Lifecycle control. High word contains runState, low word
265     * contains the number of workers that are (probably) executing
266     * tasks. This value is atomically incremented before a worker
267     * gets a task to run, and decremented when worker has no tasks
268     * and cannot find any. These two fields are bundled together to
269     * support correct termination triggering. Note: activeCount
270     * CAS'es cheat by assuming active count is in low word, so need
271     * to be modified if this changes
272     */
273     private volatile int runControl;
274    
275     // RunState values. Order among values matters
276     private static final int RUNNING = 0;
277     private static final int SHUTDOWN = 1;
278     private static final int TERMINATING = 2;
279     private static final int TERMINATED = 3;
280    
281     private static int runStateOf(int c) { return c >>> 16; }
282     private static int activeCountOf(int c) { return c & shortMask; }
283     private static int runControlFor(int r, int a) { return (r << 16) + a; }
284    
285     /**
286 jsr166 1.17 * Tries incrementing active count; fails on contention.
287     * Called by workers before/during executing tasks.
288     *
289 jsr166 1.16 * @return true on success
290 dl 1.1 */
291 dl 1.4 final boolean tryIncrementActiveCount() {
292     int c = runControl;
293     return casRunControl(c, c+1);
294 dl 1.1 }
295    
296     /**
297 jsr166 1.16 * Tries decrementing active count; fails on contention.
298     * Possibly triggers termination on success.
299 dl 1.1 * Called by workers when they can't find tasks.
300 jsr166 1.17 *
301 dl 1.4 * @return true on success
302 dl 1.1 */
303 dl 1.4 final boolean tryDecrementActiveCount() {
304     int c = runControl;
305     int nextc = c - 1;
306     if (!casRunControl(c, nextc))
307     return false;
308 dl 1.1 if (canTerminateOnShutdown(nextc))
309     terminateOnShutdown();
310 dl 1.4 return true;
311 dl 1.1 }
312    
313     /**
314 jsr166 1.28 * Returns {@code true} if argument represents zero active count
315     * and nonzero runstate, which is the triggering condition for
316 dl 1.1 * terminating on shutdown.
317     */
318     private static boolean canTerminateOnShutdown(int c) {
319 jsr166 1.17 // i.e. least bit is nonzero runState bit
320     return ((c & -c) >>> 16) != 0;
321 dl 1.1 }
322    
323     /**
324     * Transition run state to at least the given state. Return true
325     * if not already at least given state.
326     */
327     private boolean transitionRunStateTo(int state) {
328     for (;;) {
329     int c = runControl;
330     if (runStateOf(c) >= state)
331     return false;
332     if (casRunControl(c, runControlFor(state, activeCountOf(c))))
333     return true;
334     }
335     }
336    
337     /**
338     * Controls whether to add spares to maintain parallelism
339     */
340     private volatile boolean maintainsParallelism;
341    
342     // Constructors
343    
344     /**
345     * Creates a ForkJoinPool with a pool size equal to the number of
346 jsr166 1.17 * processors available on the system, using the default
347     * ForkJoinWorkerThreadFactory.
348     *
349 dl 1.1 * @throws SecurityException if a security manager exists and
350     * the caller is not permitted to modify threads
351     * because it does not hold {@link
352 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
353 dl 1.1 */
354     public ForkJoinPool() {
355     this(Runtime.getRuntime().availableProcessors(),
356     defaultForkJoinWorkerThreadFactory);
357     }
358    
359     /**
360 jsr166 1.16 * Creates a ForkJoinPool with the indicated parallelism level
361 jsr166 1.17 * threads and using the default ForkJoinWorkerThreadFactory.
362     *
363 dl 1.1 * @param parallelism the number of worker threads
364     * @throws IllegalArgumentException if parallelism less than or
365     * equal to zero
366     * @throws SecurityException if a security manager exists and
367     * the caller is not permitted to modify threads
368     * because it does not hold {@link
369 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
370 dl 1.1 */
371     public ForkJoinPool(int parallelism) {
372     this(parallelism, defaultForkJoinWorkerThreadFactory);
373     }
374    
375     /**
376 dl 1.2 * Creates a ForkJoinPool with parallelism equal to the number of
377 dl 1.1 * processors available on the system and using the given
378 jsr166 1.17 * ForkJoinWorkerThreadFactory.
379     *
380 dl 1.1 * @param factory the factory for creating new threads
381     * @throws NullPointerException if factory is null
382     * @throws SecurityException if a security manager exists and
383     * the caller is not permitted to modify threads
384     * because it does not hold {@link
385 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
386 dl 1.1 */
387     public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
388     this(Runtime.getRuntime().availableProcessors(), factory);
389     }
390    
391     /**
392 dl 1.2 * Creates a ForkJoinPool with the given parallelism and factory.
393 dl 1.1 *
394     * @param parallelism the targeted number of worker threads
395     * @param factory the factory for creating new threads
396     * @throws IllegalArgumentException if parallelism less than or
397 jsr166 1.16 * equal to zero, or greater than implementation limit
398 dl 1.1 * @throws NullPointerException if factory is null
399     * @throws SecurityException if a security manager exists and
400     * the caller is not permitted to modify threads
401     * because it does not hold {@link
402 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
403 dl 1.1 */
404     public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
405     if (parallelism <= 0 || parallelism > MAX_THREADS)
406     throw new IllegalArgumentException();
407     if (factory == null)
408     throw new NullPointerException();
409     checkPermission();
410     this.factory = factory;
411     this.parallelism = parallelism;
412     this.maxPoolSize = MAX_THREADS;
413     this.maintainsParallelism = true;
414     this.poolNumber = poolNumberGenerator.incrementAndGet();
415     this.workerLock = new ReentrantLock();
416     this.termination = workerLock.newCondition();
417     this.stealCount = new AtomicLong();
418     this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
419 dl 1.6 // worker array and workers are lazily constructed
420 dl 1.1 }
421    
422     /**
423 jsr166 1.17 * Creates a new worker thread using factory.
424     *
425 dl 1.1 * @param index the index to assign worker
426     * @return new worker, or null of factory failed
427     */
428     private ForkJoinWorkerThread createWorker(int index) {
429     Thread.UncaughtExceptionHandler h = ueh;
430     ForkJoinWorkerThread w = factory.newThread(this);
431     if (w != null) {
432     w.poolIndex = index;
433     w.setDaemon(true);
434 dl 1.6 w.setAsyncMode(locallyFifo);
435 dl 1.1 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
436     if (h != null)
437     w.setUncaughtExceptionHandler(h);
438     }
439     return w;
440     }
441    
442     /**
443 jsr166 1.16 * Returns a good size for worker array given pool size.
444 dl 1.1 * Currently requires size to be a power of two.
445     */
446 jsr166 1.17 private static int arraySizeFor(int poolSize) {
447     return (poolSize <= 1) ? 1 :
448     (1 << (32 - Integer.numberOfLeadingZeros(poolSize-1)));
449 dl 1.1 }
450    
451     /**
452 jsr166 1.16 * Creates or resizes array if necessary to hold newLength.
453     * Call only under exclusion.
454     *
455 dl 1.1 * @return the array
456     */
457     private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
458     ForkJoinWorkerThread[] ws = workers;
459     if (ws == null)
460     return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
461     else if (newLength > ws.length)
462     return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
463     else
464     return ws;
465     }
466    
467     /**
468 jsr166 1.17 * Tries to shrink workers into smaller array after one or more terminate.
469 dl 1.1 */
470     private void tryShrinkWorkerArray() {
471     ForkJoinWorkerThread[] ws = workers;
472 dl 1.6 if (ws != null) {
473     int len = ws.length;
474     int last = len - 1;
475     while (last >= 0 && ws[last] == null)
476     --last;
477     int newLength = arraySizeFor(last+1);
478     if (newLength < len)
479     workers = Arrays.copyOf(ws, newLength);
480     }
481 dl 1.1 }
482    
483     /**
484 jsr166 1.17 * Initializes workers if necessary.
485 dl 1.1 */
486 dl 1.6 final void ensureWorkerInitialization() {
487     ForkJoinWorkerThread[] ws = workers;
488     if (ws == null) {
489     final ReentrantLock lock = this.workerLock;
490     lock.lock();
491     try {
492     ws = workers;
493     if (ws == null) {
494     int ps = parallelism;
495     ws = ensureWorkerArrayCapacity(ps);
496     for (int i = 0; i < ps; ++i) {
497     ForkJoinWorkerThread w = createWorker(i);
498     if (w != null) {
499     ws[i] = w;
500     w.start();
501     updateWorkerCount(1);
502     }
503     }
504 dl 1.1 }
505 dl 1.6 } finally {
506     lock.unlock();
507 dl 1.1 }
508     }
509     }
510    
511     /**
512     * Worker creation and startup for threads added via setParallelism.
513     */
514     private void createAndStartAddedWorkers() {
515     resumeAllSpares(); // Allow spares to convert to nonspare
516     int ps = parallelism;
517     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
518     int len = ws.length;
519     // Sweep through slots, to keep lowest indices most populated
520     int k = 0;
521     while (k < len) {
522     if (ws[k] != null) {
523     ++k;
524     continue;
525     }
526     int s = workerCounts;
527     int tc = totalCountOf(s);
528     int rc = runningCountOf(s);
529     if (rc >= ps || tc >= ps)
530     break;
531     if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
532     ForkJoinWorkerThread w = createWorker(k);
533     if (w != null) {
534     ws[k++] = w;
535     w.start();
536     }
537     else {
538     updateWorkerCount(-1); // back out on failed creation
539     break;
540     }
541     }
542     }
543     }
544    
545     // Execution methods
546    
547     /**
548     * Common code for execute, invoke and submit
549     */
550     private <T> void doSubmit(ForkJoinTask<T> task) {
551 dl 1.23 if (task == null)
552     throw new NullPointerException();
553 dl 1.1 if (isShutdown())
554     throw new RejectedExecutionException();
555 dl 1.6 if (workers == null)
556     ensureWorkerInitialization();
557 dl 1.1 submissionQueue.offer(task);
558 dl 1.4 signalIdleWorkers();
559 dl 1.1 }
560    
561     /**
562 jsr166 1.17 * Performs the given task, returning its result upon completion.
563     *
564 dl 1.1 * @param task the task
565     * @return the task's result
566     * @throws NullPointerException if task is null
567     * @throws RejectedExecutionException if pool is shut down
568     */
569     public <T> T invoke(ForkJoinTask<T> task) {
570     doSubmit(task);
571     return task.join();
572     }
573    
574     /**
575     * Arranges for (asynchronous) execution of the given task.
576 jsr166 1.17 *
577 dl 1.1 * @param task the task
578     * @throws NullPointerException if task is null
579     * @throws RejectedExecutionException if pool is shut down
580     */
581     public <T> void execute(ForkJoinTask<T> task) {
582     doSubmit(task);
583     }
584    
585     // AbstractExecutorService methods
586    
587     public void execute(Runnable task) {
588 dl 1.23 ForkJoinTask<?> job;
589 jsr166 1.26 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
590     job = (ForkJoinTask<?>) task;
591 dl 1.23 else
592 dl 1.33 job = ForkJoinTask.adapt(task, null);
593 dl 1.23 doSubmit(job);
594 dl 1.1 }
595    
596     public <T> ForkJoinTask<T> submit(Callable<T> task) {
597 dl 1.33 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
598 dl 1.1 doSubmit(job);
599     return job;
600     }
601    
602     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
603 dl 1.33 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
604 dl 1.1 doSubmit(job);
605     return job;
606     }
607    
608     public ForkJoinTask<?> submit(Runnable task) {
609 dl 1.23 ForkJoinTask<?> job;
610 jsr166 1.26 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
611     job = (ForkJoinTask<?>) task;
612 dl 1.23 else
613 dl 1.33 job = ForkJoinTask.adapt(task, null);
614 dl 1.1 doSubmit(job);
615     return job;
616     }
617    
618     /**
619 dl 1.23 * Submits a ForkJoinTask for execution.
620     *
621     * @param task the task to submit
622     * @return the task
623     * @throws RejectedExecutionException if the task cannot be
624     * scheduled for execution
625     * @throws NullPointerException if the task is null
626     */
627     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
628     doSubmit(task);
629     return task;
630     }
631    
632 dl 1.1
633     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
634 jsr166 1.20 ArrayList<ForkJoinTask<T>> forkJoinTasks =
635 dl 1.1 new ArrayList<ForkJoinTask<T>>(tasks.size());
636 jsr166 1.20 for (Callable<T> task : tasks)
637 dl 1.33 forkJoinTasks.add(ForkJoinTask.adapt(task));
638 jsr166 1.20 invoke(new InvokeAll<T>(forkJoinTasks));
639    
640     @SuppressWarnings({"unchecked", "rawtypes"})
641     List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
642     return futures;
643 dl 1.1 }
644    
645     static final class InvokeAll<T> extends RecursiveAction {
646     final ArrayList<ForkJoinTask<T>> tasks;
647     InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
648     public void compute() {
649 jsr166 1.17 try { invokeAll(tasks); }
650     catch (Exception ignore) {}
651 dl 1.1 }
652 jsr166 1.18 private static final long serialVersionUID = -7914297376763021607L;
653 dl 1.1 }
654    
655     // Configuration and status settings and queries
656    
657     /**
658 jsr166 1.17 * Returns the factory used for constructing new workers.
659 dl 1.1 *
660     * @return the factory used for constructing new workers
661     */
662     public ForkJoinWorkerThreadFactory getFactory() {
663     return factory;
664     }
665    
666     /**
667 dl 1.2 * Returns the handler for internal worker threads that terminate
668     * due to unrecoverable errors encountered while executing tasks.
669 jsr166 1.17 *
670 jsr166 1.28 * @return the handler, or {@code null} if none
671 dl 1.2 */
672     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
673     Thread.UncaughtExceptionHandler h;
674     final ReentrantLock lock = this.workerLock;
675     lock.lock();
676     try {
677     h = ueh;
678     } finally {
679     lock.unlock();
680     }
681     return h;
682     }
683    
684     /**
685     * Sets the handler for internal worker threads that terminate due
686     * to unrecoverable errors encountered while executing tasks.
687     * Unless set, the current default or ThreadGroup handler is used
688     * as handler.
689     *
690     * @param h the new handler
691 jsr166 1.28 * @return the old handler, or {@code null} if none
692 dl 1.2 * @throws SecurityException if a security manager exists and
693     * the caller is not permitted to modify threads
694     * because it does not hold {@link
695 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
696 dl 1.2 */
697     public Thread.UncaughtExceptionHandler
698     setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
699     checkPermission();
700     Thread.UncaughtExceptionHandler old = null;
701     final ReentrantLock lock = this.workerLock;
702     lock.lock();
703     try {
704     old = ueh;
705     ueh = h;
706     ForkJoinWorkerThread[] ws = workers;
707 dl 1.6 if (ws != null) {
708     for (int i = 0; i < ws.length; ++i) {
709     ForkJoinWorkerThread w = ws[i];
710     if (w != null)
711     w.setUncaughtExceptionHandler(h);
712     }
713 dl 1.2 }
714     } finally {
715     lock.unlock();
716     }
717     return old;
718     }
719    
720    
721     /**
722 jsr166 1.16 * Sets the target parallelism level of this pool.
723 jsr166 1.17 *
724 dl 1.1 * @param parallelism the target parallelism
725     * @throws IllegalArgumentException if parallelism less than or
726 jsr166 1.16 * equal to zero or greater than maximum size bounds
727 dl 1.1 * @throws SecurityException if a security manager exists and
728     * the caller is not permitted to modify threads
729     * because it does not hold {@link
730 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
731 dl 1.1 */
732     public void setParallelism(int parallelism) {
733     checkPermission();
734     if (parallelism <= 0 || parallelism > maxPoolSize)
735     throw new IllegalArgumentException();
736     final ReentrantLock lock = this.workerLock;
737     lock.lock();
738     try {
739     if (!isTerminating()) {
740     int p = this.parallelism;
741     this.parallelism = parallelism;
742     if (parallelism > p)
743     createAndStartAddedWorkers();
744     else
745     trimSpares();
746     }
747     } finally {
748     lock.unlock();
749     }
750 dl 1.4 signalIdleWorkers();
751 dl 1.1 }
752    
753     /**
754     * Returns the targeted number of worker threads in this pool.
755     *
756     * @return the targeted number of worker threads in this pool
757     */
758     public int getParallelism() {
759     return parallelism;
760     }
761    
762     /**
763     * Returns the number of worker threads that have started but not
764     * yet terminated. This result returned by this method may differ
765 jsr166 1.29 * from {@link #getParallelism} when threads are created to
766 dl 1.1 * maintain parallelism when others are cooperatively blocked.
767     *
768     * @return the number of worker threads
769     */
770     public int getPoolSize() {
771     return totalCountOf(workerCounts);
772     }
773    
774     /**
775     * Returns the maximum number of threads allowed to exist in the
776     * pool, even if there are insufficient unblocked running threads.
777 jsr166 1.17 *
778 dl 1.1 * @return the maximum
779     */
780     public int getMaximumPoolSize() {
781     return maxPoolSize;
782     }
783    
784     /**
785     * Sets the maximum number of threads allowed to exist in the
786     * pool, even if there are insufficient unblocked running threads.
787     * Setting this value has no effect on current pool size. It
788     * controls construction of new threads.
789 jsr166 1.17 *
790 dl 1.1 * @throws IllegalArgumentException if negative or greater then
791 jsr166 1.16 * internal implementation limit
792 dl 1.1 */
793     public void setMaximumPoolSize(int newMax) {
794     if (newMax < 0 || newMax > MAX_THREADS)
795     throw new IllegalArgumentException();
796     maxPoolSize = newMax;
797     }
798    
799    
800     /**
801 jsr166 1.28 * Returns {@code true} if this pool dynamically maintains its
802     * target parallelism level. If false, new threads are added only
803     * to avoid possible starvation. This setting is by default true.
804 jsr166 1.17 *
805 jsr166 1.28 * @return {@code true} if maintains parallelism
806 dl 1.1 */
807     public boolean getMaintainsParallelism() {
808     return maintainsParallelism;
809     }
810    
811     /**
812     * Sets whether this pool dynamically maintains its target
813     * parallelism level. If false, new threads are added only to
814     * avoid possible starvation.
815 jsr166 1.17 *
816 jsr166 1.28 * @param enable {@code true} to maintain parallelism
817 dl 1.1 */
818     public void setMaintainsParallelism(boolean enable) {
819     maintainsParallelism = enable;
820     }
821    
822     /**
823 dl 1.6 * Establishes local first-in-first-out scheduling mode for forked
824     * tasks that are never joined. This mode may be more appropriate
825     * than default locally stack-based mode in applications in which
826     * worker threads only process asynchronous tasks. This method is
827 jsr166 1.29 * designed to be invoked only when the pool is quiescent, and
828 dl 1.6 * typically only before any tasks are submitted. The effects of
829 jsr166 1.16 * invocations at other times may be unpredictable.
830 dl 1.6 *
831 jsr166 1.29 * @param async if {@code true}, use locally FIFO scheduling
832 jsr166 1.16 * @return the previous mode
833 jsr166 1.29 * @see #getAsyncMode
834 dl 1.6 */
835     public boolean setAsyncMode(boolean async) {
836     boolean oldMode = locallyFifo;
837     locallyFifo = async;
838     ForkJoinWorkerThread[] ws = workers;
839     if (ws != null) {
840     for (int i = 0; i < ws.length; ++i) {
841     ForkJoinWorkerThread t = ws[i];
842     if (t != null)
843     t.setAsyncMode(async);
844     }
845     }
846     return oldMode;
847     }
848    
849     /**
850 jsr166 1.28 * Returns {@code true} if this pool uses local first-in-first-out
851 jsr166 1.16 * scheduling mode for forked tasks that are never joined.
852 dl 1.6 *
853 jsr166 1.28 * @return {@code true} if this pool uses async mode
854 jsr166 1.29 * @see #setAsyncMode
855 dl 1.6 */
856     public boolean getAsyncMode() {
857     return locallyFifo;
858     }
859    
860     /**
861 dl 1.2 * Returns an estimate of the number of worker threads that are
862     * not blocked waiting to join tasks or for other managed
863 dl 1.1 * synchronization.
864     *
865     * @return the number of worker threads
866     */
867     public int getRunningThreadCount() {
868     return runningCountOf(workerCounts);
869     }
870    
871     /**
872 dl 1.2 * Returns an estimate of the number of threads that are currently
873 dl 1.1 * stealing or executing tasks. This method may overestimate the
874     * number of active threads.
875 jsr166 1.17 *
876 jsr166 1.16 * @return the number of active threads
877 dl 1.1 */
878     public int getActiveThreadCount() {
879     return activeCountOf(runControl);
880     }
881    
882     /**
883 dl 1.2 * Returns an estimate of the number of threads that are currently
884 dl 1.1 * idle waiting for tasks. This method may underestimate the
885     * number of idle threads.
886 jsr166 1.17 *
887 jsr166 1.16 * @return the number of idle threads
888 dl 1.1 */
889     final int getIdleThreadCount() {
890     int c = runningCountOf(workerCounts) - activeCountOf(runControl);
891 jsr166 1.17 return (c <= 0) ? 0 : c;
892 dl 1.1 }
893    
894     /**
895 jsr166 1.28 * Returns {@code true} if all worker threads are currently idle.
896     * An idle worker is one that cannot obtain a task to execute
897     * because none are available to steal from other threads, and
898     * there are no pending submissions to the pool. This method is
899     * conservative; it might not return {@code true} immediately upon
900     * idleness of all threads, but will eventually become true if
901     * threads remain inactive.
902 jsr166 1.17 *
903 jsr166 1.28 * @return {@code true} if all threads are currently idle
904 dl 1.1 */
905     public boolean isQuiescent() {
906     return activeCountOf(runControl) == 0;
907     }
908    
909     /**
910     * Returns an estimate of the total number of tasks stolen from
911     * one thread's work queue by another. The reported value
912     * underestimates the actual total number of steals when the pool
913     * is not quiescent. This value may be useful for monitoring and
914 jsr166 1.17 * tuning fork/join programs: in general, steal counts should be
915 dl 1.1 * high enough to keep threads busy, but low enough to avoid
916     * overhead and contention across threads.
917 jsr166 1.17 *
918 jsr166 1.16 * @return the number of steals
919 dl 1.1 */
920     public long getStealCount() {
921     return stealCount.get();
922     }
923    
924     /**
925 jsr166 1.17 * Accumulates steal count from a worker.
926     * Call only when worker known to be idle.
927 dl 1.1 */
928     private void updateStealCount(ForkJoinWorkerThread w) {
929     int sc = w.getAndClearStealCount();
930     if (sc != 0)
931     stealCount.addAndGet(sc);
932     }
933    
934     /**
935 dl 1.2 * Returns an estimate of the total number of tasks currently held
936     * in queues by worker threads (but not including tasks submitted
937     * to the pool that have not begun executing). This value is only
938     * an approximation, obtained by iterating across all threads in
939     * the pool. This method may be useful for tuning task
940     * granularities.
941 jsr166 1.17 *
942 jsr166 1.16 * @return the number of queued tasks
943 dl 1.1 */
944     public long getQueuedTaskCount() {
945     long count = 0;
946     ForkJoinWorkerThread[] ws = workers;
947 dl 1.6 if (ws != null) {
948     for (int i = 0; i < ws.length; ++i) {
949     ForkJoinWorkerThread t = ws[i];
950     if (t != null)
951     count += t.getQueueSize();
952     }
953 dl 1.1 }
954     return count;
955     }
956    
957     /**
958 dl 1.2 * Returns an estimate of the number tasks submitted to this pool
959 dl 1.1 * that have not yet begun executing. This method takes time
960     * proportional to the number of submissions.
961 jsr166 1.17 *
962 jsr166 1.16 * @return the number of queued submissions
963 dl 1.1 */
964     public int getQueuedSubmissionCount() {
965     return submissionQueue.size();
966     }
967    
968     /**
969 jsr166 1.28 * Returns {@code true} if there are any tasks submitted to this
970     * pool that have not yet begun executing.
971 jsr166 1.17 *
972 jsr166 1.16 * @return {@code true} if there are any queued submissions
973 dl 1.1 */
974     public boolean hasQueuedSubmissions() {
975     return !submissionQueue.isEmpty();
976     }
977    
978     /**
979     * Removes and returns the next unexecuted submission if one is
980     * available. This method may be useful in extensions to this
981     * class that re-assign work in systems with multiple pools.
982 jsr166 1.17 *
983 jsr166 1.28 * @return the next submission, or {@code null} if none
984 dl 1.1 */
985     protected ForkJoinTask<?> pollSubmission() {
986     return submissionQueue.poll();
987     }
988    
989     /**
990 dl 1.6 * Removes all available unexecuted submitted and forked tasks
991     * from scheduling queues and adds them to the given collection,
992     * without altering their execution status. These may include
993 jsr166 1.16 * artificially generated or wrapped tasks. This method is designed
994 dl 1.6 * to be invoked only when the pool is known to be
995     * quiescent. Invocations at other times may not remove all
996     * tasks. A failure encountered while attempting to add elements
997 jsr166 1.16 * to collection {@code c} may result in elements being in
998 dl 1.6 * neither, either or both collections when the associated
999     * exception is thrown. The behavior of this operation is
1000     * undefined if the specified collection is modified while the
1001     * operation is in progress.
1002 jsr166 1.17 *
1003 dl 1.6 * @param c the collection to transfer elements into
1004     * @return the number of elements transferred
1005     */
1006 dl 1.30 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1007 dl 1.6 int n = submissionQueue.drainTo(c);
1008     ForkJoinWorkerThread[] ws = workers;
1009     if (ws != null) {
1010     for (int i = 0; i < ws.length; ++i) {
1011     ForkJoinWorkerThread w = ws[i];
1012     if (w != null)
1013     n += w.drainTasksTo(c);
1014     }
1015     }
1016     return n;
1017     }
1018    
1019     /**
1020 dl 1.1 * Returns a string identifying this pool, as well as its state,
1021     * including indications of run state, parallelism level, and
1022     * worker and task counts.
1023     *
1024     * @return a string identifying this pool, as well as its state
1025     */
1026     public String toString() {
1027     int ps = parallelism;
1028     int wc = workerCounts;
1029     int rc = runControl;
1030     long st = getStealCount();
1031     long qt = getQueuedTaskCount();
1032     long qs = getQueuedSubmissionCount();
1033     return super.toString() +
1034     "[" + runStateToString(runStateOf(rc)) +
1035     ", parallelism = " + ps +
1036     ", size = " + totalCountOf(wc) +
1037     ", active = " + activeCountOf(rc) +
1038     ", running = " + runningCountOf(wc) +
1039     ", steals = " + st +
1040     ", tasks = " + qt +
1041     ", submissions = " + qs +
1042     "]";
1043     }
1044    
1045     private static String runStateToString(int rs) {
1046     switch(rs) {
1047     case RUNNING: return "Running";
1048     case SHUTDOWN: return "Shutting down";
1049     case TERMINATING: return "Terminating";
1050     case TERMINATED: return "Terminated";
1051     default: throw new Error("Unknown run state");
1052     }
1053     }
1054    
1055     // lifecycle control
1056    
1057     /**
1058     * Initiates an orderly shutdown in which previously submitted
1059     * tasks are executed, but no new tasks will be accepted.
1060     * Invocation has no additional effect if already shut down.
1061     * Tasks that are in the process of being submitted concurrently
1062     * during the course of this method may or may not be rejected.
1063 jsr166 1.17 *
1064 dl 1.1 * @throws SecurityException if a security manager exists and
1065     * the caller is not permitted to modify threads
1066     * because it does not hold {@link
1067 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
1068 dl 1.1 */
1069     public void shutdown() {
1070     checkPermission();
1071     transitionRunStateTo(SHUTDOWN);
1072 dl 1.31 if (canTerminateOnShutdown(runControl)) {
1073     if (workers == null) { // shutting down before workers created
1074     final ReentrantLock lock = this.workerLock;
1075     lock.lock();
1076     try {
1077     if (workers == null) {
1078     terminate();
1079     transitionRunStateTo(TERMINATED);
1080     termination.signalAll();
1081     }
1082     } finally {
1083     lock.unlock();
1084     }
1085     }
1086 dl 1.1 terminateOnShutdown();
1087 dl 1.31 }
1088 dl 1.1 }
1089    
1090     /**
1091     * Attempts to stop all actively executing tasks, and cancels all
1092     * waiting tasks. Tasks that are in the process of being
1093     * submitted or executed concurrently during the course of this
1094     * method may or may not be rejected. Unlike some other executors,
1095 dl 1.6 * this method cancels rather than collects non-executed tasks
1096     * upon termination, so always returns an empty list. However, you
1097 jsr166 1.29 * can use method {@link #drainTasksTo} before invoking this
1098 dl 1.6 * method to transfer unexecuted tasks to another collection.
1099 jsr166 1.17 *
1100 dl 1.1 * @return an empty list
1101     * @throws SecurityException if a security manager exists and
1102     * the caller is not permitted to modify threads
1103     * because it does not hold {@link
1104 jsr166 1.17 * java.lang.RuntimePermission}{@code ("modifyThread")}
1105 dl 1.1 */
1106     public List<Runnable> shutdownNow() {
1107     checkPermission();
1108     terminate();
1109     return Collections.emptyList();
1110     }
1111    
1112     /**
1113 jsr166 1.16 * Returns {@code true} if all tasks have completed following shut down.
1114 dl 1.1 *
1115 jsr166 1.16 * @return {@code true} if all tasks have completed following shut down
1116 dl 1.1 */
1117     public boolean isTerminated() {
1118     return runStateOf(runControl) == TERMINATED;
1119     }
1120    
1121     /**
1122 jsr166 1.16 * Returns {@code true} if the process of termination has
1123 dl 1.1 * commenced but possibly not yet completed.
1124     *
1125 jsr166 1.16 * @return {@code true} if terminating
1126 dl 1.1 */
1127     public boolean isTerminating() {
1128     return runStateOf(runControl) >= TERMINATING;
1129     }
1130    
1131     /**
1132 jsr166 1.16 * Returns {@code true} if this pool has been shut down.
1133 dl 1.1 *
1134 jsr166 1.16 * @return {@code true} if this pool has been shut down
1135 dl 1.1 */
1136     public boolean isShutdown() {
1137     return runStateOf(runControl) >= SHUTDOWN;
1138     }
1139    
1140     /**
1141     * Blocks until all tasks have completed execution after a shutdown
1142     * request, or the timeout occurs, or the current thread is
1143     * interrupted, whichever happens first.
1144     *
1145     * @param timeout the maximum time to wait
1146     * @param unit the time unit of the timeout argument
1147 jsr166 1.16 * @return {@code true} if this executor terminated and
1148     * {@code false} if the timeout elapsed before termination
1149 dl 1.1 * @throws InterruptedException if interrupted while waiting
1150     */
1151     public boolean awaitTermination(long timeout, TimeUnit unit)
1152     throws InterruptedException {
1153     long nanos = unit.toNanos(timeout);
1154     final ReentrantLock lock = this.workerLock;
1155     lock.lock();
1156     try {
1157     for (;;) {
1158     if (isTerminated())
1159     return true;
1160     if (nanos <= 0)
1161     return false;
1162     nanos = termination.awaitNanos(nanos);
1163     }
1164     } finally {
1165     lock.unlock();
1166     }
1167     }
1168    
1169     // Shutdown and termination support
1170    
1171     /**
1172 jsr166 1.17 * Callback from terminating worker. Nulls out the corresponding
1173     * workers slot, and if terminating, tries to terminate; else
1174     * tries to shrink workers array.
1175     *
1176 dl 1.1 * @param w the worker
1177     */
1178     final void workerTerminated(ForkJoinWorkerThread w) {
1179     updateStealCount(w);
1180     updateWorkerCount(-1);
1181     final ReentrantLock lock = this.workerLock;
1182     lock.lock();
1183     try {
1184     ForkJoinWorkerThread[] ws = workers;
1185 dl 1.6 if (ws != null) {
1186     int idx = w.poolIndex;
1187     if (idx >= 0 && idx < ws.length && ws[idx] == w)
1188     ws[idx] = null;
1189     if (totalCountOf(workerCounts) == 0) {
1190     terminate(); // no-op if already terminating
1191     transitionRunStateTo(TERMINATED);
1192     termination.signalAll();
1193     }
1194     else if (!isTerminating()) {
1195     tryShrinkWorkerArray();
1196     tryResumeSpare(true); // allow replacement
1197     }
1198 dl 1.1 }
1199     } finally {
1200     lock.unlock();
1201     }
1202 dl 1.4 signalIdleWorkers();
1203 dl 1.1 }
1204    
1205     /**
1206 jsr166 1.17 * Initiates termination.
1207 dl 1.1 */
1208     private void terminate() {
1209     if (transitionRunStateTo(TERMINATING)) {
1210     stopAllWorkers();
1211     resumeAllSpares();
1212 dl 1.4 signalIdleWorkers();
1213 dl 1.1 cancelQueuedSubmissions();
1214     cancelQueuedWorkerTasks();
1215     interruptUnterminatedWorkers();
1216 dl 1.4 signalIdleWorkers(); // resignal after interrupt
1217 dl 1.1 }
1218     }
1219    
1220     /**
1221 jsr166 1.16 * Possibly terminates when on shutdown state.
1222 dl 1.1 */
1223     private void terminateOnShutdown() {
1224     if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1225     terminate();
1226     }
1227    
1228     /**
1229 jsr166 1.16 * Clears out and cancels submissions.
1230 dl 1.1 */
1231     private void cancelQueuedSubmissions() {
1232     ForkJoinTask<?> task;
1233     while ((task = pollSubmission()) != null)
1234     task.cancel(false);
1235     }
1236    
1237     /**
1238 jsr166 1.16 * Cleans out worker queues.
1239 dl 1.1 */
1240     private void cancelQueuedWorkerTasks() {
1241     final ReentrantLock lock = this.workerLock;
1242     lock.lock();
1243     try {
1244     ForkJoinWorkerThread[] ws = workers;
1245 dl 1.6 if (ws != null) {
1246     for (int i = 0; i < ws.length; ++i) {
1247     ForkJoinWorkerThread t = ws[i];
1248     if (t != null)
1249     t.cancelTasks();
1250     }
1251 dl 1.1 }
1252     } finally {
1253     lock.unlock();
1254     }
1255     }
1256    
1257     /**
1258 jsr166 1.16 * Sets each worker's status to terminating. Requires lock to avoid
1259     * conflicts with add/remove.
1260 dl 1.1 */
1261     private void stopAllWorkers() {
1262     final ReentrantLock lock = this.workerLock;
1263     lock.lock();
1264     try {
1265     ForkJoinWorkerThread[] ws = workers;
1266 dl 1.6 if (ws != null) {
1267     for (int i = 0; i < ws.length; ++i) {
1268     ForkJoinWorkerThread t = ws[i];
1269     if (t != null)
1270     t.shutdownNow();
1271     }
1272 dl 1.1 }
1273     } finally {
1274     lock.unlock();
1275     }
1276     }
1277    
1278     /**
1279 jsr166 1.16 * Interrupts all unterminated workers. This is not required for
1280 dl 1.1 * sake of internal control, but may help unstick user code during
1281     * shutdown.
1282     */
1283     private void interruptUnterminatedWorkers() {
1284     final ReentrantLock lock = this.workerLock;
1285     lock.lock();
1286     try {
1287     ForkJoinWorkerThread[] ws = workers;
1288 dl 1.6 if (ws != null) {
1289     for (int i = 0; i < ws.length; ++i) {
1290     ForkJoinWorkerThread t = ws[i];
1291     if (t != null && !t.isTerminated()) {
1292     try {
1293     t.interrupt();
1294     } catch (SecurityException ignore) {
1295     }
1296 dl 1.1 }
1297     }
1298     }
1299     } finally {
1300     lock.unlock();
1301     }
1302     }
1303    
1304    
1305     /*
1306 dl 1.4 * Nodes for event barrier to manage idle threads. Queue nodes
1307     * are basic Treiber stack nodes, also used for spare stack.
1308 dl 1.1 *
1309     * The event barrier has an event count and a wait queue (actually
1310     * a Treiber stack). Workers are enabled to look for work when
1311 dl 1.4 * the eventCount is incremented. If they fail to find work, they
1312     * may wait for next count. Upon release, threads help others wake
1313     * up.
1314     *
1315     * Synchronization events occur only in enough contexts to
1316     * maintain overall liveness:
1317 dl 1.1 *
1318     * - Submission of a new task to the pool
1319 dl 1.4 * - Resizes or other changes to the workers array
1320 dl 1.1 * - pool termination
1321     * - A worker pushing a task on an empty queue
1322     *
1323 dl 1.4 * The case of pushing a task occurs often enough, and is heavy
1324     * enough compared to simple stack pushes, to require special
1325     * handling: Method signalWork returns without advancing count if
1326     * the queue appears to be empty. This would ordinarily result in
1327     * races causing some queued waiters not to be woken up. To avoid
1328     * this, the first worker enqueued in method sync (see
1329     * syncIsReleasable) rescans for tasks after being enqueued, and
1330     * helps signal if any are found. This works well because the
1331     * worker has nothing better to do, and so might as well help
1332     * alleviate the overhead and contention on the threads actually
1333     * doing work. Also, since event counts increments on task
1334     * availability exist to maintain liveness (rather than to force
1335     * refreshes etc), it is OK for callers to exit early if
1336     * contending with another signaller.
1337 dl 1.1 */
1338     static final class WaitQueueNode {
1339     WaitQueueNode next; // only written before enqueued
1340     volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1341     final long count; // unused for spare stack
1342 dl 1.4
1343     WaitQueueNode(long c, ForkJoinWorkerThread w) {
1344 dl 1.1 count = c;
1345     thread = w;
1346     }
1347 dl 1.4
1348     /**
1349 jsr166 1.16 * Wakes up waiter, returning false if known to already
1350 dl 1.4 */
1351     boolean signal() {
1352 dl 1.1 ForkJoinWorkerThread t = thread;
1353 dl 1.4 if (t == null)
1354     return false;
1355 dl 1.1 thread = null;
1356 dl 1.4 LockSupport.unpark(t);
1357     return true;
1358     }
1359    
1360     /**
1361 jsr166 1.16 * Awaits release on sync.
1362 dl 1.4 */
1363     void awaitSyncRelease(ForkJoinPool p) {
1364     while (thread != null && !p.syncIsReleasable(this))
1365     LockSupport.park(this);
1366     }
1367    
1368     /**
1369 jsr166 1.16 * Awaits resumption as spare.
1370 dl 1.4 */
1371     void awaitSpareRelease() {
1372     while (thread != null) {
1373     if (!Thread.interrupted())
1374     LockSupport.park(this);
1375 dl 1.1 }
1376     }
1377     }
1378    
1379     /**
1380 dl 1.4 * Ensures that no thread is waiting for count to advance from the
1381     * current value of eventCount read on entry to this method, by
1382     * releasing waiting threads if necessary.
1383 jsr166 1.17 *
1384 dl 1.4 * @return the count
1385 dl 1.1 */
1386 dl 1.4 final long ensureSync() {
1387     long c = eventCount;
1388     WaitQueueNode q;
1389     while ((q = syncStack) != null && q.count < c) {
1390     if (casBarrierStack(q, null)) {
1391 dl 1.1 do {
1392 dl 1.4 q.signal();
1393 dl 1.1 } while ((q = q.next) != null);
1394     break;
1395     }
1396     }
1397     return c;
1398     }
1399    
1400     /**
1401 dl 1.4 * Increments event count and releases waiting threads.
1402 dl 1.1 */
1403 dl 1.4 private void signalIdleWorkers() {
1404 dl 1.1 long c;
1405 jsr166 1.17 do {} while (!casEventCount(c = eventCount, c+1));
1406 dl 1.4 ensureSync();
1407 dl 1.1 }
1408    
1409     /**
1410 jsr166 1.16 * Signals threads waiting to poll a task. Because method sync
1411 dl 1.4 * rechecks availability, it is OK to only proceed if queue
1412     * appears to be non-empty, and OK to skip under contention to
1413     * increment count (since some other thread succeeded).
1414 dl 1.1 */
1415 dl 1.4 final void signalWork() {
1416 dl 1.1 long c;
1417 dl 1.4 WaitQueueNode q;
1418     if (syncStack != null &&
1419     casEventCount(c = eventCount, c+1) &&
1420     (((q = syncStack) != null && q.count <= c) &&
1421     (!casBarrierStack(q, q.next) || !q.signal())))
1422     ensureSync();
1423 dl 1.1 }
1424    
1425     /**
1426 dl 1.4 * Waits until event count advances from last value held by
1427     * caller, or if excess threads, caller is resumed as spare, or
1428     * caller or pool is terminating. Updates caller's event on exit.
1429 jsr166 1.17 *
1430 dl 1.1 * @param w the calling worker thread
1431     */
1432 dl 1.4 final void sync(ForkJoinWorkerThread w) {
1433     updateStealCount(w); // Transfer w's count while it is idle
1434 dl 1.1
1435 dl 1.4 while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1436     long prev = w.lastEventCount;
1437 dl 1.1 WaitQueueNode node = null;
1438 dl 1.4 WaitQueueNode h;
1439 jsr166 1.5 while (eventCount == prev &&
1440 dl 1.4 ((h = syncStack) == null || h.count == prev)) {
1441     if (node == null)
1442     node = new WaitQueueNode(prev, w);
1443     if (casBarrierStack(node.next = h, node)) {
1444     node.awaitSyncRelease(this);
1445 dl 1.1 break;
1446     }
1447     }
1448 dl 1.4 long ec = ensureSync();
1449     if (ec != prev) {
1450     w.lastEventCount = ec;
1451     break;
1452     }
1453     }
1454     }
1455    
1456     /**
1457 jsr166 1.28 * Returns {@code true} if worker waiting on sync can proceed:
1458 dl 1.4 * - on signal (thread == null)
1459     * - on event count advance (winning race to notify vs signaller)
1460 jsr166 1.17 * - on interrupt
1461 jsr166 1.5 * - if the first queued node, we find work available
1462     * If node was not signalled and event count not advanced on exit,
1463 dl 1.4 * then we also help advance event count.
1464 jsr166 1.17 *
1465 jsr166 1.28 * @return {@code true} if node can be released
1466 dl 1.4 */
1467     final boolean syncIsReleasable(WaitQueueNode node) {
1468     long prev = node.count;
1469     if (!Thread.interrupted() && node.thread != null &&
1470     (node.next != null ||
1471     !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1472     eventCount == prev)
1473     return false;
1474     if (node.thread != null) {
1475     node.thread = null;
1476 dl 1.1 long ec = eventCount;
1477 dl 1.4 if (prev <= ec) // help signal
1478     casEventCount(ec, ec+1);
1479 dl 1.1 }
1480 dl 1.4 return true;
1481     }
1482    
1483     /**
1484 jsr166 1.28 * Returns {@code true} if a new sync event occurred since last
1485     * call to sync or this method, if so, updating caller's count.
1486 dl 1.4 */
1487     final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1488     long lc = w.lastEventCount;
1489     long ec = ensureSync();
1490     if (ec == lc)
1491     return false;
1492     w.lastEventCount = ec;
1493     return true;
1494 dl 1.1 }
1495    
1496     // Parallelism maintenance
1497    
1498     /**
1499 jsr166 1.16 * Decrements running count; if too low, adds spare.
1500 dl 1.1 *
1501     * Conceptually, all we need to do here is add or resume a
1502     * spare thread when one is about to block (and remove or
1503     * suspend it later when unblocked -- see suspendIfSpare).
1504     * However, implementing this idea requires coping with
1505 jsr166 1.17 * several problems: we have imperfect information about the
1506 dl 1.1 * states of threads. Some count updates can and usually do
1507     * lag run state changes, despite arrangements to keep them
1508     * accurate (for example, when possible, updating counts
1509     * before signalling or resuming), especially when running on
1510     * dynamic JVMs that don't optimize the infrequent paths that
1511     * update counts. Generating too many threads can make these
1512     * problems become worse, because excess threads are more
1513     * likely to be context-switched with others, slowing them all
1514     * down, especially if there is no work available, so all are
1515     * busy scanning or idling. Also, excess spare threads can
1516     * only be suspended or removed when they are idle, not
1517     * immediately when they aren't needed. So adding threads will
1518     * raise parallelism level for longer than necessary. Also,
1519 jsr166 1.16 * FJ applications often encounter highly transient peaks when
1520 dl 1.1 * many threads are blocked joining, but for less time than it
1521     * takes to create or resume spares.
1522     *
1523     * @param joinMe if non-null, return early if done
1524     * @param maintainParallelism if true, try to stay within
1525     * target counts, else create only to avoid starvation
1526     * @return true if joinMe known to be done
1527     */
1528 jsr166 1.17 final boolean preJoin(ForkJoinTask<?> joinMe,
1529     boolean maintainParallelism) {
1530 dl 1.1 maintainParallelism &= maintainsParallelism; // overrride
1531     boolean dec = false; // true when running count decremented
1532     while (spareStack == null || !tryResumeSpare(dec)) {
1533     int counts = workerCounts;
1534 jsr166 1.17 if (dec || (dec = casWorkerCounts(counts, --counts))) {
1535     // CAS cheat
1536 dl 1.1 if (!needSpare(counts, maintainParallelism))
1537     break;
1538     if (joinMe.status < 0)
1539     return true;
1540     if (tryAddSpare(counts))
1541     break;
1542     }
1543     }
1544     return false;
1545     }
1546    
1547     /**
1548     * Same idea as preJoin
1549     */
1550 jsr166 1.17 final boolean preBlock(ManagedBlocker blocker,
1551     boolean maintainParallelism) {
1552 dl 1.1 maintainParallelism &= maintainsParallelism;
1553     boolean dec = false;
1554     while (spareStack == null || !tryResumeSpare(dec)) {
1555     int counts = workerCounts;
1556     if (dec || (dec = casWorkerCounts(counts, --counts))) {
1557     if (!needSpare(counts, maintainParallelism))
1558     break;
1559     if (blocker.isReleasable())
1560     return true;
1561     if (tryAddSpare(counts))
1562     break;
1563     }
1564     }
1565     return false;
1566     }
1567    
1568     /**
1569 jsr166 1.28 * Returns {@code true} if a spare thread appears to be needed.
1570     * If maintaining parallelism, returns true when the deficit in
1571 dl 1.1 * running threads is more than the surplus of total threads, and
1572     * there is apparently some work to do. This self-limiting rule
1573     * means that the more threads that have already been added, the
1574     * less parallelism we will tolerate before adding another.
1575 jsr166 1.17 *
1576 dl 1.1 * @param counts current worker counts
1577     * @param maintainParallelism try to maintain parallelism
1578     */
1579     private boolean needSpare(int counts, boolean maintainParallelism) {
1580     int ps = parallelism;
1581     int rc = runningCountOf(counts);
1582     int tc = totalCountOf(counts);
1583     int runningDeficit = ps - rc;
1584     int totalSurplus = tc - ps;
1585     return (tc < maxPoolSize &&
1586     (rc == 0 || totalSurplus < 0 ||
1587     (maintainParallelism &&
1588 jsr166 1.5 runningDeficit > totalSurplus &&
1589 dl 1.4 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1590 dl 1.1 }
1591 jsr166 1.5
1592 dl 1.1 /**
1593 jsr166 1.16 * Adds a spare worker if lock available and no more than the
1594     * expected numbers of threads exist.
1595 jsr166 1.17 *
1596 dl 1.1 * @return true if successful
1597     */
1598     private boolean tryAddSpare(int expectedCounts) {
1599     final ReentrantLock lock = this.workerLock;
1600     int expectedRunning = runningCountOf(expectedCounts);
1601     int expectedTotal = totalCountOf(expectedCounts);
1602     boolean success = false;
1603     boolean locked = false;
1604     // confirm counts while locking; CAS after obtaining lock
1605     try {
1606     for (;;) {
1607     int s = workerCounts;
1608     int tc = totalCountOf(s);
1609     int rc = runningCountOf(s);
1610     if (rc > expectedRunning || tc > expectedTotal)
1611     break;
1612     if (!locked && !(locked = lock.tryLock()))
1613     break;
1614     if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1615     createAndStartSpare(tc);
1616     success = true;
1617     break;
1618     }
1619     }
1620     } finally {
1621     if (locked)
1622     lock.unlock();
1623     }
1624     return success;
1625     }
1626    
1627     /**
1628 jsr166 1.16 * Adds the kth spare worker. On entry, pool counts are already
1629 dl 1.1 * adjusted to reflect addition.
1630     */
1631     private void createAndStartSpare(int k) {
1632     ForkJoinWorkerThread w = null;
1633     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1634     int len = ws.length;
1635     // Probably, we can place at slot k. If not, find empty slot
1636     if (k < len && ws[k] != null) {
1637     for (k = 0; k < len && ws[k] != null; ++k)
1638     ;
1639     }
1640 dl 1.3 if (k < len && !isTerminating() && (w = createWorker(k)) != null) {
1641 dl 1.1 ws[k] = w;
1642     w.start();
1643     }
1644     else
1645     updateWorkerCount(-1); // adjust on failure
1646 dl 1.4 signalIdleWorkers();
1647 dl 1.1 }
1648    
1649     /**
1650 jsr166 1.16 * Suspends calling thread w if there are excess threads. Called
1651     * only from sync. Spares are enqueued in a Treiber stack using
1652     * the same WaitQueueNodes as barriers. They are resumed mainly
1653     * in preJoin, but are also woken on pool events that require all
1654     * threads to check run state.
1655 jsr166 1.17 *
1656 dl 1.1 * @param w the caller
1657     */
1658     private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1659     WaitQueueNode node = null;
1660     int s;
1661     while (parallelism < runningCountOf(s = workerCounts)) {
1662     if (node == null)
1663 dl 1.4 node = new WaitQueueNode(0, w);
1664 dl 1.1 if (casWorkerCounts(s, s-1)) { // representation-dependent
1665     // push onto stack
1666 jsr166 1.17 do {} while (!casSpareStack(node.next = spareStack, node));
1667 dl 1.1 // block until released by resumeSpare
1668 dl 1.4 node.awaitSpareRelease();
1669 dl 1.1 return true;
1670     }
1671     }
1672     return false;
1673     }
1674    
1675     /**
1676 jsr166 1.16 * Tries to pop and resume a spare thread.
1677 jsr166 1.17 *
1678 dl 1.1 * @param updateCount if true, increment running count on success
1679     * @return true if successful
1680     */
1681     private boolean tryResumeSpare(boolean updateCount) {
1682     WaitQueueNode q;
1683     while ((q = spareStack) != null) {
1684     if (casSpareStack(q, q.next)) {
1685     if (updateCount)
1686     updateRunningCount(1);
1687     q.signal();
1688     return true;
1689     }
1690     }
1691     return false;
1692     }
1693    
1694     /**
1695 jsr166 1.16 * Pops and resumes all spare threads. Same idea as ensureSync.
1696 jsr166 1.17 *
1697 dl 1.1 * @return true if any spares released
1698     */
1699     private boolean resumeAllSpares() {
1700     WaitQueueNode q;
1701     while ( (q = spareStack) != null) {
1702     if (casSpareStack(q, null)) {
1703     do {
1704     updateRunningCount(1);
1705     q.signal();
1706     } while ((q = q.next) != null);
1707     return true;
1708     }
1709     }
1710     return false;
1711     }
1712    
1713     /**
1714 jsr166 1.16 * Pops and shuts down excessive spare threads. Call only while
1715 dl 1.1 * holding lock. This is not guaranteed to eliminate all excess
1716     * threads, only those suspended as spares, which are the ones
1717     * unlikely to be needed in the future.
1718     */
1719     private void trimSpares() {
1720     int surplus = totalCountOf(workerCounts) - parallelism;
1721     WaitQueueNode q;
1722     while (surplus > 0 && (q = spareStack) != null) {
1723     if (casSpareStack(q, null)) {
1724     do {
1725     updateRunningCount(1);
1726     ForkJoinWorkerThread w = q.thread;
1727     if (w != null && surplus > 0 &&
1728     runningCountOf(workerCounts) > 0 && w.shutdown())
1729     --surplus;
1730     q.signal();
1731     } while ((q = q.next) != null);
1732     }
1733     }
1734     }
1735    
1736     /**
1737     * Interface for extending managed parallelism for tasks running
1738     * in ForkJoinPools. A ManagedBlocker provides two methods.
1739 jsr166 1.28 * Method {@code isReleasable} must return {@code true} if
1740     * blocking is not necessary. Method {@code block} blocks the
1741     * current thread if necessary (perhaps internally invoking
1742     * {@code isReleasable} before actually blocking.).
1743 jsr166 1.17 *
1744 dl 1.1 * <p>For example, here is a ManagedBlocker based on a
1745     * ReentrantLock:
1746 jsr166 1.17 * <pre> {@code
1747     * class ManagedLocker implements ManagedBlocker {
1748     * final ReentrantLock lock;
1749     * boolean hasLock = false;
1750     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1751     * public boolean block() {
1752     * if (!hasLock)
1753     * lock.lock();
1754     * return true;
1755     * }
1756     * public boolean isReleasable() {
1757     * return hasLock || (hasLock = lock.tryLock());
1758 dl 1.1 * }
1759 jsr166 1.17 * }}</pre>
1760 dl 1.1 */
1761     public static interface ManagedBlocker {
1762     /**
1763     * Possibly blocks the current thread, for example waiting for
1764     * a lock or condition.
1765 jsr166 1.17 *
1766 jsr166 1.28 * @return {@code true} if no additional blocking is necessary
1767     * (i.e., if isReleasable would return true)
1768 dl 1.1 * @throws InterruptedException if interrupted while waiting
1769 jsr166 1.17 * (the method is not required to do so, but is allowed to)
1770 dl 1.1 */
1771     boolean block() throws InterruptedException;
1772    
1773     /**
1774 jsr166 1.28 * Returns {@code true} if blocking is unnecessary.
1775 dl 1.1 */
1776     boolean isReleasable();
1777     }
1778    
1779     /**
1780     * Blocks in accord with the given blocker. If the current thread
1781     * is a ForkJoinWorkerThread, this method possibly arranges for a
1782     * spare thread to be activated if necessary to ensure parallelism
1783     * while the current thread is blocked. If
1784 jsr166 1.28 * {@code maintainParallelism} is {@code true} and the pool supports
1785 dl 1.2 * it ({@link #getMaintainsParallelism}), this method attempts to
1786 jsr166 1.17 * maintain the pool's nominal parallelism. Otherwise it activates
1787 dl 1.1 * a thread only if necessary to avoid complete starvation. This
1788     * option may be preferable when blockages use timeouts, or are
1789     * almost always brief.
1790     *
1791     * <p> If the caller is not a ForkJoinTask, this method is behaviorally
1792     * equivalent to
1793 jsr166 1.17 * <pre> {@code
1794     * while (!blocker.isReleasable())
1795     * if (blocker.block())
1796     * return;
1797     * }</pre>
1798 dl 1.1 * If the caller is a ForkJoinTask, then the pool may first
1799     * be expanded to ensure parallelism, and later adjusted.
1800     *
1801     * @param blocker the blocker
1802 jsr166 1.28 * @param maintainParallelism if {@code true} and supported by
1803     * this pool, attempt to maintain the pool's nominal parallelism;
1804     * otherwise activate a thread only if necessary to avoid
1805     * complete starvation.
1806 jsr166 1.16 * @throws InterruptedException if blocker.block did so
1807 dl 1.1 */
1808     public static void managedBlock(ManagedBlocker blocker,
1809     boolean maintainParallelism)
1810     throws InterruptedException {
1811     Thread t = Thread.currentThread();
1812 jsr166 1.17 ForkJoinPool pool = ((t instanceof ForkJoinWorkerThread) ?
1813     ((ForkJoinWorkerThread) t).pool : null);
1814 dl 1.1 if (!blocker.isReleasable()) {
1815     try {
1816     if (pool == null ||
1817     !pool.preBlock(blocker, maintainParallelism))
1818     awaitBlocker(blocker);
1819     } finally {
1820     if (pool != null)
1821     pool.updateRunningCount(1);
1822     }
1823     }
1824     }
1825    
1826     private static void awaitBlocker(ManagedBlocker blocker)
1827     throws InterruptedException {
1828 jsr166 1.17 do {} while (!blocker.isReleasable() && !blocker.block());
1829 dl 1.1 }
1830    
1831 dl 1.33 // AbstractExecutorService overrides. These rely on undocumented
1832     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1833     // implement RunnableFuture.
1834 dl 1.2
1835     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1836 dl 1.33 return (RunnableFuture<T>)ForkJoinTask.adapt(runnable, value);
1837 dl 1.2 }
1838    
1839     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1840 dl 1.33 return (RunnableFuture<T>)ForkJoinTask.adapt(callable);
1841 dl 1.2 }
1842    
1843 jsr166 1.27 // Unsafe mechanics
1844 dl 1.1
1845 jsr166 1.21 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1846 jsr166 1.25 private static final long eventCountOffset =
1847 jsr166 1.27 objectFieldOffset("eventCount", ForkJoinPool.class);
1848 jsr166 1.25 private static final long workerCountsOffset =
1849 jsr166 1.27 objectFieldOffset("workerCounts", ForkJoinPool.class);
1850 jsr166 1.25 private static final long runControlOffset =
1851 jsr166 1.27 objectFieldOffset("runControl", ForkJoinPool.class);
1852 jsr166 1.25 private static final long syncStackOffset =
1853 jsr166 1.27 objectFieldOffset("syncStack",ForkJoinPool.class);
1854 jsr166 1.25 private static final long spareStackOffset =
1855 jsr166 1.27 objectFieldOffset("spareStack", ForkJoinPool.class);
1856 jsr166 1.21
1857 dl 1.1 private boolean casEventCount(long cmp, long val) {
1858 jsr166 1.16 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1859 dl 1.1 }
1860     private boolean casWorkerCounts(int cmp, int val) {
1861 jsr166 1.16 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1862 dl 1.1 }
1863     private boolean casRunControl(int cmp, int val) {
1864 jsr166 1.16 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1865 dl 1.1 }
1866     private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1867 jsr166 1.16 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1868 dl 1.1 }
1869     private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1870 jsr166 1.16 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1871 dl 1.1 }
1872 jsr166 1.27
1873     private static long objectFieldOffset(String field, Class<?> klazz) {
1874     try {
1875     return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1876     } catch (NoSuchFieldException e) {
1877     // Convert Exception to corresponding Error
1878     NoSuchFieldError error = new NoSuchFieldError(field);
1879     error.initCause(e);
1880     throw error;
1881     }
1882     }
1883    
1884     /**
1885     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
1886     * Replace with a simple call to Unsafe.getUnsafe when integrating
1887     * into a jdk.
1888     *
1889     * @return a sun.misc.Unsafe
1890     */
1891     private static sun.misc.Unsafe getUnsafe() {
1892     try {
1893     return sun.misc.Unsafe.getUnsafe();
1894     } catch (SecurityException se) {
1895     try {
1896     return java.security.AccessController.doPrivileged
1897     (new java.security
1898     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1899     public sun.misc.Unsafe run() throws Exception {
1900     java.lang.reflect.Field f = sun.misc
1901     .Unsafe.class.getDeclaredField("theUnsafe");
1902     f.setAccessible(true);
1903     return (sun.misc.Unsafe) f.get(null);
1904     }});
1905     } catch (java.security.PrivilegedActionException e) {
1906     throw new RuntimeException("Could not initialize intrinsics",
1907     e.getCause());
1908     }
1909     }
1910     }
1911 dl 1.1 }