ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/forkjoin/ForkJoinPool.java
Revision: 1.35
Committed: Tue Jan 6 14:34:58 2009 UTC (15 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.34: +0 -0 lines
State: FILE REMOVED
Log Message:
Repackaging

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y.forkjoin;
8 import java.util.*;
9 import java.util.concurrent.*;
10 import java.util.concurrent.locks.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15
16 /**
17 * Host for a group of ForkJoinWorkerThreads that perform
18 * ForkJoinTasks. A ForkJoinPool also provides the entry point for
19 * tasks submitted from non-ForkJoinTasks, as well as management and
20 * monitoring operations. Normally a single ForkJoinPool is used for
21 * a large number of submitted tasks. Otherwise, use would not always
22 * outweigh the construction overhead of creating a large set of
23 * threads and the associated startup bookkeeping.
24 *
25 * <p> Class ForkJoinPool does not implement the ExecutorService
26 * interface because it only executes ForkJoinTasks, not arbitrary
27 * Runnables. However, for the sake of uniformity, it supports
28 * analogous lifecycle control methods such as shutdown.
29 *
30 * <p>A ForkJoinPool may be constructed with any number of worker
31 * threads, and worker threads may be added and removed dynamically.
32 * However, as a general rule, using a pool size of the number of
33 * processors on a given system (as arranged by the default
34 * constructor) will result in the best performance. Resizing may be
35 * expensive and may cause transient imbalances and slowdowns.
36 *
37 * <p>In addition to execution and lifecycle control methods, this
38 * class provides status check methods (for example
39 * <tt>getStealCount</tt>) that are intended to aid in developing,
40 * tuning, and monitoring fork/join applications.
41 */
42 public class ForkJoinPool implements ForkJoinExecutor {
43
44 /*
45 * This is an overhauled version of the framework described in "A
46 * Java Fork/Join Framework" by Doug Lea, in, Proceedings, ACM
47 * JavaGrande Conference, June 2000
48 * (http://gee.cs.oswego.edu/dl/papers/fj.pdf). It retains most of
49 * the basic structure, but includes a number of algorithmic
50 * improvements, along with integration with other
51 * java.util.concurrent components.
52 */
53
54 /**
55 * Factory for creating new ForkJoinWorkerThreads. A
56 * ForkJoinWorkerThreadFactory must be defined and used for
57 * ForkJoinWorkerThread subclasses that extend base functionality
58 * or initialize threads with different contexts.
59 */
60 public static interface ForkJoinWorkerThreadFactory {
61 /**
62 * Returns a new worker thread operating in the given pool.
63 *
64 * @param pool the pool this thread works in
65 * @throws NullPointerException if pool is null;
66 */
67 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
68 }
69
70 /**
71 * The default ForkJoinWorkerThreadFactory, used unless overridden
72 * in ForkJoinPool constructors.
73 */
74 public static class DefaultForkJoinWorkerThreadFactory
75 implements ForkJoinWorkerThreadFactory {
76 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
77 return new ForkJoinWorkerThread(pool);
78 }
79 }
80
81 private static final DefaultForkJoinWorkerThreadFactory
82 defaultForkJoinWorkerThreadFactory =
83 new DefaultForkJoinWorkerThreadFactory();
84
85
86 /**
87 * Permission required for callers of methods that may start or
88 * kill threads.
89 */
90 private static final RuntimePermission modifyThreadPermission =
91 new RuntimePermission("modifyThread");
92
93 /**
94 * If there is a security manager, makes sure caller has
95 * permission to modify threads.
96 */
97 private static void checkPermission() {
98 SecurityManager security = System.getSecurityManager();
99 if (security != null)
100 security.checkPermission(modifyThreadPermission);
101 }
102
103 /**
104 * Generator for assigning sequence numbers as thread names.
105 */
106 private static final AtomicInteger poolNumberGenerator =
107 new AtomicInteger();
108
109 /**
110 * Array holding all worker threads in the pool. Array size must
111 * be a power of two. Acts similarly to a CopyOnWriteArrayList --
112 * updates are protected by workerLock. But it additionally
113 * allows in-place nulling out or replacements of slots upon
114 * termination. All uses of this array should first assign as
115 * local, and must screen out nulls. Note: ForkJoinWorkerThreads
116 * directly access this array.
117 */
118 volatile ForkJoinWorkerThread[] workers;
119
120 /**
121 * Pool wide synchronization control. Workers are enabled to look
122 * for work when the barrier's count is incremented. If they fail
123 * to find some, they may wait for next count. Synchronization
124 * events occur only in enough contexts to maintain overall
125 * liveness:
126 *
127 * - Submission of a new task
128 * - Termination of pool or worker
129 *
130 * So, signals and waits occur relatively rarely during normal
131 * processing, which minimizes contention on this global
132 * synchronizer. Even so, the PoolBarrier is designed to minimize
133 * blockages by threads that have better things to do.
134 */
135 private final PoolBarrier poolBarrier;
136
137 /**
138 * Lock protecting access to workers.
139 */
140 private final ReentrantLock workerLock;
141
142 /**
143 * Condition for awaitTermination.
144 */
145 private final Condition termination;
146
147 /**
148 * Lifecycle control.
149 */
150 private final RunState runState;
151
152 /**
153 * The number of submissions that are running in pool.
154 */
155 private final AtomicInteger runningSubmissions;
156
157 /**
158 * The uncaught exception handler used when any worker
159 * abruptly terminates
160 */
161 private Thread.UncaughtExceptionHandler ueh;
162
163 /**
164 * Creation factory for worker threads.
165 */
166 private final ForkJoinWorkerThreadFactory factory;
167
168 /**
169 * Head and tail of embedded submission queue. (The queue is
170 * embedded to avoid hostile memory placements.)
171 */
172 private volatile SQNode sqHead;
173 private volatile SQNode sqTail;
174
175 /**
176 * Number of workers that are (probably) executing tasks.
177 * Atomically incremented when a worker gets a task to run, and
178 * decremented when worker has no tasks and cannot find any. This
179 * is updated only via CAS. It is inlined here rather than a
180 * stand-alone field to minimize memory thrashing when it is
181 * heavily contended during ramp-up/down of tasks.
182 */
183 private volatile int activeCount;
184
185 /**
186 * The current targeted pool size. Updated only under worker lock
187 * but volatile to allow concurrent reads.
188 */
189 private volatile int poolSize;
190
191 /**
192 * The number of workers that have started but not yet terminated
193 * Accessed only under workerLock.
194 */
195 private int runningWorkers;
196
197 /**
198 * Pool number, just for assigning useful names to worker threads
199 */
200 private final int poolNumber;
201
202 /**
203 * Return a good size for worker array given pool size.
204 * Currently requires size to be a power of two.
205 */
206 private static int workerSizeFor(int ps) {
207 return ps <= 1? 1 : (1 << (32 - Integer.numberOfLeadingZeros(ps-1)));
208 }
209
210 /**
211 * Create new worker using factory.
212 * @param index the index to assign worker
213 */
214 private ForkJoinWorkerThread createWorker(int index) {
215 ForkJoinWorkerThread w = factory.newThread(this);
216 w.setDaemon(true);
217 w.setWorkerPoolIndex(index);
218 w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
219 Thread.UncaughtExceptionHandler h = ueh;
220 if (h != null)
221 w.setUncaughtExceptionHandler(h);
222 return w;
223 }
224
225 /**
226 * Creates a ForkJoinPool with a pool size equal to the number of
227 * processors available on the system and using the default
228 * ForkJoinWorkerThreadFactory,
229 * @throws SecurityException if a security manager exists and
230 * the caller is not permitted to modify threads
231 * because it does not hold {@link
232 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
233 */
234 public ForkJoinPool() {
235 this(Runtime.getRuntime().availableProcessors(),
236 defaultForkJoinWorkerThreadFactory);
237 }
238
239 /**
240 * Creates a ForkJoinPool with the indicated number of Worker
241 * threads, and using the default ForkJoinWorkerThreadFactory,
242 * @param poolSize the number of worker threads
243 * @throws IllegalArgumentException if poolSize less than or
244 * equal to zero
245 * @throws SecurityException if a security manager exists and
246 * the caller is not permitted to modify threads
247 * because it does not hold {@link
248 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
249 */
250 public ForkJoinPool(int poolSize) {
251 this(poolSize, defaultForkJoinWorkerThreadFactory);
252 }
253
254 /**
255 * Creates a ForkJoinPool with a pool size equal to the number of
256 * processors available on the system and using the given
257 * ForkJoinWorkerThreadFactory,
258 * @param factory the factory for creating new threads
259 * @throws NullPointerException if factory is null
260 * @throws SecurityException if a security manager exists and
261 * the caller is not permitted to modify threads
262 * because it does not hold {@link
263 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
264 */
265 public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
266 this(Runtime.getRuntime().availableProcessors(), factory);
267 }
268
269 /**
270 * Creates a ForkJoinPool with the indicated number of worker
271 * threads and the given factory.
272 *
273 * <p> You can also add and remove threads while the pool is
274 * running. But it is generally more efficient and leads to more
275 * predictable performance to initialize the pool with a
276 * sufficient number of threads to support the desired concurrency
277 * level and leave this value fixed.
278 *
279 * @param poolSize the number of worker threads
280 * @param factory the factory for creating new threads
281 * @throws IllegalArgumentException if poolSize less than or
282 * equal to zero
283 * @throws NullPointerException if factory is null
284 * @throws SecurityException if a security manager exists and
285 * the caller is not permitted to modify threads
286 * because it does not hold {@link
287 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
288 */
289 public ForkJoinPool(int poolSize, ForkJoinWorkerThreadFactory factory) {
290 if (poolSize <= 0)
291 throw new IllegalArgumentException();
292 if (factory == null)
293 throw new NullPointerException();
294 checkPermission();
295 this.poolSize = poolSize;
296 this.factory = factory;
297 this.poolNumber = poolNumberGenerator.incrementAndGet();
298 this.poolBarrier = new PoolBarrier();
299 this.runState = new RunState();
300 this.runningSubmissions = new AtomicInteger();
301 this.workerLock = new ReentrantLock();
302 this.termination = workerLock.newCondition();
303 SQNode dummy = new SQNode(null);
304 this.sqHead = dummy;
305 this.sqTail = dummy;
306 createAndStartWorkers(poolSize);
307 }
308
309 /**
310 * Initial worker array and worker creation and startup. (This
311 * must be done under lock to avoid interference by some of the
312 * newly started threads while creating others.)
313 */
314 private void createAndStartWorkers(int ps) {
315 final ReentrantLock lock = this.workerLock;
316 lock.lock();
317 try {
318 ForkJoinWorkerThread[] ws =
319 new ForkJoinWorkerThread[workerSizeFor(ps)];
320 workers = ws;
321 for (int i = 0; i < ps; ++i)
322 ws[i] = createWorker(i);
323 for (int i = 0; i < ps; ++i) {
324 ws[i].start();
325 ++runningWorkers;
326 }
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 /**
333 * Performs the given task; returning its result upon completion
334 * @param task the task
335 * @return the task's result
336 * @throws NullPointerException if task is null
337 * @throws RejectedExecutionException if pool is shut down
338 */
339 public <T> T invoke(ForkJoinTask<T> task) {
340 return doSubmit(task).awaitInvoke();
341 }
342
343 /**
344 * Arranges for (asynchronous) execution of the given task,
345 * returning a {@link Future} that may be used to obtain results
346 * upon completion. (The only supported operations on this object
347 * are those defined in the <tt>Future</tt> interface.)
348 * @param task the task
349 * @return a Future that can be used to get the task's results.
350 * @throws NullPointerException if task is null
351 * @throws RejectedExecutionException if pool is shut down
352 */
353 public <T> Future<T> submit(ForkJoinTask<T> task) {
354 return doSubmit(task);
355 }
356
357 /**
358 * Arranges for (asynchronous) execution of the given task.
359 * @param task the task
360 * @throws NullPointerException if task is null
361 * @throws RejectedExecutionException if pool is shut down
362 */
363 public <T> void execute(ForkJoinTask<T> task) {
364 doSubmit(task);
365 }
366
367 /**
368 * Common code for invoke and submit
369 */
370 private <T> Submission<T> doSubmit(ForkJoinTask<T> task) {
371 if (task == null)
372 throw new NullPointerException();
373 if (runState.isAtLeastShutdown())
374 throw new RejectedExecutionException();
375 Submission<T> job = new Submission<T>(task, this);
376 addSubmission(job);
377 poolBarrier.signal();
378 return job;
379 }
380
381 /**
382 * Returns the targeted number of worker threads in this pool.
383 * This value does not necessarily reflect transient changes as
384 * threads are added, removed, or abruptly terminate.
385 *
386 * @return the number of worker threads in this pool
387 */
388 public int getPoolSize() {
389 return poolSize;
390 }
391
392 /**
393 * Equivalent to {@link #getPoolSize}.
394 *
395 * @return the number of worker threads in this pool
396 */
397 public int getParallelismLevel() {
398 return poolSize;
399 }
400
401 /**
402 * Returns the number of worker threads that have started but not
403 * yet terminated. This result returned by this method may differ
404 * from <tt>getPoolSize</tt> when threads are added, removed, or
405 * abruptly terminate.
406 *
407 * @return the number of worker threads
408 */
409 public int getRunningWorkerCount() {
410 int r;
411 final ReentrantLock lock = this.workerLock;
412 lock.lock();
413 try {
414 r = runningWorkers;
415 } finally {
416 lock.unlock();
417 }
418 return r;
419 }
420
421 /**
422 * Sets the handler for internal worker threads that terminate due
423 * to unrecoverable errors encountered while executing tasks.
424 * Unless set, the current default or ThreadGroup handler is used
425 * as handler.
426 *
427 * @param h the new handler
428 * @return the old handler, or null if none
429 * @throws SecurityException if a security manager exists and
430 * the caller is not permitted to modify threads
431 * because it does not hold {@link
432 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
433 */
434 public Thread.UncaughtExceptionHandler
435 setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
436 checkPermission();
437 Thread.UncaughtExceptionHandler old = null;
438 final ReentrantLock lock = this.workerLock;
439 lock.lock();
440 try {
441 old = ueh;
442 ueh = h;
443 ForkJoinWorkerThread[] ws = workers;
444 for (int i = 0; i < ws.length; ++i) {
445 ForkJoinWorkerThread w = ws[i];
446 if (w != null)
447 w.setUncaughtExceptionHandler(h);
448 }
449 } finally {
450 lock.unlock();
451 }
452 return old;
453 }
454
455 /**
456 * Returns the handler for internal worker threads that terminate
457 * due to unrecoverable errors encountered while executing tasks.
458 * @return the handler, or null if none
459 */
460 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
461 Thread.UncaughtExceptionHandler h;
462 final ReentrantLock lock = this.workerLock;
463 lock.lock();
464 try {
465 h = ueh;
466 } finally {
467 lock.unlock();
468 }
469 return h;
470 }
471
472 /**
473 * Tries to adds the indicated number of new worker threads to the
474 * pool. This method may be used to increase the amount of
475 * parallelism available to tasks. The actual number of
476 * threads added may be less than requested if the pool
477 * is terminating or terminated
478 * @return the number of threads added
479 * @throws SecurityException if a security manager exists and
480 * the caller is not permitted to modify threads
481 * because it does not hold {@link
482 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
483 */
484 public int addWorkers(int numberToAdd) {
485 int nadded = 0;
486 checkPermission();
487 final ReentrantLock lock = this.workerLock;
488 lock.lock();
489 try {
490 if (!runState.isAtLeastStopping()) {
491 ForkJoinWorkerThread[] ws = workers;
492 int len = ws.length;
493 int newLen = len + numberToAdd;
494 int newSize = workerSizeFor(newLen);
495 ForkJoinWorkerThread[] nws =
496 new ForkJoinWorkerThread[newSize];
497 System.arraycopy(ws, 0, nws, 0, len);
498 for (int i = len; i < newLen; ++i)
499 nws[i] = createWorker(i);
500 workers = nws;
501 for (int i = len; i < newLen; ++i) {
502 nws[i].start();
503 ++runningWorkers;
504 }
505 poolSize += numberToAdd;
506 nadded = numberToAdd;
507 }
508 } finally {
509 lock.unlock();
510 }
511 return nadded;
512 }
513
514 /**
515 * Tries to remove the indicated number of worker threads from the
516 * pool. The workers will exit the next time they are idle. This
517 * method may be used to decrease the amount of parallelism
518 * available to tasks. The actual number of workers removed
519 * may be less than requested if the pool size would become
520 * zero or the pool is terminating or terminated.
521 * @return the number removed.
522 * @throws SecurityException if a security manager exists and
523 * the caller is not permitted to modify threads
524 * because it does not hold {@link
525 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
526 */
527 public int removeWorkers(int numberToRemove) {
528 int nremoved = 0;
529 checkPermission();
530 final ReentrantLock lock = this.workerLock;
531 lock.lock();
532 try {
533 // shutdown in rightmost order to enable shrinkage in
534 // workerTerminated
535 ForkJoinWorkerThread[] ws = workers;
536 int k = ws.length;
537 while (!runState.isAtLeastStopping() &&
538 --k > 0 && // don't kill ws[0]
539 nremoved < numberToRemove) {
540 ForkJoinWorkerThread w = ws[k];
541 if (w != null) {
542 RunState rs = w.getRunState();
543 if (rs.transitionToShutdown()) {
544 --poolSize;
545 ++nremoved;
546 }
547 }
548 }
549 } finally {
550 lock.unlock();
551 }
552 return nremoved;
553 }
554
555 /**
556 * Tries to add or remove workers to attain the given pool size.
557 * This may fail to attain the given target if the pool is
558 * terminating or terminated.
559 * @param newSize the target poolSize
560 * @return the pool size upon exit of this method
561 * @throws IllegalArgumentException if newSize less than or
562 * equal to zero
563 * @throws SecurityException if a security manager exists and
564 * the caller is not permitted to modify threads
565 * because it does not hold {@link
566 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
567 */
568 public int setPoolSize(int newSize) {
569 checkPermission();
570 if (newSize <= 0)
571 throw new IllegalArgumentException();
572 final ReentrantLock lock = this.workerLock;
573 lock.lock();
574 try {
575 int ps = poolSize;
576 if (newSize > ps)
577 addWorkers(newSize - ps);
578 else if (newSize < ps)
579 removeWorkers(ps - newSize);
580 } finally {
581 lock.unlock();
582 }
583 return poolSize;
584 }
585
586 /**
587 * Callback from terminating worker.
588 * @param w the worker
589 * @param ex the exception causing abrupt termination, or null if
590 * completed normally
591 */
592 final void workerTerminated(ForkJoinWorkerThread w, Throwable ex) {
593 try {
594 final ReentrantLock lock = this.workerLock;
595 lock.lock();
596 try {
597 // Unless stopping, null slot, and if rightmost slots
598 // now null, shrink
599 if (!runState.isAtLeastStopping()) {
600 int idx = w.getWorkerPoolIndex();
601 ForkJoinWorkerThread[] ws = workers;
602 int len = ws.length;
603 if (idx >= 0 && idx < len && ws[idx] == w) {
604 ws[idx] = null;
605 int newlen = len;
606 while (newlen > 0 && ws[newlen-1] == null)
607 --newlen;
608 if (newlen < len) {
609 int newSize = workerSizeFor(newlen);
610 if (newSize < len) {
611 ForkJoinWorkerThread[] nws =
612 new ForkJoinWorkerThread[newSize];
613 System.arraycopy(ws, 0, nws, 0, newlen);
614 workers = nws;
615 poolBarrier.signal();
616 }
617 }
618 }
619 }
620 if (--runningWorkers == 0) {
621 terminate(); // no-op if already stopping
622 runState.transitionToTerminated();
623 termination.signalAll();
624 }
625 } finally {
626 lock.unlock();
627 }
628 } finally {
629 if (ex != null)
630 ForkJoinTask.rethrowException(ex);
631 }
632 }
633
634 // lifecycle control
635
636 /**
637 * Initiates an orderly shutdown in which previously submitted
638 * tasks are executed, but no new tasks will be accepted.
639 * Invocation has no additional effect if already shut down.
640 * Tasks that are in the process of being submitted concurrently
641 * during the course of this method may or may not be rejected.
642 * @throws SecurityException if a security manager exists and
643 * the caller is not permitted to modify threads
644 * because it does not hold {@link
645 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
646 */
647 public void shutdown() {
648 checkPermission();
649 runState.transitionToShutdown();
650 tryTerminateOnShutdown();
651 }
652
653 /**
654 * Attempts to stop all actively executing tasks, and cancels all
655 * waiting tasks. Tasks that are in the process of being
656 * submitted or executed concurrently during the course of this
657 * method may or may not be rejected.
658 * @throws SecurityException if a security manager exists and
659 * the caller is not permitted to modify threads
660 * because it does not hold {@link
661 * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
662 */
663 public void shutdownNow() {
664 checkPermission();
665 terminate();
666 }
667
668 /**
669 * Returns <tt>true</tt> if this pool has been shut down.
670 *
671 * @return <tt>true</tt> if this pool has been shut down
672 */
673 public boolean isShutdown() {
674 return runState.isAtLeastShutdown();
675 }
676
677 /**
678 * Returns <tt>true</tt> if all tasks have completed following shut down.
679 *
680 * @return <tt>true</tt> if all tasks have completed following shut down
681 */
682 public boolean isTerminated() {
683 return runState.isTerminated();
684 }
685
686 /**
687 * Returns <tt>true</tt> if termination has commenced but has
688 * not yet completed.
689 *
690 * @return <tt>true</tt> if in the process of terminating
691 */
692 public boolean isTerminating() {
693 return runState.isStopping();
694 }
695
696 /**
697 * Blocks until all tasks have completed execution after a shutdown
698 * request, or the timeout occurs, or the current thread is
699 * interrupted, whichever happens first.
700 *
701 * @param timeout the maximum time to wait
702 * @param unit the time unit of the timeout argument
703 * @return <tt>true</tt> if this executor terminated and
704 * <tt>false</tt> if the timeout elapsed before termination
705 * @throws InterruptedException if interrupted while waiting
706 */
707 public boolean awaitTermination(long timeout, TimeUnit unit)
708 throws InterruptedException {
709 long nanos = unit.toNanos(timeout);
710 final ReentrantLock lock = this.workerLock;
711 lock.lock();
712 try {
713 for (;;) {
714 if (runState.isTerminated())
715 return true;
716 if (nanos <= 0)
717 return false;
718 nanos = termination.awaitNanos(nanos);
719 }
720 } finally {
721 lock.unlock();
722 }
723 }
724
725 /**
726 * Initiate termination.
727 */
728 private void terminate() {
729 if (runState.transitionToStopping()) {
730 stopAllWorkers();
731 cancelQueuedSubmissions();
732 cancelQueuedWorkerTasks();
733 interruptUnterminatedWorkers();
734 }
735 }
736
737 /**
738 * Check for termination in shutdown state
739 */
740 private void tryTerminateOnShutdown() {
741 if (runState.isAtLeastShutdown() &&
742 runningSubmissions.get() == 0 &&
743 !hasQueuedSubmissions() &&
744 runningSubmissions.get() == 0) // recheck
745 terminate();
746 }
747
748 /**
749 * Clear out and cancel submissions
750 */
751 private void cancelQueuedSubmissions() {
752 Submission<?> task;
753 while (hasQueuedSubmissions() && (task = pollSubmission()) != null)
754 task.cancel(false);
755 }
756
757 /**
758 * Clean out worker queues.
759 */
760 private void cancelQueuedWorkerTasks() {
761 final ReentrantLock lock = this.workerLock;
762 lock.lock();
763 try {
764 ForkJoinWorkerThread[] ws = workers;
765 for (int i = 0; i < ws.length; ++i) {
766 ForkJoinWorkerThread t = ws[i];
767 if (t != null)
768 t.cancelTasks();
769 }
770 } finally {
771 lock.unlock();
772 }
773 }
774
775 /**
776 * Set each worker's status to stopping. Requires lock to avoid
777 * conflicts with add/remove
778 */
779 private void stopAllWorkers() {
780 final ReentrantLock lock = this.workerLock;
781 lock.lock();
782 try {
783 ForkJoinWorkerThread[] ws = workers;
784 for (int i = 0; i < ws.length; ++i) {
785 ForkJoinWorkerThread t = ws[i];
786 if (t != null) {
787 RunState rs = t.getRunState();
788 rs.transitionToStopping();
789 }
790 }
791 } finally {
792 lock.unlock();
793 }
794 poolBarrier.signal();
795 }
796
797 /**
798 * Interrupt all unterminated workers. This is not required for
799 * sake of internal control, but may help unstick user code during
800 * shutdown.
801 */
802 private void interruptUnterminatedWorkers() {
803 final ReentrantLock lock = this.workerLock;
804 lock.lock();
805 try {
806 ForkJoinWorkerThread[] ws = workers;
807 for (int i = 0; i < ws.length; ++i) {
808 ForkJoinWorkerThread t = ws[i];
809 if (t != null) {
810 RunState rs = t.getRunState();
811 if (!rs.isTerminated()) {
812 try {
813 t.interrupt();
814 } catch (SecurityException ignore) {
815 }
816 }
817 }
818 }
819 } finally {
820 lock.unlock();
821 }
822 }
823
824 // Status queries
825
826 /**
827 * Returns true if all worker threads are currently idle. An idle
828 * worker is one that cannot obtain a task to execute because none
829 * are available to steal from other threads, and there are no
830 * pending submissions to the pool. This method is conservative:
831 * It might not return true immediately upon idleness of all
832 * threads, but will eventually become true if threads remain
833 * inactive.
834 * @return true if all threads are currently idle
835 */
836 public final boolean isQuiescent() {
837 return activeCount == 0;
838 }
839
840 /**
841 * Returns the approximate number of threads that are
842 * currently executing tasks. This method may overestimate
843 * the number of active threads.
844 * @return the number of active threads.
845 */
846 public final int getActiveThreadCount() {
847 return activeCount;
848 }
849
850 /**
851 * Returns the approximate number of threads that are currently
852 * idle waiting for tasks. This method may underestimate the
853 * number of idle threads.
854 * @return the number of idle threads.
855 */
856 public final int getIdleThreadCount() {
857 return poolSize - activeCount;
858 }
859
860 /**
861 * Returns the total number of tasks stolen from one thread's work
862 * queue by another. This value is only an approximation, obtained
863 * by iterating across all threads in the pool, and may
864 * mis-estimate the actual total number of steals when the pool is
865 * not quiescent. But the value is still useful for monitoring and
866 * tuning fork/join programs: In general, steal counts should be
867 * high enough to keep threads busy, but low enough to avoid
868 * overhead and contention across threads.
869 * @return the number of steals.
870 */
871 public long getStealCount() {
872 long sum = 0;
873 ForkJoinWorkerThread[] ws = workers;
874 for (int i = 0; i < ws.length; ++i) {
875 ForkJoinWorkerThread t = ws[i];
876 if (t != null)
877 sum += t.getWorkerStealCount();
878 }
879 return sum;
880 }
881
882 /**
883 * Returns the total number of tasks currently held in queues by
884 * worker threads (but not including tasks submitted to the pool
885 * that have not begun executing). This value is only an
886 * approximation, obtained by iterating across all threads in the
887 * pool. This method may be useful for tuning task granularities.
888 * @return the number of tasks.
889 */
890 public long getTotalPerThreadQueueSize() {
891 long count = 0;
892 ForkJoinWorkerThread[] ws = workers;
893 for (int i = 0; i < ws.length; ++i) {
894 ForkJoinWorkerThread t = ws[i];
895 if (t != null)
896 count += t.getQueueSize();
897 }
898 return count;
899 }
900
901 /**
902 * Returns the number of tasks that have been submitted (via
903 * <tt>submit</tt> or <tt>invoke</tt>) and are currently executing
904 * in the pool.
905 * @return the number of tasks.
906 */
907 public int getActiveSubmissionCount() {
908 return runningSubmissions.get();
909 }
910
911 /**
912 * Returns the factory used for constructing new workers
913 *
914 * @return the factory used for constructing new workers
915 */
916 public ForkJoinWorkerThreadFactory getFactory() {
917 return factory;
918 }
919
920 // Callbacks from submissions
921
922 /**
923 * Callback on starting execution of externally submitted job.
924 */
925 final void submissionStarting() {
926 runningSubmissions.incrementAndGet();
927 }
928
929 /**
930 * Completion callback from externally submitted job.
931 */
932 final void submissionCompleted() {
933 if (runningSubmissions.decrementAndGet() == 0 &&
934 runState.isAtLeastShutdown())
935 tryTerminateOnShutdown();
936 }
937
938 /**
939 * Wait for a pool event, if necessary. Called only by workers.
940 */
941 final long barrierSync(long eventCount) {
942 return poolBarrier.sync(eventCount);
943 }
944
945 /**
946 * Embedded submission queue holds submissions not yet started by
947 * workers. This is a variant of a Michael/Scott queue that
948 * supports a fast check for apparent emptiness. This class
949 * opportunistically subclasses AtomicReference for next-field.
950 */
951 static final class SQNode extends AtomicReference<SQNode> {
952 Submission<?> submission;
953 SQNode(Submission<?> s) { submission = s; }
954 }
955
956 /**
957 * Quick check for likely non-emptiness. Returns true if an
958 * add completed but not yet fully taken.
959 */
960 final boolean mayHaveQueuedSubmissions() {
961 return sqHead != sqTail;
962 }
963
964 /**
965 * Returns true if there are any tasks submitted to this pool
966 * that have not yet begun executing.
967 * @return <tt>true</tt> if there are any queued submissions.
968 */
969 public boolean hasQueuedSubmissions() {
970 for (;;) {
971 SQNode h = sqHead;
972 SQNode t = sqTail;
973 SQNode f = h.get();
974 if (h == sqHead) {
975 if (f == null)
976 return false;
977 else if (h != t)
978 return true;
979 else
980 casSqTail(t, f);
981 }
982 }
983 }
984
985 final void addSubmission(Submission<?> x) {
986 SQNode n = new SQNode(x);
987 for (;;) {
988 SQNode t = sqTail;
989 SQNode s = t.get();
990 if (t == sqTail) {
991 if (s != null)
992 casSqTail(t, s);
993 else if (t.compareAndSet(s, n)) {
994 casSqTail(t, n);
995 return;
996 }
997 }
998 }
999 }
1000
1001 final Submission<?> pollSubmission() {
1002 for (;;) {
1003 SQNode h = sqHead;
1004 SQNode t = sqTail;
1005 SQNode f = h.get();
1006 if (h == sqHead) {
1007 if (f == null)
1008 return null;
1009 else if (h == t)
1010 casSqTail(t, f);
1011 else if (casSqHead(h, f)) {
1012 Submission<?> x = f.submission;
1013 f.submission = null;
1014 return x;
1015 }
1016 }
1017 }
1018 }
1019
1020 // Temporary Unsafe mechanics for preliminary release
1021
1022 static final Unsafe _unsafe;
1023 static final long activeCountOffset;
1024 static final long sqHeadOffset;
1025 static final long sqTailOffset;
1026
1027 static {
1028 try {
1029 if (ForkJoinPool.class.getClassLoader() != null) {
1030 Field f = Unsafe.class.getDeclaredField("theUnsafe");
1031 f.setAccessible(true);
1032 _unsafe = (Unsafe)f.get(null);
1033 }
1034 else
1035 _unsafe = Unsafe.getUnsafe();
1036 activeCountOffset = _unsafe.objectFieldOffset
1037 (ForkJoinPool.class.getDeclaredField("activeCount"));
1038 sqHeadOffset = _unsafe.objectFieldOffset
1039 (ForkJoinPool.class.getDeclaredField("sqHead"));
1040 sqTailOffset = _unsafe.objectFieldOffset
1041 (ForkJoinPool.class.getDeclaredField("sqTail"));
1042 } catch (Exception e) {
1043 throw new RuntimeException("Could not initialize intrinsics", e);
1044 }
1045 }
1046
1047 final boolean tryIncrementActiveCount() {
1048 int c = activeCount;
1049 return _unsafe.compareAndSwapInt(this, activeCountOffset,
1050 c, c+1);
1051 }
1052
1053 final boolean tryDecrementActiveCount() {
1054 int c = activeCount;
1055 return _unsafe.compareAndSwapInt(this, activeCountOffset,
1056 c, c-1);
1057 }
1058
1059 final void incrementActiveCount() {
1060 while (!tryIncrementActiveCount())
1061 ;
1062 }
1063
1064 final void decrementActiveCount() {
1065 while (!tryDecrementActiveCount())
1066 ;
1067 }
1068
1069 private boolean casSqTail(SQNode cmp, SQNode val) {
1070 return _unsafe.compareAndSwapObject(this, sqTailOffset, cmp, val);
1071 }
1072
1073 private boolean casSqHead(SQNode cmp, SQNode val) {
1074 return _unsafe.compareAndSwapObject(this, sqHeadOffset, cmp, val);
1075 }
1076 }