ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
Revision: 1.13
Committed: Wed Jul 22 01:36:51 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.12: +3 -0 lines
Log Message:
Add @since, @author tags

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     package jsr166y;
8     import java.util.*;
9     import java.util.concurrent.*;
10     import java.util.concurrent.locks.*;
11     import java.util.concurrent.atomic.*;
12     import sun.misc.Unsafe;
13     import java.lang.reflect.*;
14    
15     /**
16 dl 1.2 * An {@link ExecutorService} for running {@link ForkJoinTask}s. A
17     * ForkJoinPool provides the entry point for submissions from
18     * non-ForkJoinTasks, as well as management and monitoring operations.
19     * Normally a single ForkJoinPool is used for a large number of
20     * submitted tasks. Otherwise, use would not usually outweigh the
21     * construction and bookkeeping overhead of creating a large set of
22     * threads.
23 dl 1.1 *
24 dl 1.2 * <p>ForkJoinPools differ from other kinds of Executors mainly in
25     * that they provide <em>work-stealing</em>: all threads in the pool
26 dl 1.1 * attempt to find and execute subtasks created by other active tasks
27     * (eventually blocking if none exist). This makes them efficient when
28 dl 1.2 * most tasks spawn other subtasks (as do most ForkJoinTasks), as well
29     * as the mixed execution of some plain Runnable- or Callable- based
30 dl 1.6 * activities along with ForkJoinTasks. When setting
31 jsr166 1.8 * {@code setAsyncMode}, a ForkJoinPools may also be appropriate for
32 dl 1.6 * use with fine-grained tasks that are never joined. Otherwise, other
33 dl 1.2 * ExecutorService implementations are typically more appropriate
34     * choices.
35 dl 1.1 *
36     * <p>A ForkJoinPool may be constructed with a given parallelism level
37     * (target pool size), which it attempts to maintain by dynamically
38 dl 1.2 * adding, suspending, or resuming threads, even if some tasks are
39     * waiting to join others. However, no such adjustments are performed
40     * in the face of blocked IO or other unmanaged synchronization. The
41 jsr166 1.8 * nested {@code ManagedBlocker} interface enables extension of
42 dl 1.2 * the kinds of synchronization accommodated. The target parallelism
43 jsr166 1.8 * level may also be changed dynamically ({@code setParallelism})
44 dl 1.6 * and thread construction can be limited using methods
45 jsr166 1.8 * {@code setMaximumPoolSize} and/or
46     * {@code setMaintainsParallelism}.
47 dl 1.1 *
48     * <p>In addition to execution and lifecycle control methods, this
49     * class provides status check methods (for example
50 jsr166 1.8 * {@code getStealCount}) that are intended to aid in developing,
51 dl 1.1 * tuning, and monitoring fork/join applications. Also, method
52 jsr166 1.8 * {@code toString} returns indications of pool state in a
53 dl 1.2 * convenient form for informal monitoring.
54 dl 1.1 *
55     * <p><b>Implementation notes</b>: This implementation restricts the
56 dl 1.2 * maximum number of running threads to 32767. Attempts to create
57     * pools with greater than the maximum result in
58     * IllegalArgumentExceptions.
59 jsr166 1.13 *
60     * @since 1.7
61     * @author Doug Lea
62 dl 1.1 */
63 dl 1.2 public class ForkJoinPool extends AbstractExecutorService {
64 dl 1.1
65     /*
66     * See the extended comments interspersed below for design,
67     * rationale, and walkthroughs.
68     */
69    
70     /** Mask for packing and unpacking shorts */
71     private static final int shortMask = 0xffff;
72    
73     /** Max pool size -- must be a power of two minus 1 */
74     private static final int MAX_THREADS = 0x7FFF;
75    
76     /**
77     * Factory for creating new ForkJoinWorkerThreads. A
78     * ForkJoinWorkerThreadFactory must be defined and used for
79     * ForkJoinWorkerThread subclasses that extend base functionality
80     * or initialize threads with different contexts.
81     */
82     public static interface ForkJoinWorkerThreadFactory {
83     /**
84     * Returns a new worker thread operating in the given pool.
85     *
86     * @param pool the pool this thread works in
87 jsr166 1.11 * @throws NullPointerException if pool is null
88 dl 1.1 */
89     public ForkJoinWorkerThread newThread(ForkJoinPool pool);
90     }
91    
92     /**
93     * Default ForkJoinWorkerThreadFactory implementation, creates a
94     * new ForkJoinWorkerThread.
95     */
96 dl 1.2 static class DefaultForkJoinWorkerThreadFactory
97 dl 1.1 implements ForkJoinWorkerThreadFactory {
98     public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
99     try {
100     return new ForkJoinWorkerThread(pool);
101     } catch (OutOfMemoryError oom) {
102     return null;
103     }
104     }
105     }
106    
107     /**
108 dl 1.2 * Creates a new ForkJoinWorkerThread. This factory is used unless
109     * overridden in ForkJoinPool constructors.
110 dl 1.1 */
111 dl 1.2 public static final ForkJoinWorkerThreadFactory
112 dl 1.1 defaultForkJoinWorkerThreadFactory =
113     new DefaultForkJoinWorkerThreadFactory();
114    
115     /**
116     * Permission required for callers of methods that may start or
117     * kill threads.
118     */
119     private static final RuntimePermission modifyThreadPermission =
120     new RuntimePermission("modifyThread");
121    
122     /**
123     * If there is a security manager, makes sure caller has
124     * permission to modify threads.
125     */
126     private static void checkPermission() {
127     SecurityManager security = System.getSecurityManager();
128     if (security != null)
129     security.checkPermission(modifyThreadPermission);
130     }
131    
132     /**
133     * Generator for assigning sequence numbers as pool names.
134     */
135     private static final AtomicInteger poolNumberGenerator =
136     new AtomicInteger();
137    
138     /**
139 dl 1.6 * Array holding all worker threads in the pool. Initialized upon
140     * first use. Array size must be a power of two. Updates and
141     * replacements are protected by workerLock, but it is always kept
142     * in a consistent enough state to be randomly accessed without
143     * locking by workers performing work-stealing.
144 dl 1.1 */
145     volatile ForkJoinWorkerThread[] workers;
146    
147     /**
148     * Lock protecting access to workers.
149     */
150     private final ReentrantLock workerLock;
151    
152     /**
153     * Condition for awaitTermination.
154     */
155     private final Condition termination;
156    
157     /**
158     * The uncaught exception handler used when any worker
159 jsr166 1.9 * abruptly terminates
160 dl 1.1 */
161     private Thread.UncaughtExceptionHandler ueh;
162    
163     /**
164     * Creation factory for worker threads.
165     */
166     private final ForkJoinWorkerThreadFactory factory;
167    
168     /**
169     * Head of stack of threads that were created to maintain
170     * parallelism when other threads blocked, but have since
171     * suspended when the parallelism level rose.
172     */
173     private volatile WaitQueueNode spareStack;
174    
175     /**
176     * Sum of per-thread steal counts, updated only when threads are
177     * idle or terminating.
178     */
179     private final AtomicLong stealCount;
180    
181     /**
182     * Queue for external submissions.
183     */
184     private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
185    
186     /**
187     * Head of Treiber stack for barrier sync. See below for explanation
188     */
189 dl 1.4 private volatile WaitQueueNode syncStack;
190 dl 1.1
191     /**
192     * The count for event barrier
193     */
194     private volatile long eventCount;
195    
196     /**
197     * Pool number, just for assigning useful names to worker threads
198     */
199     private final int poolNumber;
200    
201     /**
202     * The maximum allowed pool size
203     */
204     private volatile int maxPoolSize;
205    
206     /**
207     * The desired parallelism level, updated only under workerLock.
208     */
209     private volatile int parallelism;
210    
211     /**
212 dl 1.6 * True if use local fifo, not default lifo, for local polling
213     */
214     private volatile boolean locallyFifo;
215    
216     /**
217 dl 1.1 * Holds number of total (i.e., created and not yet terminated)
218     * and running (i.e., not blocked on joins or other managed sync)
219     * threads, packed into one int to ensure consistent snapshot when
220     * making decisions about creating and suspending spare
221     * threads. Updated only by CAS. Note: CASes in
222     * updateRunningCount and preJoin running active count is in low
223     * word, so need to be modified if this changes
224     */
225     private volatile int workerCounts;
226    
227     private static int totalCountOf(int s) { return s >>> 16; }
228     private static int runningCountOf(int s) { return s & shortMask; }
229     private static int workerCountsFor(int t, int r) { return (t << 16) + r; }
230    
231     /**
232 jsr166 1.10 * Adds delta (which may be negative) to running count. This must
233 dl 1.1 * be called before (with negative arg) and after (with positive)
234 jsr166 1.10 * any managed synchronization (i.e., mainly, joins).
235 dl 1.1 * @param delta the number to add
236     */
237     final void updateRunningCount(int delta) {
238     int s;
239     do;while (!casWorkerCounts(s = workerCounts, s + delta));
240     }
241    
242     /**
243 jsr166 1.10 * Adds delta (which may be negative) to both total and running
244 dl 1.1 * count. This must be called upon creation and termination of
245     * worker threads.
246     * @param delta the number to add
247     */
248     private void updateWorkerCount(int delta) {
249     int d = delta + (delta << 16); // add to both lo and hi parts
250     int s;
251     do;while (!casWorkerCounts(s = workerCounts, s + d));
252     }
253    
254     /**
255     * Lifecycle control. High word contains runState, low word
256     * contains the number of workers that are (probably) executing
257     * tasks. This value is atomically incremented before a worker
258     * gets a task to run, and decremented when worker has no tasks
259     * and cannot find any. These two fields are bundled together to
260     * support correct termination triggering. Note: activeCount
261     * CAS'es cheat by assuming active count is in low word, so need
262     * to be modified if this changes
263     */
264     private volatile int runControl;
265    
266     // RunState values. Order among values matters
267     private static final int RUNNING = 0;
268     private static final int SHUTDOWN = 1;
269     private static final int TERMINATING = 2;
270     private static final int TERMINATED = 3;
271    
272     private static int runStateOf(int c) { return c >>> 16; }
273     private static int activeCountOf(int c) { return c & shortMask; }
274     private static int runControlFor(int r, int a) { return (r << 16) + a; }
275    
276     /**
277 dl 1.4 * Try incrementing active count; fail on contention. Called by
278     * workers before/during executing tasks.
279 jsr166 1.11 * @return true on success
280 dl 1.1 */
281 dl 1.4 final boolean tryIncrementActiveCount() {
282     int c = runControl;
283     return casRunControl(c, c+1);
284 dl 1.1 }
285    
286     /**
287 jsr166 1.10 * Tries decrementing active count; fails on contention.
288     * Possibly triggers termination on success.
289 dl 1.1 * Called by workers when they can't find tasks.
290 dl 1.4 * @return true on success
291 dl 1.1 */
292 dl 1.4 final boolean tryDecrementActiveCount() {
293     int c = runControl;
294     int nextc = c - 1;
295     if (!casRunControl(c, nextc))
296     return false;
297 dl 1.1 if (canTerminateOnShutdown(nextc))
298     terminateOnShutdown();
299 dl 1.4 return true;
300 dl 1.1 }
301    
302     /**
303 jsr166 1.10 * Returns true if argument represents zero active count and
304 dl 1.1 * nonzero runstate, which is the triggering condition for
305     * terminating on shutdown.
306     */
307     private static boolean canTerminateOnShutdown(int c) {
308     return ((c & -c) >>> 16) != 0; // i.e. least bit is nonzero runState bit
309     }
310    
311     /**
312     * Transition run state to at least the given state. Return true
313     * if not already at least given state.
314     */
315     private boolean transitionRunStateTo(int state) {
316     for (;;) {
317     int c = runControl;
318     if (runStateOf(c) >= state)
319     return false;
320     if (casRunControl(c, runControlFor(state, activeCountOf(c))))
321     return true;
322     }
323     }
324    
325     /**
326     * Controls whether to add spares to maintain parallelism
327     */
328     private volatile boolean maintainsParallelism;
329    
330     // Constructors
331    
332     /**
333     * Creates a ForkJoinPool with a pool size equal to the number of
334     * processors available on the system and using the default
335     * ForkJoinWorkerThreadFactory,
336     * @throws SecurityException if a security manager exists and
337     * the caller is not permitted to modify threads
338     * because it does not hold {@link
339 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
340 dl 1.1 */
341     public ForkJoinPool() {
342     this(Runtime.getRuntime().availableProcessors(),
343     defaultForkJoinWorkerThreadFactory);
344     }
345    
346     /**
347 jsr166 1.9 * Creates a ForkJoinPool with the indicated parallelism level
348 dl 1.1 * threads, and using the default ForkJoinWorkerThreadFactory,
349     * @param parallelism the number of worker threads
350     * @throws IllegalArgumentException if parallelism less than or
351     * equal to zero
352     * @throws SecurityException if a security manager exists and
353     * the caller is not permitted to modify threads
354     * because it does not hold {@link
355 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
356 dl 1.1 */
357     public ForkJoinPool(int parallelism) {
358     this(parallelism, defaultForkJoinWorkerThreadFactory);
359     }
360    
361     /**
362 dl 1.2 * Creates a ForkJoinPool with parallelism equal to the number of
363 dl 1.1 * processors available on the system and using the given
364     * ForkJoinWorkerThreadFactory,
365     * @param factory the factory for creating new threads
366     * @throws NullPointerException if factory is null
367     * @throws SecurityException if a security manager exists and
368     * the caller is not permitted to modify threads
369     * because it does not hold {@link
370 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
371 dl 1.1 */
372     public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
373     this(Runtime.getRuntime().availableProcessors(), factory);
374     }
375    
376     /**
377 dl 1.2 * Creates a ForkJoinPool with the given parallelism and factory.
378 dl 1.1 *
379     * @param parallelism the targeted number of worker threads
380     * @param factory the factory for creating new threads
381     * @throws IllegalArgumentException if parallelism less than or
382 jsr166 1.11 * equal to zero, or greater than implementation limit
383 dl 1.1 * @throws NullPointerException if factory is null
384     * @throws SecurityException if a security manager exists and
385     * the caller is not permitted to modify threads
386     * because it does not hold {@link
387 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
388 dl 1.1 */
389     public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
390     if (parallelism <= 0 || parallelism > MAX_THREADS)
391     throw new IllegalArgumentException();
392     if (factory == null)
393     throw new NullPointerException();
394     checkPermission();
395     this.factory = factory;
396     this.parallelism = parallelism;
397     this.maxPoolSize = MAX_THREADS;
398     this.maintainsParallelism = true;
399     this.poolNumber = poolNumberGenerator.incrementAndGet();
400     this.workerLock = new ReentrantLock();
401     this.termination = workerLock.newCondition();
402     this.stealCount = new AtomicLong();
403     this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
404 dl 1.6 // worker array and workers are lazily constructed
405 dl 1.1 }
406    
407     /**
408     * Create new worker using factory.
409     * @param index the index to assign worker
410     * @return new worker, or null of factory failed
411     */
412     private ForkJoinWorkerThread createWorker(int index) {
413     Thread.UncaughtExceptionHandler h = ueh;
414     ForkJoinWorkerThread w = factory.newThread(this);
415     if (w != null) {
416     w.poolIndex = index;
417     w.setDaemon(true);
418 dl 1.6 w.setAsyncMode(locallyFifo);
419 dl 1.1 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
420     if (h != null)
421     w.setUncaughtExceptionHandler(h);
422     }
423     return w;
424     }
425    
426     /**
427 jsr166 1.10 * Returns a good size for worker array given pool size.
428 dl 1.1 * Currently requires size to be a power of two.
429     */
430     private static int arraySizeFor(int ps) {
431     return ps <= 1? 1 : (1 << (32 - Integer.numberOfLeadingZeros(ps-1)));
432     }
433    
434     /**
435 jsr166 1.10 * Creates or resizes array if necessary to hold newLength.
436     * Call only under exclusion or lock.
437 dl 1.1 * @return the array
438     */
439     private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
440     ForkJoinWorkerThread[] ws = workers;
441     if (ws == null)
442     return workers = new ForkJoinWorkerThread[arraySizeFor(newLength)];
443     else if (newLength > ws.length)
444     return workers = Arrays.copyOf(ws, arraySizeFor(newLength));
445     else
446     return ws;
447     }
448    
449     /**
450     * Try to shrink workers into smaller array after one or more terminate
451     */
452     private void tryShrinkWorkerArray() {
453     ForkJoinWorkerThread[] ws = workers;
454 dl 1.6 if (ws != null) {
455     int len = ws.length;
456     int last = len - 1;
457     while (last >= 0 && ws[last] == null)
458     --last;
459     int newLength = arraySizeFor(last+1);
460     if (newLength < len)
461     workers = Arrays.copyOf(ws, newLength);
462     }
463 dl 1.1 }
464    
465     /**
466 dl 1.6 * Initialize workers if necessary
467 dl 1.1 */
468 dl 1.6 final void ensureWorkerInitialization() {
469     ForkJoinWorkerThread[] ws = workers;
470     if (ws == null) {
471     final ReentrantLock lock = this.workerLock;
472     lock.lock();
473     try {
474     ws = workers;
475     if (ws == null) {
476     int ps = parallelism;
477     ws = ensureWorkerArrayCapacity(ps);
478     for (int i = 0; i < ps; ++i) {
479     ForkJoinWorkerThread w = createWorker(i);
480     if (w != null) {
481     ws[i] = w;
482     w.start();
483     updateWorkerCount(1);
484     }
485     }
486 dl 1.1 }
487 dl 1.6 } finally {
488     lock.unlock();
489 dl 1.1 }
490     }
491     }
492    
493     /**
494     * Worker creation and startup for threads added via setParallelism.
495     */
496     private void createAndStartAddedWorkers() {
497     resumeAllSpares(); // Allow spares to convert to nonspare
498     int ps = parallelism;
499     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
500     int len = ws.length;
501     // Sweep through slots, to keep lowest indices most populated
502     int k = 0;
503     while (k < len) {
504     if (ws[k] != null) {
505     ++k;
506     continue;
507     }
508     int s = workerCounts;
509     int tc = totalCountOf(s);
510     int rc = runningCountOf(s);
511     if (rc >= ps || tc >= ps)
512     break;
513     if (casWorkerCounts (s, workerCountsFor(tc+1, rc+1))) {
514     ForkJoinWorkerThread w = createWorker(k);
515     if (w != null) {
516     ws[k++] = w;
517     w.start();
518     }
519     else {
520     updateWorkerCount(-1); // back out on failed creation
521     break;
522     }
523     }
524     }
525     }
526    
527     // Execution methods
528    
529     /**
530     * Common code for execute, invoke and submit
531     */
532     private <T> void doSubmit(ForkJoinTask<T> task) {
533     if (isShutdown())
534     throw new RejectedExecutionException();
535 dl 1.6 if (workers == null)
536     ensureWorkerInitialization();
537 dl 1.1 submissionQueue.offer(task);
538 dl 1.4 signalIdleWorkers();
539 dl 1.1 }
540    
541     /**
542     * Performs the given task; returning its result upon completion
543     * @param task the task
544     * @return the task's result
545     * @throws NullPointerException if task is null
546     * @throws RejectedExecutionException if pool is shut down
547     */
548     public <T> T invoke(ForkJoinTask<T> task) {
549     doSubmit(task);
550     return task.join();
551     }
552    
553     /**
554     * Arranges for (asynchronous) execution of the given task.
555     * @param task the task
556     * @throws NullPointerException if task is null
557     * @throws RejectedExecutionException if pool is shut down
558     */
559     public <T> void execute(ForkJoinTask<T> task) {
560     doSubmit(task);
561     }
562    
563     // AbstractExecutorService methods
564    
565     public void execute(Runnable task) {
566     doSubmit(new AdaptedRunnable<Void>(task, null));
567     }
568    
569     public <T> ForkJoinTask<T> submit(Callable<T> task) {
570     ForkJoinTask<T> job = new AdaptedCallable<T>(task);
571     doSubmit(job);
572     return job;
573     }
574    
575     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
576     ForkJoinTask<T> job = new AdaptedRunnable<T>(task, result);
577     doSubmit(job);
578     return job;
579     }
580    
581     public ForkJoinTask<?> submit(Runnable task) {
582     ForkJoinTask<Void> job = new AdaptedRunnable<Void>(task, null);
583     doSubmit(job);
584     return job;
585     }
586    
587     /**
588     * Adaptor for Runnables. This implements RunnableFuture
589     * to be compliant with AbstractExecutorService constraints
590     */
591     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
592     implements RunnableFuture<T> {
593     final Runnable runnable;
594     final T resultOnCompletion;
595     T result;
596     AdaptedRunnable(Runnable runnable, T result) {
597     if (runnable == null) throw new NullPointerException();
598     this.runnable = runnable;
599     this.resultOnCompletion = result;
600     }
601     public T getRawResult() { return result; }
602     public void setRawResult(T v) { result = v; }
603     public boolean exec() {
604     runnable.run();
605     result = resultOnCompletion;
606     return true;
607     }
608     public void run() { invoke(); }
609     }
610    
611     /**
612     * Adaptor for Callables
613     */
614     static final class AdaptedCallable<T> extends ForkJoinTask<T>
615     implements RunnableFuture<T> {
616     final Callable<T> callable;
617     T result;
618     AdaptedCallable(Callable<T> callable) {
619     if (callable == null) throw new NullPointerException();
620     this.callable = callable;
621     }
622     public T getRawResult() { return result; }
623     public void setRawResult(T v) { result = v; }
624     public boolean exec() {
625     try {
626     result = callable.call();
627     return true;
628     } catch (Error err) {
629     throw err;
630     } catch (RuntimeException rex) {
631     throw rex;
632     } catch (Exception ex) {
633     throw new RuntimeException(ex);
634     }
635     }
636     public void run() { invoke(); }
637     }
638    
639     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
640     ArrayList<ForkJoinTask<T>> ts =
641     new ArrayList<ForkJoinTask<T>>(tasks.size());
642     for (Callable<T> c : tasks)
643     ts.add(new AdaptedCallable<T>(c));
644     invoke(new InvokeAll<T>(ts));
645     return (List<Future<T>>)(List)ts;
646     }
647    
648     static final class InvokeAll<T> extends RecursiveAction {
649     final ArrayList<ForkJoinTask<T>> tasks;
650     InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
651     public void compute() {
652     try { invokeAll(tasks); } catch(Exception ignore) {}
653     }
654     }
655    
656     // Configuration and status settings and queries
657    
658     /**
659     * Returns the factory used for constructing new workers
660     *
661     * @return the factory used for constructing new workers
662     */
663     public ForkJoinWorkerThreadFactory getFactory() {
664     return factory;
665     }
666    
667     /**
668 dl 1.2 * Returns the handler for internal worker threads that terminate
669     * due to unrecoverable errors encountered while executing tasks.
670     * @return the handler, or null if none
671     */
672     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
673     Thread.UncaughtExceptionHandler h;
674     final ReentrantLock lock = this.workerLock;
675     lock.lock();
676     try {
677     h = ueh;
678     } finally {
679     lock.unlock();
680     }
681     return h;
682     }
683    
684     /**
685     * Sets the handler for internal worker threads that terminate due
686     * to unrecoverable errors encountered while executing tasks.
687     * Unless set, the current default or ThreadGroup handler is used
688     * as handler.
689     *
690     * @param h the new handler
691     * @return the old handler, or null if none
692     * @throws SecurityException if a security manager exists and
693     * the caller is not permitted to modify threads
694     * because it does not hold {@link
695 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
696 dl 1.2 */
697     public Thread.UncaughtExceptionHandler
698     setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
699     checkPermission();
700     Thread.UncaughtExceptionHandler old = null;
701     final ReentrantLock lock = this.workerLock;
702     lock.lock();
703     try {
704     old = ueh;
705     ueh = h;
706     ForkJoinWorkerThread[] ws = workers;
707 dl 1.6 if (ws != null) {
708     for (int i = 0; i < ws.length; ++i) {
709     ForkJoinWorkerThread w = ws[i];
710     if (w != null)
711     w.setUncaughtExceptionHandler(h);
712     }
713 dl 1.2 }
714     } finally {
715     lock.unlock();
716     }
717     return old;
718     }
719    
720    
721     /**
722 jsr166 1.9 * Sets the target parallelism level of this pool.
723 dl 1.1 * @param parallelism the target parallelism
724     * @throws IllegalArgumentException if parallelism less than or
725 jsr166 1.11 * equal to zero or greater than maximum size bounds
726 dl 1.1 * @throws SecurityException if a security manager exists and
727     * the caller is not permitted to modify threads
728     * because it does not hold {@link
729 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
730 dl 1.1 */
731     public void setParallelism(int parallelism) {
732     checkPermission();
733     if (parallelism <= 0 || parallelism > maxPoolSize)
734     throw new IllegalArgumentException();
735     final ReentrantLock lock = this.workerLock;
736     lock.lock();
737     try {
738     if (!isTerminating()) {
739     int p = this.parallelism;
740     this.parallelism = parallelism;
741     if (parallelism > p)
742     createAndStartAddedWorkers();
743     else
744     trimSpares();
745     }
746     } finally {
747     lock.unlock();
748     }
749 dl 1.4 signalIdleWorkers();
750 dl 1.1 }
751    
752     /**
753     * Returns the targeted number of worker threads in this pool.
754     *
755     * @return the targeted number of worker threads in this pool
756     */
757     public int getParallelism() {
758     return parallelism;
759     }
760    
761     /**
762     * Returns the number of worker threads that have started but not
763     * yet terminated. This result returned by this method may differ
764 jsr166 1.8 * from {@code getParallelism} when threads are created to
765 dl 1.1 * maintain parallelism when others are cooperatively blocked.
766     *
767     * @return the number of worker threads
768     */
769     public int getPoolSize() {
770     return totalCountOf(workerCounts);
771     }
772    
773     /**
774     * Returns the maximum number of threads allowed to exist in the
775     * pool, even if there are insufficient unblocked running threads.
776     * @return the maximum
777     */
778     public int getMaximumPoolSize() {
779     return maxPoolSize;
780     }
781    
782     /**
783     * Sets the maximum number of threads allowed to exist in the
784     * pool, even if there are insufficient unblocked running threads.
785     * Setting this value has no effect on current pool size. It
786     * controls construction of new threads.
787     * @throws IllegalArgumentException if negative or greater then
788 jsr166 1.11 * internal implementation limit
789 dl 1.1 */
790     public void setMaximumPoolSize(int newMax) {
791     if (newMax < 0 || newMax > MAX_THREADS)
792     throw new IllegalArgumentException();
793     maxPoolSize = newMax;
794     }
795    
796    
797     /**
798     * Returns true if this pool dynamically maintains its target
799     * parallelism level. If false, new threads are added only to
800     * avoid possible starvation.
801     * This setting is by default true;
802     * @return true if maintains parallelism
803     */
804     public boolean getMaintainsParallelism() {
805     return maintainsParallelism;
806     }
807    
808     /**
809     * Sets whether this pool dynamically maintains its target
810     * parallelism level. If false, new threads are added only to
811     * avoid possible starvation.
812     * @param enable true to maintains parallelism
813     */
814     public void setMaintainsParallelism(boolean enable) {
815     maintainsParallelism = enable;
816     }
817    
818     /**
819 dl 1.6 * Establishes local first-in-first-out scheduling mode for forked
820     * tasks that are never joined. This mode may be more appropriate
821     * than default locally stack-based mode in applications in which
822     * worker threads only process asynchronous tasks. This method is
823     * designed to be invoked only when pool is quiescent, and
824     * typically only before any tasks are submitted. The effects of
825 jsr166 1.9 * invocations at other times may be unpredictable.
826 dl 1.6 *
827     * @param async if true, use locally FIFO scheduling
828 jsr166 1.11 * @return the previous mode
829 dl 1.6 */
830     public boolean setAsyncMode(boolean async) {
831     boolean oldMode = locallyFifo;
832     locallyFifo = async;
833     ForkJoinWorkerThread[] ws = workers;
834     if (ws != null) {
835     for (int i = 0; i < ws.length; ++i) {
836     ForkJoinWorkerThread t = ws[i];
837     if (t != null)
838     t.setAsyncMode(async);
839     }
840     }
841     return oldMode;
842     }
843    
844     /**
845     * Returns true if this pool uses local first-in-first-out
846 jsr166 1.7 * scheduling mode for forked tasks that are never joined.
847 dl 1.6 *
848 jsr166 1.11 * @return true if this pool uses async mode
849 dl 1.6 */
850     public boolean getAsyncMode() {
851     return locallyFifo;
852     }
853    
854     /**
855 dl 1.2 * Returns an estimate of the number of worker threads that are
856     * not blocked waiting to join tasks or for other managed
857 dl 1.1 * synchronization.
858     *
859     * @return the number of worker threads
860     */
861     public int getRunningThreadCount() {
862     return runningCountOf(workerCounts);
863     }
864    
865     /**
866 dl 1.2 * Returns an estimate of the number of threads that are currently
867 dl 1.1 * stealing or executing tasks. This method may overestimate the
868     * number of active threads.
869 jsr166 1.11 * @return the number of active threads
870 dl 1.1 */
871     public int getActiveThreadCount() {
872     return activeCountOf(runControl);
873     }
874    
875     /**
876 dl 1.2 * Returns an estimate of the number of threads that are currently
877 dl 1.1 * idle waiting for tasks. This method may underestimate the
878     * number of idle threads.
879 jsr166 1.11 * @return the number of idle threads
880 dl 1.1 */
881     final int getIdleThreadCount() {
882     int c = runningCountOf(workerCounts) - activeCountOf(runControl);
883     return (c <= 0)? 0 : c;
884     }
885    
886     /**
887     * Returns true if all worker threads are currently idle. An idle
888     * worker is one that cannot obtain a task to execute because none
889     * are available to steal from other threads, and there are no
890     * pending submissions to the pool. This method is conservative:
891     * It might not return true immediately upon idleness of all
892     * threads, but will eventually become true if threads remain
893     * inactive.
894     * @return true if all threads are currently idle
895     */
896     public boolean isQuiescent() {
897     return activeCountOf(runControl) == 0;
898     }
899    
900     /**
901     * Returns an estimate of the total number of tasks stolen from
902     * one thread's work queue by another. The reported value
903     * underestimates the actual total number of steals when the pool
904     * is not quiescent. This value may be useful for monitoring and
905     * tuning fork/join programs: In general, steal counts should be
906     * high enough to keep threads busy, but low enough to avoid
907     * overhead and contention across threads.
908 jsr166 1.11 * @return the number of steals
909 dl 1.1 */
910     public long getStealCount() {
911     return stealCount.get();
912     }
913    
914     /**
915     * Accumulate steal count from a worker. Call only
916     * when worker known to be idle.
917     */
918     private void updateStealCount(ForkJoinWorkerThread w) {
919     int sc = w.getAndClearStealCount();
920     if (sc != 0)
921     stealCount.addAndGet(sc);
922     }
923    
924     /**
925 dl 1.2 * Returns an estimate of the total number of tasks currently held
926     * in queues by worker threads (but not including tasks submitted
927     * to the pool that have not begun executing). This value is only
928     * an approximation, obtained by iterating across all threads in
929     * the pool. This method may be useful for tuning task
930     * granularities.
931 jsr166 1.11 * @return the number of queued tasks
932 dl 1.1 */
933     public long getQueuedTaskCount() {
934     long count = 0;
935     ForkJoinWorkerThread[] ws = workers;
936 dl 1.6 if (ws != null) {
937     for (int i = 0; i < ws.length; ++i) {
938     ForkJoinWorkerThread t = ws[i];
939     if (t != null)
940     count += t.getQueueSize();
941     }
942 dl 1.1 }
943     return count;
944     }
945    
946     /**
947 dl 1.2 * Returns an estimate of the number tasks submitted to this pool
948 dl 1.1 * that have not yet begun executing. This method takes time
949     * proportional to the number of submissions.
950 jsr166 1.11 * @return the number of queued submissions
951 dl 1.1 */
952     public int getQueuedSubmissionCount() {
953     return submissionQueue.size();
954     }
955    
956     /**
957     * Returns true if there are any tasks submitted to this pool
958     * that have not yet begun executing.
959 jsr166 1.11 * @return {@code true} if there are any queued submissions
960 dl 1.1 */
961     public boolean hasQueuedSubmissions() {
962     return !submissionQueue.isEmpty();
963     }
964    
965     /**
966     * Removes and returns the next unexecuted submission if one is
967     * available. This method may be useful in extensions to this
968     * class that re-assign work in systems with multiple pools.
969     * @return the next submission, or null if none
970     */
971     protected ForkJoinTask<?> pollSubmission() {
972     return submissionQueue.poll();
973     }
974    
975     /**
976 dl 1.6 * Removes all available unexecuted submitted and forked tasks
977     * from scheduling queues and adds them to the given collection,
978     * without altering their execution status. These may include
979 jsr166 1.9 * artificially generated or wrapped tasks. This method is designed
980 dl 1.6 * to be invoked only when the pool is known to be
981     * quiescent. Invocations at other times may not remove all
982     * tasks. A failure encountered while attempting to add elements
983 jsr166 1.8 * to collection {@code c} may result in elements being in
984 dl 1.6 * neither, either or both collections when the associated
985     * exception is thrown. The behavior of this operation is
986     * undefined if the specified collection is modified while the
987     * operation is in progress.
988     * @param c the collection to transfer elements into
989     * @return the number of elements transferred
990     */
991     protected int drainTasksTo(Collection<ForkJoinTask<?>> c) {
992     int n = submissionQueue.drainTo(c);
993     ForkJoinWorkerThread[] ws = workers;
994     if (ws != null) {
995     for (int i = 0; i < ws.length; ++i) {
996     ForkJoinWorkerThread w = ws[i];
997     if (w != null)
998     n += w.drainTasksTo(c);
999     }
1000     }
1001     return n;
1002     }
1003    
1004     /**
1005 dl 1.1 * Returns a string identifying this pool, as well as its state,
1006     * including indications of run state, parallelism level, and
1007     * worker and task counts.
1008     *
1009     * @return a string identifying this pool, as well as its state
1010     */
1011     public String toString() {
1012     int ps = parallelism;
1013     int wc = workerCounts;
1014     int rc = runControl;
1015     long st = getStealCount();
1016     long qt = getQueuedTaskCount();
1017     long qs = getQueuedSubmissionCount();
1018     return super.toString() +
1019     "[" + runStateToString(runStateOf(rc)) +
1020     ", parallelism = " + ps +
1021     ", size = " + totalCountOf(wc) +
1022     ", active = " + activeCountOf(rc) +
1023     ", running = " + runningCountOf(wc) +
1024     ", steals = " + st +
1025     ", tasks = " + qt +
1026     ", submissions = " + qs +
1027     "]";
1028     }
1029    
1030     private static String runStateToString(int rs) {
1031     switch(rs) {
1032     case RUNNING: return "Running";
1033     case SHUTDOWN: return "Shutting down";
1034     case TERMINATING: return "Terminating";
1035     case TERMINATED: return "Terminated";
1036     default: throw new Error("Unknown run state");
1037     }
1038     }
1039    
1040     // lifecycle control
1041    
1042     /**
1043     * Initiates an orderly shutdown in which previously submitted
1044     * tasks are executed, but no new tasks will be accepted.
1045     * Invocation has no additional effect if already shut down.
1046     * Tasks that are in the process of being submitted concurrently
1047     * during the course of this method may or may not be rejected.
1048     * @throws SecurityException if a security manager exists and
1049     * the caller is not permitted to modify threads
1050     * because it does not hold {@link
1051 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
1052 dl 1.1 */
1053     public void shutdown() {
1054     checkPermission();
1055     transitionRunStateTo(SHUTDOWN);
1056     if (canTerminateOnShutdown(runControl))
1057     terminateOnShutdown();
1058     }
1059    
1060     /**
1061     * Attempts to stop all actively executing tasks, and cancels all
1062     * waiting tasks. Tasks that are in the process of being
1063     * submitted or executed concurrently during the course of this
1064     * method may or may not be rejected. Unlike some other executors,
1065 dl 1.6 * this method cancels rather than collects non-executed tasks
1066     * upon termination, so always returns an empty list. However, you
1067 jsr166 1.8 * can use method {@code drainTasksTo} before invoking this
1068 dl 1.6 * method to transfer unexecuted tasks to another collection.
1069 dl 1.1 * @return an empty list
1070     * @throws SecurityException if a security manager exists and
1071     * the caller is not permitted to modify threads
1072     * because it does not hold {@link
1073 jsr166 1.8 * java.lang.RuntimePermission}{@code ("modifyThread")},
1074 dl 1.1 */
1075     public List<Runnable> shutdownNow() {
1076     checkPermission();
1077     terminate();
1078     return Collections.emptyList();
1079     }
1080    
1081     /**
1082 jsr166 1.8 * Returns {@code true} if all tasks have completed following shut down.
1083 dl 1.1 *
1084 jsr166 1.8 * @return {@code true} if all tasks have completed following shut down
1085 dl 1.1 */
1086     public boolean isTerminated() {
1087     return runStateOf(runControl) == TERMINATED;
1088     }
1089    
1090     /**
1091 jsr166 1.8 * Returns {@code true} if the process of termination has
1092 dl 1.1 * commenced but possibly not yet completed.
1093     *
1094 jsr166 1.8 * @return {@code true} if terminating
1095 dl 1.1 */
1096     public boolean isTerminating() {
1097     return runStateOf(runControl) >= TERMINATING;
1098     }
1099    
1100     /**
1101 jsr166 1.8 * Returns {@code true} if this pool has been shut down.
1102 dl 1.1 *
1103 jsr166 1.8 * @return {@code true} if this pool has been shut down
1104 dl 1.1 */
1105     public boolean isShutdown() {
1106     return runStateOf(runControl) >= SHUTDOWN;
1107     }
1108    
1109     /**
1110     * Blocks until all tasks have completed execution after a shutdown
1111     * request, or the timeout occurs, or the current thread is
1112     * interrupted, whichever happens first.
1113     *
1114     * @param timeout the maximum time to wait
1115     * @param unit the time unit of the timeout argument
1116 jsr166 1.8 * @return {@code true} if this executor terminated and
1117     * {@code false} if the timeout elapsed before termination
1118 dl 1.1 * @throws InterruptedException if interrupted while waiting
1119     */
1120     public boolean awaitTermination(long timeout, TimeUnit unit)
1121     throws InterruptedException {
1122     long nanos = unit.toNanos(timeout);
1123     final ReentrantLock lock = this.workerLock;
1124     lock.lock();
1125     try {
1126     for (;;) {
1127     if (isTerminated())
1128     return true;
1129     if (nanos <= 0)
1130     return false;
1131     nanos = termination.awaitNanos(nanos);
1132     }
1133     } finally {
1134     lock.unlock();
1135     }
1136     }
1137    
1138     // Shutdown and termination support
1139    
1140     /**
1141     * Callback from terminating worker. Null out the corresponding
1142     * workers slot, and if terminating, try to terminate, else try to
1143     * shrink workers array.
1144     * @param w the worker
1145     */
1146     final void workerTerminated(ForkJoinWorkerThread w) {
1147     updateStealCount(w);
1148     updateWorkerCount(-1);
1149     final ReentrantLock lock = this.workerLock;
1150     lock.lock();
1151     try {
1152     ForkJoinWorkerThread[] ws = workers;
1153 dl 1.6 if (ws != null) {
1154     int idx = w.poolIndex;
1155     if (idx >= 0 && idx < ws.length && ws[idx] == w)
1156     ws[idx] = null;
1157     if (totalCountOf(workerCounts) == 0) {
1158     terminate(); // no-op if already terminating
1159     transitionRunStateTo(TERMINATED);
1160     termination.signalAll();
1161     }
1162     else if (!isTerminating()) {
1163     tryShrinkWorkerArray();
1164     tryResumeSpare(true); // allow replacement
1165     }
1166 dl 1.1 }
1167     } finally {
1168     lock.unlock();
1169     }
1170 dl 1.4 signalIdleWorkers();
1171 dl 1.1 }
1172    
1173     /**
1174     * Initiate termination.
1175     */
1176     private void terminate() {
1177     if (transitionRunStateTo(TERMINATING)) {
1178     stopAllWorkers();
1179     resumeAllSpares();
1180 dl 1.4 signalIdleWorkers();
1181 dl 1.1 cancelQueuedSubmissions();
1182     cancelQueuedWorkerTasks();
1183     interruptUnterminatedWorkers();
1184 dl 1.4 signalIdleWorkers(); // resignal after interrupt
1185 dl 1.1 }
1186     }
1187    
1188     /**
1189 jsr166 1.10 * Possibly terminates when on shutdown state.
1190 dl 1.1 */
1191     private void terminateOnShutdown() {
1192     if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl))
1193     terminate();
1194     }
1195    
1196     /**
1197 jsr166 1.10 * Clears out and cancels submissions.
1198 dl 1.1 */
1199     private void cancelQueuedSubmissions() {
1200     ForkJoinTask<?> task;
1201     while ((task = pollSubmission()) != null)
1202     task.cancel(false);
1203     }
1204    
1205     /**
1206 jsr166 1.10 * Cleans out worker queues.
1207 dl 1.1 */
1208     private void cancelQueuedWorkerTasks() {
1209     final ReentrantLock lock = this.workerLock;
1210     lock.lock();
1211     try {
1212     ForkJoinWorkerThread[] ws = workers;
1213 dl 1.6 if (ws != null) {
1214     for (int i = 0; i < ws.length; ++i) {
1215     ForkJoinWorkerThread t = ws[i];
1216     if (t != null)
1217     t.cancelTasks();
1218     }
1219 dl 1.1 }
1220     } finally {
1221     lock.unlock();
1222     }
1223     }
1224    
1225     /**
1226 jsr166 1.10 * Sets each worker's status to terminating. Requires lock to avoid
1227     * conflicts with add/remove.
1228 dl 1.1 */
1229     private void stopAllWorkers() {
1230     final ReentrantLock lock = this.workerLock;
1231     lock.lock();
1232     try {
1233     ForkJoinWorkerThread[] ws = workers;
1234 dl 1.6 if (ws != null) {
1235     for (int i = 0; i < ws.length; ++i) {
1236     ForkJoinWorkerThread t = ws[i];
1237     if (t != null)
1238     t.shutdownNow();
1239     }
1240 dl 1.1 }
1241     } finally {
1242     lock.unlock();
1243     }
1244     }
1245    
1246     /**
1247 jsr166 1.10 * Interrupts all unterminated workers. This is not required for
1248 dl 1.1 * sake of internal control, but may help unstick user code during
1249     * shutdown.
1250     */
1251     private void interruptUnterminatedWorkers() {
1252     final ReentrantLock lock = this.workerLock;
1253     lock.lock();
1254     try {
1255     ForkJoinWorkerThread[] ws = workers;
1256 dl 1.6 if (ws != null) {
1257     for (int i = 0; i < ws.length; ++i) {
1258     ForkJoinWorkerThread t = ws[i];
1259     if (t != null && !t.isTerminated()) {
1260     try {
1261     t.interrupt();
1262     } catch (SecurityException ignore) {
1263     }
1264 dl 1.1 }
1265     }
1266     }
1267     } finally {
1268     lock.unlock();
1269     }
1270     }
1271    
1272    
1273     /*
1274 dl 1.4 * Nodes for event barrier to manage idle threads. Queue nodes
1275     * are basic Treiber stack nodes, also used for spare stack.
1276 dl 1.1 *
1277     * The event barrier has an event count and a wait queue (actually
1278     * a Treiber stack). Workers are enabled to look for work when
1279 dl 1.4 * the eventCount is incremented. If they fail to find work, they
1280     * may wait for next count. Upon release, threads help others wake
1281     * up.
1282     *
1283     * Synchronization events occur only in enough contexts to
1284     * maintain overall liveness:
1285 dl 1.1 *
1286     * - Submission of a new task to the pool
1287 dl 1.4 * - Resizes or other changes to the workers array
1288 dl 1.1 * - pool termination
1289     * - A worker pushing a task on an empty queue
1290     *
1291 dl 1.4 * The case of pushing a task occurs often enough, and is heavy
1292     * enough compared to simple stack pushes, to require special
1293     * handling: Method signalWork returns without advancing count if
1294     * the queue appears to be empty. This would ordinarily result in
1295     * races causing some queued waiters not to be woken up. To avoid
1296     * this, the first worker enqueued in method sync (see
1297     * syncIsReleasable) rescans for tasks after being enqueued, and
1298     * helps signal if any are found. This works well because the
1299     * worker has nothing better to do, and so might as well help
1300     * alleviate the overhead and contention on the threads actually
1301     * doing work. Also, since event counts increments on task
1302     * availability exist to maintain liveness (rather than to force
1303     * refreshes etc), it is OK for callers to exit early if
1304     * contending with another signaller.
1305 dl 1.1 */
1306     static final class WaitQueueNode {
1307     WaitQueueNode next; // only written before enqueued
1308     volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1309     final long count; // unused for spare stack
1310 dl 1.4
1311     WaitQueueNode(long c, ForkJoinWorkerThread w) {
1312 dl 1.1 count = c;
1313     thread = w;
1314     }
1315 dl 1.4
1316     /**
1317 jsr166 1.10 * Wakes up waiter, returning false if known to already
1318 dl 1.4 */
1319     boolean signal() {
1320 dl 1.1 ForkJoinWorkerThread t = thread;
1321 dl 1.4 if (t == null)
1322     return false;
1323 dl 1.1 thread = null;
1324 dl 1.4 LockSupport.unpark(t);
1325     return true;
1326     }
1327    
1328     /**
1329 jsr166 1.10 * Awaits release on sync.
1330 dl 1.4 */
1331     void awaitSyncRelease(ForkJoinPool p) {
1332     while (thread != null && !p.syncIsReleasable(this))
1333     LockSupport.park(this);
1334     }
1335    
1336     /**
1337 jsr166 1.10 * Awaits resumption as spare.
1338 dl 1.4 */
1339     void awaitSpareRelease() {
1340     while (thread != null) {
1341     if (!Thread.interrupted())
1342     LockSupport.park(this);
1343 dl 1.1 }
1344     }
1345     }
1346    
1347     /**
1348 dl 1.4 * Ensures that no thread is waiting for count to advance from the
1349     * current value of eventCount read on entry to this method, by
1350     * releasing waiting threads if necessary.
1351     * @return the count
1352 dl 1.1 */
1353 dl 1.4 final long ensureSync() {
1354     long c = eventCount;
1355     WaitQueueNode q;
1356     while ((q = syncStack) != null && q.count < c) {
1357     if (casBarrierStack(q, null)) {
1358 dl 1.1 do {
1359 dl 1.4 q.signal();
1360 dl 1.1 } while ((q = q.next) != null);
1361     break;
1362     }
1363     }
1364     return c;
1365     }
1366    
1367     /**
1368 dl 1.4 * Increments event count and releases waiting threads.
1369 dl 1.1 */
1370 dl 1.4 private void signalIdleWorkers() {
1371 dl 1.1 long c;
1372     do;while (!casEventCount(c = eventCount, c+1));
1373 dl 1.4 ensureSync();
1374 dl 1.1 }
1375    
1376     /**
1377 jsr166 1.10 * Signals threads waiting to poll a task. Because method sync
1378 dl 1.4 * rechecks availability, it is OK to only proceed if queue
1379     * appears to be non-empty, and OK to skip under contention to
1380     * increment count (since some other thread succeeded).
1381 dl 1.1 */
1382 dl 1.4 final void signalWork() {
1383 dl 1.1 long c;
1384 dl 1.4 WaitQueueNode q;
1385     if (syncStack != null &&
1386     casEventCount(c = eventCount, c+1) &&
1387     (((q = syncStack) != null && q.count <= c) &&
1388     (!casBarrierStack(q, q.next) || !q.signal())))
1389     ensureSync();
1390 dl 1.1 }
1391    
1392     /**
1393 dl 1.4 * Waits until event count advances from last value held by
1394     * caller, or if excess threads, caller is resumed as spare, or
1395     * caller or pool is terminating. Updates caller's event on exit.
1396 dl 1.1 * @param w the calling worker thread
1397     */
1398 dl 1.4 final void sync(ForkJoinWorkerThread w) {
1399     updateStealCount(w); // Transfer w's count while it is idle
1400 dl 1.1
1401 dl 1.4 while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1402     long prev = w.lastEventCount;
1403 dl 1.1 WaitQueueNode node = null;
1404 dl 1.4 WaitQueueNode h;
1405 jsr166 1.5 while (eventCount == prev &&
1406 dl 1.4 ((h = syncStack) == null || h.count == prev)) {
1407     if (node == null)
1408     node = new WaitQueueNode(prev, w);
1409     if (casBarrierStack(node.next = h, node)) {
1410     node.awaitSyncRelease(this);
1411 dl 1.1 break;
1412     }
1413     }
1414 dl 1.4 long ec = ensureSync();
1415     if (ec != prev) {
1416     w.lastEventCount = ec;
1417     break;
1418     }
1419     }
1420     }
1421    
1422     /**
1423     * Returns true if worker waiting on sync can proceed:
1424     * - on signal (thread == null)
1425     * - on event count advance (winning race to notify vs signaller)
1426     * - on Interrupt
1427 jsr166 1.5 * - if the first queued node, we find work available
1428     * If node was not signalled and event count not advanced on exit,
1429 dl 1.4 * then we also help advance event count.
1430     * @return true if node can be released
1431     */
1432     final boolean syncIsReleasable(WaitQueueNode node) {
1433     long prev = node.count;
1434     if (!Thread.interrupted() && node.thread != null &&
1435     (node.next != null ||
1436     !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1437     eventCount == prev)
1438     return false;
1439     if (node.thread != null) {
1440     node.thread = null;
1441 dl 1.1 long ec = eventCount;
1442 dl 1.4 if (prev <= ec) // help signal
1443     casEventCount(ec, ec+1);
1444 dl 1.1 }
1445 dl 1.4 return true;
1446     }
1447    
1448     /**
1449     * Returns true if a new sync event occurred since last call to
1450     * sync or this method, if so, updating caller's count.
1451     */
1452     final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1453     long lc = w.lastEventCount;
1454     long ec = ensureSync();
1455     if (ec == lc)
1456     return false;
1457     w.lastEventCount = ec;
1458     return true;
1459 dl 1.1 }
1460    
1461     // Parallelism maintenance
1462    
1463     /**
1464 jsr166 1.10 * Decrements running count; if too low, adds spare.
1465 dl 1.1 *
1466     * Conceptually, all we need to do here is add or resume a
1467     * spare thread when one is about to block (and remove or
1468     * suspend it later when unblocked -- see suspendIfSpare).
1469     * However, implementing this idea requires coping with
1470     * several problems: We have imperfect information about the
1471     * states of threads. Some count updates can and usually do
1472     * lag run state changes, despite arrangements to keep them
1473     * accurate (for example, when possible, updating counts
1474     * before signalling or resuming), especially when running on
1475     * dynamic JVMs that don't optimize the infrequent paths that
1476     * update counts. Generating too many threads can make these
1477     * problems become worse, because excess threads are more
1478     * likely to be context-switched with others, slowing them all
1479     * down, especially if there is no work available, so all are
1480     * busy scanning or idling. Also, excess spare threads can
1481     * only be suspended or removed when they are idle, not
1482     * immediately when they aren't needed. So adding threads will
1483     * raise parallelism level for longer than necessary. Also,
1484 jsr166 1.9 * FJ applications often encounter highly transient peaks when
1485 dl 1.1 * many threads are blocked joining, but for less time than it
1486     * takes to create or resume spares.
1487     *
1488     * @param joinMe if non-null, return early if done
1489     * @param maintainParallelism if true, try to stay within
1490     * target counts, else create only to avoid starvation
1491     * @return true if joinMe known to be done
1492     */
1493     final boolean preJoin(ForkJoinTask<?> joinMe, boolean maintainParallelism) {
1494     maintainParallelism &= maintainsParallelism; // overrride
1495     boolean dec = false; // true when running count decremented
1496     while (spareStack == null || !tryResumeSpare(dec)) {
1497     int counts = workerCounts;
1498     if (dec || (dec = casWorkerCounts(counts, --counts))) { // CAS cheat
1499     if (!needSpare(counts, maintainParallelism))
1500     break;
1501     if (joinMe.status < 0)
1502     return true;
1503     if (tryAddSpare(counts))
1504     break;
1505     }
1506     }
1507     return false;
1508     }
1509    
1510     /**
1511     * Same idea as preJoin
1512     */
1513     final boolean preBlock(ManagedBlocker blocker, boolean maintainParallelism){
1514     maintainParallelism &= maintainsParallelism;
1515     boolean dec = false;
1516     while (spareStack == null || !tryResumeSpare(dec)) {
1517     int counts = workerCounts;
1518     if (dec || (dec = casWorkerCounts(counts, --counts))) {
1519     if (!needSpare(counts, maintainParallelism))
1520     break;
1521     if (blocker.isReleasable())
1522     return true;
1523     if (tryAddSpare(counts))
1524     break;
1525     }
1526     }
1527     return false;
1528     }
1529    
1530     /**
1531     * Returns true if a spare thread appears to be needed. If
1532     * maintaining parallelism, returns true when the deficit in
1533     * running threads is more than the surplus of total threads, and
1534     * there is apparently some work to do. This self-limiting rule
1535     * means that the more threads that have already been added, the
1536     * less parallelism we will tolerate before adding another.
1537     * @param counts current worker counts
1538     * @param maintainParallelism try to maintain parallelism
1539     */
1540     private boolean needSpare(int counts, boolean maintainParallelism) {
1541     int ps = parallelism;
1542     int rc = runningCountOf(counts);
1543     int tc = totalCountOf(counts);
1544     int runningDeficit = ps - rc;
1545     int totalSurplus = tc - ps;
1546     return (tc < maxPoolSize &&
1547     (rc == 0 || totalSurplus < 0 ||
1548     (maintainParallelism &&
1549 jsr166 1.5 runningDeficit > totalSurplus &&
1550 dl 1.4 ForkJoinWorkerThread.hasQueuedTasks(workers))));
1551 dl 1.1 }
1552 jsr166 1.5
1553 dl 1.1 /**
1554 jsr166 1.10 * Adds a spare worker if lock available and no more than the
1555     * expected numbers of threads exist.
1556 dl 1.1 * @return true if successful
1557     */
1558     private boolean tryAddSpare(int expectedCounts) {
1559     final ReentrantLock lock = this.workerLock;
1560     int expectedRunning = runningCountOf(expectedCounts);
1561     int expectedTotal = totalCountOf(expectedCounts);
1562     boolean success = false;
1563     boolean locked = false;
1564     // confirm counts while locking; CAS after obtaining lock
1565     try {
1566     for (;;) {
1567     int s = workerCounts;
1568     int tc = totalCountOf(s);
1569     int rc = runningCountOf(s);
1570     if (rc > expectedRunning || tc > expectedTotal)
1571     break;
1572     if (!locked && !(locked = lock.tryLock()))
1573     break;
1574     if (casWorkerCounts(s, workerCountsFor(tc+1, rc+1))) {
1575     createAndStartSpare(tc);
1576     success = true;
1577     break;
1578     }
1579     }
1580     } finally {
1581     if (locked)
1582     lock.unlock();
1583     }
1584     return success;
1585     }
1586    
1587     /**
1588 jsr166 1.10 * Adds the kth spare worker. On entry, pool counts are already
1589 dl 1.1 * adjusted to reflect addition.
1590     */
1591     private void createAndStartSpare(int k) {
1592     ForkJoinWorkerThread w = null;
1593     ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(k + 1);
1594     int len = ws.length;
1595     // Probably, we can place at slot k. If not, find empty slot
1596     if (k < len && ws[k] != null) {
1597     for (k = 0; k < len && ws[k] != null; ++k)
1598     ;
1599     }
1600 dl 1.3 if (k < len && !isTerminating() && (w = createWorker(k)) != null) {
1601 dl 1.1 ws[k] = w;
1602     w.start();
1603     }
1604     else
1605     updateWorkerCount(-1); // adjust on failure
1606 dl 1.4 signalIdleWorkers();
1607 dl 1.1 }
1608    
1609     /**
1610 jsr166 1.10 * Suspends calling thread w if there are excess threads. Called
1611     * only from sync. Spares are enqueued in a Treiber stack using
1612     * the same WaitQueueNodes as barriers. They are resumed mainly
1613     * in preJoin, but are also woken on pool events that require all
1614     * threads to check run state.
1615 dl 1.1 * @param w the caller
1616     */
1617     private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1618     WaitQueueNode node = null;
1619     int s;
1620     while (parallelism < runningCountOf(s = workerCounts)) {
1621     if (node == null)
1622 dl 1.4 node = new WaitQueueNode(0, w);
1623 dl 1.1 if (casWorkerCounts(s, s-1)) { // representation-dependent
1624     // push onto stack
1625     do;while (!casSpareStack(node.next = spareStack, node));
1626     // block until released by resumeSpare
1627 dl 1.4 node.awaitSpareRelease();
1628 dl 1.1 return true;
1629     }
1630     }
1631     return false;
1632     }
1633    
1634     /**
1635 jsr166 1.10 * Tries to pop and resume a spare thread.
1636 dl 1.1 * @param updateCount if true, increment running count on success
1637     * @return true if successful
1638     */
1639     private boolean tryResumeSpare(boolean updateCount) {
1640     WaitQueueNode q;
1641     while ((q = spareStack) != null) {
1642     if (casSpareStack(q, q.next)) {
1643     if (updateCount)
1644     updateRunningCount(1);
1645     q.signal();
1646     return true;
1647     }
1648     }
1649     return false;
1650     }
1651    
1652     /**
1653 jsr166 1.10 * Pops and resumes all spare threads. Same idea as ensureSync.
1654 dl 1.1 * @return true if any spares released
1655     */
1656     private boolean resumeAllSpares() {
1657     WaitQueueNode q;
1658     while ( (q = spareStack) != null) {
1659     if (casSpareStack(q, null)) {
1660     do {
1661     updateRunningCount(1);
1662     q.signal();
1663     } while ((q = q.next) != null);
1664     return true;
1665     }
1666     }
1667     return false;
1668     }
1669    
1670     /**
1671 jsr166 1.10 * Pops and shuts down excessive spare threads. Call only while
1672 dl 1.1 * holding lock. This is not guaranteed to eliminate all excess
1673     * threads, only those suspended as spares, which are the ones
1674     * unlikely to be needed in the future.
1675     */
1676     private void trimSpares() {
1677     int surplus = totalCountOf(workerCounts) - parallelism;
1678     WaitQueueNode q;
1679     while (surplus > 0 && (q = spareStack) != null) {
1680     if (casSpareStack(q, null)) {
1681     do {
1682     updateRunningCount(1);
1683     ForkJoinWorkerThread w = q.thread;
1684     if (w != null && surplus > 0 &&
1685     runningCountOf(workerCounts) > 0 && w.shutdown())
1686     --surplus;
1687     q.signal();
1688     } while ((q = q.next) != null);
1689     }
1690     }
1691     }
1692    
1693     /**
1694     * Interface for extending managed parallelism for tasks running
1695     * in ForkJoinPools. A ManagedBlocker provides two methods.
1696 jsr166 1.8 * Method {@code isReleasable} must return true if blocking is not
1697     * necessary. Method {@code block} blocks the current thread
1698 dl 1.1 * if necessary (perhaps internally invoking isReleasable before
1699     * actually blocking.).
1700     * <p>For example, here is a ManagedBlocker based on a
1701     * ReentrantLock:
1702     * <pre>
1703     * class ManagedLocker implements ManagedBlocker {
1704     * final ReentrantLock lock;
1705     * boolean hasLock = false;
1706     * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1707     * public boolean block() {
1708     * if (!hasLock)
1709     * lock.lock();
1710     * return true;
1711     * }
1712     * public boolean isReleasable() {
1713     * return hasLock || (hasLock = lock.tryLock());
1714     * }
1715     * }
1716     * </pre>
1717     */
1718     public static interface ManagedBlocker {
1719     /**
1720     * Possibly blocks the current thread, for example waiting for
1721     * a lock or condition.
1722     * @return true if no additional blocking is necessary (i.e.,
1723 jsr166 1.11 * if isReleasable would return true)
1724 dl 1.1 * @throws InterruptedException if interrupted while waiting
1725 jsr166 1.9 * (the method is not required to do so, but is allowed to).
1726 dl 1.1 */
1727     boolean block() throws InterruptedException;
1728    
1729     /**
1730     * Returns true if blocking is unnecessary.
1731     */
1732     boolean isReleasable();
1733     }
1734    
1735     /**
1736     * Blocks in accord with the given blocker. If the current thread
1737     * is a ForkJoinWorkerThread, this method possibly arranges for a
1738     * spare thread to be activated if necessary to ensure parallelism
1739     * while the current thread is blocked. If
1740 jsr166 1.8 * {@code maintainParallelism} is true and the pool supports
1741 dl 1.2 * it ({@link #getMaintainsParallelism}), this method attempts to
1742 dl 1.1 * maintain the pool's nominal parallelism. Otherwise if activates
1743     * a thread only if necessary to avoid complete starvation. This
1744     * option may be preferable when blockages use timeouts, or are
1745     * almost always brief.
1746     *
1747     * <p> If the caller is not a ForkJoinTask, this method is behaviorally
1748     * equivalent to
1749     * <pre>
1750     * while (!blocker.isReleasable())
1751     * if (blocker.block())
1752     * return;
1753     * </pre>
1754     * If the caller is a ForkJoinTask, then the pool may first
1755     * be expanded to ensure parallelism, and later adjusted.
1756     *
1757     * @param blocker the blocker
1758     * @param maintainParallelism if true and supported by this pool,
1759     * attempt to maintain the pool's nominal parallelism; otherwise
1760     * activate a thread only if necessary to avoid complete
1761     * starvation.
1762 jsr166 1.11 * @throws InterruptedException if blocker.block did so
1763 dl 1.1 */
1764     public static void managedBlock(ManagedBlocker blocker,
1765     boolean maintainParallelism)
1766     throws InterruptedException {
1767     Thread t = Thread.currentThread();
1768     ForkJoinPool pool = (t instanceof ForkJoinWorkerThread?
1769     ((ForkJoinWorkerThread)t).pool : null);
1770     if (!blocker.isReleasable()) {
1771     try {
1772     if (pool == null ||
1773     !pool.preBlock(blocker, maintainParallelism))
1774     awaitBlocker(blocker);
1775     } finally {
1776     if (pool != null)
1777     pool.updateRunningCount(1);
1778     }
1779     }
1780     }
1781    
1782     private static void awaitBlocker(ManagedBlocker blocker)
1783     throws InterruptedException {
1784     do;while (!blocker.isReleasable() && !blocker.block());
1785     }
1786    
1787 dl 1.2 // AbstractExecutorService overrides
1788    
1789     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1790     return new AdaptedRunnable(runnable, value);
1791     }
1792    
1793     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1794     return new AdaptedCallable(callable);
1795     }
1796    
1797 dl 1.1
1798     // Temporary Unsafe mechanics for preliminary release
1799 jsr166 1.5 private static Unsafe getUnsafe() throws Throwable {
1800     try {
1801     return Unsafe.getUnsafe();
1802     } catch (SecurityException se) {
1803     try {
1804     return java.security.AccessController.doPrivileged
1805     (new java.security.PrivilegedExceptionAction<Unsafe>() {
1806     public Unsafe run() throws Exception {
1807     return getUnsafePrivileged();
1808     }});
1809     } catch (java.security.PrivilegedActionException e) {
1810     throw e.getCause();
1811     }
1812     }
1813     }
1814    
1815     private static Unsafe getUnsafePrivileged()
1816     throws NoSuchFieldException, IllegalAccessException {
1817     Field f = Unsafe.class.getDeclaredField("theUnsafe");
1818     f.setAccessible(true);
1819     return (Unsafe) f.get(null);
1820     }
1821    
1822     private static long fieldOffset(String fieldName)
1823     throws NoSuchFieldException {
1824 jsr166 1.12 return UNSAFE.objectFieldOffset
1825 jsr166 1.5 (ForkJoinPool.class.getDeclaredField(fieldName));
1826     }
1827 dl 1.1
1828 jsr166 1.12 static final Unsafe UNSAFE;
1829 dl 1.1 static final long eventCountOffset;
1830     static final long workerCountsOffset;
1831     static final long runControlOffset;
1832 dl 1.4 static final long syncStackOffset;
1833 dl 1.1 static final long spareStackOffset;
1834    
1835     static {
1836     try {
1837 jsr166 1.12 UNSAFE = getUnsafe();
1838 jsr166 1.5 eventCountOffset = fieldOffset("eventCount");
1839     workerCountsOffset = fieldOffset("workerCounts");
1840     runControlOffset = fieldOffset("runControl");
1841     syncStackOffset = fieldOffset("syncStack");
1842     spareStackOffset = fieldOffset("spareStack");
1843     } catch (Throwable e) {
1844 dl 1.1 throw new RuntimeException("Could not initialize intrinsics", e);
1845     }
1846     }
1847    
1848     private boolean casEventCount(long cmp, long val) {
1849 jsr166 1.12 return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val);
1850 dl 1.1 }
1851     private boolean casWorkerCounts(int cmp, int val) {
1852 jsr166 1.12 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val);
1853 dl 1.1 }
1854     private boolean casRunControl(int cmp, int val) {
1855 jsr166 1.12 return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val);
1856 dl 1.1 }
1857     private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) {
1858 jsr166 1.12 return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val);
1859 dl 1.1 }
1860     private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1861 jsr166 1.12 return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val);
1862 dl 1.1 }
1863     }