1 |
/* |
2 |
* Written by Doug Lea with assistance from members of JCP JSR-166 |
3 |
* Expert Group and released to the public domain, as explained at |
4 |
* http://creativecommons.org/licenses/publicdomain |
5 |
*/ |
6 |
|
7 |
package jsr166y.forkjoin; |
8 |
import java.util.*; |
9 |
import java.util.concurrent.*; |
10 |
import java.util.concurrent.locks.*; |
11 |
import java.util.concurrent.atomic.*; |
12 |
import sun.misc.Unsafe; |
13 |
import java.lang.reflect.*; |
14 |
|
15 |
|
16 |
/** |
17 |
* Host for a group of ForkJoinWorkerThreads that perform |
18 |
* ForkJoinTasks. A ForkJoinPool also provides the entry point for |
19 |
* tasks submitted from non-ForkJoinTasks, as well as management and |
20 |
* monitoring operations. Normally a single ForkJoinPool is used for |
21 |
* a large number of submitted tasks. Otherwise, use would not always |
22 |
* outweigh the construction overhead of creating a large set of |
23 |
* threads and the associated startup bookkeeping. |
24 |
* |
25 |
* <p> Class ForkJoinPool does not implement the ExecutorService |
26 |
* interface because it only executes ForkJoinTasks, not arbitrary |
27 |
* Runnables. However, for the sake of uniformity, it supports |
28 |
* analogous lifecycle control methods such as shutdown. |
29 |
* |
30 |
* <p>A ForkJoinPool may be constructed with any number of worker |
31 |
* threads, and worker threads may be added and removed dynamically. |
32 |
* However, as a general rule, using a pool size of the number of |
33 |
* processors on a given system (as arranged by the default |
34 |
* constructor) will result in the best performance. Resizing may be |
35 |
* expensive and may cause transient imbalances and slowdowns. |
36 |
* |
37 |
* <p>In addition to execution and lifecycle control methods, this |
38 |
* class provides status check methods (for example |
39 |
* <tt>getStealCount</tt>) that are intended to aid in developing, |
40 |
* tuning, and monitoring fork/join applications. |
41 |
*/ |
42 |
public class ForkJoinPool implements ForkJoinExecutor { |
43 |
|
44 |
/* |
45 |
* This is an overhauled version of the framework described in "A |
46 |
* Java Fork/Join Framework" by Doug Lea, in, Proceedings, ACM |
47 |
* JavaGrande Conference, June 2000 |
48 |
* (http://gee.cs.oswego.edu/dl/papers/fj.pdf). It retains most of |
49 |
* the basic structure, but includes a number of algorithmic |
50 |
* improvements, along with integration with other |
51 |
* java.util.concurrent components. |
52 |
*/ |
53 |
|
54 |
/** |
55 |
* Factory for creating new ForkJoinWorkerThreads. A |
56 |
* ForkJoinWorkerThreadFactory must be defined and used for |
57 |
* ForkJoinWorkerThread subclasses that extend base functionality |
58 |
* or initialize threads with different contexts. |
59 |
*/ |
60 |
public static interface ForkJoinWorkerThreadFactory { |
61 |
/** |
62 |
* Returns a new worker thread operating in the given pool. |
63 |
* |
64 |
* @param pool the pool this thread works in |
65 |
* @throws NullPointerException if pool is null; |
66 |
*/ |
67 |
public ForkJoinWorkerThread newThread(ForkJoinPool pool); |
68 |
} |
69 |
|
70 |
/** |
71 |
* The default ForkJoinWorkerThreadFactory, used unless overridden |
72 |
* in ForkJoinPool constructors. |
73 |
*/ |
74 |
public static class DefaultForkJoinWorkerThreadFactory |
75 |
implements ForkJoinWorkerThreadFactory { |
76 |
public ForkJoinWorkerThread newThread(ForkJoinPool pool) { |
77 |
return new ForkJoinWorkerThread(pool); |
78 |
} |
79 |
} |
80 |
|
81 |
private static final DefaultForkJoinWorkerThreadFactory |
82 |
defaultForkJoinWorkerThreadFactory = |
83 |
new DefaultForkJoinWorkerThreadFactory(); |
84 |
|
85 |
|
86 |
/** |
87 |
* Permission required for callers of methods that may start or |
88 |
* kill threads. |
89 |
*/ |
90 |
private static final RuntimePermission modifyThreadPermission = |
91 |
new RuntimePermission("modifyThread"); |
92 |
|
93 |
/** |
94 |
* If there is a security manager, makes sure caller has |
95 |
* permission to modify threads. |
96 |
*/ |
97 |
private static void checkPermission() { |
98 |
SecurityManager security = System.getSecurityManager(); |
99 |
if (security != null) |
100 |
security.checkPermission(modifyThreadPermission); |
101 |
} |
102 |
|
103 |
/** |
104 |
* Generator for assigning sequence numbers as thread names. |
105 |
*/ |
106 |
private static final AtomicInteger poolNumberGenerator = |
107 |
new AtomicInteger(); |
108 |
|
109 |
/** |
110 |
* Array holding all worker threads in the pool. Array size must |
111 |
* be a power of two. Acts similarly to a CopyOnWriteArrayList -- |
112 |
* updates are protected by workerLock. But it additionally |
113 |
* allows in-place nulling out or replacements of slots upon |
114 |
* termination. All uses of this array should first assign as |
115 |
* local, and must screen out nulls. Note: ForkJoinWorkerThreads |
116 |
* directly access this array. |
117 |
*/ |
118 |
volatile ForkJoinWorkerThread[] workers; |
119 |
|
120 |
/** |
121 |
* Pool wide synchronization control. Workers are enabled to look |
122 |
* for work when the barrier's count is incremented. If they fail |
123 |
* to find some, they may wait for next count. Synchronization |
124 |
* events occur only in enough contexts to maintain overall |
125 |
* liveness: |
126 |
* |
127 |
* - Submission of a new task |
128 |
* - Termination of pool or worker |
129 |
* |
130 |
* So, signals and waits occur relatively rarely during normal |
131 |
* processing, which minimizes contention on this global |
132 |
* synchronizer. Even so, the PoolBarrier is designed to minimize |
133 |
* blockages by threads that have better things to do. |
134 |
*/ |
135 |
private final PoolBarrier poolBarrier; |
136 |
|
137 |
/** |
138 |
* Lock protecting access to workers. |
139 |
*/ |
140 |
private final ReentrantLock workerLock; |
141 |
|
142 |
/** |
143 |
* Condition for awaitTermination. |
144 |
*/ |
145 |
private final Condition termination; |
146 |
|
147 |
/** |
148 |
* Lifecycle control. |
149 |
*/ |
150 |
private final RunState runState; |
151 |
|
152 |
/** |
153 |
* The number of submissions that are running in pool. |
154 |
*/ |
155 |
private final AtomicInteger runningSubmissions; |
156 |
|
157 |
/** |
158 |
* The uncaught exception handler used when any worker |
159 |
* abruptly terminates |
160 |
*/ |
161 |
private Thread.UncaughtExceptionHandler ueh; |
162 |
|
163 |
/** |
164 |
* Creation factory for worker threads. |
165 |
*/ |
166 |
private final ForkJoinWorkerThreadFactory factory; |
167 |
|
168 |
/** |
169 |
* Head and tail of embedded submission queue. (The queue is |
170 |
* embedded to avoid hostile memory placements.) |
171 |
*/ |
172 |
private volatile SQNode sqHead; |
173 |
private volatile SQNode sqTail; |
174 |
|
175 |
/** |
176 |
* Number of workers that are (probably) executing tasks. |
177 |
* Atomically incremented when a worker gets a task to run, and |
178 |
* decremented when worker has no tasks and cannot find any. This |
179 |
* is updated only via CAS. It is inlined here rather than a |
180 |
* stand-alone field to minimize memory thrashing when it is |
181 |
* heavily contended during ramp-up/down of tasks. |
182 |
*/ |
183 |
private volatile int activeCount; |
184 |
|
185 |
/** |
186 |
* The current targeted pool size. Updated only under worker lock |
187 |
* but volatile to allow concurrent reads. |
188 |
*/ |
189 |
private volatile int poolSize; |
190 |
|
191 |
/** |
192 |
* The number of workers that have started but not yet terminated |
193 |
* Accessed only under workerLock. |
194 |
*/ |
195 |
private int runningWorkers; |
196 |
|
197 |
/** |
198 |
* Pool number, just for assigning useful names to worker threads |
199 |
*/ |
200 |
private final int poolNumber; |
201 |
|
202 |
/** |
203 |
* Return a good size for worker array given pool size. |
204 |
* Currently requires size to be a power of two. |
205 |
*/ |
206 |
private static int workerSizeFor(int ps) { |
207 |
return ps <= 1? 1 : (1 << (32 - Integer.numberOfLeadingZeros(ps-1))); |
208 |
} |
209 |
|
210 |
/** |
211 |
* Create new worker using factory. |
212 |
* @param index the index to assign worker |
213 |
*/ |
214 |
private ForkJoinWorkerThread createWorker(int index) { |
215 |
ForkJoinWorkerThread w = factory.newThread(this); |
216 |
w.setDaemon(true); |
217 |
w.setWorkerPoolIndex(index); |
218 |
w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); |
219 |
Thread.UncaughtExceptionHandler h = ueh; |
220 |
if (h != null) |
221 |
w.setUncaughtExceptionHandler(h); |
222 |
return w; |
223 |
} |
224 |
|
225 |
/** |
226 |
* Creates a ForkJoinPool with a pool size equal to the number of |
227 |
* processors available on the system and using the default |
228 |
* ForkJoinWorkerThreadFactory, |
229 |
* @throws SecurityException if a security manager exists and |
230 |
* the caller is not permitted to modify threads |
231 |
* because it does not hold {@link |
232 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
233 |
*/ |
234 |
public ForkJoinPool() { |
235 |
this(Runtime.getRuntime().availableProcessors(), |
236 |
defaultForkJoinWorkerThreadFactory); |
237 |
} |
238 |
|
239 |
/** |
240 |
* Creates a ForkJoinPool with the indicated number of Worker |
241 |
* threads, and using the default ForkJoinWorkerThreadFactory, |
242 |
* @param poolSize the number of worker threads |
243 |
* @throws IllegalArgumentException if poolSize less than or |
244 |
* equal to zero |
245 |
* @throws SecurityException if a security manager exists and |
246 |
* the caller is not permitted to modify threads |
247 |
* because it does not hold {@link |
248 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
249 |
*/ |
250 |
public ForkJoinPool(int poolSize) { |
251 |
this(poolSize, defaultForkJoinWorkerThreadFactory); |
252 |
} |
253 |
|
254 |
/** |
255 |
* Creates a ForkJoinPool with a pool size equal to the number of |
256 |
* processors available on the system and using the given |
257 |
* ForkJoinWorkerThreadFactory, |
258 |
* @param factory the factory for creating new threads |
259 |
* @throws NullPointerException if factory is null |
260 |
* @throws SecurityException if a security manager exists and |
261 |
* the caller is not permitted to modify threads |
262 |
* because it does not hold {@link |
263 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
264 |
*/ |
265 |
public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { |
266 |
this(Runtime.getRuntime().availableProcessors(), factory); |
267 |
} |
268 |
|
269 |
/** |
270 |
* Creates a ForkJoinPool with the indicated number of worker |
271 |
* threads and the given factory. |
272 |
* |
273 |
* <p> You can also add and remove threads while the pool is |
274 |
* running. But it is generally more efficient and leads to more |
275 |
* predictable performance to initialize the pool with a |
276 |
* sufficient number of threads to support the desired concurrency |
277 |
* level and leave this value fixed. |
278 |
* |
279 |
* @param poolSize the number of worker threads |
280 |
* @param factory the factory for creating new threads |
281 |
* @throws IllegalArgumentException if poolSize less than or |
282 |
* equal to zero |
283 |
* @throws NullPointerException if factory is null |
284 |
* @throws SecurityException if a security manager exists and |
285 |
* the caller is not permitted to modify threads |
286 |
* because it does not hold {@link |
287 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
288 |
*/ |
289 |
public ForkJoinPool(int poolSize, ForkJoinWorkerThreadFactory factory) { |
290 |
if (poolSize <= 0) |
291 |
throw new IllegalArgumentException(); |
292 |
if (factory == null) |
293 |
throw new NullPointerException(); |
294 |
checkPermission(); |
295 |
this.poolSize = poolSize; |
296 |
this.factory = factory; |
297 |
this.poolNumber = poolNumberGenerator.incrementAndGet(); |
298 |
this.poolBarrier = new PoolBarrier(); |
299 |
this.runState = new RunState(); |
300 |
this.runningSubmissions = new AtomicInteger(); |
301 |
this.workerLock = new ReentrantLock(); |
302 |
this.termination = workerLock.newCondition(); |
303 |
SQNode dummy = new SQNode(null); |
304 |
this.sqHead = dummy; |
305 |
this.sqTail = dummy; |
306 |
createAndStartWorkers(poolSize); |
307 |
} |
308 |
|
309 |
/** |
310 |
* Initial worker array and worker creation and startup. (This |
311 |
* must be done under lock to avoid interference by some of the |
312 |
* newly started threads while creating others.) |
313 |
*/ |
314 |
private void createAndStartWorkers(int ps) { |
315 |
final ReentrantLock lock = this.workerLock; |
316 |
lock.lock(); |
317 |
try { |
318 |
ForkJoinWorkerThread[] ws = |
319 |
new ForkJoinWorkerThread[workerSizeFor(ps)]; |
320 |
workers = ws; |
321 |
for (int i = 0; i < ps; ++i) |
322 |
ws[i] = createWorker(i); |
323 |
for (int i = 0; i < ps; ++i) { |
324 |
ws[i].start(); |
325 |
++runningWorkers; |
326 |
} |
327 |
} finally { |
328 |
lock.unlock(); |
329 |
} |
330 |
} |
331 |
|
332 |
/** |
333 |
* Performs the given task; returning its result upon completion |
334 |
* @param task the task |
335 |
* @return the task's result |
336 |
* @throws NullPointerException if task is null |
337 |
* @throws RejectedExecutionException if pool is shut down |
338 |
*/ |
339 |
public <T> T invoke(ForkJoinTask<T> task) { |
340 |
return doSubmit(task).awaitInvoke(); |
341 |
} |
342 |
|
343 |
/** |
344 |
* Arranges for (asynchronous) execution of the given task, |
345 |
* returning a {@link Future} that may be used to obtain results |
346 |
* upon completion. (The only supported operations on this object |
347 |
* are those defined in the <tt>Future</tt> interface.) |
348 |
* @param task the task |
349 |
* @return a Future that can be used to get the task's results. |
350 |
* @throws NullPointerException if task is null |
351 |
* @throws RejectedExecutionException if pool is shut down |
352 |
*/ |
353 |
public <T> Future<T> submit(ForkJoinTask<T> task) { |
354 |
return doSubmit(task); |
355 |
} |
356 |
|
357 |
/** |
358 |
* Arranges for (asynchronous) execution of the given task. |
359 |
* @param task the task |
360 |
* @throws NullPointerException if task is null |
361 |
* @throws RejectedExecutionException if pool is shut down |
362 |
*/ |
363 |
public <T> void execute(ForkJoinTask<T> task) { |
364 |
doSubmit(task); |
365 |
} |
366 |
|
367 |
/** |
368 |
* Common code for invoke and submit |
369 |
*/ |
370 |
private <T> Submission<T> doSubmit(ForkJoinTask<T> task) { |
371 |
if (task == null) |
372 |
throw new NullPointerException(); |
373 |
if (runState.isAtLeastShutdown()) |
374 |
throw new RejectedExecutionException(); |
375 |
Submission<T> job = new Submission<T>(task, this); |
376 |
addSubmission(job); |
377 |
poolBarrier.signal(); |
378 |
return job; |
379 |
} |
380 |
|
381 |
/** |
382 |
* Returns the targeted number of worker threads in this pool. |
383 |
* This value does not necessarily reflect transient changes as |
384 |
* threads are added, removed, or abruptly terminate. |
385 |
* |
386 |
* @return the number of worker threads in this pool |
387 |
*/ |
388 |
public int getPoolSize() { |
389 |
return poolSize; |
390 |
} |
391 |
|
392 |
/** |
393 |
* Equivalent to {@link #getPoolSize}. |
394 |
* |
395 |
* @return the number of worker threads in this pool |
396 |
*/ |
397 |
public int getParallelismLevel() { |
398 |
return poolSize; |
399 |
} |
400 |
|
401 |
/** |
402 |
* Returns the number of worker threads that have started but not |
403 |
* yet terminated. This result returned by this method may differ |
404 |
* from <tt>getPoolSize</tt> when threads are added, removed, or |
405 |
* abruptly terminate. |
406 |
* |
407 |
* @return the number of worker threads |
408 |
*/ |
409 |
public int getRunningWorkerCount() { |
410 |
int r; |
411 |
final ReentrantLock lock = this.workerLock; |
412 |
lock.lock(); |
413 |
try { |
414 |
r = runningWorkers; |
415 |
} finally { |
416 |
lock.unlock(); |
417 |
} |
418 |
return r; |
419 |
} |
420 |
|
421 |
/** |
422 |
* Sets the handler for internal worker threads that terminate due |
423 |
* to unrecoverable errors encountered while executing tasks. |
424 |
* Unless set, the current default or ThreadGroup handler is used |
425 |
* as handler. |
426 |
* |
427 |
* @param h the new handler |
428 |
* @return the old handler, or null if none |
429 |
* @throws SecurityException if a security manager exists and |
430 |
* the caller is not permitted to modify threads |
431 |
* because it does not hold {@link |
432 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
433 |
*/ |
434 |
public Thread.UncaughtExceptionHandler |
435 |
setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { |
436 |
checkPermission(); |
437 |
Thread.UncaughtExceptionHandler old = null; |
438 |
final ReentrantLock lock = this.workerLock; |
439 |
lock.lock(); |
440 |
try { |
441 |
old = ueh; |
442 |
ueh = h; |
443 |
ForkJoinWorkerThread[] ws = workers; |
444 |
for (int i = 0; i < ws.length; ++i) { |
445 |
ForkJoinWorkerThread w = ws[i]; |
446 |
if (w != null) |
447 |
w.setUncaughtExceptionHandler(h); |
448 |
} |
449 |
} finally { |
450 |
lock.unlock(); |
451 |
} |
452 |
return old; |
453 |
} |
454 |
|
455 |
/** |
456 |
* Returns the handler for internal worker threads that terminate |
457 |
* due to unrecoverable errors encountered while executing tasks. |
458 |
* @return the handler, or null if none |
459 |
*/ |
460 |
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { |
461 |
Thread.UncaughtExceptionHandler h; |
462 |
final ReentrantLock lock = this.workerLock; |
463 |
lock.lock(); |
464 |
try { |
465 |
h = ueh; |
466 |
} finally { |
467 |
lock.unlock(); |
468 |
} |
469 |
return h; |
470 |
} |
471 |
|
472 |
/** |
473 |
* Tries to adds the indicated number of new worker threads to the |
474 |
* pool. This method may be used to increase the amount of |
475 |
* parallelism available to tasks. The actual number of |
476 |
* threads added may be less than requested if the pool |
477 |
* is terminating or terminated |
478 |
* @return the number of threads added |
479 |
* @throws SecurityException if a security manager exists and |
480 |
* the caller is not permitted to modify threads |
481 |
* because it does not hold {@link |
482 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
483 |
*/ |
484 |
public int addWorkers(int numberToAdd) { |
485 |
int nadded = 0; |
486 |
checkPermission(); |
487 |
final ReentrantLock lock = this.workerLock; |
488 |
lock.lock(); |
489 |
try { |
490 |
if (!runState.isAtLeastStopping()) { |
491 |
ForkJoinWorkerThread[] ws = workers; |
492 |
int len = ws.length; |
493 |
int newLen = len + numberToAdd; |
494 |
int newSize = workerSizeFor(newLen); |
495 |
ForkJoinWorkerThread[] nws = |
496 |
new ForkJoinWorkerThread[newSize]; |
497 |
System.arraycopy(ws, 0, nws, 0, len); |
498 |
for (int i = len; i < newLen; ++i) |
499 |
nws[i] = createWorker(i); |
500 |
workers = nws; |
501 |
for (int i = len; i < newLen; ++i) { |
502 |
nws[i].start(); |
503 |
++runningWorkers; |
504 |
} |
505 |
poolSize += numberToAdd; |
506 |
nadded = numberToAdd; |
507 |
} |
508 |
} finally { |
509 |
lock.unlock(); |
510 |
} |
511 |
return nadded; |
512 |
} |
513 |
|
514 |
/** |
515 |
* Tries to remove the indicated number of worker threads from the |
516 |
* pool. The workers will exit the next time they are idle. This |
517 |
* method may be used to decrease the amount of parallelism |
518 |
* available to tasks. The actual number of workers removed |
519 |
* may be less than requested if the pool size would become |
520 |
* zero or the pool is terminating or terminated. |
521 |
* @return the number removed. |
522 |
* @throws SecurityException if a security manager exists and |
523 |
* the caller is not permitted to modify threads |
524 |
* because it does not hold {@link |
525 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
526 |
*/ |
527 |
public int removeWorkers(int numberToRemove) { |
528 |
int nremoved = 0; |
529 |
checkPermission(); |
530 |
final ReentrantLock lock = this.workerLock; |
531 |
lock.lock(); |
532 |
try { |
533 |
// shutdown in rightmost order to enable shrinkage in |
534 |
// workerTerminated |
535 |
ForkJoinWorkerThread[] ws = workers; |
536 |
int k = ws.length; |
537 |
while (!runState.isAtLeastStopping() && |
538 |
--k > 0 && // don't kill ws[0] |
539 |
nremoved < numberToRemove) { |
540 |
ForkJoinWorkerThread w = ws[k]; |
541 |
if (w != null) { |
542 |
RunState rs = w.getRunState(); |
543 |
if (rs.transitionToShutdown()) { |
544 |
--poolSize; |
545 |
++nremoved; |
546 |
} |
547 |
} |
548 |
} |
549 |
} finally { |
550 |
lock.unlock(); |
551 |
} |
552 |
return nremoved; |
553 |
} |
554 |
|
555 |
/** |
556 |
* Tries to add or remove workers to attain the given pool size. |
557 |
* This may fail to attain the given target if the pool is |
558 |
* terminating or terminated. |
559 |
* @param newSize the target poolSize |
560 |
* @return the pool size upon exit of this method |
561 |
* @throws IllegalArgumentException if newSize less than or |
562 |
* equal to zero |
563 |
* @throws SecurityException if a security manager exists and |
564 |
* the caller is not permitted to modify threads |
565 |
* because it does not hold {@link |
566 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
567 |
*/ |
568 |
public int setPoolSize(int newSize) { |
569 |
checkPermission(); |
570 |
if (newSize <= 0) |
571 |
throw new IllegalArgumentException(); |
572 |
final ReentrantLock lock = this.workerLock; |
573 |
lock.lock(); |
574 |
try { |
575 |
int ps = poolSize; |
576 |
if (newSize > ps) |
577 |
addWorkers(newSize - ps); |
578 |
else if (newSize < ps) |
579 |
removeWorkers(ps - newSize); |
580 |
} finally { |
581 |
lock.unlock(); |
582 |
} |
583 |
return poolSize; |
584 |
} |
585 |
|
586 |
/** |
587 |
* Callback from terminating worker. |
588 |
* @param w the worker |
589 |
* @param ex the exception causing abrupt termination, or null if |
590 |
* completed normally |
591 |
*/ |
592 |
final void workerTerminated(ForkJoinWorkerThread w, Throwable ex) { |
593 |
try { |
594 |
final ReentrantLock lock = this.workerLock; |
595 |
lock.lock(); |
596 |
try { |
597 |
// Unless stopping, null slot, and if rightmost slots |
598 |
// now null, shrink |
599 |
if (!runState.isAtLeastStopping()) { |
600 |
int idx = w.getWorkerPoolIndex(); |
601 |
ForkJoinWorkerThread[] ws = workers; |
602 |
int len = ws.length; |
603 |
if (idx >= 0 && idx < len && ws[idx] == w) { |
604 |
ws[idx] = null; |
605 |
int newlen = len; |
606 |
while (newlen > 0 && ws[newlen-1] == null) |
607 |
--newlen; |
608 |
if (newlen < len) { |
609 |
int newSize = workerSizeFor(newlen); |
610 |
if (newSize < len) { |
611 |
ForkJoinWorkerThread[] nws = |
612 |
new ForkJoinWorkerThread[newSize]; |
613 |
System.arraycopy(ws, 0, nws, 0, newlen); |
614 |
workers = nws; |
615 |
poolBarrier.signal(); |
616 |
} |
617 |
} |
618 |
} |
619 |
} |
620 |
if (--runningWorkers == 0) { |
621 |
terminate(); // no-op if already stopping |
622 |
runState.transitionToTerminated(); |
623 |
termination.signalAll(); |
624 |
} |
625 |
} finally { |
626 |
lock.unlock(); |
627 |
} |
628 |
} finally { |
629 |
if (ex != null) |
630 |
ForkJoinTask.rethrowException(ex); |
631 |
} |
632 |
} |
633 |
|
634 |
// lifecycle control |
635 |
|
636 |
/** |
637 |
* Initiates an orderly shutdown in which previously submitted |
638 |
* tasks are executed, but no new tasks will be accepted. |
639 |
* Invocation has no additional effect if already shut down. |
640 |
* Tasks that are in the process of being submitted concurrently |
641 |
* during the course of this method may or may not be rejected. |
642 |
* @throws SecurityException if a security manager exists and |
643 |
* the caller is not permitted to modify threads |
644 |
* because it does not hold {@link |
645 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
646 |
*/ |
647 |
public void shutdown() { |
648 |
checkPermission(); |
649 |
runState.transitionToShutdown(); |
650 |
tryTerminateOnShutdown(); |
651 |
} |
652 |
|
653 |
/** |
654 |
* Attempts to stop all actively executing tasks, and cancels all |
655 |
* waiting tasks. Tasks that are in the process of being |
656 |
* submitted or executed concurrently during the course of this |
657 |
* method may or may not be rejected. |
658 |
* @throws SecurityException if a security manager exists and |
659 |
* the caller is not permitted to modify threads |
660 |
* because it does not hold {@link |
661 |
* java.lang.RuntimePermission}<tt>("modifyThread")</tt>, |
662 |
*/ |
663 |
public void shutdownNow() { |
664 |
checkPermission(); |
665 |
terminate(); |
666 |
} |
667 |
|
668 |
/** |
669 |
* Returns <tt>true</tt> if this pool has been shut down. |
670 |
* |
671 |
* @return <tt>true</tt> if this pool has been shut down |
672 |
*/ |
673 |
public boolean isShutdown() { |
674 |
return runState.isAtLeastShutdown(); |
675 |
} |
676 |
|
677 |
/** |
678 |
* Returns <tt>true</tt> if all tasks have completed following shut down. |
679 |
* |
680 |
* @return <tt>true</tt> if all tasks have completed following shut down |
681 |
*/ |
682 |
public boolean isTerminated() { |
683 |
return runState.isTerminated(); |
684 |
} |
685 |
|
686 |
/** |
687 |
* Returns <tt>true</tt> if termination has commenced but has |
688 |
* not yet completed. |
689 |
* |
690 |
* @return <tt>true</tt> if in the process of terminating |
691 |
*/ |
692 |
public boolean isTerminating() { |
693 |
return runState.isStopping(); |
694 |
} |
695 |
|
696 |
/** |
697 |
* Blocks until all tasks have completed execution after a shutdown |
698 |
* request, or the timeout occurs, or the current thread is |
699 |
* interrupted, whichever happens first. |
700 |
* |
701 |
* @param timeout the maximum time to wait |
702 |
* @param unit the time unit of the timeout argument |
703 |
* @return <tt>true</tt> if this executor terminated and |
704 |
* <tt>false</tt> if the timeout elapsed before termination |
705 |
* @throws InterruptedException if interrupted while waiting |
706 |
*/ |
707 |
public boolean awaitTermination(long timeout, TimeUnit unit) |
708 |
throws InterruptedException { |
709 |
long nanos = unit.toNanos(timeout); |
710 |
final ReentrantLock lock = this.workerLock; |
711 |
lock.lock(); |
712 |
try { |
713 |
for (;;) { |
714 |
if (runState.isTerminated()) |
715 |
return true; |
716 |
if (nanos <= 0) |
717 |
return false; |
718 |
nanos = termination.awaitNanos(nanos); |
719 |
} |
720 |
} finally { |
721 |
lock.unlock(); |
722 |
} |
723 |
} |
724 |
|
725 |
/** |
726 |
* Initiate termination. |
727 |
*/ |
728 |
private void terminate() { |
729 |
if (runState.transitionToStopping()) { |
730 |
stopAllWorkers(); |
731 |
cancelQueuedSubmissions(); |
732 |
cancelQueuedWorkerTasks(); |
733 |
interruptUnterminatedWorkers(); |
734 |
} |
735 |
} |
736 |
|
737 |
/** |
738 |
* Check for termination in shutdown state |
739 |
*/ |
740 |
private void tryTerminateOnShutdown() { |
741 |
if (runState.isAtLeastShutdown() && |
742 |
runningSubmissions.get() == 0 && |
743 |
!hasQueuedSubmissions() && |
744 |
runningSubmissions.get() == 0) // recheck |
745 |
terminate(); |
746 |
} |
747 |
|
748 |
/** |
749 |
* Clear out and cancel submissions |
750 |
*/ |
751 |
private void cancelQueuedSubmissions() { |
752 |
Submission<?> task; |
753 |
while (hasQueuedSubmissions() && (task = pollSubmission()) != null) |
754 |
task.cancel(false); |
755 |
} |
756 |
|
757 |
/** |
758 |
* Clean out worker queues. |
759 |
*/ |
760 |
private void cancelQueuedWorkerTasks() { |
761 |
final ReentrantLock lock = this.workerLock; |
762 |
lock.lock(); |
763 |
try { |
764 |
ForkJoinWorkerThread[] ws = workers; |
765 |
for (int i = 0; i < ws.length; ++i) { |
766 |
ForkJoinWorkerThread t = ws[i]; |
767 |
if (t != null) |
768 |
t.cancelTasks(); |
769 |
} |
770 |
} finally { |
771 |
lock.unlock(); |
772 |
} |
773 |
} |
774 |
|
775 |
/** |
776 |
* Set each worker's status to stopping. Requires lock to avoid |
777 |
* conflicts with add/remove |
778 |
*/ |
779 |
private void stopAllWorkers() { |
780 |
final ReentrantLock lock = this.workerLock; |
781 |
lock.lock(); |
782 |
try { |
783 |
ForkJoinWorkerThread[] ws = workers; |
784 |
for (int i = 0; i < ws.length; ++i) { |
785 |
ForkJoinWorkerThread t = ws[i]; |
786 |
if (t != null) { |
787 |
RunState rs = t.getRunState(); |
788 |
rs.transitionToStopping(); |
789 |
} |
790 |
} |
791 |
} finally { |
792 |
lock.unlock(); |
793 |
} |
794 |
poolBarrier.signal(); |
795 |
} |
796 |
|
797 |
/** |
798 |
* Interrupt all unterminated workers. This is not required for |
799 |
* sake of internal control, but may help unstick user code during |
800 |
* shutdown. |
801 |
*/ |
802 |
private void interruptUnterminatedWorkers() { |
803 |
final ReentrantLock lock = this.workerLock; |
804 |
lock.lock(); |
805 |
try { |
806 |
ForkJoinWorkerThread[] ws = workers; |
807 |
for (int i = 0; i < ws.length; ++i) { |
808 |
ForkJoinWorkerThread t = ws[i]; |
809 |
if (t != null) { |
810 |
RunState rs = t.getRunState(); |
811 |
if (!rs.isTerminated()) { |
812 |
try { |
813 |
t.interrupt(); |
814 |
} catch (SecurityException ignore) { |
815 |
} |
816 |
} |
817 |
} |
818 |
} |
819 |
} finally { |
820 |
lock.unlock(); |
821 |
} |
822 |
} |
823 |
|
824 |
// Status queries |
825 |
|
826 |
/** |
827 |
* Returns true if all worker threads are currently idle. An idle |
828 |
* worker is one that cannot obtain a task to execute because none |
829 |
* are available to steal from other threads, and there are no |
830 |
* pending submissions to the pool. This method is conservative: |
831 |
* It might not return true immediately upon idleness of all |
832 |
* threads, but will eventually become true if threads remain |
833 |
* inactive. |
834 |
* @return true if all threads are currently idle |
835 |
*/ |
836 |
public final boolean isQuiescent() { |
837 |
return activeCount == 0; |
838 |
} |
839 |
|
840 |
/** |
841 |
* Returns the approximate number of threads that are |
842 |
* currently executing tasks. This method may overestimate |
843 |
* the number of active threads. |
844 |
* @return the number of active threads. |
845 |
*/ |
846 |
public final int getActiveThreadCount() { |
847 |
return activeCount; |
848 |
} |
849 |
|
850 |
/** |
851 |
* Returns the approximate number of threads that are currently |
852 |
* idle waiting for tasks. This method may underestimate the |
853 |
* number of idle threads. |
854 |
* @return the number of idle threads. |
855 |
*/ |
856 |
public final int getIdleThreadCount() { |
857 |
return poolSize - activeCount; |
858 |
} |
859 |
|
860 |
/** |
861 |
* Returns the total number of tasks stolen from one thread's work |
862 |
* queue by another. This value is only an approximation, obtained |
863 |
* by iterating across all threads in the pool, and may |
864 |
* mis-estimate the actual total number of steals when the pool is |
865 |
* not quiescent. But the value is still useful for monitoring and |
866 |
* tuning fork/join programs: In general, steal counts should be |
867 |
* high enough to keep threads busy, but low enough to avoid |
868 |
* overhead and contention across threads. |
869 |
* @return the number of steals. |
870 |
*/ |
871 |
public long getStealCount() { |
872 |
long sum = 0; |
873 |
ForkJoinWorkerThread[] ws = workers; |
874 |
for (int i = 0; i < ws.length; ++i) { |
875 |
ForkJoinWorkerThread t = ws[i]; |
876 |
if (t != null) |
877 |
sum += t.getWorkerStealCount(); |
878 |
} |
879 |
return sum; |
880 |
} |
881 |
|
882 |
/** |
883 |
* Returns the total number of tasks currently held in queues by |
884 |
* worker threads (but not including tasks submitted to the pool |
885 |
* that have not begun executing). This value is only an |
886 |
* approximation, obtained by iterating across all threads in the |
887 |
* pool. This method may be useful for tuning task granularities. |
888 |
* @return the number of tasks. |
889 |
*/ |
890 |
public long getTotalPerThreadQueueSize() { |
891 |
long count = 0; |
892 |
ForkJoinWorkerThread[] ws = workers; |
893 |
for (int i = 0; i < ws.length; ++i) { |
894 |
ForkJoinWorkerThread t = ws[i]; |
895 |
if (t != null) |
896 |
count += t.getQueueSize(); |
897 |
} |
898 |
return count; |
899 |
} |
900 |
|
901 |
/** |
902 |
* Returns the number of tasks that have been submitted (via |
903 |
* <tt>submit</tt> or <tt>invoke</tt>) and are currently executing |
904 |
* in the pool. |
905 |
* @return the number of tasks. |
906 |
*/ |
907 |
public int getActiveSubmissionCount() { |
908 |
return runningSubmissions.get(); |
909 |
} |
910 |
|
911 |
/** |
912 |
* Returns the factory used for constructing new workers |
913 |
* |
914 |
* @return the factory used for constructing new workers |
915 |
*/ |
916 |
public ForkJoinWorkerThreadFactory getFactory() { |
917 |
return factory; |
918 |
} |
919 |
|
920 |
// Callbacks from submissions |
921 |
|
922 |
/** |
923 |
* Callback on starting execution of externally submitted job. |
924 |
*/ |
925 |
final void submissionStarting() { |
926 |
runningSubmissions.incrementAndGet(); |
927 |
} |
928 |
|
929 |
/** |
930 |
* Completion callback from externally submitted job. |
931 |
*/ |
932 |
final void submissionCompleted() { |
933 |
if (runningSubmissions.decrementAndGet() == 0 && |
934 |
runState.isAtLeastShutdown()) |
935 |
tryTerminateOnShutdown(); |
936 |
} |
937 |
|
938 |
/** |
939 |
* Wait for a pool event, if necessary. Called only by workers. |
940 |
*/ |
941 |
final long barrierSync(long eventCount) { |
942 |
return poolBarrier.sync(eventCount); |
943 |
} |
944 |
|
945 |
/** |
946 |
* Embedded submission queue holds submissions not yet started by |
947 |
* workers. This is a variant of a Michael/Scott queue that |
948 |
* supports a fast check for apparent emptiness. This class |
949 |
* opportunistically subclasses AtomicReference for next-field. |
950 |
*/ |
951 |
static final class SQNode extends AtomicReference<SQNode> { |
952 |
Submission<?> submission; |
953 |
SQNode(Submission<?> s) { submission = s; } |
954 |
} |
955 |
|
956 |
/** |
957 |
* Quick check for likely non-emptiness. Returns true if an |
958 |
* add completed but not yet fully taken. |
959 |
*/ |
960 |
final boolean mayHaveQueuedSubmissions() { |
961 |
return sqHead != sqTail; |
962 |
} |
963 |
|
964 |
/** |
965 |
* Returns true if there are any tasks submitted to this pool |
966 |
* that have not yet begun executing. |
967 |
* @return <tt>true</tt> if there are any queued submissions. |
968 |
*/ |
969 |
public boolean hasQueuedSubmissions() { |
970 |
for (;;) { |
971 |
SQNode h = sqHead; |
972 |
SQNode t = sqTail; |
973 |
SQNode f = h.get(); |
974 |
if (h == sqHead) { |
975 |
if (f == null) |
976 |
return false; |
977 |
else if (h != t) |
978 |
return true; |
979 |
else |
980 |
casSqTail(t, f); |
981 |
} |
982 |
} |
983 |
} |
984 |
|
985 |
final void addSubmission(Submission<?> x) { |
986 |
SQNode n = new SQNode(x); |
987 |
for (;;) { |
988 |
SQNode t = sqTail; |
989 |
SQNode s = t.get(); |
990 |
if (t == sqTail) { |
991 |
if (s != null) |
992 |
casSqTail(t, s); |
993 |
else if (t.compareAndSet(s, n)) { |
994 |
casSqTail(t, n); |
995 |
return; |
996 |
} |
997 |
} |
998 |
} |
999 |
} |
1000 |
|
1001 |
final Submission<?> pollSubmission() { |
1002 |
for (;;) { |
1003 |
SQNode h = sqHead; |
1004 |
SQNode t = sqTail; |
1005 |
SQNode f = h.get(); |
1006 |
if (h == sqHead) { |
1007 |
if (f == null) |
1008 |
return null; |
1009 |
else if (h == t) |
1010 |
casSqTail(t, f); |
1011 |
else if (casSqHead(h, f)) { |
1012 |
Submission<?> x = f.submission; |
1013 |
f.submission = null; |
1014 |
return x; |
1015 |
} |
1016 |
} |
1017 |
} |
1018 |
} |
1019 |
|
1020 |
// Temporary Unsafe mechanics for preliminary release |
1021 |
|
1022 |
static final Unsafe _unsafe; |
1023 |
static final long activeCountOffset; |
1024 |
static final long sqHeadOffset; |
1025 |
static final long sqTailOffset; |
1026 |
|
1027 |
static { |
1028 |
try { |
1029 |
if (ForkJoinPool.class.getClassLoader() != null) { |
1030 |
Field f = Unsafe.class.getDeclaredField("theUnsafe"); |
1031 |
f.setAccessible(true); |
1032 |
_unsafe = (Unsafe)f.get(null); |
1033 |
} |
1034 |
else |
1035 |
_unsafe = Unsafe.getUnsafe(); |
1036 |
activeCountOffset = _unsafe.objectFieldOffset |
1037 |
(ForkJoinPool.class.getDeclaredField("activeCount")); |
1038 |
sqHeadOffset = _unsafe.objectFieldOffset |
1039 |
(ForkJoinPool.class.getDeclaredField("sqHead")); |
1040 |
sqTailOffset = _unsafe.objectFieldOffset |
1041 |
(ForkJoinPool.class.getDeclaredField("sqTail")); |
1042 |
} catch (Exception e) { |
1043 |
throw new RuntimeException("Could not initialize intrinsics", e); |
1044 |
} |
1045 |
} |
1046 |
|
1047 |
final boolean tryIncrementActiveCount() { |
1048 |
int c = activeCount; |
1049 |
return _unsafe.compareAndSwapInt(this, activeCountOffset, |
1050 |
c, c+1); |
1051 |
} |
1052 |
|
1053 |
final boolean tryDecrementActiveCount() { |
1054 |
int c = activeCount; |
1055 |
return _unsafe.compareAndSwapInt(this, activeCountOffset, |
1056 |
c, c-1); |
1057 |
} |
1058 |
|
1059 |
final void incrementActiveCount() { |
1060 |
while (!tryIncrementActiveCount()) |
1061 |
; |
1062 |
} |
1063 |
|
1064 |
final void decrementActiveCount() { |
1065 |
while (!tryDecrementActiveCount()) |
1066 |
; |
1067 |
} |
1068 |
|
1069 |
private boolean casSqTail(SQNode cmp, SQNode val) { |
1070 |
return _unsafe.compareAndSwapObject(this, sqTailOffset, cmp, val); |
1071 |
} |
1072 |
|
1073 |
private boolean casSqHead(SQNode cmp, SQNode val) { |
1074 |
return _unsafe.compareAndSwapObject(this, sqHeadOffset, cmp, val); |
1075 |
} |
1076 |
} |