ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.9
Committed: Tue Jul 8 00:46:35 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.8: +2 -2 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.2 * An {@link ExecutorService} that executes each submitted task on one
13 tim 1.1 * of several pooled threads.
14     *
15     * <p>Thread pools address two different problems at the same time:
16     * they usually provide improved performance when executing large
17     * numbers of asynchronous tasks, due to reduced per-task invocation
18     * overhead, and they provide a means of bounding and managing the
19     * resources, including threads, consumed in executing a collection of
20     * tasks.
21     *
22     * <p>This class is very configurable and can be configured to create
23     * a new thread for each task, or even to execute tasks sequentially
24     * in a single thread, in addition to its most common configuration,
25     * which reuses a pool of threads.
26     *
27     * <p>To be useful across a wide range of contexts, this class
28     * provides many adjustable parameters and extensibility hooks.
29     * However, programmers are urged to use the more convenient factory
30     * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31     * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 dl 1.2 * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33 tim 1.1 * background thread for execution of tasks), and
34     * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35     * thread), that preconfigure settings for the most common usage
36     * scenarios.
37     *
38     * <p>This class also maintain some basic statistics, such as the
39 dl 1.2 * number of completed tasks, that may be useful for monitoring and
40     * tuning executors.
41 tim 1.1 *
42     * <h3>Tuning guide</h3>
43     * <dl>
44 dl 1.2 *
45     * <dt>Core and maximum pool size</dt>
46     *
47     * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48     * according to the bounds set by corePoolSize and maximumPoolSize.
49     * When a new task is submitted, and fewer than corePoolSize threads
50     * are running, a new thread is created to handle the request, even if
51     * other worker threads are idle. If there are more than the
52     * corePoolSize but less than maximumPoolSize threads running, a new
53     * thread will be created only if the queue is full. By setting
54     * corePoolSize and maximumPoolSize the same, you create a fixed-size
55     * thread pool.</dd>
56     *
57     * <dt>Keep-alive</dt>
58     *
59     * <dd>The keepAliveTime determines what happens to idle threads. If
60     * the pool currently has more than the core number of threads, excess
61     * threads will be terminated if they have been idle for more than the
62     * keepAliveTime.</dd>
63     *
64     * <dt>Queueing</dt>
65     *
66     * <dd>You are free to specify the queuing mechanism used to handle
67 dl 1.6 * submitted tasks. A good default is to use queueless synchronous
68     * channels to to hand off work to threads. This is a safe,
69     * conservative policy that avoids lockups when handling sets of
70     * requests that might have internal dependencies. Using an unbounded
71     * queue (for example a LinkedBlockingQueue) which will cause new
72     * tasks to be queued in cases where all corePoolSize threads are
73     * busy, so no more that corePoolSize threads will be craated. This
74     * may be appropriate when each task is completely independent of
75     * others, so tasks cannot affect each others execution. For example,
76     * in an http server. When given a choice, this pool always prefers
77     * adding a new thread rather than queueing if there are currently
78     * fewer than the current getCorePoolSize threads running, but
79     * otherwise always prefers queuing a request rather than adding a new
80     * thread.
81 tim 1.1 *
82     * <p>While queuing can be useful in smoothing out transient bursts of
83     * requests, especially in socket-based services, it is not very well
84     * behaved when commands continue to arrive on average faster than
85 dl 1.2 * they can be processed.
86 tim 1.1 *
87     * Queue sizes and maximum pool sizes can often be traded off for each
88     * other. Using large queues and small pools minimizes CPU usage, OS
89     * resources, and context-switching overhead, but can lead to
90     * artifically low throughput. If tasks frequently block (for example
91     * if they are I/O bound), a JVM and underlying OS may be able to
92     * schedule time for more threads than you otherwise allow. Use of
93     * small queues or queueless handoffs generally requires larger pool
94     * sizes, which keeps CPUs busier but may encounter unacceptable
95     * scheduling overhead, which also decreases throughput.
96     * </dd>
97 dl 1.2 *
98 tim 1.1 * <dt>Creating new threads</dt>
99 dl 1.2 *
100     * <dd>New threads are created using a ThreadFactory. By default,
101     * threads are created simply with the new Thread(Runnable)
102     * constructor, but by supplying a different ThreadFactory, you can
103     * alter the thread's name, thread group, priority, daemon status,
104     * etc. </dd>
105     *
106 tim 1.1 * <dt>Before and after intercepts</dt>
107 dl 1.2 *
108     * <dd>This class has overridable methods that which are called before
109     * and after execution of each task. These can be used to manipulate
110     * the execution environment (for example, reinitializing
111     * ThreadLocals), gather statistics, or perform logging. </dd>
112     *
113 tim 1.1 * <dt>Blocked execution</dt>
114 dl 1.2 *
115     * <dd>There are a number of factors which can bound the number of
116     * tasks which can execute at once, including the maximum pool size
117     * and the queuing mechanism used. If the executor determines that a
118     * task cannot be executed because it has been refused by the queue
119     * and no threads are available, or because the executor has been shut
120     * down, the RejectedExecutionHandler's rejectedExecution method is
121     * invoked. </dd>
122     *
123 tim 1.1 * <dt>Termination</dt>
124 dl 1.2 *
125     * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
126     * graceful. In an immediate shutdown, any threads currently
127     * executing are interrupted, and any tasks not yet begun are returned
128     * from the shutdownNow call. In a graceful shutdown, all queued
129     * tasks are allowed to run, but new tasks may not be submitted.
130 tim 1.1 * </dd>
131 dl 1.2 *
132 tim 1.1 * </dl>
133     *
134     * @since 1.5
135 dl 1.2 * @see RejectedExecutionHandler
136 tim 1.1 * @see Executors
137     * @see ThreadFactory
138     *
139     * @spec JSR-166
140 dl 1.9 * @revised $Date: 2003/06/24 14:34:49 $
141 dl 1.3 * @editor $Author: dl $
142 dl 1.8 * @author Doug Lea
143 tim 1.1 */
144 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
145     /**
146     * Queue used for holding tasks and handing off to worker threads.
147     */
148     private final BlockingQueue<Runnable> workQueue;
149    
150     /**
151     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152     * workers set.
153     */
154     private final ReentrantLock mainLock = new ReentrantLock();
155    
156     /**
157     * Wait condition to support awaitTermination
158     */
159     private final Condition termination = mainLock.newCondition();
160    
161     /**
162     * Set containing all worker threads in pool.
163     */
164     private final Set<Worker> workers = new HashSet<Worker>();
165    
166     /**
167     * Timeout in nanosecods for idle threads waiting for work.
168     * Threads use this timeout only when there are more than
169     * corePoolSize present. Otherwise they wait forever for new work.
170     */
171     private volatile long keepAliveTime;
172    
173     /**
174     * Core pool size, updated only while holding mainLock,
175     * but volatile to allow concurrent readability even
176     * during updates.
177     */
178     private volatile int corePoolSize;
179    
180     /**
181     * Maximum pool size, updated only while holding mainLock
182     * but volatile to allow concurrent readability even
183     * during updates.
184     */
185     private volatile int maximumPoolSize;
186    
187     /**
188     * Current pool size, updated only while holding mainLock
189     * but volatile to allow concurrent readability even
190     * during updates.
191     */
192     private volatile int poolSize;
193    
194     /**
195     * Shutdown status, becomes (and remains) nonzero when shutdown called.
196     */
197     private volatile int shutdownStatus;
198    
199     // Special values for status
200 dl 1.8 /** Normal, not-shutdown mode */
201 dl 1.2 private static final int NOT_SHUTDOWN = 0;
202 dl 1.8 /** Controlled shutdown mode */
203 dl 1.2 private static final int SHUTDOWN_WHEN_IDLE = 1;
204 dl 1.8 /*8 Immediate shutdown mode */
205 dl 1.2 private static final int SHUTDOWN_NOW = 2;
206    
207     /**
208     * Latch that becomes true when all threads terminate after shutdown.
209     */
210     private volatile boolean isTerminated;
211    
212     /**
213     * Handler called when saturated or shutdown in execute.
214     */
215     private volatile RejectedExecutionHandler handler = defaultHandler;
216    
217     /**
218     * Factory for new threads.
219     */
220     private volatile ThreadFactory threadFactory = defaultThreadFactory;
221    
222     /**
223     * Tracks largest attained pool size.
224     */
225     private int largestPoolSize;
226    
227     /**
228     * Counter for completed tasks. Updated only on termination of
229     * worker threads.
230     */
231     private long completedTaskCount;
232    
233 dl 1.8 /**
234     * The default thread facotry
235     */
236 dl 1.2 private static final ThreadFactory defaultThreadFactory =
237     new ThreadFactory() {
238     public Thread newThread(Runnable r) {
239     return new Thread(r);
240     }
241     };
242    
243 dl 1.8 /**
244     * The default rejectect execution handler
245     */
246 dl 1.2 private static final RejectedExecutionHandler defaultHandler =
247     new AbortPolicy();
248    
249     /**
250     * Create and return a new thread running firstTask as its first
251     * task. Call only while holding mainLock
252 dl 1.8 * @param firstTask the task the new thread should run first (or
253     * null if none)
254     * @return the new thread
255 dl 1.2 */
256     private Thread addThread(Runnable firstTask) {
257     Worker w = new Worker(firstTask);
258     Thread t = threadFactory.newThread(w);
259     w.thread = t;
260     workers.add(w);
261     int nt = ++poolSize;
262     if (nt > largestPoolSize)
263     largestPoolSize = nt;
264     return t;
265     }
266    
267     /**
268     * Create and start a new thread running firstTask as its first
269     * task, only if less than corePoolSize threads are running.
270 dl 1.8 * @param firstTask the task the new thread should run first (or
271     * null if none)
272 dl 1.2 * @return true if successful.
273     */
274 dl 1.8 boolean addIfUnderCorePoolSize(Runnable firstTask) {
275 dl 1.2 Thread t = null;
276     mainLock.lock();
277     try {
278     if (poolSize < corePoolSize)
279 dl 1.8 t = addThread(firstTask);
280 dl 1.2 }
281     finally {
282     mainLock.unlock();
283     }
284     if (t == null)
285     return false;
286     t.start();
287     return true;
288     }
289    
290     /**
291     * Create and start a new thread only if less than maximumPoolSize
292     * threads are running. The new thread runs as its first task the
293     * next task in queue, or if there is none, the given task.
294 dl 1.8 * @param firstTask the task the new thread should run first (or
295     * null if none)
296 dl 1.2 * @return null on failure, else the first task to be run by new thread.
297     */
298 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
299 dl 1.2 Thread t = null;
300     Runnable next = null;
301     mainLock.lock();
302     try {
303     if (poolSize < maximumPoolSize) {
304     next = workQueue.poll();
305     if (next == null)
306 dl 1.8 next = firstTask;
307 dl 1.2 t = addThread(next);
308     }
309     }
310     finally {
311     mainLock.unlock();
312     }
313     if (t == null)
314     return null;
315     t.start();
316     return next;
317     }
318    
319    
320     /**
321     * Get the next task for a worker thread to run.
322 dl 1.8 * @return the task
323     * @throws InterruptedException if interrupted while waiting for task
324 dl 1.2 */
325     private Runnable getTask() throws InterruptedException {
326     for (;;) {
327     int stat = shutdownStatus;
328     if (stat == SHUTDOWN_NOW)
329     return null;
330     long timeout = keepAliveTime;
331     if (timeout <= 0) // must die immediately for 0 timeout
332     return null;
333     if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
334     return workQueue.poll();
335     if (poolSize <= corePoolSize) // untimed wait if core
336     return workQueue.take();
337     Runnable task = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
338     if (task != null)
339     return task;
340     if (poolSize > corePoolSize) // timed out
341     return null;
342     // else, after timeout, pool shrank so shouldn't die, so retry
343     }
344     }
345    
346     /**
347     * Perform bookkeeping for a terminated worker thread.
348 dl 1.8 * @param w the worker
349 dl 1.2 */
350     private void workerDone(Worker w) {
351     boolean allDone = false;
352     mainLock.lock();
353     try {
354     completedTaskCount += w.completedTasks;
355     workers.remove(w);
356    
357     if (--poolSize > 0)
358     return;
359    
360     // If this was last thread, deal with potential shutdown
361     int stat = shutdownStatus;
362    
363     // If there are queued tasks but no threads, create replacement.
364     if (stat != SHUTDOWN_NOW) {
365     Runnable r = workQueue.poll();
366     if (r != null) {
367     addThread(r).start();
368     return;
369     }
370     }
371    
372     // if no tasks and not shutdown, can exit without replacement
373     if (stat == NOT_SHUTDOWN)
374     return;
375    
376     allDone = true;
377     isTerminated = true;
378     termination.signalAll();
379     }
380     finally {
381     mainLock.unlock();
382     }
383    
384     if (allDone) // call outside lock
385     terminated();
386     }
387    
388     /**
389     * Worker threads
390     */
391     private class Worker implements Runnable {
392    
393     /**
394     * The runLock is acquired and released surrounding each task
395     * execution. It mainly protects against interrupts that are
396     * intended to cancel the worker thread from instead
397     * interrupting the task being run.
398     */
399     private final ReentrantLock runLock = new ReentrantLock();
400    
401     /**
402     * Initial task to run before entering run loop
403     */
404     private Runnable firstTask;
405    
406     /**
407     * Per thread completed task counter; accumulated
408     * into completedTaskCount upon termination.
409     */
410     volatile long completedTasks;
411    
412     /**
413     * Thread this worker is running in. Acts as a final field,
414     * but cannot be set until thread is created.
415     */
416     Thread thread;
417    
418     Worker(Runnable firstTask) {
419     this.firstTask = firstTask;
420     }
421    
422     boolean isActive() {
423     return runLock.isLocked();
424     }
425    
426     /**
427     * Interrupt thread if not running a task
428     */
429     void interruptIfIdle() {
430     if (runLock.tryLock()) {
431     try {
432     thread.interrupt();
433     }
434     finally {
435     runLock.unlock();
436     }
437     }
438     }
439    
440     /**
441     * Cause thread to die even if running a task.
442     */
443     void interruptNow() {
444     thread.interrupt();
445     }
446    
447     /**
448     * Run a single task between before/after methods.
449     */
450     private void runTask(Runnable task) {
451     runLock.lock();
452     try {
453     // Abort now if immediate cancel. Otherwise, we have
454     // committed to run this task.
455     if (shutdownStatus == SHUTDOWN_NOW)
456     return;
457    
458     Thread.interrupted(); // clear interrupt status on entry
459     boolean ran = false;
460     beforeExecute(thread, task);
461     try {
462     task.run();
463     ran = true;
464     afterExecute(task, null);
465     ++completedTasks;
466     }
467     catch(RuntimeException ex) {
468     if (!ran)
469     afterExecute(task, ex);
470     // else the exception occurred within
471     // afterExecute itself in which case we don't
472     // want to call it again.
473     throw ex;
474     }
475     }
476     finally {
477     runLock.unlock();
478     }
479     }
480    
481     /**
482     * Main run loop
483     */
484     public void run() {
485     try {
486     for (;;) {
487     Runnable task;
488     if (firstTask != null) {
489     task = firstTask;
490     firstTask = null;
491     }
492     else {
493     task = getTask();
494     if (task == null)
495     break;
496     }
497     runTask(task);
498     task = null; // unnecessary but can help GC
499     }
500     }
501     catch(InterruptedException ie) {
502     // fall through
503     }
504     finally {
505     workerDone(this);
506     }
507     }
508     }
509 tim 1.1
510     /**
511     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
512     * parameters. It may be more convenient to use one of the factory
513     * methods instead of this general purpose constructor.
514     *
515 dl 1.2 * @param corePoolSize the number of threads to keep in the
516 tim 1.1 * pool, even if they are idle.
517 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
518 tim 1.1 * pool.
519     * @param keepAliveTime when the number of threads is greater than
520 dl 1.2 * the core, this is the maximum time that excess idle threads
521 tim 1.1 * will wait for new tasks before terminating.
522 dl 1.2 * @param unit the time unit for the keepAliveTime
523 tim 1.1 * argument.
524     * @param workQueue the queue to use for holding tasks before the
525     * are executed. This queue will hold only the <tt>Runnable</tt>
526     * tasks submitted by the <tt>execute</tt> method.
527 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
528     * keepAliveTime less than zero, or if maximumPoolSize less than or
529     * equal to zero, or if corePoolSize greater than maximumPoolSize.
530 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
531     */
532 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
533     int maximumPoolSize,
534 tim 1.1 long keepAliveTime,
535 dl 1.2 TimeUnit unit,
536     BlockingQueue<Runnable> workQueue) {
537     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
538     defaultThreadFactory, defaultHandler);
539     }
540 tim 1.1
541 dl 1.2 /**
542     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
543     * parameters.
544     *
545     * @param corePoolSize the number of threads to keep in the
546     * pool, even if they are idle.
547     * @param maximumPoolSize the maximum number of threads to allow in the
548     * pool.
549     * @param keepAliveTime when the number of threads is greater than
550     * the core, this is the maximum time that excess idle threads
551     * will wait for new tasks before terminating.
552     * @param unit the time unit for the keepAliveTime
553     * argument.
554     * @param workQueue the queue to use for holding tasks before the
555     * are executed. This queue will hold only the <tt>Runnable</tt>
556     * tasks submitted by the <tt>execute</tt> method.
557     * @param threadFactory the factory to use when the executor
558     * creates a new thread.
559     * @throws IllegalArgumentException if corePoolSize, or
560     * keepAliveTime less than zero, or if maximumPoolSize less than or
561     * equal to zero, or if corePoolSize greater than maximumPoolSize.
562     * @throws NullPointerException if <tt>workQueue</tt>
563     * or <tt>threadFactory</tt> are null.
564     */
565     public ThreadPoolExecutor(int corePoolSize,
566     int maximumPoolSize,
567     long keepAliveTime,
568     TimeUnit unit,
569     BlockingQueue<Runnable> workQueue,
570     ThreadFactory threadFactory) {
571 tim 1.1
572 dl 1.2 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
573     threadFactory, defaultHandler);
574     }
575 tim 1.1
576 dl 1.2 /**
577     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
578     * parameters.
579     *
580     * @param corePoolSize the number of threads to keep in the
581     * pool, even if they are idle.
582     * @param maximumPoolSize the maximum number of threads to allow in the
583     * pool.
584     * @param keepAliveTime when the number of threads is greater than
585     * the core, this is the maximum time that excess idle threads
586     * will wait for new tasks before terminating.
587     * @param unit the time unit for the keepAliveTime
588     * argument.
589     * @param workQueue the queue to use for holding tasks before the
590     * are executed. This queue will hold only the <tt>Runnable</tt>
591     * tasks submitted by the <tt>execute</tt> method.
592     * @param handler the handler to use when execution is blocked
593     * because the thread bounds and queue capacities are reached.
594     * @throws IllegalArgumentException if corePoolSize, or
595     * keepAliveTime less than zero, or if maximumPoolSize less than or
596     * equal to zero, or if corePoolSize greater than maximumPoolSize.
597     * @throws NullPointerException if <tt>workQueue</tt>
598     * or <tt>handler</tt> are null.
599     */
600     public ThreadPoolExecutor(int corePoolSize,
601     int maximumPoolSize,
602     long keepAliveTime,
603     TimeUnit unit,
604     BlockingQueue<Runnable> workQueue,
605     RejectedExecutionHandler handler) {
606     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
607     defaultThreadFactory, handler);
608     }
609 tim 1.1
610 dl 1.2 /**
611     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
612     * parameters.
613     *
614     * @param corePoolSize the number of threads to keep in the
615     * pool, even if they are idle.
616     * @param maximumPoolSize the maximum number of threads to allow in the
617     * pool.
618     * @param keepAliveTime when the number of threads is greater than
619     * the core, this is the maximum time that excess idle threads
620     * will wait for new tasks before terminating.
621     * @param unit the time unit for the keepAliveTime
622     * argument.
623     * @param workQueue the queue to use for holding tasks before the
624     * are executed. This queue will hold only the <tt>Runnable</tt>
625     * tasks submitted by the <tt>execute</tt> method.
626     * @param threadFactory the factory to use when the executor
627     * creates a new thread.
628     * @param handler the handler to use when execution is blocked
629     * because the thread bounds and queue capacities are reached.
630     * @throws IllegalArgumentException if corePoolSize, or
631     * keepAliveTime less than zero, or if maximumPoolSize less than or
632     * equal to zero, or if corePoolSize greater than maximumPoolSize.
633     * @throws NullPointerException if <tt>workQueue</tt>
634     * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
635     */
636     public ThreadPoolExecutor(int corePoolSize,
637     int maximumPoolSize,
638     long keepAliveTime,
639     TimeUnit unit,
640     BlockingQueue<Runnable> workQueue,
641     ThreadFactory threadFactory,
642     RejectedExecutionHandler handler) {
643     if (corePoolSize < 0 ||
644     maximumPoolSize <= 0 ||
645     maximumPoolSize < corePoolSize ||
646     keepAliveTime < 0)
647     throw new IllegalArgumentException();
648     if (workQueue == null || threadFactory == null || handler == null)
649     throw new NullPointerException();
650     this.corePoolSize = corePoolSize;
651     this.maximumPoolSize = maximumPoolSize;
652     this.workQueue = workQueue;
653     this.keepAliveTime = unit.toNanos(keepAliveTime);
654     this.threadFactory = threadFactory;
655     this.handler = handler;
656 tim 1.1 }
657    
658 dl 1.2
659     /**
660     * Executes the given task sometime in the future. The task
661     * may execute in a new thread or in an existing pooled thread.
662     *
663     * If the task cannot be submitted for execution, either because this
664     * executor has been shutdown or because its capacity has been reached,
665     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
666     *
667     * @param command the task to execute
668     * @throws RejectedExecutionException at discretion of
669 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
670     * for execution
671 dl 1.2 */
672     public void execute(Runnable command) {
673     for (;;) {
674     if (shutdownStatus != NOT_SHUTDOWN) {
675     handler.rejectedExecution(command, this);
676     return;
677     }
678     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
679     return;
680     if (workQueue.offer(command))
681     return;
682     Runnable r = addIfUnderMaximumPoolSize(command);
683     if (r == command)
684     return;
685     if (r == null) {
686     handler.rejectedExecution(command, this);
687     return;
688     }
689     // else retry
690     }
691 tim 1.1 }
692 dl 1.4
693 dl 1.2 public void shutdown() {
694     mainLock.lock();
695     try {
696     if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
697     shutdownStatus = SHUTDOWN_WHEN_IDLE;
698 tim 1.1
699 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
700     it.next().interruptIfIdle();
701     }
702     finally {
703     mainLock.unlock();
704     }
705 tim 1.1 }
706    
707 dl 1.2 public List shutdownNow() {
708     mainLock.lock();
709     try {
710     shutdownStatus = SHUTDOWN_NOW;
711     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
712     it.next().interruptNow();
713     }
714     finally {
715     mainLock.unlock();
716     }
717     return Arrays.asList(workQueue.toArray());
718 tim 1.1 }
719    
720 dl 1.2 public boolean isShutdown() {
721     return shutdownStatus != NOT_SHUTDOWN;
722 tim 1.1 }
723    
724 dl 1.2 public boolean isTerminated() {
725     return isTerminated;
726     }
727 tim 1.1
728 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
729     throws InterruptedException {
730     mainLock.lock();
731     try {
732     return termination.await(timeout, unit);
733     }
734     finally {
735     mainLock.unlock();
736     }
737     }
738    
739     /**
740     * Sets the thread factory used to create new threads.
741     *
742     * @param threadFactory the new thread factory
743     */
744     public void setThreadFactory(ThreadFactory threadFactory) {
745     this.threadFactory = threadFactory;
746 tim 1.1 }
747    
748 dl 1.2 /**
749     * Returns the thread factory used to create new threads.
750     *
751     * @return the current thread factory
752     */
753     public ThreadFactory getThreadFactory() {
754     return threadFactory;
755 tim 1.1 }
756    
757 dl 1.2 /**
758     * Sets a new handler for unexecutable tasks.
759     *
760     * @param handler the new handler
761     */
762     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
763     this.handler = handler;
764     }
765 tim 1.1
766 dl 1.2 /**
767     * Returns the current handler for unexecutable tasks.
768     *
769     * @return the current handler
770     */
771     public RejectedExecutionHandler getRejectedExecutionHandler() {
772     return handler;
773 tim 1.1 }
774    
775 dl 1.2 /**
776     * Returns the task queue used by this executor. Note that
777     * this queue may be in active use. Retrieveing the task queue
778     * does not prevent queued tasks from executing.
779     *
780     * @return the task queue
781     */
782     public BlockingQueue<Runnable> getQueue() {
783     return workQueue;
784 tim 1.1 }
785 dl 1.4
786     /**
787     * Removes this task from internal queue if it is present, thus
788     * causing it not to be run if it has not already started. This
789     * method may be useful as one part of a cancellation scheme.
790     *
791 dl 1.8 * @param task the task to remove
792     * @return true if the task was removed
793 dl 1.4 */
794 dl 1.5 public boolean remove(Runnable task) {
795 dl 1.4 return getQueue().remove(task);
796     }
797    
798 dl 1.7
799     /**
800     * Removes from the work queue all {@ link Cancellable} tasks
801     * that have been cancelled. This method can be useful as a
802     * storage reclamation operation, that has no other impact
803     * on functionality. Cancelled tasks are never executed, but
804     * may accumulate in work queues until worker threads can
805     * actively remove them. Invoking this method ensures that they
806     * are instead removed now.
807     */
808    
809     public void purge() {
810     Iterator<Runnable> it = getQueue().iterator();
811     while (it.hasNext()) {
812     Runnable r = it.next();
813     if (r instanceof Cancellable) {
814     Cancellable c = (Cancellable)r;
815     if (c.isCancelled())
816     it.remove();
817     }
818     }
819     }
820 tim 1.1
821     /**
822 dl 1.2 * Sets the core number of threads. This overrides any value set
823     * in the constructor. If the new value is smaller than the
824     * current value, excess existing threads will be terminated when
825     * they next become idle.
826 tim 1.1 *
827 dl 1.2 * @param corePoolSize the new core size
828 dl 1.8 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
829     * less than zero
830 tim 1.1 */
831 dl 1.2 public void setCorePoolSize(int corePoolSize) {
832     if (corePoolSize < 0)
833     throw new IllegalArgumentException();
834     mainLock.lock();
835     try {
836     int extra = this.corePoolSize - corePoolSize;
837     this.corePoolSize = corePoolSize;
838     if (extra > 0 && poolSize > corePoolSize) {
839     Iterator<Worker> it = workers.iterator();
840     while (it.hasNext() &&
841     extra > 0 &&
842     poolSize > corePoolSize &&
843     workQueue.remainingCapacity() == 0) {
844     it.next().interruptIfIdle();
845     --extra;
846     }
847     }
848    
849     }
850     finally {
851     mainLock.unlock();
852     }
853     }
854 tim 1.1
855     /**
856 dl 1.2 * Returns the core number of threads.
857 tim 1.1 *
858 dl 1.2 * @return the core number of threads
859 tim 1.1 */
860 dl 1.2 public int getCorePoolSize() {
861     return corePoolSize;
862     }
863 tim 1.1
864     /**
865     * Sets the maximum allowed number of threads. This overrides any
866 dl 1.2 * value set in the constructor. If the new value is smaller than
867     * the current value, excess existing threads will be
868     * terminated when they next become idle.
869 tim 1.1 *
870 dl 1.2 * @param maximumPoolSize the new maximum
871     * @throws IllegalArgumentException if maximumPoolSize less than zero or
872     * the {@link #getCorePoolSize core pool size}
873     */
874     public void setMaximumPoolSize(int maximumPoolSize) {
875     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
876     throw new IllegalArgumentException();
877     mainLock.lock();
878     try {
879     int extra = this.maximumPoolSize - maximumPoolSize;
880     this.maximumPoolSize = maximumPoolSize;
881     if (extra > 0 && poolSize > maximumPoolSize) {
882     Iterator<Worker> it = workers.iterator();
883     while (it.hasNext() &&
884     extra > 0 &&
885     poolSize > maximumPoolSize) {
886     it.next().interruptIfIdle();
887     --extra;
888     }
889     }
890     }
891     finally {
892     mainLock.unlock();
893     }
894     }
895 tim 1.1
896     /**
897     * Returns the maximum allowed number of threads.
898     *
899 dl 1.2 * @return the maximum allowed number of threads
900 tim 1.1 */
901 dl 1.2 public int getMaximumPoolSize() {
902     return maximumPoolSize;
903     }
904 tim 1.1
905     /**
906     * Sets the time limit for which threads may remain idle before
907 dl 1.2 * being terminated. If there are more than the core number of
908 tim 1.1 * threads currently in the pool, after waiting this amount of
909     * time without processing a task, excess threads will be
910     * terminated. This overrides any value set in the constructor.
911     * @param time the time to wait. A time value of zero will cause
912     * excess threads to terminate immediately after executing tasks.
913 dl 1.2 * @param unit the time unit of the time argument
914 tim 1.1 * @throws IllegalArgumentException if msecs less than zero
915     */
916 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
917     if (time < 0)
918     throw new IllegalArgumentException();
919     this.keepAliveTime = unit.toNanos(time);
920     }
921 tim 1.1
922     /**
923     * Returns the thread keep-alive time, which is the amount of time
924 dl 1.2 * which threads in excess of the core pool size may remain
925     * idle before being terminated.
926 tim 1.1 *
927 dl 1.2 * @param unit the desired time unit of the result
928 tim 1.1 * @return the time limit
929     */
930 dl 1.2 public long getKeepAliveTime(TimeUnit unit) {
931     return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
932     }
933 tim 1.1
934     /* Statistics */
935    
936     /**
937     * Returns the current number of threads in the pool.
938     *
939     * @return the number of threads
940     */
941 dl 1.2 public int getPoolSize() {
942     return poolSize;
943     }
944 tim 1.1
945     /**
946 dl 1.2 * Returns the approximate number of threads that are actively
947 tim 1.1 * executing tasks.
948     *
949     * @return the number of threads
950     */
951 dl 1.2 public int getActiveCount() {
952     mainLock.lock();
953     try {
954     int n = 0;
955     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
956     if (it.next().isActive())
957     ++n;
958     }
959     return n;
960     }
961     finally {
962     mainLock.unlock();
963     }
964     }
965 tim 1.1
966     /**
967 dl 1.2 * Returns the largest number of threads that have ever
968     * simultaneously been in the pool.
969 tim 1.1 *
970     * @return the number of threads
971     */
972 dl 1.2 public int getLargestPoolSize() {
973     mainLock.lock();
974     try {
975     return largestPoolSize;
976     }
977     finally {
978     mainLock.unlock();
979     }
980     }
981 tim 1.1
982     /**
983 dl 1.2 * Returns the approximate total number of tasks that have been
984     * scheduled for execution. Because the states of tasks and
985     * threads may change dynamically during computation, the returned
986     * value is only an approximation.
987 tim 1.1 *
988     * @return the number of tasks
989     */
990 dl 1.2 public long getTaskCount() {
991     mainLock.lock();
992     try {
993     long n = completedTaskCount;
994     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
995     Worker w = it.next();
996     n += w.completedTasks;
997     if (w.isActive())
998     ++n;
999     }
1000     return n + workQueue.size();
1001     }
1002     finally {
1003     mainLock.unlock();
1004     }
1005     }
1006 tim 1.1
1007     /**
1008 dl 1.2 * Returns the approximate total number of tasks that have
1009     * completed execution. Because the states of tasks and threads
1010     * may change dynamically during computation, the returned value
1011     * is only an approximation.
1012 tim 1.1 *
1013     * @return the number of tasks
1014     */
1015 dl 1.2 public long getCompletedTaskCount() {
1016     mainLock.lock();
1017     try {
1018     long n = completedTaskCount;
1019     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1020     n += it.next().completedTasks;
1021     return n;
1022     }
1023     finally {
1024     mainLock.unlock();
1025     }
1026     }
1027 tim 1.1
1028     /**
1029 dl 1.2 * Method invoked prior to executing the given Runnable in given
1030     * thread. This method may be used to re-initialize ThreadLocals,
1031 dl 1.5 * or to perform logging. Note: To properly nest multiple
1032     * overridings, subclasses should generally invoke
1033     * <tt>super.beforeExecute</tt> at the end of this method.
1034 tim 1.1 *
1035 dl 1.2 * @param t the thread that will run task r.
1036     * @param r the task that will be executed.
1037 tim 1.1 */
1038 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1039 tim 1.1
1040     /**
1041 dl 1.2 * Method invoked upon completion of execution of the given
1042     * Runnable. If non-null, the Throwable is the uncaught exception
1043 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1044     * nest multiple overridings, subclasses should generally invoke
1045     * <tt>super.afterExecute</tt> at the beginning of this method.
1046 tim 1.1 *
1047 dl 1.2 * @param r the runnable that has completed.
1048     * @param t the exception that cause termination, or null if
1049     * execution completed normally.
1050 tim 1.1 */
1051 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1052 tim 1.1
1053 dl 1.2 /**
1054     * Method invoked when the Executor has terminated. Default
1055     * implementation does nothing.
1056     */
1057     protected void terminated() { }
1058 tim 1.1
1059     /**
1060     * A handler for unexecutable tasks that runs these tasks directly in the
1061     * calling thread of the <tt>execute</tt> method. This is the default
1062 dl 1.2 * <tt>RejectedExecutionHandler</tt>.
1063 tim 1.1 */
1064 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1065 tim 1.1
1066     /**
1067     * Constructs a <tt>CallerRunsPolicy</tt>.
1068     */
1069     public CallerRunsPolicy() { }
1070    
1071 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1072     if (!e.isShutdown()) {
1073 tim 1.1 r.run();
1074     }
1075     }
1076     }
1077    
1078     /**
1079 dl 1.8 * A handler for unexecutable tasks that throws a
1080     * <tt>RejectedExecutionException</tt>.
1081 tim 1.1 */
1082 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1083 tim 1.1
1084     /**
1085     * Constructs a <tt>AbortPolicy</tt>.
1086     */
1087     public AbortPolicy() { }
1088    
1089 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1090     throw new RejectedExecutionException();
1091 tim 1.1 }
1092     }
1093    
1094     /**
1095     * A handler for unexecutable tasks that waits until the task can be
1096     * submitted for execution.
1097     */
1098 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1099 tim 1.1 /**
1100     * Constructs a <tt>WaitPolicy</tt>.
1101     */
1102     public WaitPolicy() { }
1103    
1104 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1105     if (!e.isShutdown()) {
1106     try {
1107     e.getQueue().put(r);
1108     }
1109     catch (InterruptedException ie) {
1110     Thread.currentThread().interrupt();
1111     throw new RejectedExecutionException(ie);
1112     }
1113 tim 1.1 }
1114     }
1115     }
1116    
1117     /**
1118     * A handler for unexecutable tasks that silently discards these tasks.
1119     */
1120 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1121 tim 1.1
1122     /**
1123     * Constructs <tt>DiscardPolicy</tt>.
1124     */
1125     public DiscardPolicy() { }
1126    
1127 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1128 tim 1.1 }
1129     }
1130    
1131     /**
1132 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1133     * unhandled request.
1134 tim 1.1 */
1135 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1136 tim 1.1 /**
1137 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1138 tim 1.1 */
1139     public DiscardOldestPolicy() { }
1140    
1141 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1142     if (!e.isShutdown()) {
1143     e.getQueue().poll();
1144     e.execute(r);
1145 tim 1.1 }
1146     }
1147     }
1148     }