ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.20
Committed: Mon Sep 1 14:27:11 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.19: +10 -11 lines
Log Message:
Fix html

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An {@link ExecutorService} that executes each submitted task using
13 * one of possibly several pooled threads.
14 *
15 * <p>Thread pools address two different problems: they usually
16 * provide improved performance when executing large numbers of
17 * asynchronous tasks, due to reduced per-task invocation overhead,
18 * and they provide a means of bounding and managing the resources,
19 * including threads, consumed when executing a collection of tasks.
20 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
21 * statistics, such as the number of completed tasks, that may be
22 * useful for monitoring and tuning.
23 *
24 * <p>To be useful across a wide range of contexts, this class
25 * provides many adjustable parameters and extensibility hooks. For
26 * example, it can be configured to create a new thread for each task,
27 * or even to execute tasks sequentially in a single thread, in
28 * addition to its most common configuration, which reuses a pool of
29 * threads. However, programmers are urged to use the more convenient
30 * {@link Executors} factory methods {@link
31 * Executors#newCachedThreadPool} (unbounded thread pool, with
32 * automatic thread reclamation), {@link Executors#newFixedThreadPool}
33 * (fixed size thread pool) and {@link
34 * Executors#newSingleThreadExecutor} (single background thread), that
35 * preconfigure settings for the most common usage scenarios.
36 *
37 *
38 * <h3>Tuning guide</h3>
39 * <dl>
40 *
41 * <dt>Core and maximum pool size</dt>
42 *
43 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
44 * pool size according to the bounds set by corePoolSize and
45 * maximumPoolSize. When a new task is submitted, and fewer than
46 * corePoolSize threads are running, a new thread is created to handle
47 * the request, even if other worker threads are idle. If there are
48 * more than the corePoolSize but less than maximumPoolSize threads
49 * running, a new thread will be created only if the queue is full.
50 * By setting corePoolSize and maximumPoolSize the same, you create a
51 * fixed-size thread pool. By default, even core threads are only
52 * created and started when needed by new tasks, but this can be
53 * overridden dynamically using method <tt>prestartCoreThread</tt>.
54 * </dd>
55 *
56 * <dt>Keep-alive</dt>
57 *
58 * <dd>The keepAliveTime determines what happens to idle threads. If
59 * the pool currently has more than the core number of threads, excess
60 * threads will be terminated if they have been idle for more than the
61 * keepAliveTime.</dd>
62 *
63 * <dt>Queueing</dt>
64 *
65 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
66 * submitted tasks. A good default is a {@link SynchronousQueue} that
67 * hands off tasks to threads without otherwise holding them. This
68 * policy avoids lockups when handling sets of requests that might
69 * have internal dependencies. Using an unbounded queue (for example
70 * a {@link LinkedBlockingQueue}) will cause new tasks to be queued in
71 * cases where all corePoolSize threads are busy, so no more than
72 * corePoolSize threads will be created. This may be appropriate when
73 * each task is completely independent of others, so tasks cannot
74 * affect each others execution; for example, in a web page server.
75 * When given a choice, a <tt>ThreadPoolExecutor</tt> always prefers
76 * adding a new thread rather than queueing if there are currently
77 * fewer than the current getCorePoolSize threads running, but
78 * otherwise always prefers queuing a request rather than adding a new
79 * thread.
80 *
81 * <p>While queuing can be useful in smoothing out transient bursts of
82 * requests, especially in socket-based services, it is not very well
83 * behaved when commands continue to arrive on average faster than
84 * they can be processed. Queue sizes and maximum pool sizes can
85 * often be traded off for each other. Using large queues and small
86 * pools minimizes CPU usage, OS resources, and context-switching
87 * overhead, but can lead to artifically low throughput. If tasks
88 * frequently block (for example if they are I/O bound), a system may
89 * be able to schedule time for more threads than you otherwise
90 * allow. Use of small queues or queueless handoffs generally requires
91 * larger pool sizes, which keeps CPUs busier but may encounter
92 * unacceptable scheduling overhead, which also decreases throughput.
93 * </dd>
94 *
95 * <dt>Creating new threads</dt>
96 *
97 * <dd>New threads are created using a {@link ThreadFactory}. By
98 * default, threads are created simply with the <tt>new
99 * Thread(Runnable)</tt> constructor, but by supplying a different
100 * ThreadFactory, you can alter the thread's name, thread group,
101 * priority, daemon status, etc. </dd>
102 *
103 * <dt>Before and after intercepts</dt>
104 *
105 * <dd>This class has overridable methods that are called before and
106 * after execution of each task. These can be used to manipulate the
107 * execution environment, for example, reinitializing ThreadLocals,
108 * gathering statistics, or adding log entries. </dd>
109 *
110 * <dt>Rejected tasks</dt>
111 *
112 * <dd>There are a number of factors which can bound the number of
113 * tasks which can execute at once, including the maximum pool size
114 * and the queuing mechanism used. If the executor determines that a
115 * task cannot be executed because it has been refused by the queue
116 * and no threads are available, or because the executor has been shut
117 * down, the {@link RejectedExecutionHandler}
118 * <tt>rejectedExecution</tt> method is invoked. The default
119 * (<tt>AbortPolicy</tt>) handler throws a runtime {@link
120 * RejectedExecutionException} upon rejection. </dd>
121 *
122 * </dl>
123 *
124 * @since 1.5
125 * @author Doug Lea
126 */
127 public class ThreadPoolExecutor implements ExecutorService {
128 /**
129 * Queue used for holding tasks and handing off to worker threads.
130 */
131 private final BlockingQueue<Runnable> workQueue;
132
133 /**
134 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
135 * workers set.
136 */
137 private final ReentrantLock mainLock = new ReentrantLock();
138
139 /**
140 * Wait condition to support awaitTermination
141 */
142 private final Condition termination = mainLock.newCondition();
143
144 /**
145 * Set containing all worker threads in pool.
146 */
147 private final HashSet<Worker> workers = new HashSet<Worker>();
148
149 /**
150 * Timeout in nanosecods for idle threads waiting for work.
151 * Threads use this timeout only when there are more than
152 * corePoolSize present. Otherwise they wait forever for new work.
153 */
154 private volatile long keepAliveTime;
155
156 /**
157 * Core pool size, updated only while holding mainLock,
158 * but volatile to allow concurrent readability even
159 * during updates.
160 */
161 private volatile int corePoolSize;
162
163 /**
164 * Maximum pool size, updated only while holding mainLock
165 * but volatile to allow concurrent readability even
166 * during updates.
167 */
168 private volatile int maximumPoolSize;
169
170 /**
171 * Current pool size, updated only while holding mainLock
172 * but volatile to allow concurrent readability even
173 * during updates.
174 */
175 private volatile int poolSize;
176
177 /**
178 * Lifecycle state
179 */
180 private volatile int runState;
181
182 // Special values for runState
183 /** Normal, not-shutdown mode */
184 private static final int RUNNING = 0;
185 /** Controlled shutdown mode */
186 private static final int SHUTDOWN = 1;
187 /** Immediate shutdown mode */
188 private static final int STOP = 2;
189 /** Final state */
190 private static final int TERMINATED = 3;
191
192 /**
193 * Handler called when saturated or shutdown in execute.
194 */
195 private volatile RejectedExecutionHandler handler = defaultHandler;
196
197 /**
198 * Factory for new threads.
199 */
200 private volatile ThreadFactory threadFactory = defaultThreadFactory;
201
202 /**
203 * Tracks largest attained pool size.
204 */
205 private int largestPoolSize;
206
207 /**
208 * Counter for completed tasks. Updated only on termination of
209 * worker threads.
210 */
211 private long completedTaskCount;
212
213 /**
214 * The default thread factory
215 */
216 private static final ThreadFactory defaultThreadFactory =
217 new ThreadFactory() {
218 public Thread newThread(Runnable r) {
219 return new Thread(r);
220 }
221 };
222
223 /**
224 * The default rejectect execution handler
225 */
226 private static final RejectedExecutionHandler defaultHandler =
227 new AbortPolicy();
228
229 /**
230 * Invoke the rejected execution handler for the given command.
231 */
232 void reject(Runnable command) {
233 handler.rejectedExecution(command, this);
234 }
235
236 /**
237 * Create and return a new thread running firstTask as its first
238 * task. Call only while holding mainLock
239 * @param firstTask the task the new thread should run first (or
240 * null if none)
241 * @return the new thread
242 */
243 private Thread addThread(Runnable firstTask) {
244 Worker w = new Worker(firstTask);
245 Thread t = threadFactory.newThread(w);
246 w.thread = t;
247 workers.add(w);
248 int nt = ++poolSize;
249 if (nt > largestPoolSize)
250 largestPoolSize = nt;
251 return t;
252 }
253
254
255
256 /**
257 * Create and start a new thread running firstTask as its first
258 * task, only if less than corePoolSize threads are running.
259 * @param firstTask the task the new thread should run first (or
260 * null if none)
261 * @return true if successful.
262 */
263 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
264 Thread t = null;
265 mainLock.lock();
266 try {
267 if (poolSize < corePoolSize)
268 t = addThread(firstTask);
269 } finally {
270 mainLock.unlock();
271 }
272 if (t == null)
273 return false;
274 t.start();
275 return true;
276 }
277
278 /**
279 * Create and start a new thread only if less than maximumPoolSize
280 * threads are running. The new thread runs as its first task the
281 * next task in queue, or if there is none, the given task.
282 * @param firstTask the task the new thread should run first (or
283 * null if none)
284 * @return null on failure, else the first task to be run by new thread.
285 */
286 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
287 Thread t = null;
288 Runnable next = null;
289 mainLock.lock();
290 try {
291 if (poolSize < maximumPoolSize) {
292 next = workQueue.poll();
293 if (next == null)
294 next = firstTask;
295 t = addThread(next);
296 }
297 } finally {
298 mainLock.unlock();
299 }
300 if (t == null)
301 return null;
302 t.start();
303 return next;
304 }
305
306
307 /**
308 * Get the next task for a worker thread to run.
309 * @return the task
310 * @throws InterruptedException if interrupted while waiting for task
311 */
312 private Runnable getTask() throws InterruptedException {
313 for (;;) {
314 switch(runState) {
315 case RUNNING: {
316 if (poolSize <= corePoolSize) // untimed wait if core
317 return workQueue.take();
318
319 long timeout = keepAliveTime;
320 if (timeout <= 0) // die immediately for 0 timeout
321 return null;
322 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
323 if (r != null)
324 return r;
325 if (poolSize > corePoolSize) // timed out
326 return null;
327 // else, after timeout, pool shrank so shouldn't die, so retry
328 break;
329 }
330
331 case SHUTDOWN: {
332 // Help drain queue
333 Runnable r = workQueue.poll();
334 if (r != null)
335 return r;
336
337 // Check if can terminate
338 if (workQueue.isEmpty()) {
339 interruptIdleWorkers();
340 return null;
341 }
342
343 // There could still be delayed tasks in queue.
344 // Wait for one, re-checking state upon interruption
345 try {
346 return workQueue.take();
347 }
348 catch(InterruptedException ignore) {
349 }
350 break;
351 }
352
353 case STOP:
354 return null;
355 default:
356 assert false;
357 }
358 }
359 }
360
361 /**
362 * Wake up all threads that might be waiting for tasks.
363 */
364 void interruptIdleWorkers() {
365 mainLock.lock();
366 try {
367 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
368 it.next().interruptIfIdle();
369 } finally {
370 mainLock.unlock();
371 }
372 }
373
374 /**
375 * Perform bookkeeping for a terminated worker thread.
376 * @param w the worker
377 */
378 private void workerDone(Worker w) {
379 mainLock.lock();
380 try {
381 completedTaskCount += w.completedTasks;
382 workers.remove(w);
383 if (--poolSize > 0)
384 return;
385
386 // Else, this is the last thread. Deal with potential shutdown.
387
388 int state = runState;
389 assert state != TERMINATED;
390
391 if (state != STOP) {
392 // If there are queued tasks but no threads, create
393 // replacement.
394 Runnable r = workQueue.poll();
395 if (r != null) {
396 addThread(r).start();
397 return;
398 }
399
400 // If there are some (presumably delayed) tasks but
401 // none pollable, create an idle replacement to wait.
402 if (!workQueue.isEmpty()) {
403 addThread(null).start();
404 return;
405 }
406
407 // Otherwise, we can exit without replacement
408 if (state == RUNNING)
409 return;
410 }
411
412 // Either state is STOP, or state is SHUTDOWN and there is
413 // no work to do. So we can terminate.
414 runState = TERMINATED;
415 termination.signalAll();
416 // fall through to call terminate() outside of lock.
417 } finally {
418 mainLock.unlock();
419 }
420
421 assert runState == TERMINATED;
422 terminated();
423 }
424
425 /**
426 * Worker threads
427 */
428 private class Worker implements Runnable {
429
430 /**
431 * The runLock is acquired and released surrounding each task
432 * execution. It mainly protects against interrupts that are
433 * intended to cancel the worker thread from instead
434 * interrupting the task being run.
435 */
436 private final ReentrantLock runLock = new ReentrantLock();
437
438 /**
439 * Initial task to run before entering run loop
440 */
441 private Runnable firstTask;
442
443 /**
444 * Per thread completed task counter; accumulated
445 * into completedTaskCount upon termination.
446 */
447 volatile long completedTasks;
448
449 /**
450 * Thread this worker is running in. Acts as a final field,
451 * but cannot be set until thread is created.
452 */
453 Thread thread;
454
455 Worker(Runnable firstTask) {
456 this.firstTask = firstTask;
457 }
458
459 boolean isActive() {
460 return runLock.isLocked();
461 }
462
463 /**
464 * Interrupt thread if not running a task
465 */
466 void interruptIfIdle() {
467 if (runLock.tryLock()) {
468 try {
469 thread.interrupt();
470 } finally {
471 runLock.unlock();
472 }
473 }
474 }
475
476 /**
477 * Cause thread to die even if running a task.
478 */
479 void interruptNow() {
480 thread.interrupt();
481 }
482
483 /**
484 * Run a single task between before/after methods.
485 */
486 private void runTask(Runnable task) {
487 runLock.lock();
488 try {
489 // Abort now if immediate cancel. Otherwise, we have
490 // committed to run this task.
491 if (runState == STOP)
492 return;
493
494 Thread.interrupted(); // clear interrupt status on entry
495 boolean ran = false;
496 beforeExecute(thread, task);
497 try {
498 task.run();
499 ran = true;
500 afterExecute(task, null);
501 ++completedTasks;
502 } catch(RuntimeException ex) {
503 if (!ran)
504 afterExecute(task, ex);
505 // Else the exception occurred within
506 // afterExecute itself in which case we don't
507 // want to call it again.
508 throw ex;
509 }
510 } finally {
511 runLock.unlock();
512 }
513 }
514
515 /**
516 * Main run loop
517 */
518 public void run() {
519 try {
520 for (;;) {
521 Runnable task;
522 if (firstTask != null) {
523 task = firstTask;
524 firstTask = null;
525 } else {
526 task = getTask();
527 if (task == null)
528 break;
529 }
530 runTask(task);
531 task = null; // unnecessary but can help GC
532 }
533 } catch(InterruptedException ie) {
534 // fall through
535 } finally {
536 workerDone(this);
537 }
538 }
539 }
540
541 // Public methods
542
543 /**
544 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
545 * initial parameters. It may be more convenient to use one of
546 * the {@link Executors} factory methods instead of this general
547 * purpose constructor.
548 *
549 * @param corePoolSize the number of threads to keep in the
550 * pool, even if they are idle.
551 * @param maximumPoolSize the maximum number of threads to allow in the
552 * pool.
553 * @param keepAliveTime when the number of threads is greater than
554 * the core, this is the maximum time that excess idle threads
555 * will wait for new tasks before terminating.
556 * @param unit the time unit for the keepAliveTime
557 * argument.
558 * @param workQueue the queue to use for holding tasks before the
559 * are executed. This queue will hold only the <tt>Runnable</tt>
560 * tasks submitted by the <tt>execute</tt> method.
561 * @throws IllegalArgumentException if corePoolSize, or
562 * keepAliveTime less than zero, or if maximumPoolSize less than or
563 * equal to zero, or if corePoolSize greater than maximumPoolSize.
564 * @throws NullPointerException if <tt>workQueue</tt> is null
565 */
566 public ThreadPoolExecutor(int corePoolSize,
567 int maximumPoolSize,
568 long keepAliveTime,
569 TimeUnit unit,
570 BlockingQueue<Runnable> workQueue) {
571 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
572 defaultThreadFactory, defaultHandler);
573 }
574
575 /**
576 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
577 * parameters.
578 *
579 * @param corePoolSize the number of threads to keep in the
580 * pool, even if they are idle.
581 * @param maximumPoolSize the maximum number of threads to allow in the
582 * pool.
583 * @param keepAliveTime when the number of threads is greater than
584 * the core, this is the maximum time that excess idle threads
585 * will wait for new tasks before terminating.
586 * @param unit the time unit for the keepAliveTime
587 * argument.
588 * @param workQueue the queue to use for holding tasks before the
589 * are executed. This queue will hold only the <tt>Runnable</tt>
590 * tasks submitted by the <tt>execute</tt> method.
591 * @param threadFactory the factory to use when the executor
592 * creates a new thread.
593 * @throws IllegalArgumentException if corePoolSize, or
594 * keepAliveTime less than zero, or if maximumPoolSize less than or
595 * equal to zero, or if corePoolSize greater than maximumPoolSize.
596 * @throws NullPointerException if <tt>workQueue</tt>
597 * or <tt>threadFactory</tt> are null.
598 */
599 public ThreadPoolExecutor(int corePoolSize,
600 int maximumPoolSize,
601 long keepAliveTime,
602 TimeUnit unit,
603 BlockingQueue<Runnable> workQueue,
604 ThreadFactory threadFactory) {
605
606 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
607 threadFactory, defaultHandler);
608 }
609
610 /**
611 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
612 * parameters.
613 *
614 * @param corePoolSize the number of threads to keep in the
615 * pool, even if they are idle.
616 * @param maximumPoolSize the maximum number of threads to allow in the
617 * pool.
618 * @param keepAliveTime when the number of threads is greater than
619 * the core, this is the maximum time that excess idle threads
620 * will wait for new tasks before terminating.
621 * @param unit the time unit for the keepAliveTime
622 * argument.
623 * @param workQueue the queue to use for holding tasks before the
624 * are executed. This queue will hold only the <tt>Runnable</tt>
625 * tasks submitted by the <tt>execute</tt> method.
626 * @param handler the handler to use when execution is blocked
627 * because the thread bounds and queue capacities are reached.
628 * @throws IllegalArgumentException if corePoolSize, or
629 * keepAliveTime less than zero, or if maximumPoolSize less than or
630 * equal to zero, or if corePoolSize greater than maximumPoolSize.
631 * @throws NullPointerException if <tt>workQueue</tt>
632 * or <tt>handler</tt> are null.
633 */
634 public ThreadPoolExecutor(int corePoolSize,
635 int maximumPoolSize,
636 long keepAliveTime,
637 TimeUnit unit,
638 BlockingQueue<Runnable> workQueue,
639 RejectedExecutionHandler handler) {
640 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
641 defaultThreadFactory, handler);
642 }
643
644 /**
645 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
646 * parameters.
647 *
648 * @param corePoolSize the number of threads to keep in the
649 * pool, even if they are idle.
650 * @param maximumPoolSize the maximum number of threads to allow in the
651 * pool.
652 * @param keepAliveTime when the number of threads is greater than
653 * the core, this is the maximum time that excess idle threads
654 * will wait for new tasks before terminating.
655 * @param unit the time unit for the keepAliveTime
656 * argument.
657 * @param workQueue the queue to use for holding tasks before the
658 * are executed. This queue will hold only the <tt>Runnable</tt>
659 * tasks submitted by the <tt>execute</tt> method.
660 * @param threadFactory the factory to use when the executor
661 * creates a new thread.
662 * @param handler the handler to use when execution is blocked
663 * because the thread bounds and queue capacities are reached.
664 * @throws IllegalArgumentException if corePoolSize, or
665 * keepAliveTime less than zero, or if maximumPoolSize less than or
666 * equal to zero, or if corePoolSize greater than maximumPoolSize.
667 * @throws NullPointerException if <tt>workQueue</tt>
668 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
669 */
670 public ThreadPoolExecutor(int corePoolSize,
671 int maximumPoolSize,
672 long keepAliveTime,
673 TimeUnit unit,
674 BlockingQueue<Runnable> workQueue,
675 ThreadFactory threadFactory,
676 RejectedExecutionHandler handler) {
677 if (corePoolSize < 0 ||
678 maximumPoolSize <= 0 ||
679 maximumPoolSize < corePoolSize ||
680 keepAliveTime < 0)
681 throw new IllegalArgumentException();
682 if (workQueue == null || threadFactory == null || handler == null)
683 throw new NullPointerException();
684 this.corePoolSize = corePoolSize;
685 this.maximumPoolSize = maximumPoolSize;
686 this.workQueue = workQueue;
687 this.keepAliveTime = unit.toNanos(keepAliveTime);
688 this.threadFactory = threadFactory;
689 this.handler = handler;
690 }
691
692
693 /**
694 * Executes the given task sometime in the future. The task
695 * may execute in a new thread or in an existing pooled thread.
696 *
697 * If the task cannot be submitted for execution, either because this
698 * executor has been shutdown or because its capacity has been reached,
699 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
700 *
701 * @param command the task to execute
702 * @throws RejectedExecutionException at discretion of
703 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
704 * for execution
705 */
706 public void execute(Runnable command) {
707 for (;;) {
708 if (runState != RUNNING) {
709 reject(command);
710 return;
711 }
712 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
713 return;
714 if (workQueue.offer(command))
715 return;
716 Runnable r = addIfUnderMaximumPoolSize(command);
717 if (r == command)
718 return;
719 if (r == null) {
720 reject(command);
721 return;
722 }
723 // else retry
724 }
725 }
726
727 public void shutdown() {
728 mainLock.lock();
729 try {
730 if (runState == RUNNING) // don't override shutdownNow
731 runState = SHUTDOWN;
732 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
733 it.next().interruptIfIdle();
734 } finally {
735 mainLock.unlock();
736 }
737 }
738
739
740 public List shutdownNow() {
741 mainLock.lock();
742 try {
743 if (runState != TERMINATED)
744 runState = STOP;
745 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
746 it.next().interruptNow();
747 } finally {
748 mainLock.unlock();
749 }
750 return Arrays.asList(workQueue.toArray());
751 }
752
753 public boolean isShutdown() {
754 return runState != RUNNING;
755 }
756
757 /**
758 * Return true if this executor is in the process of terminating
759 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
760 * completely terminated. This method may be useful for
761 * debugging. A return of <tt>true</tt> reported a sufficient
762 * period after shutdown may indicate that submitted tasks have
763 * ignored or suppressed interruption, causing this executor not
764 * to properly terminate.
765 * @return true if terminating but not yet terminated.
766 */
767 public boolean isTerminating() {
768 return runState == STOP;
769 }
770
771 public boolean isTerminated() {
772 return runState == TERMINATED;
773 }
774
775 public boolean awaitTermination(long timeout, TimeUnit unit)
776 throws InterruptedException {
777 mainLock.lock();
778 try {
779 return termination.await(timeout, unit);
780 } finally {
781 mainLock.unlock();
782 }
783 }
784
785 /**
786 * Invokes <tt>shutdown</tt> when this executor is no longer
787 * referenced.
788 */
789 protected void finalize() {
790 shutdown();
791 }
792
793 /**
794 * Sets the thread factory used to create new threads.
795 *
796 * @param threadFactory the new thread factory
797 * @see #getThreadFactory
798 */
799 public void setThreadFactory(ThreadFactory threadFactory) {
800 this.threadFactory = threadFactory;
801 }
802
803 /**
804 * Returns the thread factory used to create new threads.
805 *
806 * @return the current thread factory
807 * @see #setThreadFactory
808 */
809 public ThreadFactory getThreadFactory() {
810 return threadFactory;
811 }
812
813 /**
814 * Sets a new handler for unexecutable tasks.
815 *
816 * @param handler the new handler
817 * @see #getRejectedExecutionHandler
818 */
819 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
820 this.handler = handler;
821 }
822
823 /**
824 * Returns the current handler for unexecutable tasks.
825 *
826 * @return the current handler
827 * @see #setRejectedExecutionHandler
828 */
829 public RejectedExecutionHandler getRejectedExecutionHandler() {
830 return handler;
831 }
832
833 /**
834 * Returns the task queue used by this executor. Access to the
835 * task queue is intended primarily for debugging and monitoring.
836 * This queue may be in active use. Retrieveing the task queue
837 * does not prevent queued tasks from executing.
838 *
839 * @return the task queue
840 */
841 public BlockingQueue<Runnable> getQueue() {
842 return workQueue;
843 }
844
845 /**
846 * Removes this task from internal queue if it is present, thus
847 * causing it not to be run if it has not already started. This
848 * method may be useful as one part of a cancellation scheme.
849 *
850 * @param task the task to remove
851 * @return true if the task was removed
852 */
853 public boolean remove(Runnable task) {
854 return getQueue().remove(task);
855 }
856
857
858 /**
859 * Tries to remove from the work queue all {@link Cancellable}
860 * tasks that have been cancelled. This method can be useful as a
861 * storage reclamation operation, that has no other impact on
862 * functionality. Cancelled tasks are never executed, but may
863 * accumulate in work queues until worker threads can actively
864 * remove them. Invoking this method instead tries to remove them now.
865 * However, this method may fail to remove all such tasks in
866 * the presence of interference by other threads.
867 */
868
869 public void purge() {
870 // Fail if we encounter interference during traversal
871 try {
872 Iterator<Runnable> it = getQueue().iterator();
873 while (it.hasNext()) {
874 Runnable r = it.next();
875 if (r instanceof Cancellable) {
876 Cancellable c = (Cancellable)r;
877 if (c.isCancelled())
878 it.remove();
879 }
880 }
881 }
882 catch(ConcurrentModificationException ex) {
883 return;
884 }
885 }
886
887 /**
888 * Sets the core number of threads. This overrides any value set
889 * in the constructor. If the new value is smaller than the
890 * current value, excess existing threads will be terminated when
891 * they next become idle.
892 *
893 * @param corePoolSize the new core size
894 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
895 * less than zero
896 * @see #getCorePoolSize
897 */
898 public void setCorePoolSize(int corePoolSize) {
899 if (corePoolSize < 0)
900 throw new IllegalArgumentException();
901 mainLock.lock();
902 try {
903 int extra = this.corePoolSize - corePoolSize;
904 this.corePoolSize = corePoolSize;
905 if (extra > 0 && poolSize > corePoolSize) {
906 Iterator<Worker> it = workers.iterator();
907 while (it.hasNext() &&
908 extra > 0 &&
909 poolSize > corePoolSize &&
910 workQueue.remainingCapacity() == 0) {
911 it.next().interruptIfIdle();
912 --extra;
913 }
914 }
915
916 } finally {
917 mainLock.unlock();
918 }
919 }
920
921 /**
922 * Returns the core number of threads.
923 *
924 * @return the core number of threads
925 * @see #setCorePoolSize
926 */
927 public int getCorePoolSize() {
928 return corePoolSize;
929 }
930
931 /**
932 * Start a core thread, causing it to idly wait for work. This
933 * overrides the default policy of starting core threads only when
934 * new tasks are executed. This method will return <tt>false</tt>
935 * if all core threads have already been started.
936 * @return true if a thread was started
937 */
938 public boolean prestartCoreThread() {
939 return addIfUnderCorePoolSize(null);
940 }
941
942 /**
943 * Start all core threads, causing them to idly wait for work. This
944 * overrides the default policy of starting core threads only when
945 * new tasks are executed.
946 * @return the number of threads started.
947 */
948 public int prestartAllCoreThreads() {
949 int n = 0;
950 while (addIfUnderCorePoolSize(null))
951 ++n;
952 return n;
953 }
954
955 /**
956 * Sets the maximum allowed number of threads. This overrides any
957 * value set in the constructor. If the new value is smaller than
958 * the current value, excess existing threads will be
959 * terminated when they next become idle.
960 *
961 * @param maximumPoolSize the new maximum
962 * @throws IllegalArgumentException if maximumPoolSize less than zero or
963 * the {@link #getCorePoolSize core pool size}
964 * @see #getMaximumPoolSize
965 */
966 public void setMaximumPoolSize(int maximumPoolSize) {
967 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
968 throw new IllegalArgumentException();
969 mainLock.lock();
970 try {
971 int extra = this.maximumPoolSize - maximumPoolSize;
972 this.maximumPoolSize = maximumPoolSize;
973 if (extra > 0 && poolSize > maximumPoolSize) {
974 Iterator<Worker> it = workers.iterator();
975 while (it.hasNext() &&
976 extra > 0 &&
977 poolSize > maximumPoolSize) {
978 it.next().interruptIfIdle();
979 --extra;
980 }
981 }
982 } finally {
983 mainLock.unlock();
984 }
985 }
986
987 /**
988 * Returns the maximum allowed number of threads.
989 *
990 * @return the maximum allowed number of threads
991 * @see #setMaximumPoolSize
992 */
993 public int getMaximumPoolSize() {
994 return maximumPoolSize;
995 }
996
997 /**
998 * Sets the time limit for which threads may remain idle before
999 * being terminated. If there are more than the core number of
1000 * threads currently in the pool, after waiting this amount of
1001 * time without processing a task, excess threads will be
1002 * terminated. This overrides any value set in the constructor.
1003 * @param time the time to wait. A time value of zero will cause
1004 * excess threads to terminate immediately after executing tasks.
1005 * @param unit the time unit of the time argument
1006 * @throws IllegalArgumentException if time less than zero
1007 * @see #getKeepAliveTime
1008 */
1009 public void setKeepAliveTime(long time, TimeUnit unit) {
1010 if (time < 0)
1011 throw new IllegalArgumentException();
1012 this.keepAliveTime = unit.toNanos(time);
1013 }
1014
1015 /**
1016 * Returns the thread keep-alive time, which is the amount of time
1017 * which threads in excess of the core pool size may remain
1018 * idle before being terminated.
1019 *
1020 * @param unit the desired time unit of the result
1021 * @return the time limit
1022 * @see #setKeepAliveTime
1023 */
1024 public long getKeepAliveTime(TimeUnit unit) {
1025 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1026 }
1027
1028 /* Statistics */
1029
1030 /**
1031 * Returns the current number of threads in the pool.
1032 *
1033 * @return the number of threads
1034 */
1035 public int getPoolSize() {
1036 return poolSize;
1037 }
1038
1039 /**
1040 * Returns the approximate number of threads that are actively
1041 * executing tasks.
1042 *
1043 * @return the number of threads
1044 */
1045 public int getActiveCount() {
1046 mainLock.lock();
1047 try {
1048 int n = 0;
1049 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1050 if (it.next().isActive())
1051 ++n;
1052 }
1053 return n;
1054 } finally {
1055 mainLock.unlock();
1056 }
1057 }
1058
1059 /**
1060 * Returns the largest number of threads that have ever
1061 * simultaneously been in the pool.
1062 *
1063 * @return the number of threads
1064 */
1065 public int getLargestPoolSize() {
1066 mainLock.lock();
1067 try {
1068 return largestPoolSize;
1069 } finally {
1070 mainLock.unlock();
1071 }
1072 }
1073
1074 /**
1075 * Returns the approximate total number of tasks that have been
1076 * scheduled for execution. Because the states of tasks and
1077 * threads may change dynamically during computation, the returned
1078 * value is only an approximation, but one that does not ever
1079 * decrease across successive calls.
1080 *
1081 * @return the number of tasks
1082 */
1083 public long getTaskCount() {
1084 mainLock.lock();
1085 try {
1086 long n = completedTaskCount;
1087 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1088 Worker w = it.next();
1089 n += w.completedTasks;
1090 if (w.isActive())
1091 ++n;
1092 }
1093 return n + workQueue.size();
1094 } finally {
1095 mainLock.unlock();
1096 }
1097 }
1098
1099 /**
1100 * Returns the approximate total number of tasks that have
1101 * completed execution. Because the states of tasks and threads
1102 * may change dynamically during computation, the returned value
1103 * is only an approximation, but one that does not ever decrease
1104 * across successive calls.
1105 *
1106 * @return the number of tasks
1107 */
1108 public long getCompletedTaskCount() {
1109 mainLock.lock();
1110 try {
1111 long n = completedTaskCount;
1112 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1113 n += it.next().completedTasks;
1114 return n;
1115 } finally {
1116 mainLock.unlock();
1117 }
1118 }
1119
1120 /**
1121 * Method invoked prior to executing the given Runnable in the
1122 * given thread. This method may be used to re-initialize
1123 * ThreadLocals, or to perform logging. Note: To properly nest
1124 * multiple overridings, subclasses should generally invoke
1125 * <tt>super.beforeExecute</tt> at the end of this method.
1126 *
1127 * @param t the thread that will run task r.
1128 * @param r the task that will be executed.
1129 */
1130 protected void beforeExecute(Thread t, Runnable r) { }
1131
1132 /**
1133 * Method invoked upon completion of execution of the given
1134 * Runnable. If non-null, the Throwable is the uncaught exception
1135 * that caused execution to terminate abruptly. Note: To properly
1136 * nest multiple overridings, subclasses should generally invoke
1137 * <tt>super.afterExecute</tt> at the beginning of this method.
1138 *
1139 * @param r the runnable that has completed.
1140 * @param t the exception that cause termination, or null if
1141 * execution completed normally.
1142 */
1143 protected void afterExecute(Runnable r, Throwable t) { }
1144
1145 /**
1146 * Method invoked when the Executor has terminated. Default
1147 * implementation does nothing. Note: To properly nest multiple
1148 * overridings, subclasses should generally invoke
1149 * <tt>super.terminated</tt> within this method.
1150 */
1151 protected void terminated() { }
1152
1153 /**
1154 * A handler for unexecutable tasks that runs these tasks directly
1155 * in the calling thread of the <tt>execute</tt> method. This is
1156 * the default <tt>RejectedExecutionHandler</tt>.
1157 */
1158 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1159
1160 /**
1161 * Constructs a <tt>CallerRunsPolicy</tt>.
1162 */
1163 public CallerRunsPolicy() { }
1164
1165 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1166 if (!e.isShutdown()) {
1167 r.run();
1168 }
1169 }
1170 }
1171
1172 /**
1173 * A handler for unexecutable tasks that throws a
1174 * <tt>RejectedExecutionException</tt>.
1175 */
1176 public static class AbortPolicy implements RejectedExecutionHandler {
1177
1178 /**
1179 * Constructs a <tt>AbortPolicy</tt>.
1180 */
1181 public AbortPolicy() { }
1182
1183 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1184 throw new RejectedExecutionException();
1185 }
1186 }
1187
1188 /**
1189 * A handler for unexecutable tasks that waits until the task can be
1190 * submitted for execution.
1191 */
1192 public static class WaitPolicy implements RejectedExecutionHandler {
1193 /**
1194 * Constructs a <tt>WaitPolicy</tt>.
1195 */
1196 public WaitPolicy() { }
1197
1198 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1199 if (!e.isShutdown()) {
1200 try {
1201 e.getQueue().put(r);
1202 } catch (InterruptedException ie) {
1203 Thread.currentThread().interrupt();
1204 throw new RejectedExecutionException(ie);
1205 }
1206 }
1207 }
1208 }
1209
1210 /**
1211 * A handler for unexecutable tasks that silently discards these tasks.
1212 */
1213 public static class DiscardPolicy implements RejectedExecutionHandler {
1214
1215 /**
1216 * Constructs <tt>DiscardPolicy</tt>.
1217 */
1218 public DiscardPolicy() { }
1219
1220 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1221 }
1222 }
1223
1224 /**
1225 * A handler for unexecutable tasks that discards the oldest
1226 * unhandled request.
1227 */
1228 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1229 /**
1230 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1231 */
1232 public DiscardOldestPolicy() { }
1233
1234 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1235 if (!e.isShutdown()) {
1236 e.getQueue().poll();
1237 e.execute(r);
1238 }
1239 }
1240 }
1241 }