290 |
|
* @author Doug Lea |
291 |
|
*/ |
292 |
|
public class ThreadPoolExecutor extends AbstractExecutorService { |
293 |
< |
/** |
294 |
< |
* Only used to force toArray() to produce a Runnable[]. |
293 |
> |
|
294 |
> |
/* |
295 |
> |
* A TPE manages a largish set of control fields, mainly runState, |
296 |
> |
* poolSize, corePoolSize, maximumPoolSize. In general, state |
297 |
> |
* changes only occur within mainLock regions, but nearly all |
298 |
> |
* fields are volatile, so can be read outside of locked |
299 |
> |
* regions. This enables the most performance-critical actions, |
300 |
> |
* such as enqueuing and dequeing tasks in workQueue, to normally |
301 |
> |
* proceed without holding this lock when they see that the state |
302 |
> |
* allows actions. This sometimes requires a form of double-check. |
303 |
> |
* For example when it appears that poolSize is less than |
304 |
> |
* corePoolSize, addIfUnderCorePoolSize is called, which checks |
305 |
> |
* sizes and runState under the lock before actually creating a |
306 |
> |
* new thread. |
307 |
> |
* |
308 |
> |
* The main lifecyle control is via runState, taking on values: |
309 |
> |
* RUNNING: Accept new tasks and process queued tasks |
310 |
> |
* SHUTDOWN: Don't accept new tasks, but process queued tasks |
311 |
> |
* STOP: Don't accept new tasks, don't process queued tasks, |
312 |
> |
* and interrupt in-progress tasks |
313 |
> |
* TERMINATED: Same as stop, plus all threads have terminated |
314 |
> |
* with transitions: |
315 |
> |
* |
316 |
> |
* RUNNING -> SHUTDOWN |
317 |
> |
* On invocation of shutdown() when pool or queue nonempty |
318 |
> |
* {RUNNING or SHUTDOWN} -> STOP |
319 |
> |
* On invocation of shutdownNow() when pool or queue nonempty |
320 |
> |
* {SHUTDOWN or STOP} -> TERMINATED |
321 |
> |
* When both queue and pool become empty |
322 |
> |
* RUNNING -> TERMINATED |
323 |
> |
* On invocation of shutdown when both queue and pool empty |
324 |
> |
* (This bypasses creating a new thread just to cause termination) |
325 |
> |
* |
326 |
|
*/ |
296 |
– |
private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0]; |
327 |
|
|
328 |
|
/** |
329 |
|
* Permission for checking shutdown |
337 |
|
private final BlockingQueue<Runnable> workQueue; |
338 |
|
|
339 |
|
/** |
340 |
< |
* Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and |
341 |
< |
* workers set. |
340 |
> |
* Lock held on updates to poolSize, corePoolSize, |
341 |
> |
* maximumPoolSize, runState, and workers set. |
342 |
|
*/ |
343 |
|
private final ReentrantLock mainLock = new ReentrantLock(); |
344 |
|
|
391 |
|
*/ |
392 |
|
volatile int runState; |
393 |
|
|
394 |
< |
// Special values for runState |
394 |
> |
/* |
395 |
> |
* Special values for runState. The numerical order among values |
396 |
> |
* matters. The runState monotonically increases over time, but |
397 |
> |
* need not hit each state. |
398 |
> |
*/ |
399 |
|
/** Normal, not-shutdown mode */ |
400 |
|
static final int RUNNING = 0; |
401 |
|
/** Controlled shutdown mode */ |
447 |
|
* @return the new thread, or null if threadFactory fails to create thread |
448 |
|
*/ |
449 |
|
private Thread addThread(Runnable firstTask) { |
416 |
– |
if (runState == TERMINATED) // Don't create thread if terminated |
417 |
– |
return null; |
450 |
|
Worker w = new Worker(firstTask); |
451 |
|
Thread t = threadFactory.newThread(w); |
452 |
|
if (t != null) { |
471 |
|
final ReentrantLock mainLock = this.mainLock; |
472 |
|
mainLock.lock(); |
473 |
|
try { |
474 |
< |
if (poolSize < corePoolSize) |
474 |
> |
if (poolSize < corePoolSize && runState == RUNNING) |
475 |
|
t = addThread(firstTask); |
476 |
|
} finally { |
477 |
|
mainLock.unlock(); |
500 |
|
final ReentrantLock mainLock = this.mainLock; |
501 |
|
mainLock.lock(); |
502 |
|
try { |
503 |
< |
if (poolSize < maximumPoolSize) { |
503 |
> |
if (poolSize < maximumPoolSize && runState == RUNNING) { |
504 |
|
Runnable next = workQueue.poll(); |
505 |
|
if (next == null) { |
506 |
|
next = firstTask; |
518 |
|
return status; |
519 |
|
} |
520 |
|
|
489 |
– |
|
521 |
|
/** |
522 |
|
* Gets the next task for a worker thread to run. |
523 |
|
* @return the task |
559 |
|
return workQueue.take(); |
560 |
|
} |
561 |
|
|
562 |
< |
case STOP: |
562 |
> |
default: // stopping/stopped |
563 |
|
return null; |
533 |
– |
default: |
534 |
– |
assert false; |
564 |
|
} |
565 |
|
} catch (InterruptedException ie) { |
566 |
|
// On interruption, re-check runstate |
568 |
|
} |
569 |
|
} |
570 |
|
|
571 |
+ |
|
572 |
+ |
/** |
573 |
+ |
* Rejects a task that was queued concurrently with a call to |
574 |
+ |
* shutdownNow. If still present in the queue, this task must be |
575 |
+ |
* removed and rejected to preserve shutdownNow guarantees. |
576 |
+ |
* @param command the task |
577 |
+ |
*/ |
578 |
+ |
private void rejectIfQueued(Runnable command) { |
579 |
+ |
final ReentrantLock mainLock = this.mainLock; |
580 |
+ |
mainLock.lock(); |
581 |
+ |
boolean present; |
582 |
+ |
try { |
583 |
+ |
present = workQueue.remove(command); |
584 |
+ |
} finally { |
585 |
+ |
mainLock.unlock(); |
586 |
+ |
} |
587 |
+ |
if (present) |
588 |
+ |
reject(command); |
589 |
+ |
} |
590 |
+ |
|
591 |
|
/** |
592 |
|
* Wakes up all threads that might be waiting for tasks. |
593 |
|
*/ |
612 |
|
try { |
613 |
|
completedTaskCount += w.completedTasks; |
614 |
|
workers.remove(w); |
615 |
< |
if (--poolSize > 0) |
616 |
< |
return; |
617 |
< |
|
618 |
< |
// Else, this is the last thread. Deal with potential shutdown. |
619 |
< |
|
620 |
< |
int state = runState; |
621 |
< |
assert state != TERMINATED; |
622 |
< |
|
574 |
< |
if (state != STOP) { |
575 |
< |
// If there are queued tasks but no threads, create |
576 |
< |
// replacement thread. We must create it initially |
577 |
< |
// idle to avoid orphaned tasks in case addThread |
578 |
< |
// fails. This also handles case of delayed tasks |
579 |
< |
// that will sometime later become runnable. |
580 |
< |
if (!workQueue.isEmpty()) { |
615 |
> |
if (--poolSize == 0) { // Deal with potential shutdown. |
616 |
> |
int state = runState; |
617 |
> |
// If not stopping and there are queued tasks but no |
618 |
> |
// threads, create replacement thread. We must create |
619 |
> |
// it initially idle to avoid orphaned tasks in case |
620 |
> |
// addThread fails. This also handles case of delayed |
621 |
> |
// tasks that will sometime later become runnable. |
622 |
> |
if (state < STOP && !workQueue.isEmpty()) { |
623 |
|
Thread t = addThread(null); |
624 |
|
if (t != null) |
625 |
|
t.start(); |
626 |
< |
return; |
626 |
> |
state = RUNNING; // to cause termination check to fail |
627 |
> |
} |
628 |
> |
if (state == STOP || state == SHUTDOWN) { // can terminate |
629 |
> |
runState = TERMINATED; |
630 |
> |
termination.signalAll(); |
631 |
> |
terminated(); |
632 |
|
} |
586 |
– |
|
587 |
– |
// Otherwise, we can exit without replacement |
588 |
– |
if (state == RUNNING) |
589 |
– |
return; |
633 |
|
} |
591 |
– |
|
592 |
– |
// Either state is STOP, or state is SHUTDOWN and there is |
593 |
– |
// no work to do. So we can terminate. |
594 |
– |
termination.signalAll(); |
595 |
– |
runState = TERMINATED; |
596 |
– |
// fall through to call terminate() outside of lock. |
634 |
|
} finally { |
635 |
|
mainLock.unlock(); |
636 |
|
} |
600 |
– |
|
601 |
– |
assert runState == TERMINATED; |
602 |
– |
terminated(); |
637 |
|
} |
638 |
|
|
639 |
|
/** |
640 |
|
* Worker threads |
641 |
|
*/ |
642 |
< |
private class Worker implements Runnable { |
642 |
> |
private final class Worker implements Runnable { |
643 |
|
|
644 |
|
/** |
645 |
|
* The runLock is acquired and released surrounding each task |
703 |
|
runLock.lock(); |
704 |
|
try { |
705 |
|
// If not shutting down then clear an outstanding interrupt. |
706 |
< |
if (runState != STOP && |
706 |
> |
if (runState < STOP && |
707 |
|
Thread.interrupted() && |
708 |
< |
runState == STOP) // Re-interrupt if stopped after clearing |
708 |
> |
runState >= STOP) // Re-interrupt if stopped after clearing |
709 |
|
thread.interrupt(); |
710 |
|
boolean ran = false; |
711 |
|
beforeExecute(thread, task); |
736 |
|
firstTask = null; |
737 |
|
while (task != null || (task = getTask()) != null) { |
738 |
|
runTask(task); |
739 |
< |
task = null; // unnecessary but can help GC |
739 |
> |
task = null; |
740 |
|
} |
741 |
|
} finally { |
742 |
|
workerDone(this); |
894 |
|
this.handler = handler; |
895 |
|
} |
896 |
|
|
863 |
– |
|
897 |
|
/** |
898 |
|
* Executes the given task sometime in the future. The task |
899 |
|
* may execute in a new thread or in an existing pooled thread. |
911 |
|
public void execute(Runnable command) { |
912 |
|
if (command == null) |
913 |
|
throw new NullPointerException(); |
914 |
< |
for (;;) { |
882 |
< |
if (runState != RUNNING) { |
883 |
< |
reject(command); |
884 |
< |
return; |
885 |
< |
} |
914 |
> |
while (runState == RUNNING) { |
915 |
|
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) |
916 |
|
return; |
917 |
< |
if (workQueue.offer(command)) |
917 |
> |
if (workQueue.offer(command)) {// recheck state after queuing |
918 |
> |
if (runState != RUNNING) |
919 |
> |
rejectIfQueued(command); |
920 |
> |
else if (poolSize < corePoolSize) |
921 |
> |
addIfUnderCorePoolSize(null); |
922 |
|
return; |
923 |
+ |
} |
924 |
|
int status = addIfUnderMaximumPoolSize(command); |
925 |
< |
if (status > 0) // created new thread |
925 |
> |
if (status > 0) // Created new thread to handle task |
926 |
|
return; |
927 |
< |
if (status == 0) { // failed to create thread |
928 |
< |
reject(command); |
929 |
< |
return; |
896 |
< |
} |
897 |
< |
// Retry if created a new thread but it is busy with another task |
927 |
> |
if (status == 0) // Cannot create thread |
928 |
> |
break; |
929 |
> |
// Retry if created thread but it is busy with another task |
930 |
|
} |
931 |
+ |
|
932 |
+ |
reject(command); // is shutdown or can't create thread or queue task |
933 |
+ |
return; |
934 |
|
} |
935 |
|
|
936 |
|
/** |
950 |
|
if (security != null) |
951 |
|
security.checkPermission(shutdownPerm); |
952 |
|
|
918 |
– |
boolean fullyTerminated = false; |
953 |
|
final ReentrantLock mainLock = this.mainLock; |
954 |
|
mainLock.lock(); |
955 |
|
try { |
956 |
< |
if (workers.size() > 0) { |
956 |
> |
if (security != null) { |
957 |
|
// Check if caller can modify worker threads. This |
958 |
|
// might not be true even if passed above check, if |
959 |
|
// the SecurityManager treats some threads specially. |
960 |
< |
if (security != null) { |
961 |
< |
for (Worker w: workers) |
962 |
< |
security.checkAccess(w.thread); |
929 |
< |
} |
960 |
> |
for (Worker w: workers) |
961 |
> |
security.checkAccess(w.thread); |
962 |
> |
} |
963 |
|
|
964 |
< |
int state = runState; |
965 |
< |
if (state == RUNNING) // don't override shutdownNow |
966 |
< |
runState = SHUTDOWN; |
964 |
> |
int state = runState; |
965 |
> |
if (state == RUNNING) // don't override shutdownNow |
966 |
> |
runState = SHUTDOWN; |
967 |
> |
int nworkers = 0; |
968 |
|
|
969 |
< |
try { |
970 |
< |
for (Worker w: workers) |
971 |
< |
w.interruptIfIdle(); |
972 |
< |
} catch (SecurityException se) { |
939 |
< |
// If SecurityManager allows above checks, but |
940 |
< |
// then unexpectedly throws exception when |
941 |
< |
// interrupting threads (which it ought not do), |
942 |
< |
// back out as cleanly as we can. Some threads may |
943 |
< |
// have been killed but we remain in non-shutdown |
944 |
< |
// state. |
945 |
< |
runState = state; |
946 |
< |
throw se; |
969 |
> |
try { |
970 |
> |
for (Worker w: workers) { |
971 |
> |
w.interruptIfIdle(); |
972 |
> |
++nworkers; |
973 |
|
} |
974 |
+ |
} catch (SecurityException se) { |
975 |
+ |
// If SecurityManager allows above checks, but |
976 |
+ |
// then unexpectedly throws exception when |
977 |
+ |
// interrupting threads (which it ought not do), |
978 |
+ |
// back out as cleanly as we can. Some threads may |
979 |
+ |
// have been killed but we remain in non-shutdown |
980 |
+ |
// state. |
981 |
+ |
runState = state; |
982 |
+ |
throw se; |
983 |
|
} |
984 |
< |
else { // If no workers, trigger full termination now |
985 |
< |
fullyTerminated = true; |
984 |
> |
|
985 |
> |
// If no live workers, act on one's behalf to terminate |
986 |
> |
if (nworkers == 0 && state != TERMINATED) { |
987 |
|
runState = TERMINATED; |
988 |
|
termination.signalAll(); |
989 |
+ |
terminated(); |
990 |
|
} |
991 |
|
} finally { |
992 |
|
mainLock.unlock(); |
993 |
|
} |
957 |
– |
if (fullyTerminated) |
958 |
– |
terminated(); |
994 |
|
} |
995 |
|
|
996 |
|
|
997 |
|
/** |
998 |
|
* Attempts to stop all actively executing tasks, halts the |
999 |
|
* processing of waiting tasks, and returns a list of the tasks |
1000 |
< |
* that were awaiting execution. |
1000 |
> |
* that were awaiting execution. These tasks are drained (removed) |
1001 |
> |
* from the task queue upon return from this method. |
1002 |
|
* |
1003 |
|
* <p>There are no guarantees beyond best-effort attempts to stop |
1004 |
|
* processing actively executing tasks. This implementation |
1018 |
|
if (security != null) |
1019 |
|
security.checkPermission(shutdownPerm); |
1020 |
|
|
985 |
– |
boolean fullyTerminated = false; |
1021 |
|
final ReentrantLock mainLock = this.mainLock; |
1022 |
|
mainLock.lock(); |
1023 |
|
try { |
1024 |
< |
if (workers.size() > 0) { |
1025 |
< |
if (security != null) { |
1026 |
< |
for (Worker w: workers) |
1027 |
< |
security.checkAccess(w.thread); |
993 |
< |
} |
1024 |
> |
if (security != null) { |
1025 |
> |
for (Worker w: workers) |
1026 |
> |
security.checkAccess(w.thread); |
1027 |
> |
} |
1028 |
|
|
1029 |
< |
int state = runState; |
1030 |
< |
if (state != TERMINATED) |
1031 |
< |
runState = STOP; |
1032 |
< |
try { |
1033 |
< |
for (Worker w : workers) |
1034 |
< |
w.interruptNow(); |
1035 |
< |
} catch (SecurityException se) { |
1036 |
< |
runState = state; // back out; |
1003 |
< |
throw se; |
1029 |
> |
int state = runState; |
1030 |
> |
if (state != TERMINATED) |
1031 |
> |
runState = STOP; |
1032 |
> |
int nworkers = 0; |
1033 |
> |
try { |
1034 |
> |
for (Worker w : workers) { |
1035 |
> |
w.interruptNow(); |
1036 |
> |
++nworkers; |
1037 |
|
} |
1038 |
+ |
} catch (SecurityException se) { |
1039 |
+ |
runState = state; // back out; |
1040 |
+ |
throw se; |
1041 |
|
} |
1042 |
< |
else { // If no workers, trigger full termination now |
1043 |
< |
fullyTerminated = true; |
1042 |
> |
|
1043 |
> |
if (nworkers == 0 && state != TERMINATED) { |
1044 |
|
runState = TERMINATED; |
1045 |
|
termination.signalAll(); |
1046 |
+ |
terminated(); |
1047 |
|
} |
1048 |
+ |
|
1049 |
+ |
List<Runnable> taskList = new ArrayList<Runnable>(); |
1050 |
+ |
workQueue.drainTo(taskList); |
1051 |
+ |
return taskList; |
1052 |
|
} finally { |
1053 |
|
mainLock.unlock(); |
1054 |
|
} |
1014 |
– |
if (fullyTerminated) |
1015 |
– |
terminated(); |
1016 |
– |
return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY)); |
1055 |
|
} |
1056 |
|
|
1057 |
|
public boolean isShutdown() { |
1234 |
|
// We have to create initially-idle threads here |
1235 |
|
// because we otherwise have no recourse about |
1236 |
|
// what to do with a dequeued task if addThread fails. |
1237 |
< |
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) { |
1237 |
> |
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize && |
1238 |
> |
runState < STOP) { |
1239 |
|
Thread t = addThread(null); |
1240 |
|
if (t != null) |
1241 |
|
t.start(); |
1456 |
|
} |
1457 |
|
|
1458 |
|
/** |
1459 |
< |
* Returns the approximate total number of tasks that have been |
1459 |
> |
* Returns the approximate total number of tasks that have ever been |
1460 |
|
* scheduled for execution. Because the states of tasks and |
1461 |
|
* threads may change dynamically during computation, the returned |
1462 |
|
* value is only an approximation, but one that does not ever |