192 |
|
private volatile int poolSize; |
193 |
|
|
194 |
|
/** |
195 |
< |
* Shutdown status, becomes (and remains) nonzero when shutdown called. |
195 |
> |
* Lifecycle state |
196 |
|
*/ |
197 |
< |
private volatile int shutdownStatus; |
197 |
> |
private volatile int runState; |
198 |
|
|
199 |
< |
// Special values for status |
199 |
> |
// Special values for runState |
200 |
|
/** Normal, not-shutdown mode */ |
201 |
< |
private static final int NOT_SHUTDOWN = 0; |
201 |
> |
private static final int RUNNING = 0; |
202 |
|
/** Controlled shutdown mode */ |
203 |
< |
private static final int SHUTDOWN_WHEN_IDLE = 1; |
204 |
< |
/*8 Immediate shutdown mode */ |
205 |
< |
private static final int SHUTDOWN_NOW = 2; |
206 |
< |
|
207 |
< |
/** |
208 |
< |
* Latch that becomes true when all threads terminate after shutdown. |
209 |
< |
*/ |
210 |
< |
private volatile boolean isTerminated; |
203 |
> |
private static final int SHUTDOWN = 1; |
204 |
> |
/** Immediate shutdown mode */ |
205 |
> |
private static final int STOP = 2; |
206 |
> |
/** Final state */ |
207 |
> |
private static final int TERMINATED = 3; |
208 |
|
|
209 |
|
/** |
210 |
|
* Handler called when saturated or shutdown in execute. |
228 |
|
private long completedTaskCount; |
229 |
|
|
230 |
|
/** |
231 |
< |
* The default thread facotry |
231 |
> |
* The default thread factory |
232 |
|
*/ |
233 |
|
private static final ThreadFactory defaultThreadFactory = |
234 |
|
new ThreadFactory() { |
250 |
|
handler.rejectedExecution(command, this); |
251 |
|
} |
252 |
|
|
256 |
– |
|
253 |
|
/** |
254 |
|
* Create and return a new thread running firstTask as its first |
255 |
|
* task. Call only while holding mainLock |
268 |
|
return t; |
269 |
|
} |
270 |
|
|
271 |
< |
// addIfUnderCorePoolSize is non-private; accessible to ScheduledExecutor |
271 |
> |
|
272 |
|
|
273 |
|
/** |
274 |
|
* Create and start a new thread running firstTask as its first |
277 |
|
* null if none) |
278 |
|
* @return true if successful. |
279 |
|
*/ |
280 |
< |
boolean addIfUnderCorePoolSize(Runnable firstTask) { |
280 |
> |
private boolean addIfUnderCorePoolSize(Runnable firstTask) { |
281 |
|
Thread t = null; |
282 |
|
mainLock.lock(); |
283 |
|
try { |
328 |
|
*/ |
329 |
|
private Runnable getTask() throws InterruptedException { |
330 |
|
for (;;) { |
331 |
< |
int stat = shutdownStatus; |
332 |
< |
if (stat == SHUTDOWN_NOW) |
333 |
< |
return null; |
334 |
< |
if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying |
335 |
< |
return workQueue.poll(); |
336 |
< |
if (poolSize <= corePoolSize) // untimed wait if core |
337 |
< |
return workQueue.take(); |
338 |
< |
long timeout = keepAliveTime; |
339 |
< |
if (timeout <= 0) // must die immediately for 0 timeout |
340 |
< |
return null; |
341 |
< |
Runnable task = workQueue.poll(timeout, TimeUnit.NANOSECONDS); |
342 |
< |
if (task != null) |
343 |
< |
return task; |
344 |
< |
if (poolSize > corePoolSize) // timed out |
331 |
> |
switch(runState) { |
332 |
> |
case RUNNING: { |
333 |
> |
if (poolSize <= corePoolSize) // untimed wait if core |
334 |
> |
return workQueue.take(); |
335 |
> |
|
336 |
> |
long timeout = keepAliveTime; |
337 |
> |
if (timeout <= 0) // die immediately for 0 timeout |
338 |
> |
return null; |
339 |
> |
Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); |
340 |
> |
if (r != null) |
341 |
> |
return r; |
342 |
> |
if (poolSize > corePoolSize) // timed out |
343 |
> |
return null; |
344 |
> |
// else, after timeout, pool shrank so shouldn't die, so retry |
345 |
> |
break; |
346 |
> |
} |
347 |
> |
|
348 |
> |
case SHUTDOWN: { |
349 |
> |
// Help drain queue |
350 |
> |
Runnable r = workQueue.poll(); |
351 |
> |
if (r != null) |
352 |
> |
return r; |
353 |
> |
|
354 |
> |
// Check if can terminate |
355 |
> |
if (workQueue.isEmpty()) { |
356 |
> |
interruptIdleWorkers(); |
357 |
> |
return null; |
358 |
> |
} |
359 |
> |
|
360 |
> |
// There could still be delayed tasks in queue. |
361 |
> |
// Wait for one, re-checking state upon interruption |
362 |
> |
try { |
363 |
> |
return workQueue.take(); |
364 |
> |
} |
365 |
> |
catch(InterruptedException ignore) { |
366 |
> |
} |
367 |
> |
break; |
368 |
> |
} |
369 |
> |
|
370 |
> |
case STOP: |
371 |
|
return null; |
372 |
< |
// else, after timeout, pool shrank so shouldn't die, so retry |
372 |
> |
default: |
373 |
> |
assert false; |
374 |
> |
} |
375 |
> |
} |
376 |
> |
} |
377 |
> |
|
378 |
> |
/** |
379 |
> |
* Wake up all threads that might be waiting for tasks. |
380 |
> |
*/ |
381 |
> |
void interruptIdleWorkers() { |
382 |
> |
mainLock.lock(); |
383 |
> |
try { |
384 |
> |
for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) |
385 |
> |
it.next().interruptIfIdle(); |
386 |
> |
} finally { |
387 |
> |
mainLock.unlock(); |
388 |
|
} |
389 |
|
} |
390 |
|
|
393 |
|
* @param w the worker |
394 |
|
*/ |
395 |
|
private void workerDone(Worker w) { |
359 |
– |
boolean allDone = false; |
396 |
|
mainLock.lock(); |
397 |
|
try { |
398 |
|
completedTaskCount += w.completedTasks; |
399 |
|
workers.remove(w); |
364 |
– |
|
400 |
|
if (--poolSize > 0) |
401 |
|
return; |
402 |
|
|
403 |
< |
// If this was last thread, deal with potential shutdown |
404 |
< |
int stat = shutdownStatus; |
403 |
> |
// Else, this is the last thread. Deal with potential shutdown. |
404 |
> |
|
405 |
> |
int state = runState; |
406 |
> |
assert state != TERMINATED; |
407 |
|
|
408 |
< |
// If there are queued tasks but no threads, create replacement. |
409 |
< |
if (stat != SHUTDOWN_NOW) { |
408 |
> |
if (state != STOP) { |
409 |
> |
// If there are queued tasks but no threads, create |
410 |
> |
// replacement. |
411 |
|
Runnable r = workQueue.poll(); |
412 |
|
if (r != null) { |
413 |
|
addThread(r).start(); |
414 |
|
return; |
415 |
|
} |
378 |
– |
} |
416 |
|
|
417 |
< |
// if no tasks and not shutdown, can exit without replacement |
418 |
< |
if (stat == NOT_SHUTDOWN) |
419 |
< |
return; |
417 |
> |
// If there are some (presumably delayed) tasks but |
418 |
> |
// none pollable, create an idle replacement to wait. |
419 |
> |
if (!workQueue.isEmpty()) { |
420 |
> |
addThread(null).start(); |
421 |
> |
return; |
422 |
> |
} |
423 |
> |
|
424 |
> |
// Otherwise, we can exit without replacement |
425 |
> |
if (state == RUNNING) |
426 |
> |
return; |
427 |
> |
} |
428 |
|
|
429 |
< |
allDone = true; |
430 |
< |
isTerminated = true; |
429 |
> |
// Either state is STOP, or state is SHUTDOWN and there is |
430 |
> |
// no work to do. So we can terminate. |
431 |
> |
runState = TERMINATED; |
432 |
|
termination.signalAll(); |
433 |
+ |
// fall through to call terminate() outside of lock. |
434 |
|
} finally { |
435 |
|
mainLock.unlock(); |
436 |
|
} |
437 |
|
|
438 |
< |
if (allDone) // call outside lock |
439 |
< |
terminated(); |
438 |
> |
assert runState == TERMINATED; |
439 |
> |
terminated(); |
440 |
|
} |
441 |
|
|
442 |
|
/** |
505 |
|
try { |
506 |
|
// Abort now if immediate cancel. Otherwise, we have |
507 |
|
// committed to run this task. |
508 |
< |
if (shutdownStatus == SHUTDOWN_NOW) |
508 |
> |
if (runState == STOP) |
509 |
|
return; |
510 |
|
|
511 |
|
Thread.interrupted(); // clear interrupt status on entry |
719 |
|
*/ |
720 |
|
public void execute(Runnable command) { |
721 |
|
for (;;) { |
722 |
< |
if (shutdownStatus != NOT_SHUTDOWN) { |
722 |
> |
if (runState != RUNNING) { |
723 |
|
reject(command); |
724 |
|
return; |
725 |
|
} |
741 |
|
public void shutdown() { |
742 |
|
mainLock.lock(); |
743 |
|
try { |
744 |
< |
if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow |
745 |
< |
shutdownStatus = SHUTDOWN_WHEN_IDLE; |
699 |
< |
|
744 |
> |
if (runState == RUNNING) // don't override shutdownNow |
745 |
> |
runState = SHUTDOWN; |
746 |
|
for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) |
747 |
|
it.next().interruptIfIdle(); |
748 |
|
} finally { |
750 |
|
} |
751 |
|
} |
752 |
|
|
753 |
+ |
|
754 |
|
public List shutdownNow() { |
755 |
|
mainLock.lock(); |
756 |
|
try { |
757 |
< |
shutdownStatus = SHUTDOWN_NOW; |
757 |
> |
if (runState != TERMINATED) |
758 |
> |
runState = STOP; |
759 |
|
for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) |
760 |
|
it.next().interruptNow(); |
761 |
|
} finally { |
765 |
|
} |
766 |
|
|
767 |
|
public boolean isShutdown() { |
768 |
< |
return shutdownStatus != NOT_SHUTDOWN; |
768 |
> |
return runState != RUNNING; |
769 |
> |
} |
770 |
> |
|
771 |
> |
/** |
772 |
> |
* Return true if this executor is in the process of terminating |
773 |
> |
* after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not |
774 |
> |
* completely terminated. This method may be useful for |
775 |
> |
* debugging. A return of <tt>true</tt> reported a sufficient |
776 |
> |
* period after shutdown may indicate that submitted tasks have |
777 |
> |
* ignored or suppressed interruption, causing this executor not |
778 |
> |
* to properly terminate. |
779 |
> |
* @return true if terminating but not yet terminated. |
780 |
> |
*/ |
781 |
> |
public boolean isTerminating() { |
782 |
> |
return runState == STOP; |
783 |
|
} |
784 |
|
|
785 |
|
public boolean isTerminated() { |
786 |
< |
return isTerminated; |
786 |
> |
return runState == TERMINATED; |
787 |
|
} |
788 |
|
|
789 |
|
public boolean awaitTermination(long timeout, TimeUnit unit) |
869 |
|
|
870 |
|
|
871 |
|
/** |
872 |
< |
* Removes from the work queue all {@link Cancellable} tasks |
873 |
< |
* that have been cancelled. This method can be useful as a |
874 |
< |
* storage reclamation operation, that has no other impact |
875 |
< |
* on functionality. Cancelled tasks are never executed, but |
876 |
< |
* may accumulate in work queues until worker threads can |
877 |
< |
* actively remove them. Invoking this method ensures that they |
878 |
< |
* are instead removed now. |
872 |
> |
* Tries to remove from the work queue all {@link Cancellable} |
873 |
> |
* tasks that have been cancelled. This method can be useful as a |
874 |
> |
* storage reclamation operation, that has no other impact on |
875 |
> |
* functionality. Cancelled tasks are never executed, but may |
876 |
> |
* accumulate in work queues until worker threads can actively |
877 |
> |
* remove them. Invoking this method instead tries to remove them now. |
878 |
> |
* However, this method may fail to remove all such tasks in |
879 |
> |
* the presence of interference by other threads. |
880 |
|
*/ |
881 |
|
|
882 |
|
public void purge() { |
883 |
< |
Iterator<Runnable> it = getQueue().iterator(); |
884 |
< |
while (it.hasNext()) { |
885 |
< |
Runnable r = it.next(); |
886 |
< |
if (r instanceof Cancellable) { |
887 |
< |
Cancellable c = (Cancellable)r; |
888 |
< |
if (c.isCancelled()) |
889 |
< |
it.remove(); |
883 |
> |
// Fail if we encounter interference during traversal |
884 |
> |
try { |
885 |
> |
Iterator<Runnable> it = getQueue().iterator(); |
886 |
> |
while (it.hasNext()) { |
887 |
> |
Runnable r = it.next(); |
888 |
> |
if (r instanceof Cancellable) { |
889 |
> |
Cancellable c = (Cancellable)r; |
890 |
> |
if (c.isCancelled()) |
891 |
> |
it.remove(); |
892 |
> |
} |
893 |
|
} |
894 |
|
} |
895 |
+ |
catch(ConcurrentModificationException ex) { |
896 |
+ |
return; |
897 |
+ |
} |
898 |
|
} |
899 |
|
|
900 |
|
/** |
942 |
|
} |
943 |
|
|
944 |
|
/** |
945 |
+ |
* Start a core thread, causing it to idly wait for work. This |
946 |
+ |
* overrides the default policy of starting core threads only when |
947 |
+ |
* new tasks are executed. This method will return <tt>false</tt> |
948 |
+ |
* if all core threads have already been started. |
949 |
+ |
* @return true if a thread was started |
950 |
+ |
*/ |
951 |
+ |
public boolean prestartCoreThread() { |
952 |
+ |
return addIfUnderCorePoolSize(null); |
953 |
+ |
} |
954 |
+ |
|
955 |
+ |
/** |
956 |
+ |
* Start all core threads, causing them to idly wait for work. This |
957 |
+ |
* overrides the default policy of starting core threads only when |
958 |
+ |
* new tasks are executed. |
959 |
+ |
* @return the number of threads started. |
960 |
+ |
*/ |
961 |
+ |
public int prestartAllCoreThreads() { |
962 |
+ |
int n = 0; |
963 |
+ |
while (addIfUnderCorePoolSize(null)) |
964 |
+ |
++n; |
965 |
+ |
return n; |
966 |
+ |
} |
967 |
+ |
|
968 |
+ |
/** |
969 |
|
* Sets the maximum allowed number of threads. This overrides any |
970 |
|
* value set in the constructor. If the new value is smaller than |
971 |
|
* the current value, excess existing threads will be |