ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.20
Committed: Mon Sep 1 14:27:11 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.19: +10 -11 lines
Log Message:
Fix html

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.17 * An {@link ExecutorService} that executes each submitted task using
13     * one of possibly several pooled threads.
14 tim 1.1 *
15 dl 1.17 * <p>Thread pools address two different problems: they usually
16     * provide improved performance when executing large numbers of
17     * asynchronous tasks, due to reduced per-task invocation overhead,
18     * and they provide a means of bounding and managing the resources,
19     * including threads, consumed when executing a collection of tasks.
20 dl 1.20 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
21     * statistics, such as the number of completed tasks, that may be
22     * useful for monitoring and tuning.
23 dl 1.17 *
24 tim 1.1 * <p>To be useful across a wide range of contexts, this class
25 dl 1.17 * provides many adjustable parameters and extensibility hooks. For
26     * example, it can be configured to create a new thread for each task,
27     * or even to execute tasks sequentially in a single thread, in
28     * addition to its most common configuration, which reuses a pool of
29     * threads. However, programmers are urged to use the more convenient
30 dl 1.20 * {@link Executors} factory methods {@link
31     * Executors#newCachedThreadPool} (unbounded thread pool, with
32     * automatic thread reclamation), {@link Executors#newFixedThreadPool}
33     * (fixed size thread pool) and {@link
34     * Executors#newSingleThreadExecutor} (single background thread), that
35     * preconfigure settings for the most common usage scenarios.
36 dl 1.17 *
37 tim 1.1 *
38     * <h3>Tuning guide</h3>
39     * <dl>
40 dl 1.2 *
41     * <dt>Core and maximum pool size</dt>
42     *
43 dl 1.19 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
44     * pool size according to the bounds set by corePoolSize and
45     * maximumPoolSize. When a new task is submitted, and fewer than
46     * corePoolSize threads are running, a new thread is created to handle
47     * the request, even if other worker threads are idle. If there are
48     * more than the corePoolSize but less than maximumPoolSize threads
49     * running, a new thread will be created only if the queue is full.
50     * By setting corePoolSize and maximumPoolSize the same, you create a
51     * fixed-size thread pool. By default, even core threads are only
52     * created and started when needed by new tasks, but this can be
53     * overridden dynamically using method <tt>prestartCoreThread</tt>.
54 dl 1.17 * </dd>
55 dl 1.2 *
56 tim 1.10 * <dt>Keep-alive</dt>
57 dl 1.2 *
58     * <dd>The keepAliveTime determines what happens to idle threads. If
59     * the pool currently has more than the core number of threads, excess
60     * threads will be terminated if they have been idle for more than the
61     * keepAliveTime.</dd>
62     *
63 tim 1.10 * <dt>Queueing</dt>
64     *
65 dl 1.19 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
66     * submitted tasks. A good default is a {@link SynchronousQueue} that
67     * hands off tasks to threads without otherwise holding them. This
68     * policy avoids lockups when handling sets of requests that might
69     * have internal dependencies. Using an unbounded queue (for example
70     * a {@link LinkedBlockingQueue}) will cause new tasks to be queued in
71     * cases where all corePoolSize threads are busy, so no more than
72 dl 1.20 * corePoolSize threads will be created. This may be appropriate when
73 dl 1.19 * each task is completely independent of others, so tasks cannot
74     * affect each others execution; for example, in a web page server.
75     * When given a choice, a <tt>ThreadPoolExecutor</tt> always prefers
76     * adding a new thread rather than queueing if there are currently
77     * fewer than the current getCorePoolSize threads running, but
78     * otherwise always prefers queuing a request rather than adding a new
79     * thread.
80 tim 1.1 *
81     * <p>While queuing can be useful in smoothing out transient bursts of
82     * requests, especially in socket-based services, it is not very well
83     * behaved when commands continue to arrive on average faster than
84 dl 1.17 * they can be processed. Queue sizes and maximum pool sizes can
85     * often be traded off for each other. Using large queues and small
86     * pools minimizes CPU usage, OS resources, and context-switching
87     * overhead, but can lead to artifically low throughput. If tasks
88     * frequently block (for example if they are I/O bound), a system may
89     * be able to schedule time for more threads than you otherwise
90     * allow. Use of small queues or queueless handoffs generally requires
91     * larger pool sizes, which keeps CPUs busier but may encounter
92     * unacceptable scheduling overhead, which also decreases throughput.
93 tim 1.1 * </dd>
94 dl 1.2 *
95 tim 1.1 * <dt>Creating new threads</dt>
96 dl 1.2 *
97 dl 1.19 * <dd>New threads are created using a {@link ThreadFactory}. By
98     * default, threads are created simply with the <tt>new
99     * Thread(Runnable)</tt> constructor, but by supplying a different
100     * ThreadFactory, you can alter the thread's name, thread group,
101     * priority, daemon status, etc. </dd>
102 dl 1.2 *
103 tim 1.1 * <dt>Before and after intercepts</dt>
104 dl 1.2 *
105 dl 1.19 * <dd>This class has overridable methods that are called before and
106     * after execution of each task. These can be used to manipulate the
107     * execution environment, for example, reinitializing ThreadLocals,
108     * gathering statistics, or adding log entries. </dd>
109 dl 1.2 *
110 dl 1.17 * <dt>Rejected tasks</dt>
111 dl 1.2 *
112     * <dd>There are a number of factors which can bound the number of
113     * tasks which can execute at once, including the maximum pool size
114     * and the queuing mechanism used. If the executor determines that a
115     * task cannot be executed because it has been refused by the queue
116     * and no threads are available, or because the executor has been shut
117 dl 1.19 * down, the {@link RejectedExecutionHandler}
118     * <tt>rejectedExecution</tt> method is invoked. The default
119     * (<tt>AbortPolicy</tt>) handler throws a runtime {@link
120     * RejectedExecutionException} upon rejection. </dd>
121 dl 1.2 *
122 tim 1.1 * </dl>
123     *
124     * @since 1.5
125 dl 1.8 * @author Doug Lea
126 tim 1.1 */
127 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
128     /**
129     * Queue used for holding tasks and handing off to worker threads.
130 tim 1.10 */
131 dl 1.2 private final BlockingQueue<Runnable> workQueue;
132    
133     /**
134     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
135     * workers set.
136 tim 1.10 */
137 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
138    
139     /**
140     * Wait condition to support awaitTermination
141 tim 1.10 */
142 dl 1.2 private final Condition termination = mainLock.newCondition();
143    
144     /**
145     * Set containing all worker threads in pool.
146 tim 1.10 */
147 dl 1.17 private final HashSet<Worker> workers = new HashSet<Worker>();
148 dl 1.2
149     /**
150     * Timeout in nanosecods for idle threads waiting for work.
151     * Threads use this timeout only when there are more than
152     * corePoolSize present. Otherwise they wait forever for new work.
153 tim 1.10 */
154 dl 1.2 private volatile long keepAliveTime;
155    
156     /**
157     * Core pool size, updated only while holding mainLock,
158     * but volatile to allow concurrent readability even
159     * during updates.
160 tim 1.10 */
161 dl 1.2 private volatile int corePoolSize;
162    
163     /**
164     * Maximum pool size, updated only while holding mainLock
165     * but volatile to allow concurrent readability even
166     * during updates.
167 tim 1.10 */
168 dl 1.2 private volatile int maximumPoolSize;
169    
170     /**
171     * Current pool size, updated only while holding mainLock
172     * but volatile to allow concurrent readability even
173     * during updates.
174 tim 1.10 */
175 dl 1.2 private volatile int poolSize;
176    
177     /**
178 dl 1.16 * Lifecycle state
179 tim 1.10 */
180 dl 1.16 private volatile int runState;
181 dl 1.2
182 dl 1.16 // Special values for runState
183 dl 1.8 /** Normal, not-shutdown mode */
184 dl 1.16 private static final int RUNNING = 0;
185 dl 1.8 /** Controlled shutdown mode */
186 dl 1.16 private static final int SHUTDOWN = 1;
187     /** Immediate shutdown mode */
188     private static final int STOP = 2;
189     /** Final state */
190     private static final int TERMINATED = 3;
191 dl 1.2
192     /**
193     * Handler called when saturated or shutdown in execute.
194 tim 1.10 */
195 dl 1.2 private volatile RejectedExecutionHandler handler = defaultHandler;
196    
197     /**
198     * Factory for new threads.
199 tim 1.10 */
200 dl 1.2 private volatile ThreadFactory threadFactory = defaultThreadFactory;
201    
202     /**
203     * Tracks largest attained pool size.
204 tim 1.10 */
205 dl 1.2 private int largestPoolSize;
206    
207     /**
208     * Counter for completed tasks. Updated only on termination of
209     * worker threads.
210 tim 1.10 */
211 dl 1.2 private long completedTaskCount;
212    
213 dl 1.8 /**
214 dl 1.16 * The default thread factory
215 dl 1.8 */
216 tim 1.10 private static final ThreadFactory defaultThreadFactory =
217 dl 1.2 new ThreadFactory() {
218     public Thread newThread(Runnable r) {
219     return new Thread(r);
220     }
221     };
222    
223 dl 1.8 /**
224     * The default rejectect execution handler
225     */
226 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
227 dl 1.2 new AbortPolicy();
228    
229     /**
230 dl 1.17 * Invoke the rejected execution handler for the given command.
231 dl 1.13 */
232     void reject(Runnable command) {
233     handler.rejectedExecution(command, this);
234     }
235    
236     /**
237 dl 1.2 * Create and return a new thread running firstTask as its first
238     * task. Call only while holding mainLock
239 dl 1.8 * @param firstTask the task the new thread should run first (or
240     * null if none)
241     * @return the new thread
242 dl 1.2 */
243     private Thread addThread(Runnable firstTask) {
244     Worker w = new Worker(firstTask);
245     Thread t = threadFactory.newThread(w);
246     w.thread = t;
247     workers.add(w);
248     int nt = ++poolSize;
249     if (nt > largestPoolSize)
250     largestPoolSize = nt;
251     return t;
252     }
253    
254 dl 1.16
255 dl 1.15
256 dl 1.2 /**
257     * Create and start a new thread running firstTask as its first
258     * task, only if less than corePoolSize threads are running.
259 dl 1.8 * @param firstTask the task the new thread should run first (or
260     * null if none)
261 dl 1.2 * @return true if successful.
262     */
263 dl 1.16 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
264 dl 1.2 Thread t = null;
265     mainLock.lock();
266     try {
267 tim 1.10 if (poolSize < corePoolSize)
268 dl 1.8 t = addThread(firstTask);
269 tim 1.14 } finally {
270 dl 1.2 mainLock.unlock();
271     }
272     if (t == null)
273     return false;
274     t.start();
275     return true;
276     }
277    
278     /**
279     * Create and start a new thread only if less than maximumPoolSize
280     * threads are running. The new thread runs as its first task the
281     * next task in queue, or if there is none, the given task.
282 dl 1.8 * @param firstTask the task the new thread should run first (or
283     * null if none)
284 dl 1.2 * @return null on failure, else the first task to be run by new thread.
285     */
286 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
287 dl 1.2 Thread t = null;
288     Runnable next = null;
289     mainLock.lock();
290     try {
291     if (poolSize < maximumPoolSize) {
292     next = workQueue.poll();
293     if (next == null)
294 dl 1.8 next = firstTask;
295 dl 1.2 t = addThread(next);
296     }
297 tim 1.14 } finally {
298 dl 1.2 mainLock.unlock();
299     }
300     if (t == null)
301     return null;
302     t.start();
303     return next;
304     }
305    
306    
307     /**
308     * Get the next task for a worker thread to run.
309 dl 1.8 * @return the task
310     * @throws InterruptedException if interrupted while waiting for task
311 dl 1.2 */
312     private Runnable getTask() throws InterruptedException {
313     for (;;) {
314 dl 1.16 switch(runState) {
315     case RUNNING: {
316     if (poolSize <= corePoolSize) // untimed wait if core
317     return workQueue.take();
318    
319     long timeout = keepAliveTime;
320     if (timeout <= 0) // die immediately for 0 timeout
321     return null;
322     Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
323     if (r != null)
324     return r;
325     if (poolSize > corePoolSize) // timed out
326     return null;
327     // else, after timeout, pool shrank so shouldn't die, so retry
328     break;
329     }
330    
331     case SHUTDOWN: {
332     // Help drain queue
333     Runnable r = workQueue.poll();
334     if (r != null)
335     return r;
336    
337     // Check if can terminate
338     if (workQueue.isEmpty()) {
339     interruptIdleWorkers();
340     return null;
341     }
342    
343     // There could still be delayed tasks in queue.
344     // Wait for one, re-checking state upon interruption
345     try {
346     return workQueue.take();
347     }
348     catch(InterruptedException ignore) {
349     }
350     break;
351     }
352    
353     case STOP:
354 dl 1.2 return null;
355 dl 1.16 default:
356     assert false;
357     }
358     }
359     }
360    
361     /**
362     * Wake up all threads that might be waiting for tasks.
363     */
364     void interruptIdleWorkers() {
365     mainLock.lock();
366     try {
367     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
368     it.next().interruptIfIdle();
369     } finally {
370     mainLock.unlock();
371 dl 1.2 }
372     }
373    
374     /**
375     * Perform bookkeeping for a terminated worker thread.
376 tim 1.10 * @param w the worker
377 dl 1.2 */
378     private void workerDone(Worker w) {
379     mainLock.lock();
380     try {
381     completedTaskCount += w.completedTasks;
382     workers.remove(w);
383 tim 1.10 if (--poolSize > 0)
384 dl 1.2 return;
385    
386 dl 1.16 // Else, this is the last thread. Deal with potential shutdown.
387    
388     int state = runState;
389     assert state != TERMINATED;
390 tim 1.10
391 dl 1.16 if (state != STOP) {
392     // If there are queued tasks but no threads, create
393     // replacement.
394 dl 1.2 Runnable r = workQueue.poll();
395     if (r != null) {
396     addThread(r).start();
397     return;
398     }
399 dl 1.16
400     // If there are some (presumably delayed) tasks but
401     // none pollable, create an idle replacement to wait.
402     if (!workQueue.isEmpty()) {
403     addThread(null).start();
404     return;
405     }
406    
407     // Otherwise, we can exit without replacement
408     if (state == RUNNING)
409     return;
410 dl 1.2 }
411    
412 dl 1.16 // Either state is STOP, or state is SHUTDOWN and there is
413     // no work to do. So we can terminate.
414     runState = TERMINATED;
415 dl 1.2 termination.signalAll();
416 dl 1.16 // fall through to call terminate() outside of lock.
417 tim 1.14 } finally {
418 dl 1.2 mainLock.unlock();
419     }
420    
421 dl 1.16 assert runState == TERMINATED;
422     terminated();
423 dl 1.2 }
424    
425     /**
426 tim 1.10 * Worker threads
427 dl 1.2 */
428     private class Worker implements Runnable {
429    
430     /**
431     * The runLock is acquired and released surrounding each task
432     * execution. It mainly protects against interrupts that are
433     * intended to cancel the worker thread from instead
434     * interrupting the task being run.
435     */
436     private final ReentrantLock runLock = new ReentrantLock();
437    
438     /**
439     * Initial task to run before entering run loop
440     */
441     private Runnable firstTask;
442    
443     /**
444     * Per thread completed task counter; accumulated
445     * into completedTaskCount upon termination.
446     */
447     volatile long completedTasks;
448    
449     /**
450     * Thread this worker is running in. Acts as a final field,
451     * but cannot be set until thread is created.
452     */
453     Thread thread;
454    
455     Worker(Runnable firstTask) {
456     this.firstTask = firstTask;
457     }
458    
459     boolean isActive() {
460     return runLock.isLocked();
461     }
462    
463     /**
464     * Interrupt thread if not running a task
465 tim 1.10 */
466 dl 1.2 void interruptIfIdle() {
467     if (runLock.tryLock()) {
468     try {
469     thread.interrupt();
470 tim 1.14 } finally {
471 dl 1.2 runLock.unlock();
472     }
473     }
474     }
475    
476     /**
477     * Cause thread to die even if running a task.
478 tim 1.10 */
479 dl 1.2 void interruptNow() {
480     thread.interrupt();
481     }
482    
483     /**
484     * Run a single task between before/after methods.
485     */
486     private void runTask(Runnable task) {
487     runLock.lock();
488     try {
489     // Abort now if immediate cancel. Otherwise, we have
490     // committed to run this task.
491 dl 1.16 if (runState == STOP)
492 dl 1.2 return;
493    
494     Thread.interrupted(); // clear interrupt status on entry
495     boolean ran = false;
496     beforeExecute(thread, task);
497     try {
498     task.run();
499     ran = true;
500     afterExecute(task, null);
501     ++completedTasks;
502 tim 1.14 } catch(RuntimeException ex) {
503 dl 1.2 if (!ran)
504     afterExecute(task, ex);
505 dl 1.17 // Else the exception occurred within
506 dl 1.2 // afterExecute itself in which case we don't
507     // want to call it again.
508     throw ex;
509     }
510 tim 1.14 } finally {
511 dl 1.2 runLock.unlock();
512     }
513     }
514    
515     /**
516     * Main run loop
517     */
518     public void run() {
519     try {
520     for (;;) {
521     Runnable task;
522     if (firstTask != null) {
523     task = firstTask;
524     firstTask = null;
525 tim 1.14 } else {
526 dl 1.2 task = getTask();
527     if (task == null)
528     break;
529     }
530     runTask(task);
531     task = null; // unnecessary but can help GC
532     }
533 tim 1.14 } catch(InterruptedException ie) {
534 dl 1.2 // fall through
535 tim 1.14 } finally {
536 dl 1.2 workerDone(this);
537     }
538     }
539     }
540 tim 1.1
541 dl 1.17 // Public methods
542    
543 tim 1.1 /**
544 dl 1.17 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
545     * initial parameters. It may be more convenient to use one of
546 dholmes 1.18 * the {@link Executors} factory methods instead of this general
547 dl 1.17 * purpose constructor.
548 tim 1.1 *
549 dl 1.2 * @param corePoolSize the number of threads to keep in the
550 tim 1.1 * pool, even if they are idle.
551 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
552 tim 1.1 * pool.
553     * @param keepAliveTime when the number of threads is greater than
554 dl 1.2 * the core, this is the maximum time that excess idle threads
555 tim 1.1 * will wait for new tasks before terminating.
556 dl 1.2 * @param unit the time unit for the keepAliveTime
557 tim 1.1 * argument.
558     * @param workQueue the queue to use for holding tasks before the
559     * are executed. This queue will hold only the <tt>Runnable</tt>
560     * tasks submitted by the <tt>execute</tt> method.
561 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
562     * keepAliveTime less than zero, or if maximumPoolSize less than or
563     * equal to zero, or if corePoolSize greater than maximumPoolSize.
564 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
565     */
566 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
567     int maximumPoolSize,
568 tim 1.1 long keepAliveTime,
569 dl 1.2 TimeUnit unit,
570     BlockingQueue<Runnable> workQueue) {
571 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
572 dl 1.2 defaultThreadFactory, defaultHandler);
573     }
574 tim 1.1
575 dl 1.2 /**
576     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
577     * parameters.
578     *
579     * @param corePoolSize the number of threads to keep in the
580     * pool, even if they are idle.
581     * @param maximumPoolSize the maximum number of threads to allow in the
582     * pool.
583     * @param keepAliveTime when the number of threads is greater than
584     * the core, this is the maximum time that excess idle threads
585     * will wait for new tasks before terminating.
586     * @param unit the time unit for the keepAliveTime
587     * argument.
588     * @param workQueue the queue to use for holding tasks before the
589     * are executed. This queue will hold only the <tt>Runnable</tt>
590     * tasks submitted by the <tt>execute</tt> method.
591     * @param threadFactory the factory to use when the executor
592 tim 1.10 * creates a new thread.
593 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
594     * keepAliveTime less than zero, or if maximumPoolSize less than or
595     * equal to zero, or if corePoolSize greater than maximumPoolSize.
596 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
597 dl 1.2 * or <tt>threadFactory</tt> are null.
598     */
599     public ThreadPoolExecutor(int corePoolSize,
600     int maximumPoolSize,
601     long keepAliveTime,
602     TimeUnit unit,
603     BlockingQueue<Runnable> workQueue,
604     ThreadFactory threadFactory) {
605 tim 1.1
606 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
607 dl 1.2 threadFactory, defaultHandler);
608     }
609 tim 1.1
610 dl 1.2 /**
611     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
612     * parameters.
613     *
614     * @param corePoolSize the number of threads to keep in the
615     * pool, even if they are idle.
616     * @param maximumPoolSize the maximum number of threads to allow in the
617     * pool.
618     * @param keepAliveTime when the number of threads is greater than
619     * the core, this is the maximum time that excess idle threads
620     * will wait for new tasks before terminating.
621     * @param unit the time unit for the keepAliveTime
622     * argument.
623     * @param workQueue the queue to use for holding tasks before the
624     * are executed. This queue will hold only the <tt>Runnable</tt>
625     * tasks submitted by the <tt>execute</tt> method.
626     * @param handler the handler to use when execution is blocked
627     * because the thread bounds and queue capacities are reached.
628     * @throws IllegalArgumentException if corePoolSize, or
629     * keepAliveTime less than zero, or if maximumPoolSize less than or
630     * equal to zero, or if corePoolSize greater than maximumPoolSize.
631 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
632 dl 1.2 * or <tt>handler</tt> are null.
633     */
634     public ThreadPoolExecutor(int corePoolSize,
635     int maximumPoolSize,
636     long keepAliveTime,
637     TimeUnit unit,
638     BlockingQueue<Runnable> workQueue,
639     RejectedExecutionHandler handler) {
640 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
641 dl 1.2 defaultThreadFactory, handler);
642     }
643 tim 1.1
644 dl 1.2 /**
645     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
646     * parameters.
647     *
648     * @param corePoolSize the number of threads to keep in the
649     * pool, even if they are idle.
650     * @param maximumPoolSize the maximum number of threads to allow in the
651     * pool.
652     * @param keepAliveTime when the number of threads is greater than
653     * the core, this is the maximum time that excess idle threads
654     * will wait for new tasks before terminating.
655     * @param unit the time unit for the keepAliveTime
656     * argument.
657     * @param workQueue the queue to use for holding tasks before the
658     * are executed. This queue will hold only the <tt>Runnable</tt>
659     * tasks submitted by the <tt>execute</tt> method.
660     * @param threadFactory the factory to use when the executor
661 tim 1.10 * creates a new thread.
662 dl 1.2 * @param handler the handler to use when execution is blocked
663     * because the thread bounds and queue capacities are reached.
664     * @throws IllegalArgumentException if corePoolSize, or
665     * keepAliveTime less than zero, or if maximumPoolSize less than or
666     * equal to zero, or if corePoolSize greater than maximumPoolSize.
667 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
668 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
669     */
670     public ThreadPoolExecutor(int corePoolSize,
671     int maximumPoolSize,
672     long keepAliveTime,
673     TimeUnit unit,
674     BlockingQueue<Runnable> workQueue,
675     ThreadFactory threadFactory,
676     RejectedExecutionHandler handler) {
677 tim 1.10 if (corePoolSize < 0 ||
678 dl 1.2 maximumPoolSize <= 0 ||
679 tim 1.10 maximumPoolSize < corePoolSize ||
680 dl 1.2 keepAliveTime < 0)
681     throw new IllegalArgumentException();
682     if (workQueue == null || threadFactory == null || handler == null)
683     throw new NullPointerException();
684     this.corePoolSize = corePoolSize;
685     this.maximumPoolSize = maximumPoolSize;
686     this.workQueue = workQueue;
687     this.keepAliveTime = unit.toNanos(keepAliveTime);
688     this.threadFactory = threadFactory;
689     this.handler = handler;
690 tim 1.1 }
691    
692 dl 1.2
693     /**
694     * Executes the given task sometime in the future. The task
695     * may execute in a new thread or in an existing pooled thread.
696     *
697     * If the task cannot be submitted for execution, either because this
698     * executor has been shutdown or because its capacity has been reached,
699 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
700 dl 1.2 *
701     * @param command the task to execute
702     * @throws RejectedExecutionException at discretion of
703 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
704     * for execution
705 dl 1.2 */
706 tim 1.10 public void execute(Runnable command) {
707 dl 1.2 for (;;) {
708 dl 1.16 if (runState != RUNNING) {
709 dl 1.13 reject(command);
710 dl 1.2 return;
711     }
712     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
713     return;
714     if (workQueue.offer(command))
715     return;
716     Runnable r = addIfUnderMaximumPoolSize(command);
717     if (r == command)
718     return;
719     if (r == null) {
720 dl 1.13 reject(command);
721 dl 1.2 return;
722     }
723     // else retry
724     }
725 tim 1.1 }
726 dl 1.4
727 dl 1.2 public void shutdown() {
728     mainLock.lock();
729     try {
730 dl 1.16 if (runState == RUNNING) // don't override shutdownNow
731     runState = SHUTDOWN;
732 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
733     it.next().interruptIfIdle();
734 tim 1.14 } finally {
735 dl 1.2 mainLock.unlock();
736     }
737 tim 1.1 }
738    
739 dl 1.16
740 dl 1.2 public List shutdownNow() {
741     mainLock.lock();
742     try {
743 dl 1.16 if (runState != TERMINATED)
744     runState = STOP;
745 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
746     it.next().interruptNow();
747 tim 1.14 } finally {
748 dl 1.2 mainLock.unlock();
749     }
750     return Arrays.asList(workQueue.toArray());
751 tim 1.1 }
752    
753 dl 1.2 public boolean isShutdown() {
754 dl 1.16 return runState != RUNNING;
755     }
756    
757     /**
758     * Return true if this executor is in the process of terminating
759     * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
760     * completely terminated. This method may be useful for
761     * debugging. A return of <tt>true</tt> reported a sufficient
762     * period after shutdown may indicate that submitted tasks have
763     * ignored or suppressed interruption, causing this executor not
764     * to properly terminate.
765     * @return true if terminating but not yet terminated.
766     */
767     public boolean isTerminating() {
768     return runState == STOP;
769 tim 1.1 }
770    
771 dl 1.2 public boolean isTerminated() {
772 dl 1.16 return runState == TERMINATED;
773 dl 1.2 }
774 tim 1.1
775 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
776     throws InterruptedException {
777     mainLock.lock();
778     try {
779     return termination.await(timeout, unit);
780 tim 1.14 } finally {
781 dl 1.2 mainLock.unlock();
782     }
783 dl 1.15 }
784    
785     /**
786     * Invokes <tt>shutdown</tt> when this executor is no longer
787     * referenced.
788     */
789     protected void finalize() {
790     shutdown();
791 dl 1.2 }
792 tim 1.10
793 dl 1.2 /**
794     * Sets the thread factory used to create new threads.
795     *
796     * @param threadFactory the new thread factory
797 tim 1.11 * @see #getThreadFactory
798 dl 1.2 */
799     public void setThreadFactory(ThreadFactory threadFactory) {
800     this.threadFactory = threadFactory;
801 tim 1.1 }
802    
803 dl 1.2 /**
804     * Returns the thread factory used to create new threads.
805     *
806     * @return the current thread factory
807 tim 1.11 * @see #setThreadFactory
808 dl 1.2 */
809     public ThreadFactory getThreadFactory() {
810     return threadFactory;
811 tim 1.1 }
812    
813 dl 1.2 /**
814     * Sets a new handler for unexecutable tasks.
815     *
816     * @param handler the new handler
817 tim 1.11 * @see #getRejectedExecutionHandler
818 dl 1.2 */
819     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
820     this.handler = handler;
821     }
822 tim 1.1
823 dl 1.2 /**
824     * Returns the current handler for unexecutable tasks.
825     *
826     * @return the current handler
827 tim 1.11 * @see #setRejectedExecutionHandler
828 dl 1.2 */
829     public RejectedExecutionHandler getRejectedExecutionHandler() {
830     return handler;
831 tim 1.1 }
832    
833 dl 1.2 /**
834 dl 1.17 * Returns the task queue used by this executor. Access to the
835     * task queue is intended primarily for debugging and monitoring.
836     * This queue may be in active use. Retrieveing the task queue
837 dl 1.2 * does not prevent queued tasks from executing.
838     *
839     * @return the task queue
840     */
841     public BlockingQueue<Runnable> getQueue() {
842     return workQueue;
843 tim 1.1 }
844 dl 1.4
845     /**
846     * Removes this task from internal queue if it is present, thus
847     * causing it not to be run if it has not already started. This
848     * method may be useful as one part of a cancellation scheme.
849 tim 1.10 *
850 dl 1.8 * @param task the task to remove
851     * @return true if the task was removed
852 dl 1.4 */
853 dl 1.5 public boolean remove(Runnable task) {
854 dl 1.4 return getQueue().remove(task);
855     }
856    
857 dl 1.7
858     /**
859 dl 1.16 * Tries to remove from the work queue all {@link Cancellable}
860     * tasks that have been cancelled. This method can be useful as a
861     * storage reclamation operation, that has no other impact on
862     * functionality. Cancelled tasks are never executed, but may
863     * accumulate in work queues until worker threads can actively
864     * remove them. Invoking this method instead tries to remove them now.
865     * However, this method may fail to remove all such tasks in
866     * the presence of interference by other threads.
867 dl 1.7 */
868    
869     public void purge() {
870 dl 1.16 // Fail if we encounter interference during traversal
871     try {
872     Iterator<Runnable> it = getQueue().iterator();
873     while (it.hasNext()) {
874     Runnable r = it.next();
875     if (r instanceof Cancellable) {
876     Cancellable c = (Cancellable)r;
877     if (c.isCancelled())
878     it.remove();
879     }
880 dl 1.7 }
881     }
882 dl 1.16 catch(ConcurrentModificationException ex) {
883     return;
884     }
885 dl 1.7 }
886 tim 1.1
887     /**
888 dl 1.2 * Sets the core number of threads. This overrides any value set
889     * in the constructor. If the new value is smaller than the
890     * current value, excess existing threads will be terminated when
891     * they next become idle.
892 tim 1.1 *
893 dl 1.2 * @param corePoolSize the new core size
894 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
895 dl 1.8 * less than zero
896 tim 1.11 * @see #getCorePoolSize
897 tim 1.1 */
898 dl 1.2 public void setCorePoolSize(int corePoolSize) {
899     if (corePoolSize < 0)
900     throw new IllegalArgumentException();
901     mainLock.lock();
902     try {
903     int extra = this.corePoolSize - corePoolSize;
904     this.corePoolSize = corePoolSize;
905     if (extra > 0 && poolSize > corePoolSize) {
906     Iterator<Worker> it = workers.iterator();
907 tim 1.10 while (it.hasNext() &&
908     extra > 0 &&
909 dl 1.2 poolSize > corePoolSize &&
910     workQueue.remainingCapacity() == 0) {
911     it.next().interruptIfIdle();
912     --extra;
913     }
914     }
915 tim 1.10
916 tim 1.14 } finally {
917 dl 1.2 mainLock.unlock();
918     }
919     }
920 tim 1.1
921     /**
922 dl 1.2 * Returns the core number of threads.
923 tim 1.1 *
924 dl 1.2 * @return the core number of threads
925 tim 1.11 * @see #setCorePoolSize
926 tim 1.1 */
927 tim 1.10 public int getCorePoolSize() {
928 dl 1.2 return corePoolSize;
929 dl 1.16 }
930    
931     /**
932     * Start a core thread, causing it to idly wait for work. This
933     * overrides the default policy of starting core threads only when
934     * new tasks are executed. This method will return <tt>false</tt>
935     * if all core threads have already been started.
936     * @return true if a thread was started
937     */
938     public boolean prestartCoreThread() {
939     return addIfUnderCorePoolSize(null);
940     }
941    
942     /**
943     * Start all core threads, causing them to idly wait for work. This
944     * overrides the default policy of starting core threads only when
945     * new tasks are executed.
946     * @return the number of threads started.
947     */
948     public int prestartAllCoreThreads() {
949     int n = 0;
950     while (addIfUnderCorePoolSize(null))
951     ++n;
952     return n;
953 dl 1.2 }
954 tim 1.1
955     /**
956     * Sets the maximum allowed number of threads. This overrides any
957 dl 1.2 * value set in the constructor. If the new value is smaller than
958     * the current value, excess existing threads will be
959     * terminated when they next become idle.
960 tim 1.1 *
961 dl 1.2 * @param maximumPoolSize the new maximum
962     * @throws IllegalArgumentException if maximumPoolSize less than zero or
963     * the {@link #getCorePoolSize core pool size}
964 tim 1.11 * @see #getMaximumPoolSize
965 dl 1.2 */
966     public void setMaximumPoolSize(int maximumPoolSize) {
967     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
968     throw new IllegalArgumentException();
969     mainLock.lock();
970     try {
971     int extra = this.maximumPoolSize - maximumPoolSize;
972     this.maximumPoolSize = maximumPoolSize;
973     if (extra > 0 && poolSize > maximumPoolSize) {
974     Iterator<Worker> it = workers.iterator();
975 tim 1.10 while (it.hasNext() &&
976     extra > 0 &&
977 dl 1.2 poolSize > maximumPoolSize) {
978     it.next().interruptIfIdle();
979     --extra;
980     }
981     }
982 tim 1.14 } finally {
983 dl 1.2 mainLock.unlock();
984     }
985     }
986 tim 1.1
987     /**
988     * Returns the maximum allowed number of threads.
989     *
990 dl 1.2 * @return the maximum allowed number of threads
991 tim 1.11 * @see #setMaximumPoolSize
992 tim 1.1 */
993 tim 1.10 public int getMaximumPoolSize() {
994 dl 1.2 return maximumPoolSize;
995     }
996 tim 1.1
997     /**
998     * Sets the time limit for which threads may remain idle before
999 dl 1.2 * being terminated. If there are more than the core number of
1000 tim 1.1 * threads currently in the pool, after waiting this amount of
1001     * time without processing a task, excess threads will be
1002     * terminated. This overrides any value set in the constructor.
1003     * @param time the time to wait. A time value of zero will cause
1004     * excess threads to terminate immediately after executing tasks.
1005 dl 1.2 * @param unit the time unit of the time argument
1006 dl 1.17 * @throws IllegalArgumentException if time less than zero
1007 tim 1.11 * @see #getKeepAliveTime
1008 tim 1.1 */
1009 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
1010     if (time < 0)
1011     throw new IllegalArgumentException();
1012     this.keepAliveTime = unit.toNanos(time);
1013     }
1014 tim 1.1
1015     /**
1016     * Returns the thread keep-alive time, which is the amount of time
1017 dl 1.2 * which threads in excess of the core pool size may remain
1018 tim 1.10 * idle before being terminated.
1019 tim 1.1 *
1020 dl 1.2 * @param unit the desired time unit of the result
1021 tim 1.1 * @return the time limit
1022 tim 1.11 * @see #setKeepAliveTime
1023 tim 1.1 */
1024 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
1025 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1026     }
1027 tim 1.1
1028     /* Statistics */
1029    
1030     /**
1031     * Returns the current number of threads in the pool.
1032     *
1033     * @return the number of threads
1034     */
1035 tim 1.10 public int getPoolSize() {
1036 dl 1.2 return poolSize;
1037     }
1038 tim 1.1
1039     /**
1040 dl 1.2 * Returns the approximate number of threads that are actively
1041 tim 1.1 * executing tasks.
1042     *
1043     * @return the number of threads
1044     */
1045 tim 1.10 public int getActiveCount() {
1046 dl 1.2 mainLock.lock();
1047     try {
1048     int n = 0;
1049     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1050     if (it.next().isActive())
1051     ++n;
1052     }
1053     return n;
1054 tim 1.14 } finally {
1055 dl 1.2 mainLock.unlock();
1056     }
1057     }
1058 tim 1.1
1059     /**
1060 dl 1.2 * Returns the largest number of threads that have ever
1061     * simultaneously been in the pool.
1062 tim 1.1 *
1063     * @return the number of threads
1064     */
1065 tim 1.10 public int getLargestPoolSize() {
1066 dl 1.2 mainLock.lock();
1067     try {
1068     return largestPoolSize;
1069 tim 1.14 } finally {
1070 dl 1.2 mainLock.unlock();
1071     }
1072     }
1073 tim 1.1
1074     /**
1075 dl 1.2 * Returns the approximate total number of tasks that have been
1076     * scheduled for execution. Because the states of tasks and
1077     * threads may change dynamically during computation, the returned
1078 dl 1.17 * value is only an approximation, but one that does not ever
1079     * decrease across successive calls.
1080 tim 1.1 *
1081     * @return the number of tasks
1082     */
1083 tim 1.10 public long getTaskCount() {
1084 dl 1.2 mainLock.lock();
1085     try {
1086     long n = completedTaskCount;
1087     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1088     Worker w = it.next();
1089     n += w.completedTasks;
1090     if (w.isActive())
1091     ++n;
1092     }
1093     return n + workQueue.size();
1094 tim 1.14 } finally {
1095 dl 1.2 mainLock.unlock();
1096     }
1097     }
1098 tim 1.1
1099     /**
1100 dl 1.2 * Returns the approximate total number of tasks that have
1101     * completed execution. Because the states of tasks and threads
1102     * may change dynamically during computation, the returned value
1103 dl 1.17 * is only an approximation, but one that does not ever decrease
1104     * across successive calls.
1105 tim 1.1 *
1106     * @return the number of tasks
1107     */
1108 tim 1.10 public long getCompletedTaskCount() {
1109 dl 1.2 mainLock.lock();
1110     try {
1111     long n = completedTaskCount;
1112 tim 1.10 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1113 dl 1.2 n += it.next().completedTasks;
1114     return n;
1115 tim 1.14 } finally {
1116 dl 1.2 mainLock.unlock();
1117     }
1118     }
1119 tim 1.1
1120     /**
1121 dl 1.17 * Method invoked prior to executing the given Runnable in the
1122     * given thread. This method may be used to re-initialize
1123     * ThreadLocals, or to perform logging. Note: To properly nest
1124     * multiple overridings, subclasses should generally invoke
1125 dl 1.5 * <tt>super.beforeExecute</tt> at the end of this method.
1126 tim 1.1 *
1127 dl 1.2 * @param t the thread that will run task r.
1128     * @param r the task that will be executed.
1129 tim 1.1 */
1130 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1131 tim 1.1
1132     /**
1133 dl 1.2 * Method invoked upon completion of execution of the given
1134     * Runnable. If non-null, the Throwable is the uncaught exception
1135 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1136     * nest multiple overridings, subclasses should generally invoke
1137     * <tt>super.afterExecute</tt> at the beginning of this method.
1138 tim 1.1 *
1139 dl 1.2 * @param r the runnable that has completed.
1140     * @param t the exception that cause termination, or null if
1141     * execution completed normally.
1142 tim 1.1 */
1143 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1144 tim 1.1
1145 dl 1.2 /**
1146     * Method invoked when the Executor has terminated. Default
1147 dl 1.17 * implementation does nothing. Note: To properly nest multiple
1148     * overridings, subclasses should generally invoke
1149     * <tt>super.terminated</tt> within this method.
1150 dl 1.2 */
1151     protected void terminated() { }
1152 tim 1.1
1153     /**
1154 dl 1.17 * A handler for unexecutable tasks that runs these tasks directly
1155     * in the calling thread of the <tt>execute</tt> method. This is
1156     * the default <tt>RejectedExecutionHandler</tt>.
1157 tim 1.1 */
1158 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1159 tim 1.1
1160     /**
1161     * Constructs a <tt>CallerRunsPolicy</tt>.
1162     */
1163     public CallerRunsPolicy() { }
1164    
1165 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1166     if (!e.isShutdown()) {
1167 tim 1.1 r.run();
1168     }
1169     }
1170     }
1171    
1172     /**
1173 dl 1.8 * A handler for unexecutable tasks that throws a
1174     * <tt>RejectedExecutionException</tt>.
1175 tim 1.1 */
1176 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1177 tim 1.1
1178     /**
1179     * Constructs a <tt>AbortPolicy</tt>.
1180     */
1181     public AbortPolicy() { }
1182    
1183 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1184     throw new RejectedExecutionException();
1185 tim 1.1 }
1186     }
1187    
1188     /**
1189     * A handler for unexecutable tasks that waits until the task can be
1190     * submitted for execution.
1191     */
1192 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1193 tim 1.1 /**
1194     * Constructs a <tt>WaitPolicy</tt>.
1195     */
1196     public WaitPolicy() { }
1197    
1198 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1199     if (!e.isShutdown()) {
1200     try {
1201     e.getQueue().put(r);
1202 tim 1.14 } catch (InterruptedException ie) {
1203 dl 1.2 Thread.currentThread().interrupt();
1204     throw new RejectedExecutionException(ie);
1205     }
1206 tim 1.1 }
1207     }
1208     }
1209    
1210     /**
1211     * A handler for unexecutable tasks that silently discards these tasks.
1212     */
1213 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1214 tim 1.1
1215     /**
1216     * Constructs <tt>DiscardPolicy</tt>.
1217     */
1218     public DiscardPolicy() { }
1219    
1220 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1221 tim 1.1 }
1222     }
1223    
1224     /**
1225 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1226     * unhandled request.
1227 tim 1.1 */
1228 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1229 tim 1.1 /**
1230 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1231 tim 1.1 */
1232     public DiscardOldestPolicy() { }
1233    
1234 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1235     if (!e.isShutdown()) {
1236     e.getQueue().poll();
1237     e.execute(r);
1238 tim 1.1 }
1239     }
1240     }
1241     }