ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.11
Committed: Wed Aug 6 19:22:36 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
CVS Tags: JSR166_CR1
Changes since 1.10: +12 -2 lines
Log Message:
Added missing @see tags

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.2 * An {@link ExecutorService} that executes each submitted task on one
13 tim 1.1 * of several pooled threads.
14     *
15     * <p>Thread pools address two different problems at the same time:
16     * they usually provide improved performance when executing large
17     * numbers of asynchronous tasks, due to reduced per-task invocation
18     * overhead, and they provide a means of bounding and managing the
19     * resources, including threads, consumed in executing a collection of
20     * tasks.
21     *
22     * <p>This class is very configurable and can be configured to create
23     * a new thread for each task, or even to execute tasks sequentially
24     * in a single thread, in addition to its most common configuration,
25     * which reuses a pool of threads.
26     *
27     * <p>To be useful across a wide range of contexts, this class
28     * provides many adjustable parameters and extensibility hooks.
29     * However, programmers are urged to use the more convenient factory
30     * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31     * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 dl 1.2 * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33 tim 1.1 * background thread for execution of tasks), and
34     * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35     * thread), that preconfigure settings for the most common usage
36     * scenarios.
37     *
38     * <p>This class also maintain some basic statistics, such as the
39 dl 1.2 * number of completed tasks, that may be useful for monitoring and
40     * tuning executors.
41 tim 1.1 *
42     * <h3>Tuning guide</h3>
43     * <dl>
44 dl 1.2 *
45     * <dt>Core and maximum pool size</dt>
46     *
47     * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48     * according to the bounds set by corePoolSize and maximumPoolSize.
49     * When a new task is submitted, and fewer than corePoolSize threads
50     * are running, a new thread is created to handle the request, even if
51     * other worker threads are idle. If there are more than the
52     * corePoolSize but less than maximumPoolSize threads running, a new
53     * thread will be created only if the queue is full. By setting
54     * corePoolSize and maximumPoolSize the same, you create a fixed-size
55     * thread pool.</dd>
56     *
57 tim 1.10 * <dt>Keep-alive</dt>
58 dl 1.2 *
59     * <dd>The keepAliveTime determines what happens to idle threads. If
60     * the pool currently has more than the core number of threads, excess
61     * threads will be terminated if they have been idle for more than the
62     * keepAliveTime.</dd>
63     *
64 tim 1.10 * <dt>Queueing</dt>
65     *
66 dl 1.2 * <dd>You are free to specify the queuing mechanism used to handle
67 dl 1.6 * submitted tasks. A good default is to use queueless synchronous
68     * channels to to hand off work to threads. This is a safe,
69     * conservative policy that avoids lockups when handling sets of
70     * requests that might have internal dependencies. Using an unbounded
71     * queue (for example a LinkedBlockingQueue) which will cause new
72     * tasks to be queued in cases where all corePoolSize threads are
73     * busy, so no more that corePoolSize threads will be craated. This
74     * may be appropriate when each task is completely independent of
75     * others, so tasks cannot affect each others execution. For example,
76     * in an http server. When given a choice, this pool always prefers
77     * adding a new thread rather than queueing if there are currently
78     * fewer than the current getCorePoolSize threads running, but
79     * otherwise always prefers queuing a request rather than adding a new
80     * thread.
81 tim 1.1 *
82     * <p>While queuing can be useful in smoothing out transient bursts of
83     * requests, especially in socket-based services, it is not very well
84     * behaved when commands continue to arrive on average faster than
85 tim 1.10 * they can be processed.
86 tim 1.1 *
87     * Queue sizes and maximum pool sizes can often be traded off for each
88     * other. Using large queues and small pools minimizes CPU usage, OS
89     * resources, and context-switching overhead, but can lead to
90     * artifically low throughput. If tasks frequently block (for example
91     * if they are I/O bound), a JVM and underlying OS may be able to
92     * schedule time for more threads than you otherwise allow. Use of
93     * small queues or queueless handoffs generally requires larger pool
94     * sizes, which keeps CPUs busier but may encounter unacceptable
95     * scheduling overhead, which also decreases throughput.
96     * </dd>
97 dl 1.2 *
98 tim 1.1 * <dt>Creating new threads</dt>
99 dl 1.2 *
100     * <dd>New threads are created using a ThreadFactory. By default,
101     * threads are created simply with the new Thread(Runnable)
102     * constructor, but by supplying a different ThreadFactory, you can
103     * alter the thread's name, thread group, priority, daemon status,
104     * etc. </dd>
105     *
106 tim 1.1 * <dt>Before and after intercepts</dt>
107 dl 1.2 *
108     * <dd>This class has overridable methods that which are called before
109     * and after execution of each task. These can be used to manipulate
110     * the execution environment (for example, reinitializing
111     * ThreadLocals), gather statistics, or perform logging. </dd>
112     *
113 tim 1.1 * <dt>Blocked execution</dt>
114 dl 1.2 *
115     * <dd>There are a number of factors which can bound the number of
116     * tasks which can execute at once, including the maximum pool size
117     * and the queuing mechanism used. If the executor determines that a
118     * task cannot be executed because it has been refused by the queue
119     * and no threads are available, or because the executor has been shut
120     * down, the RejectedExecutionHandler's rejectedExecution method is
121     * invoked. </dd>
122     *
123 tim 1.1 * <dt>Termination</dt>
124 dl 1.2 *
125     * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
126     * graceful. In an immediate shutdown, any threads currently
127     * executing are interrupted, and any tasks not yet begun are returned
128     * from the shutdownNow call. In a graceful shutdown, all queued
129     * tasks are allowed to run, but new tasks may not be submitted.
130 tim 1.1 * </dd>
131 dl 1.2 *
132 tim 1.1 * </dl>
133     *
134     * @since 1.5
135 dl 1.2 * @see RejectedExecutionHandler
136 tim 1.1 * @see Executors
137     * @see ThreadFactory
138     *
139     * @spec JSR-166
140 tim 1.11 * @revised $Date: 2003/07/31 20:32:00 $
141     * @editor $Author: tim $
142 dl 1.8 * @author Doug Lea
143 tim 1.1 */
144 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
145     /**
146     * Queue used for holding tasks and handing off to worker threads.
147 tim 1.10 */
148 dl 1.2 private final BlockingQueue<Runnable> workQueue;
149    
150     /**
151     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152     * workers set.
153 tim 1.10 */
154 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
155    
156     /**
157     * Wait condition to support awaitTermination
158 tim 1.10 */
159 dl 1.2 private final Condition termination = mainLock.newCondition();
160    
161     /**
162     * Set containing all worker threads in pool.
163 tim 1.10 */
164 dl 1.2 private final Set<Worker> workers = new HashSet<Worker>();
165    
166     /**
167     * Timeout in nanosecods for idle threads waiting for work.
168     * Threads use this timeout only when there are more than
169     * corePoolSize present. Otherwise they wait forever for new work.
170 tim 1.10 */
171 dl 1.2 private volatile long keepAliveTime;
172    
173     /**
174     * Core pool size, updated only while holding mainLock,
175     * but volatile to allow concurrent readability even
176     * during updates.
177 tim 1.10 */
178 dl 1.2 private volatile int corePoolSize;
179    
180     /**
181     * Maximum pool size, updated only while holding mainLock
182     * but volatile to allow concurrent readability even
183     * during updates.
184 tim 1.10 */
185 dl 1.2 private volatile int maximumPoolSize;
186    
187     /**
188     * Current pool size, updated only while holding mainLock
189     * but volatile to allow concurrent readability even
190     * during updates.
191 tim 1.10 */
192 dl 1.2 private volatile int poolSize;
193    
194     /**
195     * Shutdown status, becomes (and remains) nonzero when shutdown called.
196 tim 1.10 */
197 dl 1.2 private volatile int shutdownStatus;
198    
199     // Special values for status
200 dl 1.8 /** Normal, not-shutdown mode */
201 dl 1.2 private static final int NOT_SHUTDOWN = 0;
202 dl 1.8 /** Controlled shutdown mode */
203 dl 1.2 private static final int SHUTDOWN_WHEN_IDLE = 1;
204 dl 1.8 /*8 Immediate shutdown mode */
205 tim 1.10 private static final int SHUTDOWN_NOW = 2;
206 dl 1.2
207     /**
208     * Latch that becomes true when all threads terminate after shutdown.
209 tim 1.10 */
210 dl 1.2 private volatile boolean isTerminated;
211    
212     /**
213     * Handler called when saturated or shutdown in execute.
214 tim 1.10 */
215 dl 1.2 private volatile RejectedExecutionHandler handler = defaultHandler;
216    
217     /**
218     * Factory for new threads.
219 tim 1.10 */
220 dl 1.2 private volatile ThreadFactory threadFactory = defaultThreadFactory;
221    
222     /**
223     * Tracks largest attained pool size.
224 tim 1.10 */
225 dl 1.2 private int largestPoolSize;
226    
227     /**
228     * Counter for completed tasks. Updated only on termination of
229     * worker threads.
230 tim 1.10 */
231 dl 1.2 private long completedTaskCount;
232    
233 dl 1.8 /**
234 tim 1.10 * The default thread facotry
235 dl 1.8 */
236 tim 1.10 private static final ThreadFactory defaultThreadFactory =
237 dl 1.2 new ThreadFactory() {
238     public Thread newThread(Runnable r) {
239     return new Thread(r);
240     }
241     };
242    
243 dl 1.8 /**
244     * The default rejectect execution handler
245     */
246 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
247 dl 1.2 new AbortPolicy();
248    
249     /**
250     * Create and return a new thread running firstTask as its first
251     * task. Call only while holding mainLock
252 dl 1.8 * @param firstTask the task the new thread should run first (or
253     * null if none)
254     * @return the new thread
255 dl 1.2 */
256     private Thread addThread(Runnable firstTask) {
257     Worker w = new Worker(firstTask);
258     Thread t = threadFactory.newThread(w);
259     w.thread = t;
260     workers.add(w);
261     int nt = ++poolSize;
262     if (nt > largestPoolSize)
263     largestPoolSize = nt;
264     return t;
265     }
266    
267     /**
268     * Create and start a new thread running firstTask as its first
269     * task, only if less than corePoolSize threads are running.
270 dl 1.8 * @param firstTask the task the new thread should run first (or
271     * null if none)
272 dl 1.2 * @return true if successful.
273     */
274 dl 1.8 boolean addIfUnderCorePoolSize(Runnable firstTask) {
275 dl 1.2 Thread t = null;
276     mainLock.lock();
277     try {
278 tim 1.10 if (poolSize < corePoolSize)
279 dl 1.8 t = addThread(firstTask);
280 dl 1.2 }
281     finally {
282     mainLock.unlock();
283     }
284     if (t == null)
285     return false;
286     t.start();
287     return true;
288     }
289    
290     /**
291     * Create and start a new thread only if less than maximumPoolSize
292     * threads are running. The new thread runs as its first task the
293     * next task in queue, or if there is none, the given task.
294 dl 1.8 * @param firstTask the task the new thread should run first (or
295     * null if none)
296 dl 1.2 * @return null on failure, else the first task to be run by new thread.
297     */
298 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
299 dl 1.2 Thread t = null;
300     Runnable next = null;
301     mainLock.lock();
302     try {
303     if (poolSize < maximumPoolSize) {
304     next = workQueue.poll();
305     if (next == null)
306 dl 1.8 next = firstTask;
307 dl 1.2 t = addThread(next);
308     }
309     }
310     finally {
311     mainLock.unlock();
312     }
313     if (t == null)
314     return null;
315     t.start();
316     return next;
317     }
318    
319    
320     /**
321     * Get the next task for a worker thread to run.
322 dl 1.8 * @return the task
323     * @throws InterruptedException if interrupted while waiting for task
324 dl 1.2 */
325     private Runnable getTask() throws InterruptedException {
326     for (;;) {
327     int stat = shutdownStatus;
328     if (stat == SHUTDOWN_NOW)
329     return null;
330     long timeout = keepAliveTime;
331     if (timeout <= 0) // must die immediately for 0 timeout
332     return null;
333     if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
334     return workQueue.poll();
335     if (poolSize <= corePoolSize) // untimed wait if core
336     return workQueue.take();
337     Runnable task = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
338     if (task != null)
339     return task;
340     if (poolSize > corePoolSize) // timed out
341     return null;
342     // else, after timeout, pool shrank so shouldn't die, so retry
343     }
344     }
345    
346     /**
347     * Perform bookkeeping for a terminated worker thread.
348 tim 1.10 * @param w the worker
349 dl 1.2 */
350     private void workerDone(Worker w) {
351     boolean allDone = false;
352     mainLock.lock();
353     try {
354     completedTaskCount += w.completedTasks;
355     workers.remove(w);
356    
357 tim 1.10 if (--poolSize > 0)
358 dl 1.2 return;
359    
360     // If this was last thread, deal with potential shutdown
361     int stat = shutdownStatus;
362 tim 1.10
363 dl 1.2 // If there are queued tasks but no threads, create replacement.
364     if (stat != SHUTDOWN_NOW) {
365     Runnable r = workQueue.poll();
366     if (r != null) {
367     addThread(r).start();
368     return;
369     }
370     }
371    
372     // if no tasks and not shutdown, can exit without replacement
373 tim 1.10 if (stat == NOT_SHUTDOWN)
374 dl 1.2 return;
375    
376     allDone = true;
377     isTerminated = true;
378     termination.signalAll();
379     }
380     finally {
381     mainLock.unlock();
382     }
383    
384     if (allDone) // call outside lock
385     terminated();
386     }
387    
388     /**
389 tim 1.10 * Worker threads
390 dl 1.2 */
391     private class Worker implements Runnable {
392    
393     /**
394     * The runLock is acquired and released surrounding each task
395     * execution. It mainly protects against interrupts that are
396     * intended to cancel the worker thread from instead
397     * interrupting the task being run.
398     */
399     private final ReentrantLock runLock = new ReentrantLock();
400    
401     /**
402     * Initial task to run before entering run loop
403     */
404     private Runnable firstTask;
405    
406     /**
407     * Per thread completed task counter; accumulated
408     * into completedTaskCount upon termination.
409     */
410     volatile long completedTasks;
411    
412     /**
413     * Thread this worker is running in. Acts as a final field,
414     * but cannot be set until thread is created.
415     */
416     Thread thread;
417    
418     Worker(Runnable firstTask) {
419     this.firstTask = firstTask;
420     }
421    
422     boolean isActive() {
423     return runLock.isLocked();
424     }
425    
426     /**
427     * Interrupt thread if not running a task
428 tim 1.10 */
429 dl 1.2 void interruptIfIdle() {
430     if (runLock.tryLock()) {
431     try {
432     thread.interrupt();
433     }
434     finally {
435     runLock.unlock();
436     }
437     }
438     }
439    
440     /**
441     * Cause thread to die even if running a task.
442 tim 1.10 */
443 dl 1.2 void interruptNow() {
444     thread.interrupt();
445     }
446    
447     /**
448     * Run a single task between before/after methods.
449     */
450     private void runTask(Runnable task) {
451     runLock.lock();
452     try {
453     // Abort now if immediate cancel. Otherwise, we have
454     // committed to run this task.
455     if (shutdownStatus == SHUTDOWN_NOW)
456     return;
457    
458     Thread.interrupted(); // clear interrupt status on entry
459     boolean ran = false;
460     beforeExecute(thread, task);
461     try {
462     task.run();
463     ran = true;
464     afterExecute(task, null);
465     ++completedTasks;
466     }
467     catch(RuntimeException ex) {
468     if (!ran)
469     afterExecute(task, ex);
470     // else the exception occurred within
471     // afterExecute itself in which case we don't
472     // want to call it again.
473     throw ex;
474     }
475     }
476     finally {
477     runLock.unlock();
478     }
479     }
480    
481     /**
482     * Main run loop
483     */
484     public void run() {
485     try {
486     for (;;) {
487     Runnable task;
488     if (firstTask != null) {
489     task = firstTask;
490     firstTask = null;
491     }
492     else {
493     task = getTask();
494     if (task == null)
495     break;
496     }
497     runTask(task);
498     task = null; // unnecessary but can help GC
499     }
500     }
501 tim 1.10 catch(InterruptedException ie) {
502 dl 1.2 // fall through
503     }
504     finally {
505     workerDone(this);
506     }
507     }
508     }
509 tim 1.1
510     /**
511     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
512     * parameters. It may be more convenient to use one of the factory
513     * methods instead of this general purpose constructor.
514     *
515 dl 1.2 * @param corePoolSize the number of threads to keep in the
516 tim 1.1 * pool, even if they are idle.
517 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
518 tim 1.1 * pool.
519     * @param keepAliveTime when the number of threads is greater than
520 dl 1.2 * the core, this is the maximum time that excess idle threads
521 tim 1.1 * will wait for new tasks before terminating.
522 dl 1.2 * @param unit the time unit for the keepAliveTime
523 tim 1.1 * argument.
524     * @param workQueue the queue to use for holding tasks before the
525     * are executed. This queue will hold only the <tt>Runnable</tt>
526     * tasks submitted by the <tt>execute</tt> method.
527 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
528     * keepAliveTime less than zero, or if maximumPoolSize less than or
529     * equal to zero, or if corePoolSize greater than maximumPoolSize.
530 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
531     */
532 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
533     int maximumPoolSize,
534 tim 1.1 long keepAliveTime,
535 dl 1.2 TimeUnit unit,
536     BlockingQueue<Runnable> workQueue) {
537 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
538 dl 1.2 defaultThreadFactory, defaultHandler);
539     }
540 tim 1.1
541 dl 1.2 /**
542     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
543     * parameters.
544     *
545     * @param corePoolSize the number of threads to keep in the
546     * pool, even if they are idle.
547     * @param maximumPoolSize the maximum number of threads to allow in the
548     * pool.
549     * @param keepAliveTime when the number of threads is greater than
550     * the core, this is the maximum time that excess idle threads
551     * will wait for new tasks before terminating.
552     * @param unit the time unit for the keepAliveTime
553     * argument.
554     * @param workQueue the queue to use for holding tasks before the
555     * are executed. This queue will hold only the <tt>Runnable</tt>
556     * tasks submitted by the <tt>execute</tt> method.
557     * @param threadFactory the factory to use when the executor
558 tim 1.10 * creates a new thread.
559 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
560     * keepAliveTime less than zero, or if maximumPoolSize less than or
561     * equal to zero, or if corePoolSize greater than maximumPoolSize.
562 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
563 dl 1.2 * or <tt>threadFactory</tt> are null.
564     */
565     public ThreadPoolExecutor(int corePoolSize,
566     int maximumPoolSize,
567     long keepAliveTime,
568     TimeUnit unit,
569     BlockingQueue<Runnable> workQueue,
570     ThreadFactory threadFactory) {
571 tim 1.1
572 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
573 dl 1.2 threadFactory, defaultHandler);
574     }
575 tim 1.1
576 dl 1.2 /**
577     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
578     * parameters.
579     *
580     * @param corePoolSize the number of threads to keep in the
581     * pool, even if they are idle.
582     * @param maximumPoolSize the maximum number of threads to allow in the
583     * pool.
584     * @param keepAliveTime when the number of threads is greater than
585     * the core, this is the maximum time that excess idle threads
586     * will wait for new tasks before terminating.
587     * @param unit the time unit for the keepAliveTime
588     * argument.
589     * @param workQueue the queue to use for holding tasks before the
590     * are executed. This queue will hold only the <tt>Runnable</tt>
591     * tasks submitted by the <tt>execute</tt> method.
592     * @param handler the handler to use when execution is blocked
593     * because the thread bounds and queue capacities are reached.
594     * @throws IllegalArgumentException if corePoolSize, or
595     * keepAliveTime less than zero, or if maximumPoolSize less than or
596     * equal to zero, or if corePoolSize greater than maximumPoolSize.
597 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
598 dl 1.2 * or <tt>handler</tt> are null.
599     */
600     public ThreadPoolExecutor(int corePoolSize,
601     int maximumPoolSize,
602     long keepAliveTime,
603     TimeUnit unit,
604     BlockingQueue<Runnable> workQueue,
605     RejectedExecutionHandler handler) {
606 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
607 dl 1.2 defaultThreadFactory, handler);
608     }
609 tim 1.1
610 dl 1.2 /**
611     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
612     * parameters.
613     *
614     * @param corePoolSize the number of threads to keep in the
615     * pool, even if they are idle.
616     * @param maximumPoolSize the maximum number of threads to allow in the
617     * pool.
618     * @param keepAliveTime when the number of threads is greater than
619     * the core, this is the maximum time that excess idle threads
620     * will wait for new tasks before terminating.
621     * @param unit the time unit for the keepAliveTime
622     * argument.
623     * @param workQueue the queue to use for holding tasks before the
624     * are executed. This queue will hold only the <tt>Runnable</tt>
625     * tasks submitted by the <tt>execute</tt> method.
626     * @param threadFactory the factory to use when the executor
627 tim 1.10 * creates a new thread.
628 dl 1.2 * @param handler the handler to use when execution is blocked
629     * because the thread bounds and queue capacities are reached.
630     * @throws IllegalArgumentException if corePoolSize, or
631     * keepAliveTime less than zero, or if maximumPoolSize less than or
632     * equal to zero, or if corePoolSize greater than maximumPoolSize.
633 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
634 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
635     */
636     public ThreadPoolExecutor(int corePoolSize,
637     int maximumPoolSize,
638     long keepAliveTime,
639     TimeUnit unit,
640     BlockingQueue<Runnable> workQueue,
641     ThreadFactory threadFactory,
642     RejectedExecutionHandler handler) {
643 tim 1.10 if (corePoolSize < 0 ||
644 dl 1.2 maximumPoolSize <= 0 ||
645 tim 1.10 maximumPoolSize < corePoolSize ||
646 dl 1.2 keepAliveTime < 0)
647     throw new IllegalArgumentException();
648     if (workQueue == null || threadFactory == null || handler == null)
649     throw new NullPointerException();
650     this.corePoolSize = corePoolSize;
651     this.maximumPoolSize = maximumPoolSize;
652     this.workQueue = workQueue;
653     this.keepAliveTime = unit.toNanos(keepAliveTime);
654     this.threadFactory = threadFactory;
655     this.handler = handler;
656 tim 1.1 }
657    
658 dl 1.2
659     /**
660     * Executes the given task sometime in the future. The task
661     * may execute in a new thread or in an existing pooled thread.
662     *
663     * If the task cannot be submitted for execution, either because this
664     * executor has been shutdown or because its capacity has been reached,
665 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
666 dl 1.2 *
667     * @param command the task to execute
668     * @throws RejectedExecutionException at discretion of
669 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
670     * for execution
671 dl 1.2 */
672 tim 1.10 public void execute(Runnable command) {
673 dl 1.2 for (;;) {
674     if (shutdownStatus != NOT_SHUTDOWN) {
675     handler.rejectedExecution(command, this);
676     return;
677     }
678     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
679     return;
680     if (workQueue.offer(command))
681     return;
682     Runnable r = addIfUnderMaximumPoolSize(command);
683     if (r == command)
684     return;
685     if (r == null) {
686     handler.rejectedExecution(command, this);
687     return;
688     }
689     // else retry
690     }
691 tim 1.1 }
692 dl 1.4
693 dl 1.2 public void shutdown() {
694     mainLock.lock();
695     try {
696     if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
697     shutdownStatus = SHUTDOWN_WHEN_IDLE;
698 tim 1.1
699 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
700     it.next().interruptIfIdle();
701     }
702     finally {
703     mainLock.unlock();
704     }
705 tim 1.1 }
706    
707 dl 1.2 public List shutdownNow() {
708     mainLock.lock();
709     try {
710     shutdownStatus = SHUTDOWN_NOW;
711     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
712     it.next().interruptNow();
713     }
714     finally {
715     mainLock.unlock();
716     }
717     return Arrays.asList(workQueue.toArray());
718 tim 1.1 }
719    
720 dl 1.2 public boolean isShutdown() {
721     return shutdownStatus != NOT_SHUTDOWN;
722 tim 1.1 }
723    
724 dl 1.2 public boolean isTerminated() {
725     return isTerminated;
726     }
727 tim 1.1
728 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
729     throws InterruptedException {
730     mainLock.lock();
731     try {
732     return termination.await(timeout, unit);
733     }
734     finally {
735     mainLock.unlock();
736     }
737     }
738 tim 1.10
739 dl 1.2 /**
740     * Sets the thread factory used to create new threads.
741     *
742     * @param threadFactory the new thread factory
743 tim 1.11 * @see #getThreadFactory
744 dl 1.2 */
745     public void setThreadFactory(ThreadFactory threadFactory) {
746     this.threadFactory = threadFactory;
747 tim 1.1 }
748    
749 dl 1.2 /**
750     * Returns the thread factory used to create new threads.
751     *
752     * @return the current thread factory
753 tim 1.11 * @see #setThreadFactory
754 dl 1.2 */
755     public ThreadFactory getThreadFactory() {
756     return threadFactory;
757 tim 1.1 }
758    
759 dl 1.2 /**
760     * Sets a new handler for unexecutable tasks.
761     *
762     * @param handler the new handler
763 tim 1.11 * @see #getRejectedExecutionHandler
764 dl 1.2 */
765     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
766     this.handler = handler;
767     }
768 tim 1.1
769 dl 1.2 /**
770     * Returns the current handler for unexecutable tasks.
771     *
772     * @return the current handler
773 tim 1.11 * @see #setRejectedExecutionHandler
774 dl 1.2 */
775     public RejectedExecutionHandler getRejectedExecutionHandler() {
776     return handler;
777 tim 1.1 }
778    
779 dl 1.2 /**
780     * Returns the task queue used by this executor. Note that
781     * this queue may be in active use. Retrieveing the task queue
782     * does not prevent queued tasks from executing.
783     *
784     * @return the task queue
785     */
786     public BlockingQueue<Runnable> getQueue() {
787     return workQueue;
788 tim 1.1 }
789 dl 1.4
790     /**
791     * Removes this task from internal queue if it is present, thus
792     * causing it not to be run if it has not already started. This
793     * method may be useful as one part of a cancellation scheme.
794 tim 1.10 *
795 dl 1.8 * @param task the task to remove
796     * @return true if the task was removed
797 dl 1.4 */
798 dl 1.5 public boolean remove(Runnable task) {
799 dl 1.4 return getQueue().remove(task);
800     }
801    
802 dl 1.7
803     /**
804 tim 1.10 * Removes from the work queue all {@link Cancellable} tasks
805 dl 1.7 * that have been cancelled. This method can be useful as a
806     * storage reclamation operation, that has no other impact
807     * on functionality. Cancelled tasks are never executed, but
808     * may accumulate in work queues until worker threads can
809     * actively remove them. Invoking this method ensures that they
810     * are instead removed now.
811     */
812    
813     public void purge() {
814     Iterator<Runnable> it = getQueue().iterator();
815     while (it.hasNext()) {
816     Runnable r = it.next();
817     if (r instanceof Cancellable) {
818     Cancellable c = (Cancellable)r;
819 tim 1.10 if (c.isCancelled())
820 dl 1.7 it.remove();
821     }
822     }
823     }
824 tim 1.1
825     /**
826 dl 1.2 * Sets the core number of threads. This overrides any value set
827     * in the constructor. If the new value is smaller than the
828     * current value, excess existing threads will be terminated when
829     * they next become idle.
830 tim 1.1 *
831 dl 1.2 * @param corePoolSize the new core size
832 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
833 dl 1.8 * less than zero
834 tim 1.11 * @see #getCorePoolSize
835 tim 1.1 */
836 dl 1.2 public void setCorePoolSize(int corePoolSize) {
837     if (corePoolSize < 0)
838     throw new IllegalArgumentException();
839     mainLock.lock();
840     try {
841     int extra = this.corePoolSize - corePoolSize;
842     this.corePoolSize = corePoolSize;
843     if (extra > 0 && poolSize > corePoolSize) {
844     Iterator<Worker> it = workers.iterator();
845 tim 1.10 while (it.hasNext() &&
846     extra > 0 &&
847 dl 1.2 poolSize > corePoolSize &&
848     workQueue.remainingCapacity() == 0) {
849     it.next().interruptIfIdle();
850     --extra;
851     }
852     }
853 tim 1.10
854 dl 1.2 }
855     finally {
856     mainLock.unlock();
857     }
858     }
859 tim 1.1
860     /**
861 dl 1.2 * Returns the core number of threads.
862 tim 1.1 *
863 dl 1.2 * @return the core number of threads
864 tim 1.11 * @see #setCorePoolSize
865 tim 1.1 */
866 tim 1.10 public int getCorePoolSize() {
867 dl 1.2 return corePoolSize;
868     }
869 tim 1.1
870     /**
871     * Sets the maximum allowed number of threads. This overrides any
872 dl 1.2 * value set in the constructor. If the new value is smaller than
873     * the current value, excess existing threads will be
874     * terminated when they next become idle.
875 tim 1.1 *
876 dl 1.2 * @param maximumPoolSize the new maximum
877     * @throws IllegalArgumentException if maximumPoolSize less than zero or
878     * the {@link #getCorePoolSize core pool size}
879 tim 1.11 * @see #getMaximumPoolSize
880 dl 1.2 */
881     public void setMaximumPoolSize(int maximumPoolSize) {
882     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
883     throw new IllegalArgumentException();
884     mainLock.lock();
885     try {
886     int extra = this.maximumPoolSize - maximumPoolSize;
887     this.maximumPoolSize = maximumPoolSize;
888     if (extra > 0 && poolSize > maximumPoolSize) {
889     Iterator<Worker> it = workers.iterator();
890 tim 1.10 while (it.hasNext() &&
891     extra > 0 &&
892 dl 1.2 poolSize > maximumPoolSize) {
893     it.next().interruptIfIdle();
894     --extra;
895     }
896     }
897     }
898     finally {
899     mainLock.unlock();
900     }
901     }
902 tim 1.1
903     /**
904     * Returns the maximum allowed number of threads.
905     *
906 dl 1.2 * @return the maximum allowed number of threads
907 tim 1.11 * @see #setMaximumPoolSize
908 tim 1.1 */
909 tim 1.10 public int getMaximumPoolSize() {
910 dl 1.2 return maximumPoolSize;
911     }
912 tim 1.1
913     /**
914     * Sets the time limit for which threads may remain idle before
915 dl 1.2 * being terminated. If there are more than the core number of
916 tim 1.1 * threads currently in the pool, after waiting this amount of
917     * time without processing a task, excess threads will be
918     * terminated. This overrides any value set in the constructor.
919     * @param time the time to wait. A time value of zero will cause
920     * excess threads to terminate immediately after executing tasks.
921 dl 1.2 * @param unit the time unit of the time argument
922 tim 1.1 * @throws IllegalArgumentException if msecs less than zero
923 tim 1.11 * @see #getKeepAliveTime
924 tim 1.1 */
925 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
926     if (time < 0)
927     throw new IllegalArgumentException();
928     this.keepAliveTime = unit.toNanos(time);
929     }
930 tim 1.1
931     /**
932     * Returns the thread keep-alive time, which is the amount of time
933 dl 1.2 * which threads in excess of the core pool size may remain
934 tim 1.10 * idle before being terminated.
935 tim 1.1 *
936 dl 1.2 * @param unit the desired time unit of the result
937 tim 1.1 * @return the time limit
938 tim 1.11 * @see #setKeepAliveTime
939 tim 1.1 */
940 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
941 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
942     }
943 tim 1.1
944     /* Statistics */
945    
946     /**
947     * Returns the current number of threads in the pool.
948     *
949     * @return the number of threads
950     */
951 tim 1.10 public int getPoolSize() {
952 dl 1.2 return poolSize;
953     }
954 tim 1.1
955     /**
956 dl 1.2 * Returns the approximate number of threads that are actively
957 tim 1.1 * executing tasks.
958     *
959     * @return the number of threads
960     */
961 tim 1.10 public int getActiveCount() {
962 dl 1.2 mainLock.lock();
963     try {
964     int n = 0;
965     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
966     if (it.next().isActive())
967     ++n;
968     }
969     return n;
970     }
971     finally {
972     mainLock.unlock();
973     }
974     }
975 tim 1.1
976     /**
977 dl 1.2 * Returns the largest number of threads that have ever
978     * simultaneously been in the pool.
979 tim 1.1 *
980     * @return the number of threads
981     */
982 tim 1.10 public int getLargestPoolSize() {
983 dl 1.2 mainLock.lock();
984     try {
985     return largestPoolSize;
986     }
987     finally {
988     mainLock.unlock();
989     }
990     }
991 tim 1.1
992     /**
993 dl 1.2 * Returns the approximate total number of tasks that have been
994     * scheduled for execution. Because the states of tasks and
995     * threads may change dynamically during computation, the returned
996     * value is only an approximation.
997 tim 1.1 *
998     * @return the number of tasks
999     */
1000 tim 1.10 public long getTaskCount() {
1001 dl 1.2 mainLock.lock();
1002     try {
1003     long n = completedTaskCount;
1004     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1005     Worker w = it.next();
1006     n += w.completedTasks;
1007     if (w.isActive())
1008     ++n;
1009     }
1010     return n + workQueue.size();
1011     }
1012     finally {
1013     mainLock.unlock();
1014     }
1015     }
1016 tim 1.1
1017     /**
1018 dl 1.2 * Returns the approximate total number of tasks that have
1019     * completed execution. Because the states of tasks and threads
1020     * may change dynamically during computation, the returned value
1021     * is only an approximation.
1022 tim 1.1 *
1023     * @return the number of tasks
1024     */
1025 tim 1.10 public long getCompletedTaskCount() {
1026 dl 1.2 mainLock.lock();
1027     try {
1028     long n = completedTaskCount;
1029 tim 1.10 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1030 dl 1.2 n += it.next().completedTasks;
1031     return n;
1032     }
1033     finally {
1034     mainLock.unlock();
1035     }
1036     }
1037 tim 1.1
1038     /**
1039 dl 1.2 * Method invoked prior to executing the given Runnable in given
1040     * thread. This method may be used to re-initialize ThreadLocals,
1041 dl 1.5 * or to perform logging. Note: To properly nest multiple
1042     * overridings, subclasses should generally invoke
1043     * <tt>super.beforeExecute</tt> at the end of this method.
1044 tim 1.1 *
1045 dl 1.2 * @param t the thread that will run task r.
1046     * @param r the task that will be executed.
1047 tim 1.1 */
1048 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1049 tim 1.1
1050     /**
1051 dl 1.2 * Method invoked upon completion of execution of the given
1052     * Runnable. If non-null, the Throwable is the uncaught exception
1053 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1054     * nest multiple overridings, subclasses should generally invoke
1055     * <tt>super.afterExecute</tt> at the beginning of this method.
1056 tim 1.1 *
1057 dl 1.2 * @param r the runnable that has completed.
1058     * @param t the exception that cause termination, or null if
1059     * execution completed normally.
1060 tim 1.1 */
1061 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1062 tim 1.1
1063 dl 1.2 /**
1064     * Method invoked when the Executor has terminated. Default
1065     * implementation does nothing.
1066     */
1067     protected void terminated() { }
1068 tim 1.1
1069     /**
1070     * A handler for unexecutable tasks that runs these tasks directly in the
1071     * calling thread of the <tt>execute</tt> method. This is the default
1072 dl 1.2 * <tt>RejectedExecutionHandler</tt>.
1073 tim 1.1 */
1074 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1075 tim 1.1
1076     /**
1077     * Constructs a <tt>CallerRunsPolicy</tt>.
1078     */
1079     public CallerRunsPolicy() { }
1080    
1081 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1082     if (!e.isShutdown()) {
1083 tim 1.1 r.run();
1084     }
1085     }
1086     }
1087    
1088     /**
1089 dl 1.8 * A handler for unexecutable tasks that throws a
1090     * <tt>RejectedExecutionException</tt>.
1091 tim 1.1 */
1092 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1093 tim 1.1
1094     /**
1095     * Constructs a <tt>AbortPolicy</tt>.
1096     */
1097     public AbortPolicy() { }
1098    
1099 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1100     throw new RejectedExecutionException();
1101 tim 1.1 }
1102     }
1103    
1104     /**
1105     * A handler for unexecutable tasks that waits until the task can be
1106     * submitted for execution.
1107     */
1108 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1109 tim 1.1 /**
1110     * Constructs a <tt>WaitPolicy</tt>.
1111     */
1112     public WaitPolicy() { }
1113    
1114 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1115     if (!e.isShutdown()) {
1116     try {
1117     e.getQueue().put(r);
1118     }
1119     catch (InterruptedException ie) {
1120     Thread.currentThread().interrupt();
1121     throw new RejectedExecutionException(ie);
1122     }
1123 tim 1.1 }
1124     }
1125     }
1126    
1127     /**
1128     * A handler for unexecutable tasks that silently discards these tasks.
1129     */
1130 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1131 tim 1.1
1132     /**
1133     * Constructs <tt>DiscardPolicy</tt>.
1134     */
1135     public DiscardPolicy() { }
1136    
1137 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1138 tim 1.1 }
1139     }
1140    
1141     /**
1142 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1143     * unhandled request.
1144 tim 1.1 */
1145 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1146 tim 1.1 /**
1147 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1148 tim 1.1 */
1149     public DiscardOldestPolicy() { }
1150    
1151 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1152     if (!e.isShutdown()) {
1153     e.getQueue().poll();
1154     e.execute(r);
1155 tim 1.1 }
1156     }
1157     }
1158     }