ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.12
Committed: Thu Aug 7 16:00:28 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.11: +14 -5 lines
Log Message:
ScheduledExecutor must prestart core threads

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.2 * An {@link ExecutorService} that executes each submitted task on one
13 tim 1.1 * of several pooled threads.
14     *
15     * <p>Thread pools address two different problems at the same time:
16     * they usually provide improved performance when executing large
17     * numbers of asynchronous tasks, due to reduced per-task invocation
18     * overhead, and they provide a means of bounding and managing the
19     * resources, including threads, consumed in executing a collection of
20     * tasks.
21     *
22     * <p>This class is very configurable and can be configured to create
23     * a new thread for each task, or even to execute tasks sequentially
24     * in a single thread, in addition to its most common configuration,
25     * which reuses a pool of threads.
26     *
27     * <p>To be useful across a wide range of contexts, this class
28     * provides many adjustable parameters and extensibility hooks.
29     * However, programmers are urged to use the more convenient factory
30     * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31     * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 dl 1.2 * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33 tim 1.1 * background thread for execution of tasks), and
34     * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35     * thread), that preconfigure settings for the most common usage
36     * scenarios.
37     *
38     * <p>This class also maintain some basic statistics, such as the
39 dl 1.2 * number of completed tasks, that may be useful for monitoring and
40     * tuning executors.
41 tim 1.1 *
42     * <h3>Tuning guide</h3>
43     * <dl>
44 dl 1.2 *
45     * <dt>Core and maximum pool size</dt>
46     *
47     * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48     * according to the bounds set by corePoolSize and maximumPoolSize.
49     * When a new task is submitted, and fewer than corePoolSize threads
50     * are running, a new thread is created to handle the request, even if
51     * other worker threads are idle. If there are more than the
52     * corePoolSize but less than maximumPoolSize threads running, a new
53     * thread will be created only if the queue is full. By setting
54     * corePoolSize and maximumPoolSize the same, you create a fixed-size
55     * thread pool.</dd>
56     *
57 tim 1.10 * <dt>Keep-alive</dt>
58 dl 1.2 *
59     * <dd>The keepAliveTime determines what happens to idle threads. If
60     * the pool currently has more than the core number of threads, excess
61     * threads will be terminated if they have been idle for more than the
62     * keepAliveTime.</dd>
63     *
64 tim 1.10 * <dt>Queueing</dt>
65     *
66 dl 1.2 * <dd>You are free to specify the queuing mechanism used to handle
67 dl 1.6 * submitted tasks. A good default is to use queueless synchronous
68     * channels to to hand off work to threads. This is a safe,
69     * conservative policy that avoids lockups when handling sets of
70     * requests that might have internal dependencies. Using an unbounded
71     * queue (for example a LinkedBlockingQueue) which will cause new
72     * tasks to be queued in cases where all corePoolSize threads are
73     * busy, so no more that corePoolSize threads will be craated. This
74     * may be appropriate when each task is completely independent of
75     * others, so tasks cannot affect each others execution. For example,
76     * in an http server. When given a choice, this pool always prefers
77     * adding a new thread rather than queueing if there are currently
78     * fewer than the current getCorePoolSize threads running, but
79     * otherwise always prefers queuing a request rather than adding a new
80     * thread.
81 tim 1.1 *
82     * <p>While queuing can be useful in smoothing out transient bursts of
83     * requests, especially in socket-based services, it is not very well
84     * behaved when commands continue to arrive on average faster than
85 tim 1.10 * they can be processed.
86 tim 1.1 *
87     * Queue sizes and maximum pool sizes can often be traded off for each
88     * other. Using large queues and small pools minimizes CPU usage, OS
89     * resources, and context-switching overhead, but can lead to
90     * artifically low throughput. If tasks frequently block (for example
91     * if they are I/O bound), a JVM and underlying OS may be able to
92     * schedule time for more threads than you otherwise allow. Use of
93     * small queues or queueless handoffs generally requires larger pool
94     * sizes, which keeps CPUs busier but may encounter unacceptable
95     * scheduling overhead, which also decreases throughput.
96     * </dd>
97 dl 1.2 *
98 tim 1.1 * <dt>Creating new threads</dt>
99 dl 1.2 *
100     * <dd>New threads are created using a ThreadFactory. By default,
101     * threads are created simply with the new Thread(Runnable)
102     * constructor, but by supplying a different ThreadFactory, you can
103     * alter the thread's name, thread group, priority, daemon status,
104     * etc. </dd>
105     *
106 tim 1.1 * <dt>Before and after intercepts</dt>
107 dl 1.2 *
108     * <dd>This class has overridable methods that which are called before
109     * and after execution of each task. These can be used to manipulate
110     * the execution environment (for example, reinitializing
111     * ThreadLocals), gather statistics, or perform logging. </dd>
112     *
113 tim 1.1 * <dt>Blocked execution</dt>
114 dl 1.2 *
115     * <dd>There are a number of factors which can bound the number of
116     * tasks which can execute at once, including the maximum pool size
117     * and the queuing mechanism used. If the executor determines that a
118     * task cannot be executed because it has been refused by the queue
119     * and no threads are available, or because the executor has been shut
120     * down, the RejectedExecutionHandler's rejectedExecution method is
121     * invoked. </dd>
122     *
123 tim 1.1 * <dt>Termination</dt>
124 dl 1.2 *
125     * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
126     * graceful. In an immediate shutdown, any threads currently
127     * executing are interrupted, and any tasks not yet begun are returned
128     * from the shutdownNow call. In a graceful shutdown, all queued
129     * tasks are allowed to run, but new tasks may not be submitted.
130 tim 1.1 * </dd>
131 dl 1.2 *
132 tim 1.1 * </dl>
133     *
134     * @since 1.5
135 dl 1.2 * @see RejectedExecutionHandler
136 tim 1.1 * @see Executors
137     * @see ThreadFactory
138     *
139     * @spec JSR-166
140 dl 1.12 * @revised $Date: 2003/08/06 19:22:36 $
141 tim 1.11 * @editor $Author: tim $
142 dl 1.8 * @author Doug Lea
143 tim 1.1 */
144 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
145     /**
146     * Queue used for holding tasks and handing off to worker threads.
147 tim 1.10 */
148 dl 1.2 private final BlockingQueue<Runnable> workQueue;
149    
150     /**
151     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152     * workers set.
153 tim 1.10 */
154 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
155    
156     /**
157     * Wait condition to support awaitTermination
158 tim 1.10 */
159 dl 1.2 private final Condition termination = mainLock.newCondition();
160    
161     /**
162     * Set containing all worker threads in pool.
163 tim 1.10 */
164 dl 1.2 private final Set<Worker> workers = new HashSet<Worker>();
165    
166     /**
167     * Timeout in nanosecods for idle threads waiting for work.
168     * Threads use this timeout only when there are more than
169     * corePoolSize present. Otherwise they wait forever for new work.
170 tim 1.10 */
171 dl 1.2 private volatile long keepAliveTime;
172    
173     /**
174     * Core pool size, updated only while holding mainLock,
175     * but volatile to allow concurrent readability even
176     * during updates.
177 tim 1.10 */
178 dl 1.2 private volatile int corePoolSize;
179    
180     /**
181     * Maximum pool size, updated only while holding mainLock
182     * but volatile to allow concurrent readability even
183     * during updates.
184 tim 1.10 */
185 dl 1.2 private volatile int maximumPoolSize;
186    
187     /**
188     * Current pool size, updated only while holding mainLock
189     * but volatile to allow concurrent readability even
190     * during updates.
191 tim 1.10 */
192 dl 1.2 private volatile int poolSize;
193    
194     /**
195     * Shutdown status, becomes (and remains) nonzero when shutdown called.
196 tim 1.10 */
197 dl 1.2 private volatile int shutdownStatus;
198    
199     // Special values for status
200 dl 1.8 /** Normal, not-shutdown mode */
201 dl 1.2 private static final int NOT_SHUTDOWN = 0;
202 dl 1.8 /** Controlled shutdown mode */
203 dl 1.2 private static final int SHUTDOWN_WHEN_IDLE = 1;
204 dl 1.8 /*8 Immediate shutdown mode */
205 tim 1.10 private static final int SHUTDOWN_NOW = 2;
206 dl 1.2
207     /**
208     * Latch that becomes true when all threads terminate after shutdown.
209 tim 1.10 */
210 dl 1.2 private volatile boolean isTerminated;
211    
212     /**
213     * Handler called when saturated or shutdown in execute.
214 tim 1.10 */
215 dl 1.2 private volatile RejectedExecutionHandler handler = defaultHandler;
216    
217     /**
218     * Factory for new threads.
219 tim 1.10 */
220 dl 1.2 private volatile ThreadFactory threadFactory = defaultThreadFactory;
221    
222     /**
223     * Tracks largest attained pool size.
224 tim 1.10 */
225 dl 1.2 private int largestPoolSize;
226    
227     /**
228     * Counter for completed tasks. Updated only on termination of
229     * worker threads.
230 tim 1.10 */
231 dl 1.2 private long completedTaskCount;
232    
233 dl 1.8 /**
234 tim 1.10 * The default thread facotry
235 dl 1.8 */
236 tim 1.10 private static final ThreadFactory defaultThreadFactory =
237 dl 1.2 new ThreadFactory() {
238     public Thread newThread(Runnable r) {
239     return new Thread(r);
240     }
241     };
242    
243 dl 1.8 /**
244     * The default rejectect execution handler
245     */
246 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
247 dl 1.2 new AbortPolicy();
248    
249     /**
250     * Create and return a new thread running firstTask as its first
251     * task. Call only while holding mainLock
252 dl 1.8 * @param firstTask the task the new thread should run first (or
253     * null if none)
254     * @return the new thread
255 dl 1.2 */
256     private Thread addThread(Runnable firstTask) {
257     Worker w = new Worker(firstTask);
258     Thread t = threadFactory.newThread(w);
259     w.thread = t;
260     workers.add(w);
261     int nt = ++poolSize;
262     if (nt > largestPoolSize)
263     largestPoolSize = nt;
264     return t;
265     }
266    
267     /**
268     * Create and start a new thread running firstTask as its first
269     * task, only if less than corePoolSize threads are running.
270 dl 1.8 * @param firstTask the task the new thread should run first (or
271     * null if none)
272 dl 1.2 * @return true if successful.
273     */
274 dl 1.12 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
275 dl 1.2 Thread t = null;
276     mainLock.lock();
277     try {
278 tim 1.10 if (poolSize < corePoolSize)
279 dl 1.8 t = addThread(firstTask);
280 dl 1.2 }
281     finally {
282     mainLock.unlock();
283     }
284     if (t == null)
285     return false;
286     t.start();
287     return true;
288     }
289    
290     /**
291 dl 1.12 * Eagerly (vs by default lazily) start all core threads.
292     */
293     void prestartCoreThreads() {
294     while (addIfUnderCorePoolSize(null)) {
295     }
296     }
297    
298    
299     /**
300 dl 1.2 * Create and start a new thread only if less than maximumPoolSize
301     * threads are running. The new thread runs as its first task the
302     * next task in queue, or if there is none, the given task.
303 dl 1.8 * @param firstTask the task the new thread should run first (or
304     * null if none)
305 dl 1.2 * @return null on failure, else the first task to be run by new thread.
306     */
307 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
308 dl 1.2 Thread t = null;
309     Runnable next = null;
310     mainLock.lock();
311     try {
312     if (poolSize < maximumPoolSize) {
313     next = workQueue.poll();
314     if (next == null)
315 dl 1.8 next = firstTask;
316 dl 1.2 t = addThread(next);
317     }
318     }
319     finally {
320     mainLock.unlock();
321     }
322     if (t == null)
323     return null;
324     t.start();
325     return next;
326     }
327    
328    
329     /**
330     * Get the next task for a worker thread to run.
331 dl 1.8 * @return the task
332     * @throws InterruptedException if interrupted while waiting for task
333 dl 1.2 */
334     private Runnable getTask() throws InterruptedException {
335     for (;;) {
336     int stat = shutdownStatus;
337     if (stat == SHUTDOWN_NOW)
338     return null;
339     if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
340     return workQueue.poll();
341     if (poolSize <= corePoolSize) // untimed wait if core
342     return workQueue.take();
343 dl 1.12 long timeout = keepAliveTime;
344     if (timeout <= 0) // must die immediately for 0 timeout
345     return null;
346 dl 1.2 Runnable task = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
347     if (task != null)
348     return task;
349     if (poolSize > corePoolSize) // timed out
350     return null;
351     // else, after timeout, pool shrank so shouldn't die, so retry
352     }
353     }
354    
355     /**
356     * Perform bookkeeping for a terminated worker thread.
357 tim 1.10 * @param w the worker
358 dl 1.2 */
359     private void workerDone(Worker w) {
360     boolean allDone = false;
361     mainLock.lock();
362     try {
363     completedTaskCount += w.completedTasks;
364     workers.remove(w);
365    
366 tim 1.10 if (--poolSize > 0)
367 dl 1.2 return;
368    
369     // If this was last thread, deal with potential shutdown
370     int stat = shutdownStatus;
371 tim 1.10
372 dl 1.2 // If there are queued tasks but no threads, create replacement.
373     if (stat != SHUTDOWN_NOW) {
374     Runnable r = workQueue.poll();
375     if (r != null) {
376     addThread(r).start();
377     return;
378     }
379     }
380    
381     // if no tasks and not shutdown, can exit without replacement
382 tim 1.10 if (stat == NOT_SHUTDOWN)
383 dl 1.2 return;
384    
385     allDone = true;
386     isTerminated = true;
387     termination.signalAll();
388     }
389     finally {
390     mainLock.unlock();
391     }
392    
393     if (allDone) // call outside lock
394     terminated();
395     }
396    
397     /**
398 tim 1.10 * Worker threads
399 dl 1.2 */
400     private class Worker implements Runnable {
401    
402     /**
403     * The runLock is acquired and released surrounding each task
404     * execution. It mainly protects against interrupts that are
405     * intended to cancel the worker thread from instead
406     * interrupting the task being run.
407     */
408     private final ReentrantLock runLock = new ReentrantLock();
409    
410     /**
411     * Initial task to run before entering run loop
412     */
413     private Runnable firstTask;
414    
415     /**
416     * Per thread completed task counter; accumulated
417     * into completedTaskCount upon termination.
418     */
419     volatile long completedTasks;
420    
421     /**
422     * Thread this worker is running in. Acts as a final field,
423     * but cannot be set until thread is created.
424     */
425     Thread thread;
426    
427     Worker(Runnable firstTask) {
428     this.firstTask = firstTask;
429     }
430    
431     boolean isActive() {
432     return runLock.isLocked();
433     }
434    
435     /**
436     * Interrupt thread if not running a task
437 tim 1.10 */
438 dl 1.2 void interruptIfIdle() {
439     if (runLock.tryLock()) {
440     try {
441     thread.interrupt();
442     }
443     finally {
444     runLock.unlock();
445     }
446     }
447     }
448    
449     /**
450     * Cause thread to die even if running a task.
451 tim 1.10 */
452 dl 1.2 void interruptNow() {
453     thread.interrupt();
454     }
455    
456     /**
457     * Run a single task between before/after methods.
458     */
459     private void runTask(Runnable task) {
460     runLock.lock();
461     try {
462     // Abort now if immediate cancel. Otherwise, we have
463     // committed to run this task.
464     if (shutdownStatus == SHUTDOWN_NOW)
465     return;
466    
467     Thread.interrupted(); // clear interrupt status on entry
468     boolean ran = false;
469     beforeExecute(thread, task);
470     try {
471     task.run();
472     ran = true;
473     afterExecute(task, null);
474     ++completedTasks;
475     }
476     catch(RuntimeException ex) {
477     if (!ran)
478     afterExecute(task, ex);
479     // else the exception occurred within
480     // afterExecute itself in which case we don't
481     // want to call it again.
482     throw ex;
483     }
484     }
485     finally {
486     runLock.unlock();
487     }
488     }
489    
490     /**
491     * Main run loop
492     */
493     public void run() {
494     try {
495     for (;;) {
496     Runnable task;
497     if (firstTask != null) {
498     task = firstTask;
499     firstTask = null;
500     }
501     else {
502     task = getTask();
503     if (task == null)
504     break;
505     }
506     runTask(task);
507     task = null; // unnecessary but can help GC
508     }
509     }
510 tim 1.10 catch(InterruptedException ie) {
511 dl 1.2 // fall through
512     }
513     finally {
514     workerDone(this);
515     }
516     }
517     }
518 tim 1.1
519     /**
520     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
521     * parameters. It may be more convenient to use one of the factory
522     * methods instead of this general purpose constructor.
523     *
524 dl 1.2 * @param corePoolSize the number of threads to keep in the
525 tim 1.1 * pool, even if they are idle.
526 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
527 tim 1.1 * pool.
528     * @param keepAliveTime when the number of threads is greater than
529 dl 1.2 * the core, this is the maximum time that excess idle threads
530 tim 1.1 * will wait for new tasks before terminating.
531 dl 1.2 * @param unit the time unit for the keepAliveTime
532 tim 1.1 * argument.
533     * @param workQueue the queue to use for holding tasks before the
534     * are executed. This queue will hold only the <tt>Runnable</tt>
535     * tasks submitted by the <tt>execute</tt> method.
536 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
537     * keepAliveTime less than zero, or if maximumPoolSize less than or
538     * equal to zero, or if corePoolSize greater than maximumPoolSize.
539 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
540     */
541 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
542     int maximumPoolSize,
543 tim 1.1 long keepAliveTime,
544 dl 1.2 TimeUnit unit,
545     BlockingQueue<Runnable> workQueue) {
546 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
547 dl 1.2 defaultThreadFactory, defaultHandler);
548     }
549 tim 1.1
550 dl 1.2 /**
551     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
552     * parameters.
553     *
554     * @param corePoolSize the number of threads to keep in the
555     * pool, even if they are idle.
556     * @param maximumPoolSize the maximum number of threads to allow in the
557     * pool.
558     * @param keepAliveTime when the number of threads is greater than
559     * the core, this is the maximum time that excess idle threads
560     * will wait for new tasks before terminating.
561     * @param unit the time unit for the keepAliveTime
562     * argument.
563     * @param workQueue the queue to use for holding tasks before the
564     * are executed. This queue will hold only the <tt>Runnable</tt>
565     * tasks submitted by the <tt>execute</tt> method.
566     * @param threadFactory the factory to use when the executor
567 tim 1.10 * creates a new thread.
568 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
569     * keepAliveTime less than zero, or if maximumPoolSize less than or
570     * equal to zero, or if corePoolSize greater than maximumPoolSize.
571 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
572 dl 1.2 * or <tt>threadFactory</tt> are null.
573     */
574     public ThreadPoolExecutor(int corePoolSize,
575     int maximumPoolSize,
576     long keepAliveTime,
577     TimeUnit unit,
578     BlockingQueue<Runnable> workQueue,
579     ThreadFactory threadFactory) {
580 tim 1.1
581 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
582 dl 1.2 threadFactory, defaultHandler);
583     }
584 tim 1.1
585 dl 1.2 /**
586     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
587     * parameters.
588     *
589     * @param corePoolSize the number of threads to keep in the
590     * pool, even if they are idle.
591     * @param maximumPoolSize the maximum number of threads to allow in the
592     * pool.
593     * @param keepAliveTime when the number of threads is greater than
594     * the core, this is the maximum time that excess idle threads
595     * will wait for new tasks before terminating.
596     * @param unit the time unit for the keepAliveTime
597     * argument.
598     * @param workQueue the queue to use for holding tasks before the
599     * are executed. This queue will hold only the <tt>Runnable</tt>
600     * tasks submitted by the <tt>execute</tt> method.
601     * @param handler the handler to use when execution is blocked
602     * because the thread bounds and queue capacities are reached.
603     * @throws IllegalArgumentException if corePoolSize, or
604     * keepAliveTime less than zero, or if maximumPoolSize less than or
605     * equal to zero, or if corePoolSize greater than maximumPoolSize.
606 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
607 dl 1.2 * or <tt>handler</tt> are null.
608     */
609     public ThreadPoolExecutor(int corePoolSize,
610     int maximumPoolSize,
611     long keepAliveTime,
612     TimeUnit unit,
613     BlockingQueue<Runnable> workQueue,
614     RejectedExecutionHandler handler) {
615 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
616 dl 1.2 defaultThreadFactory, handler);
617     }
618 tim 1.1
619 dl 1.2 /**
620     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
621     * parameters.
622     *
623     * @param corePoolSize the number of threads to keep in the
624     * pool, even if they are idle.
625     * @param maximumPoolSize the maximum number of threads to allow in the
626     * pool.
627     * @param keepAliveTime when the number of threads is greater than
628     * the core, this is the maximum time that excess idle threads
629     * will wait for new tasks before terminating.
630     * @param unit the time unit for the keepAliveTime
631     * argument.
632     * @param workQueue the queue to use for holding tasks before the
633     * are executed. This queue will hold only the <tt>Runnable</tt>
634     * tasks submitted by the <tt>execute</tt> method.
635     * @param threadFactory the factory to use when the executor
636 tim 1.10 * creates a new thread.
637 dl 1.2 * @param handler the handler to use when execution is blocked
638     * because the thread bounds and queue capacities are reached.
639     * @throws IllegalArgumentException if corePoolSize, or
640     * keepAliveTime less than zero, or if maximumPoolSize less than or
641     * equal to zero, or if corePoolSize greater than maximumPoolSize.
642 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
643 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
644     */
645     public ThreadPoolExecutor(int corePoolSize,
646     int maximumPoolSize,
647     long keepAliveTime,
648     TimeUnit unit,
649     BlockingQueue<Runnable> workQueue,
650     ThreadFactory threadFactory,
651     RejectedExecutionHandler handler) {
652 tim 1.10 if (corePoolSize < 0 ||
653 dl 1.2 maximumPoolSize <= 0 ||
654 tim 1.10 maximumPoolSize < corePoolSize ||
655 dl 1.2 keepAliveTime < 0)
656     throw new IllegalArgumentException();
657     if (workQueue == null || threadFactory == null || handler == null)
658     throw new NullPointerException();
659     this.corePoolSize = corePoolSize;
660     this.maximumPoolSize = maximumPoolSize;
661     this.workQueue = workQueue;
662     this.keepAliveTime = unit.toNanos(keepAliveTime);
663     this.threadFactory = threadFactory;
664     this.handler = handler;
665 tim 1.1 }
666    
667 dl 1.2
668     /**
669     * Executes the given task sometime in the future. The task
670     * may execute in a new thread or in an existing pooled thread.
671     *
672     * If the task cannot be submitted for execution, either because this
673     * executor has been shutdown or because its capacity has been reached,
674 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
675 dl 1.2 *
676     * @param command the task to execute
677     * @throws RejectedExecutionException at discretion of
678 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
679     * for execution
680 dl 1.2 */
681 tim 1.10 public void execute(Runnable command) {
682 dl 1.2 for (;;) {
683     if (shutdownStatus != NOT_SHUTDOWN) {
684     handler.rejectedExecution(command, this);
685     return;
686     }
687     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
688     return;
689     if (workQueue.offer(command))
690     return;
691     Runnable r = addIfUnderMaximumPoolSize(command);
692     if (r == command)
693     return;
694     if (r == null) {
695     handler.rejectedExecution(command, this);
696     return;
697     }
698     // else retry
699     }
700 tim 1.1 }
701 dl 1.4
702 dl 1.2 public void shutdown() {
703     mainLock.lock();
704     try {
705     if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
706     shutdownStatus = SHUTDOWN_WHEN_IDLE;
707 tim 1.1
708 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
709     it.next().interruptIfIdle();
710     }
711     finally {
712     mainLock.unlock();
713     }
714 tim 1.1 }
715    
716 dl 1.2 public List shutdownNow() {
717     mainLock.lock();
718     try {
719     shutdownStatus = SHUTDOWN_NOW;
720     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
721     it.next().interruptNow();
722     }
723     finally {
724     mainLock.unlock();
725     }
726     return Arrays.asList(workQueue.toArray());
727 tim 1.1 }
728    
729 dl 1.2 public boolean isShutdown() {
730     return shutdownStatus != NOT_SHUTDOWN;
731 tim 1.1 }
732    
733 dl 1.2 public boolean isTerminated() {
734     return isTerminated;
735     }
736 tim 1.1
737 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
738     throws InterruptedException {
739     mainLock.lock();
740     try {
741     return termination.await(timeout, unit);
742     }
743     finally {
744     mainLock.unlock();
745     }
746     }
747 tim 1.10
748 dl 1.2 /**
749     * Sets the thread factory used to create new threads.
750     *
751     * @param threadFactory the new thread factory
752 tim 1.11 * @see #getThreadFactory
753 dl 1.2 */
754     public void setThreadFactory(ThreadFactory threadFactory) {
755     this.threadFactory = threadFactory;
756 tim 1.1 }
757    
758 dl 1.2 /**
759     * Returns the thread factory used to create new threads.
760     *
761     * @return the current thread factory
762 tim 1.11 * @see #setThreadFactory
763 dl 1.2 */
764     public ThreadFactory getThreadFactory() {
765     return threadFactory;
766 tim 1.1 }
767    
768 dl 1.2 /**
769     * Sets a new handler for unexecutable tasks.
770     *
771     * @param handler the new handler
772 tim 1.11 * @see #getRejectedExecutionHandler
773 dl 1.2 */
774     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
775     this.handler = handler;
776     }
777 tim 1.1
778 dl 1.2 /**
779     * Returns the current handler for unexecutable tasks.
780     *
781     * @return the current handler
782 tim 1.11 * @see #setRejectedExecutionHandler
783 dl 1.2 */
784     public RejectedExecutionHandler getRejectedExecutionHandler() {
785     return handler;
786 tim 1.1 }
787    
788 dl 1.2 /**
789     * Returns the task queue used by this executor. Note that
790     * this queue may be in active use. Retrieveing the task queue
791     * does not prevent queued tasks from executing.
792     *
793     * @return the task queue
794     */
795     public BlockingQueue<Runnable> getQueue() {
796     return workQueue;
797 tim 1.1 }
798 dl 1.4
799     /**
800     * Removes this task from internal queue if it is present, thus
801     * causing it not to be run if it has not already started. This
802     * method may be useful as one part of a cancellation scheme.
803 tim 1.10 *
804 dl 1.8 * @param task the task to remove
805     * @return true if the task was removed
806 dl 1.4 */
807 dl 1.5 public boolean remove(Runnable task) {
808 dl 1.4 return getQueue().remove(task);
809     }
810    
811 dl 1.7
812     /**
813 tim 1.10 * Removes from the work queue all {@link Cancellable} tasks
814 dl 1.7 * that have been cancelled. This method can be useful as a
815     * storage reclamation operation, that has no other impact
816     * on functionality. Cancelled tasks are never executed, but
817     * may accumulate in work queues until worker threads can
818     * actively remove them. Invoking this method ensures that they
819     * are instead removed now.
820     */
821    
822     public void purge() {
823     Iterator<Runnable> it = getQueue().iterator();
824     while (it.hasNext()) {
825     Runnable r = it.next();
826     if (r instanceof Cancellable) {
827     Cancellable c = (Cancellable)r;
828 tim 1.10 if (c.isCancelled())
829 dl 1.7 it.remove();
830     }
831     }
832     }
833 tim 1.1
834     /**
835 dl 1.2 * Sets the core number of threads. This overrides any value set
836     * in the constructor. If the new value is smaller than the
837     * current value, excess existing threads will be terminated when
838     * they next become idle.
839 tim 1.1 *
840 dl 1.2 * @param corePoolSize the new core size
841 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
842 dl 1.8 * less than zero
843 tim 1.11 * @see #getCorePoolSize
844 tim 1.1 */
845 dl 1.2 public void setCorePoolSize(int corePoolSize) {
846     if (corePoolSize < 0)
847     throw new IllegalArgumentException();
848     mainLock.lock();
849     try {
850     int extra = this.corePoolSize - corePoolSize;
851     this.corePoolSize = corePoolSize;
852     if (extra > 0 && poolSize > corePoolSize) {
853     Iterator<Worker> it = workers.iterator();
854 tim 1.10 while (it.hasNext() &&
855     extra > 0 &&
856 dl 1.2 poolSize > corePoolSize &&
857     workQueue.remainingCapacity() == 0) {
858     it.next().interruptIfIdle();
859     --extra;
860     }
861     }
862 tim 1.10
863 dl 1.2 }
864     finally {
865     mainLock.unlock();
866     }
867     }
868 tim 1.1
869     /**
870 dl 1.2 * Returns the core number of threads.
871 tim 1.1 *
872 dl 1.2 * @return the core number of threads
873 tim 1.11 * @see #setCorePoolSize
874 tim 1.1 */
875 tim 1.10 public int getCorePoolSize() {
876 dl 1.2 return corePoolSize;
877     }
878 tim 1.1
879     /**
880     * Sets the maximum allowed number of threads. This overrides any
881 dl 1.2 * value set in the constructor. If the new value is smaller than
882     * the current value, excess existing threads will be
883     * terminated when they next become idle.
884 tim 1.1 *
885 dl 1.2 * @param maximumPoolSize the new maximum
886     * @throws IllegalArgumentException if maximumPoolSize less than zero or
887     * the {@link #getCorePoolSize core pool size}
888 tim 1.11 * @see #getMaximumPoolSize
889 dl 1.2 */
890     public void setMaximumPoolSize(int maximumPoolSize) {
891     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
892     throw new IllegalArgumentException();
893     mainLock.lock();
894     try {
895     int extra = this.maximumPoolSize - maximumPoolSize;
896     this.maximumPoolSize = maximumPoolSize;
897     if (extra > 0 && poolSize > maximumPoolSize) {
898     Iterator<Worker> it = workers.iterator();
899 tim 1.10 while (it.hasNext() &&
900     extra > 0 &&
901 dl 1.2 poolSize > maximumPoolSize) {
902     it.next().interruptIfIdle();
903     --extra;
904     }
905     }
906     }
907     finally {
908     mainLock.unlock();
909     }
910     }
911 tim 1.1
912     /**
913     * Returns the maximum allowed number of threads.
914     *
915 dl 1.2 * @return the maximum allowed number of threads
916 tim 1.11 * @see #setMaximumPoolSize
917 tim 1.1 */
918 tim 1.10 public int getMaximumPoolSize() {
919 dl 1.2 return maximumPoolSize;
920     }
921 tim 1.1
922     /**
923     * Sets the time limit for which threads may remain idle before
924 dl 1.2 * being terminated. If there are more than the core number of
925 tim 1.1 * threads currently in the pool, after waiting this amount of
926     * time without processing a task, excess threads will be
927     * terminated. This overrides any value set in the constructor.
928     * @param time the time to wait. A time value of zero will cause
929     * excess threads to terminate immediately after executing tasks.
930 dl 1.2 * @param unit the time unit of the time argument
931 tim 1.1 * @throws IllegalArgumentException if msecs less than zero
932 tim 1.11 * @see #getKeepAliveTime
933 tim 1.1 */
934 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
935     if (time < 0)
936     throw new IllegalArgumentException();
937     this.keepAliveTime = unit.toNanos(time);
938     }
939 tim 1.1
940     /**
941     * Returns the thread keep-alive time, which is the amount of time
942 dl 1.2 * which threads in excess of the core pool size may remain
943 tim 1.10 * idle before being terminated.
944 tim 1.1 *
945 dl 1.2 * @param unit the desired time unit of the result
946 tim 1.1 * @return the time limit
947 tim 1.11 * @see #setKeepAliveTime
948 tim 1.1 */
949 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
950 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
951     }
952 tim 1.1
953     /* Statistics */
954    
955     /**
956     * Returns the current number of threads in the pool.
957     *
958     * @return the number of threads
959     */
960 tim 1.10 public int getPoolSize() {
961 dl 1.2 return poolSize;
962     }
963 tim 1.1
964     /**
965 dl 1.2 * Returns the approximate number of threads that are actively
966 tim 1.1 * executing tasks.
967     *
968     * @return the number of threads
969     */
970 tim 1.10 public int getActiveCount() {
971 dl 1.2 mainLock.lock();
972     try {
973     int n = 0;
974     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
975     if (it.next().isActive())
976     ++n;
977     }
978     return n;
979     }
980     finally {
981     mainLock.unlock();
982     }
983     }
984 tim 1.1
985     /**
986 dl 1.2 * Returns the largest number of threads that have ever
987     * simultaneously been in the pool.
988 tim 1.1 *
989     * @return the number of threads
990     */
991 tim 1.10 public int getLargestPoolSize() {
992 dl 1.2 mainLock.lock();
993     try {
994     return largestPoolSize;
995     }
996     finally {
997     mainLock.unlock();
998     }
999     }
1000 tim 1.1
1001     /**
1002 dl 1.2 * Returns the approximate total number of tasks that have been
1003     * scheduled for execution. Because the states of tasks and
1004     * threads may change dynamically during computation, the returned
1005     * value is only an approximation.
1006 tim 1.1 *
1007     * @return the number of tasks
1008     */
1009 tim 1.10 public long getTaskCount() {
1010 dl 1.2 mainLock.lock();
1011     try {
1012     long n = completedTaskCount;
1013     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1014     Worker w = it.next();
1015     n += w.completedTasks;
1016     if (w.isActive())
1017     ++n;
1018     }
1019     return n + workQueue.size();
1020     }
1021     finally {
1022     mainLock.unlock();
1023     }
1024     }
1025 tim 1.1
1026     /**
1027 dl 1.2 * Returns the approximate total number of tasks that have
1028     * completed execution. Because the states of tasks and threads
1029     * may change dynamically during computation, the returned value
1030     * is only an approximation.
1031 tim 1.1 *
1032     * @return the number of tasks
1033     */
1034 tim 1.10 public long getCompletedTaskCount() {
1035 dl 1.2 mainLock.lock();
1036     try {
1037     long n = completedTaskCount;
1038 tim 1.10 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1039 dl 1.2 n += it.next().completedTasks;
1040     return n;
1041     }
1042     finally {
1043     mainLock.unlock();
1044     }
1045     }
1046 tim 1.1
1047     /**
1048 dl 1.2 * Method invoked prior to executing the given Runnable in given
1049     * thread. This method may be used to re-initialize ThreadLocals,
1050 dl 1.5 * or to perform logging. Note: To properly nest multiple
1051     * overridings, subclasses should generally invoke
1052     * <tt>super.beforeExecute</tt> at the end of this method.
1053 tim 1.1 *
1054 dl 1.2 * @param t the thread that will run task r.
1055     * @param r the task that will be executed.
1056 tim 1.1 */
1057 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1058 tim 1.1
1059     /**
1060 dl 1.2 * Method invoked upon completion of execution of the given
1061     * Runnable. If non-null, the Throwable is the uncaught exception
1062 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1063     * nest multiple overridings, subclasses should generally invoke
1064     * <tt>super.afterExecute</tt> at the beginning of this method.
1065 tim 1.1 *
1066 dl 1.2 * @param r the runnable that has completed.
1067     * @param t the exception that cause termination, or null if
1068     * execution completed normally.
1069 tim 1.1 */
1070 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1071 tim 1.1
1072 dl 1.2 /**
1073     * Method invoked when the Executor has terminated. Default
1074     * implementation does nothing.
1075     */
1076     protected void terminated() { }
1077 tim 1.1
1078     /**
1079     * A handler for unexecutable tasks that runs these tasks directly in the
1080     * calling thread of the <tt>execute</tt> method. This is the default
1081 dl 1.2 * <tt>RejectedExecutionHandler</tt>.
1082 tim 1.1 */
1083 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1084 tim 1.1
1085     /**
1086     * Constructs a <tt>CallerRunsPolicy</tt>.
1087     */
1088     public CallerRunsPolicy() { }
1089    
1090 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1091     if (!e.isShutdown()) {
1092 tim 1.1 r.run();
1093     }
1094     }
1095     }
1096    
1097     /**
1098 dl 1.8 * A handler for unexecutable tasks that throws a
1099     * <tt>RejectedExecutionException</tt>.
1100 tim 1.1 */
1101 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1102 tim 1.1
1103     /**
1104     * Constructs a <tt>AbortPolicy</tt>.
1105     */
1106     public AbortPolicy() { }
1107    
1108 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1109     throw new RejectedExecutionException();
1110 tim 1.1 }
1111     }
1112    
1113     /**
1114     * A handler for unexecutable tasks that waits until the task can be
1115     * submitted for execution.
1116     */
1117 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1118 tim 1.1 /**
1119     * Constructs a <tt>WaitPolicy</tt>.
1120     */
1121     public WaitPolicy() { }
1122    
1123 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1124     if (!e.isShutdown()) {
1125     try {
1126     e.getQueue().put(r);
1127     }
1128     catch (InterruptedException ie) {
1129     Thread.currentThread().interrupt();
1130     throw new RejectedExecutionException(ie);
1131     }
1132 tim 1.1 }
1133     }
1134     }
1135    
1136     /**
1137     * A handler for unexecutable tasks that silently discards these tasks.
1138     */
1139 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1140 tim 1.1
1141     /**
1142     * Constructs <tt>DiscardPolicy</tt>.
1143     */
1144     public DiscardPolicy() { }
1145    
1146 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1147 tim 1.1 }
1148     }
1149    
1150     /**
1151 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1152     * unhandled request.
1153 tim 1.1 */
1154 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1155 tim 1.1 /**
1156 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1157 tim 1.1 */
1158     public DiscardOldestPolicy() { }
1159    
1160 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1161     if (!e.isShutdown()) {
1162     e.getQueue().poll();
1163     e.execute(r);
1164 tim 1.1 }
1165     }
1166     }
1167     }