546 |
|
* task execution. This protects against interrupts that are |
547 |
|
* intended to wake up a worker thread waiting for a task from |
548 |
|
* instead interrupting a task being run. We implement a simple |
549 |
< |
* non-reentrant mutual exclusion lock rather than use ReentrantLock |
550 |
< |
* because we do not want worker tasks to be able to reacquire the |
551 |
< |
* lock when they invoke pool control methods like setCorePoolSize. |
549 |
> |
* non-reentrant mutual exclusion lock rather than use |
550 |
> |
* ReentrantLock because we do not want worker tasks to be able to |
551 |
> |
* reacquire the lock when they invoke pool control methods like |
552 |
> |
* setCorePoolSize. Additionally, to suppress interrupts until |
553 |
> |
* the thread actually starts running tasks, we initialize lock |
554 |
> |
* state to a negative value, and clear it upon start (in |
555 |
> |
* runWorker). |
556 |
|
*/ |
557 |
|
private final class Worker |
558 |
|
extends AbstractQueuedSynchronizer |
576 |
|
* @param firstTask the first task (null if none) |
577 |
|
*/ |
578 |
|
Worker(Runnable firstTask) { |
579 |
+ |
setState(-1); // inhibit interrupts until runWorker |
580 |
|
this.firstTask = firstTask; |
581 |
|
this.thread = getThreadFactory().newThread(this); |
582 |
|
} |
592 |
|
// The value 1 represents the locked state. |
593 |
|
|
594 |
|
protected boolean isHeldExclusively() { |
595 |
< |
return getState() == 1; |
595 |
> |
return getState() != 0; |
596 |
|
} |
597 |
|
|
598 |
|
protected boolean tryAcquire(int unused) { |
613 |
|
public boolean tryLock() { return tryAcquire(1); } |
614 |
|
public void unlock() { release(1); } |
615 |
|
public boolean isLocked() { return isHeldExclusively(); } |
616 |
+ |
|
617 |
+ |
void interruptIfStarted() { |
618 |
+ |
Thread t; |
619 |
+ |
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { |
620 |
+ |
try { |
621 |
+ |
t.interrupt(); |
622 |
+ |
} catch (SecurityException ignore) { |
623 |
+ |
} |
624 |
+ |
} |
625 |
+ |
} |
626 |
|
} |
627 |
|
|
628 |
|
/* |
721 |
|
final ReentrantLock mainLock = this.mainLock; |
722 |
|
mainLock.lock(); |
723 |
|
try { |
724 |
< |
for (Worker w : workers) { |
725 |
< |
try { |
711 |
< |
w.thread.interrupt(); |
712 |
< |
} catch (SecurityException ignore) { |
713 |
< |
} |
714 |
< |
} |
724 |
> |
for (Worker w : workers) |
725 |
> |
w.interruptIfStarted(); |
726 |
|
} finally { |
727 |
|
mainLock.unlock(); |
728 |
|
} |
779 |
|
|
780 |
|
private static final boolean ONLY_ONE = true; |
781 |
|
|
771 |
– |
/** |
772 |
– |
* Ensures that unless the pool is stopping, the current thread |
773 |
– |
* does not have its interrupt set. This requires a double-check |
774 |
– |
* of state in case the interrupt was cleared concurrently with a |
775 |
– |
* shutdownNow -- if so, the interrupt is re-enabled. |
776 |
– |
*/ |
777 |
– |
private void clearInterruptsForTaskRun() { |
778 |
– |
if (runStateLessThan(ctl.get(), STOP) && |
779 |
– |
Thread.interrupted() && |
780 |
– |
runStateAtLeast(ctl.get(), STOP)) |
781 |
– |
Thread.currentThread().interrupt(); |
782 |
– |
} |
783 |
– |
|
782 |
|
/* |
783 |
|
* Misc utilities, most of which are also exported to |
784 |
|
* ScheduledThreadPoolExecutor |
888 |
|
} |
889 |
|
|
890 |
|
boolean workerStarted = false; |
891 |
+ |
boolean workerAdded = false; |
892 |
|
Worker w = null; |
893 |
|
try { |
894 |
|
final ReentrantLock mainLock = this.mainLock; |
895 |
|
w = new Worker(firstTask); |
896 |
|
final Thread t = w.thread; |
897 |
< |
if (t.isAlive()) // precheck that t is startable |
898 |
< |
throw new IllegalThreadStateException(); |
899 |
< |
mainLock.lock(); |
900 |
< |
try { |
901 |
< |
// Recheck while holding lock. |
902 |
< |
// Back out on ThreadFactory failure or if |
903 |
< |
// shut down before lock acquired. |
904 |
< |
int c = ctl.get(); |
905 |
< |
int rs = runStateOf(c); |
906 |
< |
|
907 |
< |
if (t == null || |
908 |
< |
(rs >= SHUTDOWN && |
909 |
< |
! (rs == SHUTDOWN && |
910 |
< |
firstTask == null))) |
911 |
< |
return false; |
912 |
< |
|
913 |
< |
workers.add(w); |
914 |
< |
|
915 |
< |
int s = workers.size(); |
916 |
< |
if (s > largestPoolSize) |
917 |
< |
largestPoolSize = s; |
918 |
< |
} finally { |
919 |
< |
mainLock.unlock(); |
897 |
> |
if (t != null) { |
898 |
> |
mainLock.lock(); |
899 |
> |
try { |
900 |
> |
// Recheck while holding lock. |
901 |
> |
// Back out on ThreadFactory failure or if |
902 |
> |
// shut down before lock acquired. |
903 |
> |
int c = ctl.get(); |
904 |
> |
int rs = runStateOf(c); |
905 |
> |
|
906 |
> |
if (rs < SHUTDOWN || |
907 |
> |
(rs == SHUTDOWN && firstTask == null)) { |
908 |
> |
if (t.isAlive()) // precheck that t is startable |
909 |
> |
throw new IllegalThreadStateException(); |
910 |
> |
workers.add(w); |
911 |
> |
int s = workers.size(); |
912 |
> |
if (s > largestPoolSize) |
913 |
> |
largestPoolSize = s; |
914 |
> |
workerAdded = true; |
915 |
> |
} |
916 |
> |
} finally { |
917 |
> |
mainLock.unlock(); |
918 |
> |
} |
919 |
> |
if (workerAdded) { |
920 |
> |
t.start(); |
921 |
> |
workerStarted = true; |
922 |
> |
} |
923 |
|
} |
922 |
– |
|
923 |
– |
t.start(); |
924 |
– |
// It is possible (but unlikely) for a thread to have been |
925 |
– |
// added to workers, but not yet started, during transition to |
926 |
– |
// STOP, which could result in a rare missed interrupt, |
927 |
– |
// because Thread.interrupt is not guaranteed to have any |
928 |
– |
// effect on a non-yet-started Thread (see Thread#interrupt). |
929 |
– |
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) |
930 |
– |
t.interrupt(); |
931 |
– |
|
932 |
– |
return workerStarted = true; |
924 |
|
} finally { |
925 |
|
if (! workerStarted) |
926 |
|
addWorkerFailed(w); |
927 |
|
} |
928 |
+ |
return workerStarted; |
929 |
|
} |
930 |
|
|
931 |
|
/** |
1092 |
|
* @param w the worker |
1093 |
|
*/ |
1094 |
|
final void runWorker(Worker w) { |
1095 |
+ |
Thread wt = Thread.currentThread(); |
1096 |
|
Runnable task = w.firstTask; |
1097 |
|
w.firstTask = null; |
1098 |
+ |
w.unlock(); // allow interrupts |
1099 |
|
boolean completedAbruptly = true; |
1100 |
|
try { |
1101 |
|
while (task != null || (task = getTask()) != null) { |
1102 |
|
w.lock(); |
1103 |
< |
clearInterruptsForTaskRun(); |
1103 |
> |
// If pool is stopping, ensure thread is interrupted; |
1104 |
> |
// if not, ensure thread is not interrupted. This |
1105 |
> |
// requires a recheck in second case to deal with |
1106 |
> |
// shutdownNow race while clearing interrupt |
1107 |
> |
if ((runStateAtLeast(ctl.get(), STOP) || |
1108 |
> |
(Thread.interrupted() && |
1109 |
> |
runStateAtLeast(ctl.get(), STOP))) && |
1110 |
> |
!wt.isInterrupted()) |
1111 |
> |
wt.interrupt(); |
1112 |
|
try { |
1113 |
< |
beforeExecute(w.thread, task); |
1113 |
> |
beforeExecute(wt, task); |
1114 |
|
Throwable thrown = null; |
1115 |
|
try { |
1116 |
|
task.run(); |