ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.15
Committed: Sat Aug 9 19:55:30 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.14: +12 -3 lines
Log Message:
Add finalize

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.2 * An {@link ExecutorService} that executes each submitted task on one
13 tim 1.1 * of several pooled threads.
14     *
15     * <p>Thread pools address two different problems at the same time:
16     * they usually provide improved performance when executing large
17     * numbers of asynchronous tasks, due to reduced per-task invocation
18     * overhead, and they provide a means of bounding and managing the
19     * resources, including threads, consumed in executing a collection of
20     * tasks.
21     *
22     * <p>This class is very configurable and can be configured to create
23     * a new thread for each task, or even to execute tasks sequentially
24     * in a single thread, in addition to its most common configuration,
25     * which reuses a pool of threads.
26     *
27     * <p>To be useful across a wide range of contexts, this class
28     * provides many adjustable parameters and extensibility hooks.
29     * However, programmers are urged to use the more convenient factory
30     * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31     * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 dl 1.2 * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33 tim 1.1 * background thread for execution of tasks), and
34     * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35     * thread), that preconfigure settings for the most common usage
36     * scenarios.
37     *
38     * <p>This class also maintain some basic statistics, such as the
39 dl 1.2 * number of completed tasks, that may be useful for monitoring and
40     * tuning executors.
41 tim 1.1 *
42     * <h3>Tuning guide</h3>
43     * <dl>
44 dl 1.2 *
45     * <dt>Core and maximum pool size</dt>
46     *
47     * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48     * according to the bounds set by corePoolSize and maximumPoolSize.
49     * When a new task is submitted, and fewer than corePoolSize threads
50     * are running, a new thread is created to handle the request, even if
51     * other worker threads are idle. If there are more than the
52     * corePoolSize but less than maximumPoolSize threads running, a new
53     * thread will be created only if the queue is full. By setting
54     * corePoolSize and maximumPoolSize the same, you create a fixed-size
55     * thread pool.</dd>
56     *
57 tim 1.10 * <dt>Keep-alive</dt>
58 dl 1.2 *
59     * <dd>The keepAliveTime determines what happens to idle threads. If
60     * the pool currently has more than the core number of threads, excess
61     * threads will be terminated if they have been idle for more than the
62     * keepAliveTime.</dd>
63     *
64 tim 1.10 * <dt>Queueing</dt>
65     *
66 dl 1.2 * <dd>You are free to specify the queuing mechanism used to handle
67 dl 1.6 * submitted tasks. A good default is to use queueless synchronous
68     * channels to to hand off work to threads. This is a safe,
69     * conservative policy that avoids lockups when handling sets of
70     * requests that might have internal dependencies. Using an unbounded
71     * queue (for example a LinkedBlockingQueue) which will cause new
72     * tasks to be queued in cases where all corePoolSize threads are
73     * busy, so no more that corePoolSize threads will be craated. This
74     * may be appropriate when each task is completely independent of
75     * others, so tasks cannot affect each others execution. For example,
76     * in an http server. When given a choice, this pool always prefers
77     * adding a new thread rather than queueing if there are currently
78     * fewer than the current getCorePoolSize threads running, but
79     * otherwise always prefers queuing a request rather than adding a new
80     * thread.
81 tim 1.1 *
82     * <p>While queuing can be useful in smoothing out transient bursts of
83     * requests, especially in socket-based services, it is not very well
84     * behaved when commands continue to arrive on average faster than
85 tim 1.10 * they can be processed.
86 tim 1.1 *
87     * Queue sizes and maximum pool sizes can often be traded off for each
88     * other. Using large queues and small pools minimizes CPU usage, OS
89     * resources, and context-switching overhead, but can lead to
90     * artifically low throughput. If tasks frequently block (for example
91     * if they are I/O bound), a JVM and underlying OS may be able to
92     * schedule time for more threads than you otherwise allow. Use of
93     * small queues or queueless handoffs generally requires larger pool
94     * sizes, which keeps CPUs busier but may encounter unacceptable
95     * scheduling overhead, which also decreases throughput.
96     * </dd>
97 dl 1.2 *
98 tim 1.1 * <dt>Creating new threads</dt>
99 dl 1.2 *
100     * <dd>New threads are created using a ThreadFactory. By default,
101     * threads are created simply with the new Thread(Runnable)
102     * constructor, but by supplying a different ThreadFactory, you can
103     * alter the thread's name, thread group, priority, daemon status,
104     * etc. </dd>
105     *
106 tim 1.1 * <dt>Before and after intercepts</dt>
107 dl 1.2 *
108     * <dd>This class has overridable methods that which are called before
109     * and after execution of each task. These can be used to manipulate
110     * the execution environment (for example, reinitializing
111     * ThreadLocals), gather statistics, or perform logging. </dd>
112     *
113 tim 1.1 * <dt>Blocked execution</dt>
114 dl 1.2 *
115     * <dd>There are a number of factors which can bound the number of
116     * tasks which can execute at once, including the maximum pool size
117     * and the queuing mechanism used. If the executor determines that a
118     * task cannot be executed because it has been refused by the queue
119     * and no threads are available, or because the executor has been shut
120     * down, the RejectedExecutionHandler's rejectedExecution method is
121     * invoked. </dd>
122     *
123 tim 1.1 * <dt>Termination</dt>
124 dl 1.2 *
125     * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
126     * graceful. In an immediate shutdown, any threads currently
127     * executing are interrupted, and any tasks not yet begun are returned
128     * from the shutdownNow call. In a graceful shutdown, all queued
129     * tasks are allowed to run, but new tasks may not be submitted.
130 tim 1.1 * </dd>
131 dl 1.2 *
132 tim 1.1 * </dl>
133     *
134     * @since 1.5
135 dl 1.2 * @see RejectedExecutionHandler
136 tim 1.1 * @see Executors
137     * @see ThreadFactory
138     *
139     * @spec JSR-166
140 dl 1.15 * @revised $Date: 2003/08/08 20:05:07 $
141     * @editor $Author: tim $
142 dl 1.8 * @author Doug Lea
143 tim 1.1 */
144 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
145     /**
146     * Queue used for holding tasks and handing off to worker threads.
147 tim 1.10 */
148 dl 1.2 private final BlockingQueue<Runnable> workQueue;
149    
150     /**
151     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152     * workers set.
153 tim 1.10 */
154 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
155    
156     /**
157     * Wait condition to support awaitTermination
158 tim 1.10 */
159 dl 1.2 private final Condition termination = mainLock.newCondition();
160    
161     /**
162     * Set containing all worker threads in pool.
163 tim 1.10 */
164 dl 1.2 private final Set<Worker> workers = new HashSet<Worker>();
165    
166     /**
167     * Timeout in nanosecods for idle threads waiting for work.
168     * Threads use this timeout only when there are more than
169     * corePoolSize present. Otherwise they wait forever for new work.
170 tim 1.10 */
171 dl 1.2 private volatile long keepAliveTime;
172    
173     /**
174     * Core pool size, updated only while holding mainLock,
175     * but volatile to allow concurrent readability even
176     * during updates.
177 tim 1.10 */
178 dl 1.2 private volatile int corePoolSize;
179    
180     /**
181     * Maximum pool size, updated only while holding mainLock
182     * but volatile to allow concurrent readability even
183     * during updates.
184 tim 1.10 */
185 dl 1.2 private volatile int maximumPoolSize;
186    
187     /**
188     * Current pool size, updated only while holding mainLock
189     * but volatile to allow concurrent readability even
190     * during updates.
191 tim 1.10 */
192 dl 1.2 private volatile int poolSize;
193    
194     /**
195     * Shutdown status, becomes (and remains) nonzero when shutdown called.
196 tim 1.10 */
197 dl 1.2 private volatile int shutdownStatus;
198    
199     // Special values for status
200 dl 1.8 /** Normal, not-shutdown mode */
201 dl 1.2 private static final int NOT_SHUTDOWN = 0;
202 dl 1.8 /** Controlled shutdown mode */
203 dl 1.2 private static final int SHUTDOWN_WHEN_IDLE = 1;
204 dl 1.8 /*8 Immediate shutdown mode */
205 tim 1.10 private static final int SHUTDOWN_NOW = 2;
206 dl 1.2
207     /**
208     * Latch that becomes true when all threads terminate after shutdown.
209 tim 1.10 */
210 dl 1.2 private volatile boolean isTerminated;
211    
212     /**
213     * Handler called when saturated or shutdown in execute.
214 tim 1.10 */
215 dl 1.2 private volatile RejectedExecutionHandler handler = defaultHandler;
216    
217     /**
218     * Factory for new threads.
219 tim 1.10 */
220 dl 1.2 private volatile ThreadFactory threadFactory = defaultThreadFactory;
221    
222     /**
223     * Tracks largest attained pool size.
224 tim 1.10 */
225 dl 1.2 private int largestPoolSize;
226    
227     /**
228     * Counter for completed tasks. Updated only on termination of
229     * worker threads.
230 tim 1.10 */
231 dl 1.2 private long completedTaskCount;
232    
233 dl 1.8 /**
234 tim 1.10 * The default thread facotry
235 dl 1.8 */
236 tim 1.10 private static final ThreadFactory defaultThreadFactory =
237 dl 1.2 new ThreadFactory() {
238     public Thread newThread(Runnable r) {
239     return new Thread(r);
240     }
241     };
242    
243 dl 1.8 /**
244     * The default rejectect execution handler
245     */
246 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
247 dl 1.2 new AbortPolicy();
248    
249     /**
250 dl 1.13 * Invoke the rejected execution handler for the give command.
251     */
252     void reject(Runnable command) {
253     handler.rejectedExecution(command, this);
254     }
255    
256    
257     /**
258 dl 1.2 * Create and return a new thread running firstTask as its first
259     * task. Call only while holding mainLock
260 dl 1.8 * @param firstTask the task the new thread should run first (or
261     * null if none)
262     * @return the new thread
263 dl 1.2 */
264     private Thread addThread(Runnable firstTask) {
265     Worker w = new Worker(firstTask);
266     Thread t = threadFactory.newThread(w);
267     w.thread = t;
268     workers.add(w);
269     int nt = ++poolSize;
270     if (nt > largestPoolSize)
271     largestPoolSize = nt;
272     return t;
273     }
274    
275 dl 1.15 // addIfUnderCorePoolSize is non-private; accessible to ScheduledExecutor
276    
277 dl 1.2 /**
278     * Create and start a new thread running firstTask as its first
279     * task, only if less than corePoolSize threads are running.
280 dl 1.8 * @param firstTask the task the new thread should run first (or
281     * null if none)
282 dl 1.2 * @return true if successful.
283     */
284 dl 1.13 boolean addIfUnderCorePoolSize(Runnable firstTask) {
285 dl 1.2 Thread t = null;
286     mainLock.lock();
287     try {
288 tim 1.10 if (poolSize < corePoolSize)
289 dl 1.8 t = addThread(firstTask);
290 tim 1.14 } finally {
291 dl 1.2 mainLock.unlock();
292     }
293     if (t == null)
294     return false;
295     t.start();
296     return true;
297     }
298    
299     /**
300     * Create and start a new thread only if less than maximumPoolSize
301     * threads are running. The new thread runs as its first task the
302     * next task in queue, or if there is none, the given task.
303 dl 1.8 * @param firstTask the task the new thread should run first (or
304     * null if none)
305 dl 1.2 * @return null on failure, else the first task to be run by new thread.
306     */
307 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
308 dl 1.2 Thread t = null;
309     Runnable next = null;
310     mainLock.lock();
311     try {
312     if (poolSize < maximumPoolSize) {
313     next = workQueue.poll();
314     if (next == null)
315 dl 1.8 next = firstTask;
316 dl 1.2 t = addThread(next);
317     }
318 tim 1.14 } finally {
319 dl 1.2 mainLock.unlock();
320     }
321     if (t == null)
322     return null;
323     t.start();
324     return next;
325     }
326    
327    
328     /**
329     * Get the next task for a worker thread to run.
330 dl 1.8 * @return the task
331     * @throws InterruptedException if interrupted while waiting for task
332 dl 1.2 */
333     private Runnable getTask() throws InterruptedException {
334     for (;;) {
335     int stat = shutdownStatus;
336     if (stat == SHUTDOWN_NOW)
337     return null;
338     if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
339     return workQueue.poll();
340     if (poolSize <= corePoolSize) // untimed wait if core
341     return workQueue.take();
342 dl 1.12 long timeout = keepAliveTime;
343     if (timeout <= 0) // must die immediately for 0 timeout
344     return null;
345 dl 1.2 Runnable task = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
346     if (task != null)
347     return task;
348     if (poolSize > corePoolSize) // timed out
349     return null;
350     // else, after timeout, pool shrank so shouldn't die, so retry
351     }
352     }
353    
354     /**
355     * Perform bookkeeping for a terminated worker thread.
356 tim 1.10 * @param w the worker
357 dl 1.2 */
358     private void workerDone(Worker w) {
359     boolean allDone = false;
360     mainLock.lock();
361     try {
362     completedTaskCount += w.completedTasks;
363     workers.remove(w);
364    
365 tim 1.10 if (--poolSize > 0)
366 dl 1.2 return;
367    
368     // If this was last thread, deal with potential shutdown
369     int stat = shutdownStatus;
370 tim 1.10
371 dl 1.2 // If there are queued tasks but no threads, create replacement.
372     if (stat != SHUTDOWN_NOW) {
373     Runnable r = workQueue.poll();
374     if (r != null) {
375     addThread(r).start();
376     return;
377     }
378     }
379    
380     // if no tasks and not shutdown, can exit without replacement
381 tim 1.10 if (stat == NOT_SHUTDOWN)
382 dl 1.2 return;
383    
384     allDone = true;
385     isTerminated = true;
386     termination.signalAll();
387 tim 1.14 } finally {
388 dl 1.2 mainLock.unlock();
389     }
390    
391     if (allDone) // call outside lock
392     terminated();
393     }
394    
395     /**
396 tim 1.10 * Worker threads
397 dl 1.2 */
398     private class Worker implements Runnable {
399    
400     /**
401     * The runLock is acquired and released surrounding each task
402     * execution. It mainly protects against interrupts that are
403     * intended to cancel the worker thread from instead
404     * interrupting the task being run.
405     */
406     private final ReentrantLock runLock = new ReentrantLock();
407    
408     /**
409     * Initial task to run before entering run loop
410     */
411     private Runnable firstTask;
412    
413     /**
414     * Per thread completed task counter; accumulated
415     * into completedTaskCount upon termination.
416     */
417     volatile long completedTasks;
418    
419     /**
420     * Thread this worker is running in. Acts as a final field,
421     * but cannot be set until thread is created.
422     */
423     Thread thread;
424    
425     Worker(Runnable firstTask) {
426     this.firstTask = firstTask;
427     }
428    
429     boolean isActive() {
430     return runLock.isLocked();
431     }
432    
433     /**
434     * Interrupt thread if not running a task
435 tim 1.10 */
436 dl 1.2 void interruptIfIdle() {
437     if (runLock.tryLock()) {
438     try {
439     thread.interrupt();
440 tim 1.14 } finally {
441 dl 1.2 runLock.unlock();
442     }
443     }
444     }
445    
446     /**
447     * Cause thread to die even if running a task.
448 tim 1.10 */
449 dl 1.2 void interruptNow() {
450     thread.interrupt();
451     }
452    
453     /**
454     * Run a single task between before/after methods.
455     */
456     private void runTask(Runnable task) {
457     runLock.lock();
458     try {
459     // Abort now if immediate cancel. Otherwise, we have
460     // committed to run this task.
461     if (shutdownStatus == SHUTDOWN_NOW)
462     return;
463    
464     Thread.interrupted(); // clear interrupt status on entry
465     boolean ran = false;
466     beforeExecute(thread, task);
467     try {
468     task.run();
469     ran = true;
470     afterExecute(task, null);
471     ++completedTasks;
472 tim 1.14 } catch(RuntimeException ex) {
473 dl 1.2 if (!ran)
474     afterExecute(task, ex);
475     // else the exception occurred within
476     // afterExecute itself in which case we don't
477     // want to call it again.
478     throw ex;
479     }
480 tim 1.14 } finally {
481 dl 1.2 runLock.unlock();
482     }
483     }
484    
485     /**
486     * Main run loop
487     */
488     public void run() {
489     try {
490     for (;;) {
491     Runnable task;
492     if (firstTask != null) {
493     task = firstTask;
494     firstTask = null;
495 tim 1.14 } else {
496 dl 1.2 task = getTask();
497     if (task == null)
498     break;
499     }
500     runTask(task);
501     task = null; // unnecessary but can help GC
502     }
503 tim 1.14 } catch(InterruptedException ie) {
504 dl 1.2 // fall through
505 tim 1.14 } finally {
506 dl 1.2 workerDone(this);
507     }
508     }
509     }
510 tim 1.1
511     /**
512     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
513     * parameters. It may be more convenient to use one of the factory
514     * methods instead of this general purpose constructor.
515     *
516 dl 1.2 * @param corePoolSize the number of threads to keep in the
517 tim 1.1 * pool, even if they are idle.
518 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
519 tim 1.1 * pool.
520     * @param keepAliveTime when the number of threads is greater than
521 dl 1.2 * the core, this is the maximum time that excess idle threads
522 tim 1.1 * will wait for new tasks before terminating.
523 dl 1.2 * @param unit the time unit for the keepAliveTime
524 tim 1.1 * argument.
525     * @param workQueue the queue to use for holding tasks before the
526     * are executed. This queue will hold only the <tt>Runnable</tt>
527     * tasks submitted by the <tt>execute</tt> method.
528 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
529     * keepAliveTime less than zero, or if maximumPoolSize less than or
530     * equal to zero, or if corePoolSize greater than maximumPoolSize.
531 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
532     */
533 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
534     int maximumPoolSize,
535 tim 1.1 long keepAliveTime,
536 dl 1.2 TimeUnit unit,
537     BlockingQueue<Runnable> workQueue) {
538 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
539 dl 1.2 defaultThreadFactory, defaultHandler);
540     }
541 tim 1.1
542 dl 1.2 /**
543     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
544     * parameters.
545     *
546     * @param corePoolSize the number of threads to keep in the
547     * pool, even if they are idle.
548     * @param maximumPoolSize the maximum number of threads to allow in the
549     * pool.
550     * @param keepAliveTime when the number of threads is greater than
551     * the core, this is the maximum time that excess idle threads
552     * will wait for new tasks before terminating.
553     * @param unit the time unit for the keepAliveTime
554     * argument.
555     * @param workQueue the queue to use for holding tasks before the
556     * are executed. This queue will hold only the <tt>Runnable</tt>
557     * tasks submitted by the <tt>execute</tt> method.
558     * @param threadFactory the factory to use when the executor
559 tim 1.10 * creates a new thread.
560 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
561     * keepAliveTime less than zero, or if maximumPoolSize less than or
562     * equal to zero, or if corePoolSize greater than maximumPoolSize.
563 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
564 dl 1.2 * or <tt>threadFactory</tt> are null.
565     */
566     public ThreadPoolExecutor(int corePoolSize,
567     int maximumPoolSize,
568     long keepAliveTime,
569     TimeUnit unit,
570     BlockingQueue<Runnable> workQueue,
571     ThreadFactory threadFactory) {
572 tim 1.1
573 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
574 dl 1.2 threadFactory, defaultHandler);
575     }
576 tim 1.1
577 dl 1.2 /**
578     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
579     * parameters.
580     *
581     * @param corePoolSize the number of threads to keep in the
582     * pool, even if they are idle.
583     * @param maximumPoolSize the maximum number of threads to allow in the
584     * pool.
585     * @param keepAliveTime when the number of threads is greater than
586     * the core, this is the maximum time that excess idle threads
587     * will wait for new tasks before terminating.
588     * @param unit the time unit for the keepAliveTime
589     * argument.
590     * @param workQueue the queue to use for holding tasks before the
591     * are executed. This queue will hold only the <tt>Runnable</tt>
592     * tasks submitted by the <tt>execute</tt> method.
593     * @param handler the handler to use when execution is blocked
594     * because the thread bounds and queue capacities are reached.
595     * @throws IllegalArgumentException if corePoolSize, or
596     * keepAliveTime less than zero, or if maximumPoolSize less than or
597     * equal to zero, or if corePoolSize greater than maximumPoolSize.
598 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
599 dl 1.2 * or <tt>handler</tt> are null.
600     */
601     public ThreadPoolExecutor(int corePoolSize,
602     int maximumPoolSize,
603     long keepAliveTime,
604     TimeUnit unit,
605     BlockingQueue<Runnable> workQueue,
606     RejectedExecutionHandler handler) {
607 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
608 dl 1.2 defaultThreadFactory, handler);
609     }
610 tim 1.1
611 dl 1.2 /**
612     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
613     * parameters.
614     *
615     * @param corePoolSize the number of threads to keep in the
616     * pool, even if they are idle.
617     * @param maximumPoolSize the maximum number of threads to allow in the
618     * pool.
619     * @param keepAliveTime when the number of threads is greater than
620     * the core, this is the maximum time that excess idle threads
621     * will wait for new tasks before terminating.
622     * @param unit the time unit for the keepAliveTime
623     * argument.
624     * @param workQueue the queue to use for holding tasks before the
625     * are executed. This queue will hold only the <tt>Runnable</tt>
626     * tasks submitted by the <tt>execute</tt> method.
627     * @param threadFactory the factory to use when the executor
628 tim 1.10 * creates a new thread.
629 dl 1.2 * @param handler the handler to use when execution is blocked
630     * because the thread bounds and queue capacities are reached.
631     * @throws IllegalArgumentException if corePoolSize, or
632     * keepAliveTime less than zero, or if maximumPoolSize less than or
633     * equal to zero, or if corePoolSize greater than maximumPoolSize.
634 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
635 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
636     */
637     public ThreadPoolExecutor(int corePoolSize,
638     int maximumPoolSize,
639     long keepAliveTime,
640     TimeUnit unit,
641     BlockingQueue<Runnable> workQueue,
642     ThreadFactory threadFactory,
643     RejectedExecutionHandler handler) {
644 tim 1.10 if (corePoolSize < 0 ||
645 dl 1.2 maximumPoolSize <= 0 ||
646 tim 1.10 maximumPoolSize < corePoolSize ||
647 dl 1.2 keepAliveTime < 0)
648     throw new IllegalArgumentException();
649     if (workQueue == null || threadFactory == null || handler == null)
650     throw new NullPointerException();
651     this.corePoolSize = corePoolSize;
652     this.maximumPoolSize = maximumPoolSize;
653     this.workQueue = workQueue;
654     this.keepAliveTime = unit.toNanos(keepAliveTime);
655     this.threadFactory = threadFactory;
656     this.handler = handler;
657 tim 1.1 }
658    
659 dl 1.2
660     /**
661     * Executes the given task sometime in the future. The task
662     * may execute in a new thread or in an existing pooled thread.
663     *
664     * If the task cannot be submitted for execution, either because this
665     * executor has been shutdown or because its capacity has been reached,
666 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
667 dl 1.2 *
668     * @param command the task to execute
669     * @throws RejectedExecutionException at discretion of
670 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
671     * for execution
672 dl 1.2 */
673 tim 1.10 public void execute(Runnable command) {
674 dl 1.2 for (;;) {
675     if (shutdownStatus != NOT_SHUTDOWN) {
676 dl 1.13 reject(command);
677 dl 1.2 return;
678     }
679     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
680     return;
681     if (workQueue.offer(command))
682     return;
683     Runnable r = addIfUnderMaximumPoolSize(command);
684     if (r == command)
685     return;
686     if (r == null) {
687 dl 1.13 reject(command);
688 dl 1.2 return;
689     }
690     // else retry
691     }
692 tim 1.1 }
693 dl 1.4
694 dl 1.2 public void shutdown() {
695     mainLock.lock();
696     try {
697     if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
698     shutdownStatus = SHUTDOWN_WHEN_IDLE;
699 tim 1.1
700 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
701     it.next().interruptIfIdle();
702 tim 1.14 } finally {
703 dl 1.2 mainLock.unlock();
704     }
705 tim 1.1 }
706    
707 dl 1.2 public List shutdownNow() {
708     mainLock.lock();
709     try {
710     shutdownStatus = SHUTDOWN_NOW;
711     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
712     it.next().interruptNow();
713 tim 1.14 } finally {
714 dl 1.2 mainLock.unlock();
715     }
716     return Arrays.asList(workQueue.toArray());
717 tim 1.1 }
718    
719 dl 1.2 public boolean isShutdown() {
720     return shutdownStatus != NOT_SHUTDOWN;
721 tim 1.1 }
722    
723 dl 1.2 public boolean isTerminated() {
724     return isTerminated;
725     }
726 tim 1.1
727 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
728     throws InterruptedException {
729     mainLock.lock();
730     try {
731     return termination.await(timeout, unit);
732 tim 1.14 } finally {
733 dl 1.2 mainLock.unlock();
734     }
735 dl 1.15 }
736    
737     /**
738     * Invokes <tt>shutdown</tt> when this executor is no longer
739     * referenced.
740     */
741     protected void finalize() {
742     shutdown();
743 dl 1.2 }
744 tim 1.10
745 dl 1.2 /**
746     * Sets the thread factory used to create new threads.
747     *
748     * @param threadFactory the new thread factory
749 tim 1.11 * @see #getThreadFactory
750 dl 1.2 */
751     public void setThreadFactory(ThreadFactory threadFactory) {
752     this.threadFactory = threadFactory;
753 tim 1.1 }
754    
755 dl 1.2 /**
756     * Returns the thread factory used to create new threads.
757     *
758     * @return the current thread factory
759 tim 1.11 * @see #setThreadFactory
760 dl 1.2 */
761     public ThreadFactory getThreadFactory() {
762     return threadFactory;
763 tim 1.1 }
764    
765 dl 1.2 /**
766     * Sets a new handler for unexecutable tasks.
767     *
768     * @param handler the new handler
769 tim 1.11 * @see #getRejectedExecutionHandler
770 dl 1.2 */
771     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
772     this.handler = handler;
773     }
774 tim 1.1
775 dl 1.2 /**
776     * Returns the current handler for unexecutable tasks.
777     *
778     * @return the current handler
779 tim 1.11 * @see #setRejectedExecutionHandler
780 dl 1.2 */
781     public RejectedExecutionHandler getRejectedExecutionHandler() {
782     return handler;
783 tim 1.1 }
784    
785 dl 1.2 /**
786     * Returns the task queue used by this executor. Note that
787     * this queue may be in active use. Retrieveing the task queue
788     * does not prevent queued tasks from executing.
789     *
790     * @return the task queue
791     */
792     public BlockingQueue<Runnable> getQueue() {
793     return workQueue;
794 tim 1.1 }
795 dl 1.4
796     /**
797     * Removes this task from internal queue if it is present, thus
798     * causing it not to be run if it has not already started. This
799     * method may be useful as one part of a cancellation scheme.
800 tim 1.10 *
801 dl 1.8 * @param task the task to remove
802     * @return true if the task was removed
803 dl 1.4 */
804 dl 1.5 public boolean remove(Runnable task) {
805 dl 1.4 return getQueue().remove(task);
806     }
807    
808 dl 1.7
809     /**
810 tim 1.10 * Removes from the work queue all {@link Cancellable} tasks
811 dl 1.7 * that have been cancelled. This method can be useful as a
812     * storage reclamation operation, that has no other impact
813     * on functionality. Cancelled tasks are never executed, but
814     * may accumulate in work queues until worker threads can
815     * actively remove them. Invoking this method ensures that they
816     * are instead removed now.
817     */
818    
819     public void purge() {
820     Iterator<Runnable> it = getQueue().iterator();
821     while (it.hasNext()) {
822     Runnable r = it.next();
823     if (r instanceof Cancellable) {
824     Cancellable c = (Cancellable)r;
825 tim 1.10 if (c.isCancelled())
826 dl 1.7 it.remove();
827     }
828     }
829     }
830 tim 1.1
831     /**
832 dl 1.2 * Sets the core number of threads. This overrides any value set
833     * in the constructor. If the new value is smaller than the
834     * current value, excess existing threads will be terminated when
835     * they next become idle.
836 tim 1.1 *
837 dl 1.2 * @param corePoolSize the new core size
838 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
839 dl 1.8 * less than zero
840 tim 1.11 * @see #getCorePoolSize
841 tim 1.1 */
842 dl 1.2 public void setCorePoolSize(int corePoolSize) {
843     if (corePoolSize < 0)
844     throw new IllegalArgumentException();
845     mainLock.lock();
846     try {
847     int extra = this.corePoolSize - corePoolSize;
848     this.corePoolSize = corePoolSize;
849     if (extra > 0 && poolSize > corePoolSize) {
850     Iterator<Worker> it = workers.iterator();
851 tim 1.10 while (it.hasNext() &&
852     extra > 0 &&
853 dl 1.2 poolSize > corePoolSize &&
854     workQueue.remainingCapacity() == 0) {
855     it.next().interruptIfIdle();
856     --extra;
857     }
858     }
859 tim 1.10
860 tim 1.14 } finally {
861 dl 1.2 mainLock.unlock();
862     }
863     }
864 tim 1.1
865     /**
866 dl 1.2 * Returns the core number of threads.
867 tim 1.1 *
868 dl 1.2 * @return the core number of threads
869 tim 1.11 * @see #setCorePoolSize
870 tim 1.1 */
871 tim 1.10 public int getCorePoolSize() {
872 dl 1.2 return corePoolSize;
873     }
874 tim 1.1
875     /**
876     * Sets the maximum allowed number of threads. This overrides any
877 dl 1.2 * value set in the constructor. If the new value is smaller than
878     * the current value, excess existing threads will be
879     * terminated when they next become idle.
880 tim 1.1 *
881 dl 1.2 * @param maximumPoolSize the new maximum
882     * @throws IllegalArgumentException if maximumPoolSize less than zero or
883     * the {@link #getCorePoolSize core pool size}
884 tim 1.11 * @see #getMaximumPoolSize
885 dl 1.2 */
886     public void setMaximumPoolSize(int maximumPoolSize) {
887     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
888     throw new IllegalArgumentException();
889     mainLock.lock();
890     try {
891     int extra = this.maximumPoolSize - maximumPoolSize;
892     this.maximumPoolSize = maximumPoolSize;
893     if (extra > 0 && poolSize > maximumPoolSize) {
894     Iterator<Worker> it = workers.iterator();
895 tim 1.10 while (it.hasNext() &&
896     extra > 0 &&
897 dl 1.2 poolSize > maximumPoolSize) {
898     it.next().interruptIfIdle();
899     --extra;
900     }
901     }
902 tim 1.14 } finally {
903 dl 1.2 mainLock.unlock();
904     }
905     }
906 tim 1.1
907     /**
908     * Returns the maximum allowed number of threads.
909     *
910 dl 1.2 * @return the maximum allowed number of threads
911 tim 1.11 * @see #setMaximumPoolSize
912 tim 1.1 */
913 tim 1.10 public int getMaximumPoolSize() {
914 dl 1.2 return maximumPoolSize;
915     }
916 tim 1.1
917     /**
918     * Sets the time limit for which threads may remain idle before
919 dl 1.2 * being terminated. If there are more than the core number of
920 tim 1.1 * threads currently in the pool, after waiting this amount of
921     * time without processing a task, excess threads will be
922     * terminated. This overrides any value set in the constructor.
923     * @param time the time to wait. A time value of zero will cause
924     * excess threads to terminate immediately after executing tasks.
925 dl 1.2 * @param unit the time unit of the time argument
926 tim 1.1 * @throws IllegalArgumentException if msecs less than zero
927 tim 1.11 * @see #getKeepAliveTime
928 tim 1.1 */
929 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
930     if (time < 0)
931     throw new IllegalArgumentException();
932     this.keepAliveTime = unit.toNanos(time);
933     }
934 tim 1.1
935     /**
936     * Returns the thread keep-alive time, which is the amount of time
937 dl 1.2 * which threads in excess of the core pool size may remain
938 tim 1.10 * idle before being terminated.
939 tim 1.1 *
940 dl 1.2 * @param unit the desired time unit of the result
941 tim 1.1 * @return the time limit
942 tim 1.11 * @see #setKeepAliveTime
943 tim 1.1 */
944 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
945 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
946     }
947 tim 1.1
948     /* Statistics */
949    
950     /**
951     * Returns the current number of threads in the pool.
952     *
953     * @return the number of threads
954     */
955 tim 1.10 public int getPoolSize() {
956 dl 1.2 return poolSize;
957     }
958 tim 1.1
959     /**
960 dl 1.2 * Returns the approximate number of threads that are actively
961 tim 1.1 * executing tasks.
962     *
963     * @return the number of threads
964     */
965 tim 1.10 public int getActiveCount() {
966 dl 1.2 mainLock.lock();
967     try {
968     int n = 0;
969     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
970     if (it.next().isActive())
971     ++n;
972     }
973     return n;
974 tim 1.14 } finally {
975 dl 1.2 mainLock.unlock();
976     }
977     }
978 tim 1.1
979     /**
980 dl 1.2 * Returns the largest number of threads that have ever
981     * simultaneously been in the pool.
982 tim 1.1 *
983     * @return the number of threads
984     */
985 tim 1.10 public int getLargestPoolSize() {
986 dl 1.2 mainLock.lock();
987     try {
988     return largestPoolSize;
989 tim 1.14 } finally {
990 dl 1.2 mainLock.unlock();
991     }
992     }
993 tim 1.1
994     /**
995 dl 1.2 * Returns the approximate total number of tasks that have been
996     * scheduled for execution. Because the states of tasks and
997     * threads may change dynamically during computation, the returned
998     * value is only an approximation.
999 tim 1.1 *
1000     * @return the number of tasks
1001     */
1002 tim 1.10 public long getTaskCount() {
1003 dl 1.2 mainLock.lock();
1004     try {
1005     long n = completedTaskCount;
1006     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1007     Worker w = it.next();
1008     n += w.completedTasks;
1009     if (w.isActive())
1010     ++n;
1011     }
1012     return n + workQueue.size();
1013 tim 1.14 } finally {
1014 dl 1.2 mainLock.unlock();
1015     }
1016     }
1017 tim 1.1
1018     /**
1019 dl 1.2 * Returns the approximate total number of tasks that have
1020     * completed execution. Because the states of tasks and threads
1021     * may change dynamically during computation, the returned value
1022     * is only an approximation.
1023 tim 1.1 *
1024     * @return the number of tasks
1025     */
1026 tim 1.10 public long getCompletedTaskCount() {
1027 dl 1.2 mainLock.lock();
1028     try {
1029     long n = completedTaskCount;
1030 tim 1.10 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1031 dl 1.2 n += it.next().completedTasks;
1032     return n;
1033 tim 1.14 } finally {
1034 dl 1.2 mainLock.unlock();
1035     }
1036     }
1037 tim 1.1
1038     /**
1039 dl 1.2 * Method invoked prior to executing the given Runnable in given
1040     * thread. This method may be used to re-initialize ThreadLocals,
1041 dl 1.5 * or to perform logging. Note: To properly nest multiple
1042     * overridings, subclasses should generally invoke
1043     * <tt>super.beforeExecute</tt> at the end of this method.
1044 tim 1.1 *
1045 dl 1.2 * @param t the thread that will run task r.
1046     * @param r the task that will be executed.
1047 tim 1.1 */
1048 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1049 tim 1.1
1050     /**
1051 dl 1.2 * Method invoked upon completion of execution of the given
1052     * Runnable. If non-null, the Throwable is the uncaught exception
1053 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1054     * nest multiple overridings, subclasses should generally invoke
1055     * <tt>super.afterExecute</tt> at the beginning of this method.
1056 tim 1.1 *
1057 dl 1.2 * @param r the runnable that has completed.
1058     * @param t the exception that cause termination, or null if
1059     * execution completed normally.
1060 tim 1.1 */
1061 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1062 tim 1.1
1063 dl 1.2 /**
1064     * Method invoked when the Executor has terminated. Default
1065     * implementation does nothing.
1066     */
1067     protected void terminated() { }
1068 tim 1.1
1069     /**
1070     * A handler for unexecutable tasks that runs these tasks directly in the
1071     * calling thread of the <tt>execute</tt> method. This is the default
1072 dl 1.2 * <tt>RejectedExecutionHandler</tt>.
1073 tim 1.1 */
1074 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1075 tim 1.1
1076     /**
1077     * Constructs a <tt>CallerRunsPolicy</tt>.
1078     */
1079     public CallerRunsPolicy() { }
1080    
1081 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1082     if (!e.isShutdown()) {
1083 tim 1.1 r.run();
1084     }
1085     }
1086     }
1087    
1088     /**
1089 dl 1.8 * A handler for unexecutable tasks that throws a
1090     * <tt>RejectedExecutionException</tt>.
1091 tim 1.1 */
1092 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1093 tim 1.1
1094     /**
1095     * Constructs a <tt>AbortPolicy</tt>.
1096     */
1097     public AbortPolicy() { }
1098    
1099 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1100     throw new RejectedExecutionException();
1101 tim 1.1 }
1102     }
1103    
1104     /**
1105     * A handler for unexecutable tasks that waits until the task can be
1106     * submitted for execution.
1107     */
1108 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1109 tim 1.1 /**
1110     * Constructs a <tt>WaitPolicy</tt>.
1111     */
1112     public WaitPolicy() { }
1113    
1114 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1115     if (!e.isShutdown()) {
1116     try {
1117     e.getQueue().put(r);
1118 tim 1.14 } catch (InterruptedException ie) {
1119 dl 1.2 Thread.currentThread().interrupt();
1120     throw new RejectedExecutionException(ie);
1121     }
1122 tim 1.1 }
1123     }
1124     }
1125    
1126     /**
1127     * A handler for unexecutable tasks that silently discards these tasks.
1128     */
1129 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1130 tim 1.1
1131     /**
1132     * Constructs <tt>DiscardPolicy</tt>.
1133     */
1134     public DiscardPolicy() { }
1135    
1136 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1137 tim 1.1 }
1138     }
1139    
1140     /**
1141 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1142     * unhandled request.
1143 tim 1.1 */
1144 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1145 tim 1.1 /**
1146 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1147 tim 1.1 */
1148     public DiscardOldestPolicy() { }
1149    
1150 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1151     if (!e.isShutdown()) {
1152     e.getQueue().poll();
1153     e.execute(r);
1154 tim 1.1 }
1155     }
1156     }
1157     }