ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.17
Committed: Sun Aug 24 23:32:25 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.16: +83 -74 lines
Log Message:
Kill ScheduledExecutor Date methods; Documentation clarifications

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.17 * An {@link ExecutorService} that executes each submitted task using
13     * one of possibly several pooled threads.
14 tim 1.1 *
15 dl 1.17 * <p>Thread pools address two different problems: they usually
16     * provide improved performance when executing large numbers of
17     * asynchronous tasks, due to reduced per-task invocation overhead,
18     * and they provide a means of bounding and managing the resources,
19     * including threads, consumed when executing a collection of tasks.
20     *
21     * <p>This class is designed to ge highly tunable.
22 tim 1.1 *
23     * <p>To be useful across a wide range of contexts, this class
24 dl 1.17 * provides many adjustable parameters and extensibility hooks. For
25     * example, it can be configured to create a new thread for each task,
26     * or even to execute tasks sequentially in a single thread, in
27     * addition to its most common configuration, which reuses a pool of
28     * threads. However, programmers are urged to use the more convenient
29     * factory methods <tt>newCachedThreadPool</tt> (unbounded thread
30     * pool, with automatic thread reclamation),
31     * <tt>newFixedThreadPool</tt> (fixed size thread pool),
32     * <tt>newSingleThreadPoolExecutor</tt> (single background thread for
33     * execution of tasks), and <tt>newThreadPerTaskExeceutor</tt>
34     * (execute each task in a new thread), that preconfigure settings for
35     * the most common usage scenarios.
36     *
37     * <p>Each <tt>ThreadPoolExecutor</tt> also maintains some basic
38     * statistics, such as the number of completed tasks, that may be
39     * useful for monitoring and tuning executors.
40 tim 1.1 *
41     * <h3>Tuning guide</h3>
42     * <dl>
43 dl 1.2 *
44     * <dt>Core and maximum pool size</dt>
45     *
46     * <dd>A ThreadPoolExecutor will automatically adjust the pool size
47     * according to the bounds set by corePoolSize and maximumPoolSize.
48     * When a new task is submitted, and fewer than corePoolSize threads
49     * are running, a new thread is created to handle the request, even if
50     * other worker threads are idle. If there are more than the
51     * corePoolSize but less than maximumPoolSize threads running, a new
52     * thread will be created only if the queue is full. By setting
53     * corePoolSize and maximumPoolSize the same, you create a fixed-size
54 dl 1.17 * thread pool. By default, even core threads are only created and
55     * started when needed by new tasks, but this can be overridden
56     * dynamically using method <tt>prestartCoreThread</tt>.
57     * </dd>
58 dl 1.2 *
59 tim 1.10 * <dt>Keep-alive</dt>
60 dl 1.2 *
61     * <dd>The keepAliveTime determines what happens to idle threads. If
62     * the pool currently has more than the core number of threads, excess
63     * threads will be terminated if they have been idle for more than the
64     * keepAliveTime.</dd>
65     *
66 tim 1.10 * <dt>Queueing</dt>
67     *
68 dl 1.2 * <dd>You are free to specify the queuing mechanism used to handle
69 dl 1.17 * submitted tasks. A good default is to use a zero-capacity
70     * <tt>SynchronousQueue</tt> to to hand off work to threads. This is
71     * a safe, conservative policy that avoids lockups when handling sets
72     * of requests that might have internal dependencies. Using an
73     * unbounded queue (for example a <tt>LinkedBlockingQueue</tt>) will
74     * cause new tasks to be queued in cases where all corePoolSize
75     * threads are busy, so no more than corePoolSize threads will be
76     * craated. This may be appropriate when each task is completely
77     * independent of others, so tasks cannot affect each others
78     * execution. For example, in a web page server. When given a choice,
79     * this pool always prefers adding a new thread rather than queueing
80     * if there are currently fewer than the current getCorePoolSize
81     * threads running, but otherwise always prefers queuing a request
82     * rather than adding a new thread.
83 tim 1.1 *
84     * <p>While queuing can be useful in smoothing out transient bursts of
85     * requests, especially in socket-based services, it is not very well
86     * behaved when commands continue to arrive on average faster than
87 dl 1.17 * they can be processed. Queue sizes and maximum pool sizes can
88     * often be traded off for each other. Using large queues and small
89     * pools minimizes CPU usage, OS resources, and context-switching
90     * overhead, but can lead to artifically low throughput. If tasks
91     * frequently block (for example if they are I/O bound), a system may
92     * be able to schedule time for more threads than you otherwise
93     * allow. Use of small queues or queueless handoffs generally requires
94     * larger pool sizes, which keeps CPUs busier but may encounter
95     * unacceptable scheduling overhead, which also decreases throughput.
96 tim 1.1 * </dd>
97 dl 1.2 *
98 tim 1.1 * <dt>Creating new threads</dt>
99 dl 1.2 *
100     * <dd>New threads are created using a ThreadFactory. By default,
101     * threads are created simply with the new Thread(Runnable)
102     * constructor, but by supplying a different ThreadFactory, you can
103     * alter the thread's name, thread group, priority, daemon status,
104     * etc. </dd>
105     *
106 tim 1.1 * <dt>Before and after intercepts</dt>
107 dl 1.2 *
108     * <dd>This class has overridable methods that which are called before
109     * and after execution of each task. These can be used to manipulate
110     * the execution environment (for example, reinitializing
111     * ThreadLocals), gather statistics, or perform logging. </dd>
112     *
113 dl 1.17 * <dt>Rejected tasks</dt>
114 dl 1.2 *
115     * <dd>There are a number of factors which can bound the number of
116     * tasks which can execute at once, including the maximum pool size
117     * and the queuing mechanism used. If the executor determines that a
118     * task cannot be executed because it has been refused by the queue
119     * and no threads are available, or because the executor has been shut
120     * down, the RejectedExecutionHandler's rejectedExecution method is
121 dl 1.17 * invoked. The default (<tt>AbortPolicy</tt>) handler throws a runtime
122     * {@link RejectedExecutionException} upon rejection. </dd>
123 dl 1.2 *
124 tim 1.1 * <dt>Termination</dt>
125 dl 1.2 *
126     * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
127     * graceful. In an immediate shutdown, any threads currently
128     * executing are interrupted, and any tasks not yet begun are returned
129     * from the shutdownNow call. In a graceful shutdown, all queued
130     * tasks are allowed to run, but new tasks may not be submitted.
131 tim 1.1 * </dd>
132 dl 1.2 *
133 tim 1.1 * </dl>
134     *
135     * @since 1.5
136 dl 1.2 * @see RejectedExecutionHandler
137 tim 1.1 * @see Executors
138     * @see ThreadFactory
139     *
140     * @spec JSR-166
141 dl 1.17 * @revised $Date: 2003/08/14 15:34:04 $
142 dl 1.16 * @editor $Author: dl $
143 dl 1.8 * @author Doug Lea
144 tim 1.1 */
145 dl 1.2 public class ThreadPoolExecutor implements ExecutorService {
146     /**
147     * Queue used for holding tasks and handing off to worker threads.
148 tim 1.10 */
149 dl 1.2 private final BlockingQueue<Runnable> workQueue;
150    
151     /**
152     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
153     * workers set.
154 tim 1.10 */
155 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
156    
157     /**
158     * Wait condition to support awaitTermination
159 tim 1.10 */
160 dl 1.2 private final Condition termination = mainLock.newCondition();
161    
162     /**
163     * Set containing all worker threads in pool.
164 tim 1.10 */
165 dl 1.17 private final HashSet<Worker> workers = new HashSet<Worker>();
166 dl 1.2
167     /**
168     * Timeout in nanosecods for idle threads waiting for work.
169     * Threads use this timeout only when there are more than
170     * corePoolSize present. Otherwise they wait forever for new work.
171 tim 1.10 */
172 dl 1.2 private volatile long keepAliveTime;
173    
174     /**
175     * Core pool size, updated only while holding mainLock,
176     * but volatile to allow concurrent readability even
177     * during updates.
178 tim 1.10 */
179 dl 1.2 private volatile int corePoolSize;
180    
181     /**
182     * Maximum pool size, updated only while holding mainLock
183     * but volatile to allow concurrent readability even
184     * during updates.
185 tim 1.10 */
186 dl 1.2 private volatile int maximumPoolSize;
187    
188     /**
189     * Current pool size, updated only while holding mainLock
190     * but volatile to allow concurrent readability even
191     * during updates.
192 tim 1.10 */
193 dl 1.2 private volatile int poolSize;
194    
195     /**
196 dl 1.16 * Lifecycle state
197 tim 1.10 */
198 dl 1.16 private volatile int runState;
199 dl 1.2
200 dl 1.16 // Special values for runState
201 dl 1.8 /** Normal, not-shutdown mode */
202 dl 1.16 private static final int RUNNING = 0;
203 dl 1.8 /** Controlled shutdown mode */
204 dl 1.16 private static final int SHUTDOWN = 1;
205     /** Immediate shutdown mode */
206     private static final int STOP = 2;
207     /** Final state */
208     private static final int TERMINATED = 3;
209 dl 1.2
210     /**
211     * Handler called when saturated or shutdown in execute.
212 tim 1.10 */
213 dl 1.2 private volatile RejectedExecutionHandler handler = defaultHandler;
214    
215     /**
216     * Factory for new threads.
217 tim 1.10 */
218 dl 1.2 private volatile ThreadFactory threadFactory = defaultThreadFactory;
219    
220     /**
221     * Tracks largest attained pool size.
222 tim 1.10 */
223 dl 1.2 private int largestPoolSize;
224    
225     /**
226     * Counter for completed tasks. Updated only on termination of
227     * worker threads.
228 tim 1.10 */
229 dl 1.2 private long completedTaskCount;
230    
231 dl 1.8 /**
232 dl 1.16 * The default thread factory
233 dl 1.8 */
234 tim 1.10 private static final ThreadFactory defaultThreadFactory =
235 dl 1.2 new ThreadFactory() {
236     public Thread newThread(Runnable r) {
237     return new Thread(r);
238     }
239     };
240    
241 dl 1.8 /**
242     * The default rejectect execution handler
243     */
244 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
245 dl 1.2 new AbortPolicy();
246    
247     /**
248 dl 1.17 * Invoke the rejected execution handler for the given command.
249 dl 1.13 */
250     void reject(Runnable command) {
251     handler.rejectedExecution(command, this);
252     }
253    
254     /**
255 dl 1.2 * Create and return a new thread running firstTask as its first
256     * task. Call only while holding mainLock
257 dl 1.8 * @param firstTask the task the new thread should run first (or
258     * null if none)
259     * @return the new thread
260 dl 1.2 */
261     private Thread addThread(Runnable firstTask) {
262     Worker w = new Worker(firstTask);
263     Thread t = threadFactory.newThread(w);
264     w.thread = t;
265     workers.add(w);
266     int nt = ++poolSize;
267     if (nt > largestPoolSize)
268     largestPoolSize = nt;
269     return t;
270     }
271    
272 dl 1.16
273 dl 1.15
274 dl 1.2 /**
275     * Create and start a new thread running firstTask as its first
276     * task, only if less than corePoolSize threads are running.
277 dl 1.8 * @param firstTask the task the new thread should run first (or
278     * null if none)
279 dl 1.2 * @return true if successful.
280     */
281 dl 1.16 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
282 dl 1.2 Thread t = null;
283     mainLock.lock();
284     try {
285 tim 1.10 if (poolSize < corePoolSize)
286 dl 1.8 t = addThread(firstTask);
287 tim 1.14 } finally {
288 dl 1.2 mainLock.unlock();
289     }
290     if (t == null)
291     return false;
292     t.start();
293     return true;
294     }
295    
296     /**
297     * Create and start a new thread only if less than maximumPoolSize
298     * threads are running. The new thread runs as its first task the
299     * next task in queue, or if there is none, the given task.
300 dl 1.8 * @param firstTask the task the new thread should run first (or
301     * null if none)
302 dl 1.2 * @return null on failure, else the first task to be run by new thread.
303     */
304 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
305 dl 1.2 Thread t = null;
306     Runnable next = null;
307     mainLock.lock();
308     try {
309     if (poolSize < maximumPoolSize) {
310     next = workQueue.poll();
311     if (next == null)
312 dl 1.8 next = firstTask;
313 dl 1.2 t = addThread(next);
314     }
315 tim 1.14 } finally {
316 dl 1.2 mainLock.unlock();
317     }
318     if (t == null)
319     return null;
320     t.start();
321     return next;
322     }
323    
324    
325     /**
326     * Get the next task for a worker thread to run.
327 dl 1.8 * @return the task
328     * @throws InterruptedException if interrupted while waiting for task
329 dl 1.2 */
330     private Runnable getTask() throws InterruptedException {
331     for (;;) {
332 dl 1.16 switch(runState) {
333     case RUNNING: {
334     if (poolSize <= corePoolSize) // untimed wait if core
335     return workQueue.take();
336    
337     long timeout = keepAliveTime;
338     if (timeout <= 0) // die immediately for 0 timeout
339     return null;
340     Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
341     if (r != null)
342     return r;
343     if (poolSize > corePoolSize) // timed out
344     return null;
345     // else, after timeout, pool shrank so shouldn't die, so retry
346     break;
347     }
348    
349     case SHUTDOWN: {
350     // Help drain queue
351     Runnable r = workQueue.poll();
352     if (r != null)
353     return r;
354    
355     // Check if can terminate
356     if (workQueue.isEmpty()) {
357     interruptIdleWorkers();
358     return null;
359     }
360    
361     // There could still be delayed tasks in queue.
362     // Wait for one, re-checking state upon interruption
363     try {
364     return workQueue.take();
365     }
366     catch(InterruptedException ignore) {
367     }
368     break;
369     }
370    
371     case STOP:
372 dl 1.2 return null;
373 dl 1.16 default:
374     assert false;
375     }
376     }
377     }
378    
379     /**
380     * Wake up all threads that might be waiting for tasks.
381     */
382     void interruptIdleWorkers() {
383     mainLock.lock();
384     try {
385     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
386     it.next().interruptIfIdle();
387     } finally {
388     mainLock.unlock();
389 dl 1.2 }
390     }
391    
392     /**
393     * Perform bookkeeping for a terminated worker thread.
394 tim 1.10 * @param w the worker
395 dl 1.2 */
396     private void workerDone(Worker w) {
397     mainLock.lock();
398     try {
399     completedTaskCount += w.completedTasks;
400     workers.remove(w);
401 tim 1.10 if (--poolSize > 0)
402 dl 1.2 return;
403    
404 dl 1.16 // Else, this is the last thread. Deal with potential shutdown.
405    
406     int state = runState;
407     assert state != TERMINATED;
408 tim 1.10
409 dl 1.16 if (state != STOP) {
410     // If there are queued tasks but no threads, create
411     // replacement.
412 dl 1.2 Runnable r = workQueue.poll();
413     if (r != null) {
414     addThread(r).start();
415     return;
416     }
417 dl 1.16
418     // If there are some (presumably delayed) tasks but
419     // none pollable, create an idle replacement to wait.
420     if (!workQueue.isEmpty()) {
421     addThread(null).start();
422     return;
423     }
424    
425     // Otherwise, we can exit without replacement
426     if (state == RUNNING)
427     return;
428 dl 1.2 }
429    
430 dl 1.16 // Either state is STOP, or state is SHUTDOWN and there is
431     // no work to do. So we can terminate.
432     runState = TERMINATED;
433 dl 1.2 termination.signalAll();
434 dl 1.16 // fall through to call terminate() outside of lock.
435 tim 1.14 } finally {
436 dl 1.2 mainLock.unlock();
437     }
438    
439 dl 1.16 assert runState == TERMINATED;
440     terminated();
441 dl 1.2 }
442    
443     /**
444 tim 1.10 * Worker threads
445 dl 1.2 */
446     private class Worker implements Runnable {
447    
448     /**
449     * The runLock is acquired and released surrounding each task
450     * execution. It mainly protects against interrupts that are
451     * intended to cancel the worker thread from instead
452     * interrupting the task being run.
453     */
454     private final ReentrantLock runLock = new ReentrantLock();
455    
456     /**
457     * Initial task to run before entering run loop
458     */
459     private Runnable firstTask;
460    
461     /**
462     * Per thread completed task counter; accumulated
463     * into completedTaskCount upon termination.
464     */
465     volatile long completedTasks;
466    
467     /**
468     * Thread this worker is running in. Acts as a final field,
469     * but cannot be set until thread is created.
470     */
471     Thread thread;
472    
473     Worker(Runnable firstTask) {
474     this.firstTask = firstTask;
475     }
476    
477     boolean isActive() {
478     return runLock.isLocked();
479     }
480    
481     /**
482     * Interrupt thread if not running a task
483 tim 1.10 */
484 dl 1.2 void interruptIfIdle() {
485     if (runLock.tryLock()) {
486     try {
487     thread.interrupt();
488 tim 1.14 } finally {
489 dl 1.2 runLock.unlock();
490     }
491     }
492     }
493    
494     /**
495     * Cause thread to die even if running a task.
496 tim 1.10 */
497 dl 1.2 void interruptNow() {
498     thread.interrupt();
499     }
500    
501     /**
502     * Run a single task between before/after methods.
503     */
504     private void runTask(Runnable task) {
505     runLock.lock();
506     try {
507     // Abort now if immediate cancel. Otherwise, we have
508     // committed to run this task.
509 dl 1.16 if (runState == STOP)
510 dl 1.2 return;
511    
512     Thread.interrupted(); // clear interrupt status on entry
513     boolean ran = false;
514     beforeExecute(thread, task);
515     try {
516     task.run();
517     ran = true;
518     afterExecute(task, null);
519     ++completedTasks;
520 tim 1.14 } catch(RuntimeException ex) {
521 dl 1.2 if (!ran)
522     afterExecute(task, ex);
523 dl 1.17 // Else the exception occurred within
524 dl 1.2 // afterExecute itself in which case we don't
525     // want to call it again.
526     throw ex;
527     }
528 tim 1.14 } finally {
529 dl 1.2 runLock.unlock();
530     }
531     }
532    
533     /**
534     * Main run loop
535     */
536     public void run() {
537     try {
538     for (;;) {
539     Runnable task;
540     if (firstTask != null) {
541     task = firstTask;
542     firstTask = null;
543 tim 1.14 } else {
544 dl 1.2 task = getTask();
545     if (task == null)
546     break;
547     }
548     runTask(task);
549     task = null; // unnecessary but can help GC
550     }
551 tim 1.14 } catch(InterruptedException ie) {
552 dl 1.2 // fall through
553 tim 1.14 } finally {
554 dl 1.2 workerDone(this);
555     }
556     }
557     }
558 tim 1.1
559 dl 1.17 // Public methods
560    
561 tim 1.1 /**
562 dl 1.17 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
563     * initial parameters. It may be more convenient to use one of
564     * the {@link @Executors} factory methods instead of this general
565     * purpose constructor.
566 tim 1.1 *
567 dl 1.2 * @param corePoolSize the number of threads to keep in the
568 tim 1.1 * pool, even if they are idle.
569 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
570 tim 1.1 * pool.
571     * @param keepAliveTime when the number of threads is greater than
572 dl 1.2 * the core, this is the maximum time that excess idle threads
573 tim 1.1 * will wait for new tasks before terminating.
574 dl 1.2 * @param unit the time unit for the keepAliveTime
575 tim 1.1 * argument.
576     * @param workQueue the queue to use for holding tasks before the
577     * are executed. This queue will hold only the <tt>Runnable</tt>
578     * tasks submitted by the <tt>execute</tt> method.
579 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
580     * keepAliveTime less than zero, or if maximumPoolSize less than or
581     * equal to zero, or if corePoolSize greater than maximumPoolSize.
582 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
583     */
584 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
585     int maximumPoolSize,
586 tim 1.1 long keepAliveTime,
587 dl 1.2 TimeUnit unit,
588     BlockingQueue<Runnable> workQueue) {
589 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
590 dl 1.2 defaultThreadFactory, defaultHandler);
591     }
592 tim 1.1
593 dl 1.2 /**
594     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
595     * parameters.
596     *
597     * @param corePoolSize the number of threads to keep in the
598     * pool, even if they are idle.
599     * @param maximumPoolSize the maximum number of threads to allow in the
600     * pool.
601     * @param keepAliveTime when the number of threads is greater than
602     * the core, this is the maximum time that excess idle threads
603     * will wait for new tasks before terminating.
604     * @param unit the time unit for the keepAliveTime
605     * argument.
606     * @param workQueue the queue to use for holding tasks before the
607     * are executed. This queue will hold only the <tt>Runnable</tt>
608     * tasks submitted by the <tt>execute</tt> method.
609     * @param threadFactory the factory to use when the executor
610 tim 1.10 * creates a new thread.
611 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
612     * keepAliveTime less than zero, or if maximumPoolSize less than or
613     * equal to zero, or if corePoolSize greater than maximumPoolSize.
614 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
615 dl 1.2 * or <tt>threadFactory</tt> are null.
616     */
617     public ThreadPoolExecutor(int corePoolSize,
618     int maximumPoolSize,
619     long keepAliveTime,
620     TimeUnit unit,
621     BlockingQueue<Runnable> workQueue,
622     ThreadFactory threadFactory) {
623 tim 1.1
624 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
625 dl 1.2 threadFactory, defaultHandler);
626     }
627 tim 1.1
628 dl 1.2 /**
629     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
630     * parameters.
631     *
632     * @param corePoolSize the number of threads to keep in the
633     * pool, even if they are idle.
634     * @param maximumPoolSize the maximum number of threads to allow in the
635     * pool.
636     * @param keepAliveTime when the number of threads is greater than
637     * the core, this is the maximum time that excess idle threads
638     * will wait for new tasks before terminating.
639     * @param unit the time unit for the keepAliveTime
640     * argument.
641     * @param workQueue the queue to use for holding tasks before the
642     * are executed. This queue will hold only the <tt>Runnable</tt>
643     * tasks submitted by the <tt>execute</tt> method.
644     * @param handler the handler to use when execution is blocked
645     * because the thread bounds and queue capacities are reached.
646     * @throws IllegalArgumentException if corePoolSize, or
647     * keepAliveTime less than zero, or if maximumPoolSize less than or
648     * equal to zero, or if corePoolSize greater than maximumPoolSize.
649 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
650 dl 1.2 * or <tt>handler</tt> are null.
651     */
652     public ThreadPoolExecutor(int corePoolSize,
653     int maximumPoolSize,
654     long keepAliveTime,
655     TimeUnit unit,
656     BlockingQueue<Runnable> workQueue,
657     RejectedExecutionHandler handler) {
658 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
659 dl 1.2 defaultThreadFactory, handler);
660     }
661 tim 1.1
662 dl 1.2 /**
663     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
664     * parameters.
665     *
666     * @param corePoolSize the number of threads to keep in the
667     * pool, even if they are idle.
668     * @param maximumPoolSize the maximum number of threads to allow in the
669     * pool.
670     * @param keepAliveTime when the number of threads is greater than
671     * the core, this is the maximum time that excess idle threads
672     * will wait for new tasks before terminating.
673     * @param unit the time unit for the keepAliveTime
674     * argument.
675     * @param workQueue the queue to use for holding tasks before the
676     * are executed. This queue will hold only the <tt>Runnable</tt>
677     * tasks submitted by the <tt>execute</tt> method.
678     * @param threadFactory the factory to use when the executor
679 tim 1.10 * creates a new thread.
680 dl 1.2 * @param handler the handler to use when execution is blocked
681     * because the thread bounds and queue capacities are reached.
682     * @throws IllegalArgumentException if corePoolSize, or
683     * keepAliveTime less than zero, or if maximumPoolSize less than or
684     * equal to zero, or if corePoolSize greater than maximumPoolSize.
685 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
686 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
687     */
688     public ThreadPoolExecutor(int corePoolSize,
689     int maximumPoolSize,
690     long keepAliveTime,
691     TimeUnit unit,
692     BlockingQueue<Runnable> workQueue,
693     ThreadFactory threadFactory,
694     RejectedExecutionHandler handler) {
695 tim 1.10 if (corePoolSize < 0 ||
696 dl 1.2 maximumPoolSize <= 0 ||
697 tim 1.10 maximumPoolSize < corePoolSize ||
698 dl 1.2 keepAliveTime < 0)
699     throw new IllegalArgumentException();
700     if (workQueue == null || threadFactory == null || handler == null)
701     throw new NullPointerException();
702     this.corePoolSize = corePoolSize;
703     this.maximumPoolSize = maximumPoolSize;
704     this.workQueue = workQueue;
705     this.keepAliveTime = unit.toNanos(keepAliveTime);
706     this.threadFactory = threadFactory;
707     this.handler = handler;
708 tim 1.1 }
709    
710 dl 1.2
711     /**
712     * Executes the given task sometime in the future. The task
713     * may execute in a new thread or in an existing pooled thread.
714     *
715     * If the task cannot be submitted for execution, either because this
716     * executor has been shutdown or because its capacity has been reached,
717 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
718 dl 1.2 *
719     * @param command the task to execute
720     * @throws RejectedExecutionException at discretion of
721 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
722     * for execution
723 dl 1.2 */
724 tim 1.10 public void execute(Runnable command) {
725 dl 1.2 for (;;) {
726 dl 1.16 if (runState != RUNNING) {
727 dl 1.13 reject(command);
728 dl 1.2 return;
729     }
730     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
731     return;
732     if (workQueue.offer(command))
733     return;
734     Runnable r = addIfUnderMaximumPoolSize(command);
735     if (r == command)
736     return;
737     if (r == null) {
738 dl 1.13 reject(command);
739 dl 1.2 return;
740     }
741     // else retry
742     }
743 tim 1.1 }
744 dl 1.4
745 dl 1.2 public void shutdown() {
746     mainLock.lock();
747     try {
748 dl 1.16 if (runState == RUNNING) // don't override shutdownNow
749     runState = SHUTDOWN;
750 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
751     it.next().interruptIfIdle();
752 tim 1.14 } finally {
753 dl 1.2 mainLock.unlock();
754     }
755 tim 1.1 }
756    
757 dl 1.16
758 dl 1.2 public List shutdownNow() {
759     mainLock.lock();
760     try {
761 dl 1.16 if (runState != TERMINATED)
762     runState = STOP;
763 dl 1.2 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
764     it.next().interruptNow();
765 tim 1.14 } finally {
766 dl 1.2 mainLock.unlock();
767     }
768     return Arrays.asList(workQueue.toArray());
769 tim 1.1 }
770    
771 dl 1.2 public boolean isShutdown() {
772 dl 1.16 return runState != RUNNING;
773     }
774    
775     /**
776     * Return true if this executor is in the process of terminating
777     * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
778     * completely terminated. This method may be useful for
779     * debugging. A return of <tt>true</tt> reported a sufficient
780     * period after shutdown may indicate that submitted tasks have
781     * ignored or suppressed interruption, causing this executor not
782     * to properly terminate.
783     * @return true if terminating but not yet terminated.
784     */
785     public boolean isTerminating() {
786     return runState == STOP;
787 tim 1.1 }
788    
789 dl 1.2 public boolean isTerminated() {
790 dl 1.16 return runState == TERMINATED;
791 dl 1.2 }
792 tim 1.1
793 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
794     throws InterruptedException {
795     mainLock.lock();
796     try {
797     return termination.await(timeout, unit);
798 tim 1.14 } finally {
799 dl 1.2 mainLock.unlock();
800     }
801 dl 1.15 }
802    
803     /**
804     * Invokes <tt>shutdown</tt> when this executor is no longer
805     * referenced.
806     */
807     protected void finalize() {
808     shutdown();
809 dl 1.2 }
810 tim 1.10
811 dl 1.2 /**
812     * Sets the thread factory used to create new threads.
813     *
814     * @param threadFactory the new thread factory
815 tim 1.11 * @see #getThreadFactory
816 dl 1.2 */
817     public void setThreadFactory(ThreadFactory threadFactory) {
818     this.threadFactory = threadFactory;
819 tim 1.1 }
820    
821 dl 1.2 /**
822     * Returns the thread factory used to create new threads.
823     *
824     * @return the current thread factory
825 tim 1.11 * @see #setThreadFactory
826 dl 1.2 */
827     public ThreadFactory getThreadFactory() {
828     return threadFactory;
829 tim 1.1 }
830    
831 dl 1.2 /**
832     * Sets a new handler for unexecutable tasks.
833     *
834     * @param handler the new handler
835 tim 1.11 * @see #getRejectedExecutionHandler
836 dl 1.2 */
837     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
838     this.handler = handler;
839     }
840 tim 1.1
841 dl 1.2 /**
842     * Returns the current handler for unexecutable tasks.
843     *
844     * @return the current handler
845 tim 1.11 * @see #setRejectedExecutionHandler
846 dl 1.2 */
847     public RejectedExecutionHandler getRejectedExecutionHandler() {
848     return handler;
849 tim 1.1 }
850    
851 dl 1.2 /**
852 dl 1.17 * Returns the task queue used by this executor. Access to the
853     * task queue is intended primarily for debugging and monitoring.
854     * This queue may be in active use. Retrieveing the task queue
855 dl 1.2 * does not prevent queued tasks from executing.
856     *
857     * @return the task queue
858     */
859     public BlockingQueue<Runnable> getQueue() {
860     return workQueue;
861 tim 1.1 }
862 dl 1.4
863     /**
864     * Removes this task from internal queue if it is present, thus
865     * causing it not to be run if it has not already started. This
866     * method may be useful as one part of a cancellation scheme.
867 tim 1.10 *
868 dl 1.8 * @param task the task to remove
869     * @return true if the task was removed
870 dl 1.4 */
871 dl 1.5 public boolean remove(Runnable task) {
872 dl 1.4 return getQueue().remove(task);
873     }
874    
875 dl 1.7
876     /**
877 dl 1.16 * Tries to remove from the work queue all {@link Cancellable}
878     * tasks that have been cancelled. This method can be useful as a
879     * storage reclamation operation, that has no other impact on
880     * functionality. Cancelled tasks are never executed, but may
881     * accumulate in work queues until worker threads can actively
882     * remove them. Invoking this method instead tries to remove them now.
883     * However, this method may fail to remove all such tasks in
884     * the presence of interference by other threads.
885 dl 1.7 */
886    
887     public void purge() {
888 dl 1.16 // Fail if we encounter interference during traversal
889     try {
890     Iterator<Runnable> it = getQueue().iterator();
891     while (it.hasNext()) {
892     Runnable r = it.next();
893     if (r instanceof Cancellable) {
894     Cancellable c = (Cancellable)r;
895     if (c.isCancelled())
896     it.remove();
897     }
898 dl 1.7 }
899     }
900 dl 1.16 catch(ConcurrentModificationException ex) {
901     return;
902     }
903 dl 1.7 }
904 tim 1.1
905     /**
906 dl 1.2 * Sets the core number of threads. This overrides any value set
907     * in the constructor. If the new value is smaller than the
908     * current value, excess existing threads will be terminated when
909     * they next become idle.
910 tim 1.1 *
911 dl 1.2 * @param corePoolSize the new core size
912 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
913 dl 1.8 * less than zero
914 tim 1.11 * @see #getCorePoolSize
915 tim 1.1 */
916 dl 1.2 public void setCorePoolSize(int corePoolSize) {
917     if (corePoolSize < 0)
918     throw new IllegalArgumentException();
919     mainLock.lock();
920     try {
921     int extra = this.corePoolSize - corePoolSize;
922     this.corePoolSize = corePoolSize;
923     if (extra > 0 && poolSize > corePoolSize) {
924     Iterator<Worker> it = workers.iterator();
925 tim 1.10 while (it.hasNext() &&
926     extra > 0 &&
927 dl 1.2 poolSize > corePoolSize &&
928     workQueue.remainingCapacity() == 0) {
929     it.next().interruptIfIdle();
930     --extra;
931     }
932     }
933 tim 1.10
934 tim 1.14 } finally {
935 dl 1.2 mainLock.unlock();
936     }
937     }
938 tim 1.1
939     /**
940 dl 1.2 * Returns the core number of threads.
941 tim 1.1 *
942 dl 1.2 * @return the core number of threads
943 tim 1.11 * @see #setCorePoolSize
944 tim 1.1 */
945 tim 1.10 public int getCorePoolSize() {
946 dl 1.2 return corePoolSize;
947 dl 1.16 }
948    
949     /**
950     * Start a core thread, causing it to idly wait for work. This
951     * overrides the default policy of starting core threads only when
952     * new tasks are executed. This method will return <tt>false</tt>
953     * if all core threads have already been started.
954     * @return true if a thread was started
955     */
956     public boolean prestartCoreThread() {
957     return addIfUnderCorePoolSize(null);
958     }
959    
960     /**
961     * Start all core threads, causing them to idly wait for work. This
962     * overrides the default policy of starting core threads only when
963     * new tasks are executed.
964     * @return the number of threads started.
965     */
966     public int prestartAllCoreThreads() {
967     int n = 0;
968     while (addIfUnderCorePoolSize(null))
969     ++n;
970     return n;
971 dl 1.2 }
972 tim 1.1
973     /**
974     * Sets the maximum allowed number of threads. This overrides any
975 dl 1.2 * value set in the constructor. If the new value is smaller than
976     * the current value, excess existing threads will be
977     * terminated when they next become idle.
978 tim 1.1 *
979 dl 1.2 * @param maximumPoolSize the new maximum
980     * @throws IllegalArgumentException if maximumPoolSize less than zero or
981     * the {@link #getCorePoolSize core pool size}
982 tim 1.11 * @see #getMaximumPoolSize
983 dl 1.2 */
984     public void setMaximumPoolSize(int maximumPoolSize) {
985     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
986     throw new IllegalArgumentException();
987     mainLock.lock();
988     try {
989     int extra = this.maximumPoolSize - maximumPoolSize;
990     this.maximumPoolSize = maximumPoolSize;
991     if (extra > 0 && poolSize > maximumPoolSize) {
992     Iterator<Worker> it = workers.iterator();
993 tim 1.10 while (it.hasNext() &&
994     extra > 0 &&
995 dl 1.2 poolSize > maximumPoolSize) {
996     it.next().interruptIfIdle();
997     --extra;
998     }
999     }
1000 tim 1.14 } finally {
1001 dl 1.2 mainLock.unlock();
1002     }
1003     }
1004 tim 1.1
1005     /**
1006     * Returns the maximum allowed number of threads.
1007     *
1008 dl 1.2 * @return the maximum allowed number of threads
1009 tim 1.11 * @see #setMaximumPoolSize
1010 tim 1.1 */
1011 tim 1.10 public int getMaximumPoolSize() {
1012 dl 1.2 return maximumPoolSize;
1013     }
1014 tim 1.1
1015     /**
1016     * Sets the time limit for which threads may remain idle before
1017 dl 1.2 * being terminated. If there are more than the core number of
1018 tim 1.1 * threads currently in the pool, after waiting this amount of
1019     * time without processing a task, excess threads will be
1020     * terminated. This overrides any value set in the constructor.
1021     * @param time the time to wait. A time value of zero will cause
1022     * excess threads to terminate immediately after executing tasks.
1023 dl 1.2 * @param unit the time unit of the time argument
1024 dl 1.17 * @throws IllegalArgumentException if time less than zero
1025 tim 1.11 * @see #getKeepAliveTime
1026 tim 1.1 */
1027 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
1028     if (time < 0)
1029     throw new IllegalArgumentException();
1030     this.keepAliveTime = unit.toNanos(time);
1031     }
1032 tim 1.1
1033     /**
1034     * Returns the thread keep-alive time, which is the amount of time
1035 dl 1.2 * which threads in excess of the core pool size may remain
1036 tim 1.10 * idle before being terminated.
1037 tim 1.1 *
1038 dl 1.2 * @param unit the desired time unit of the result
1039 tim 1.1 * @return the time limit
1040 tim 1.11 * @see #setKeepAliveTime
1041 tim 1.1 */
1042 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
1043 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1044     }
1045 tim 1.1
1046     /* Statistics */
1047    
1048     /**
1049     * Returns the current number of threads in the pool.
1050     *
1051     * @return the number of threads
1052     */
1053 tim 1.10 public int getPoolSize() {
1054 dl 1.2 return poolSize;
1055     }
1056 tim 1.1
1057     /**
1058 dl 1.2 * Returns the approximate number of threads that are actively
1059 tim 1.1 * executing tasks.
1060     *
1061     * @return the number of threads
1062     */
1063 tim 1.10 public int getActiveCount() {
1064 dl 1.2 mainLock.lock();
1065     try {
1066     int n = 0;
1067     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1068     if (it.next().isActive())
1069     ++n;
1070     }
1071     return n;
1072 tim 1.14 } finally {
1073 dl 1.2 mainLock.unlock();
1074     }
1075     }
1076 tim 1.1
1077     /**
1078 dl 1.2 * Returns the largest number of threads that have ever
1079     * simultaneously been in the pool.
1080 tim 1.1 *
1081     * @return the number of threads
1082     */
1083 tim 1.10 public int getLargestPoolSize() {
1084 dl 1.2 mainLock.lock();
1085     try {
1086     return largestPoolSize;
1087 tim 1.14 } finally {
1088 dl 1.2 mainLock.unlock();
1089     }
1090     }
1091 tim 1.1
1092     /**
1093 dl 1.2 * Returns the approximate total number of tasks that have been
1094     * scheduled for execution. Because the states of tasks and
1095     * threads may change dynamically during computation, the returned
1096 dl 1.17 * value is only an approximation, but one that does not ever
1097     * decrease across successive calls.
1098 tim 1.1 *
1099     * @return the number of tasks
1100     */
1101 tim 1.10 public long getTaskCount() {
1102 dl 1.2 mainLock.lock();
1103     try {
1104     long n = completedTaskCount;
1105     for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1106     Worker w = it.next();
1107     n += w.completedTasks;
1108     if (w.isActive())
1109     ++n;
1110     }
1111     return n + workQueue.size();
1112 tim 1.14 } finally {
1113 dl 1.2 mainLock.unlock();
1114     }
1115     }
1116 tim 1.1
1117     /**
1118 dl 1.2 * Returns the approximate total number of tasks that have
1119     * completed execution. Because the states of tasks and threads
1120     * may change dynamically during computation, the returned value
1121 dl 1.17 * is only an approximation, but one that does not ever decrease
1122     * across successive calls.
1123 tim 1.1 *
1124     * @return the number of tasks
1125     */
1126 tim 1.10 public long getCompletedTaskCount() {
1127 dl 1.2 mainLock.lock();
1128     try {
1129     long n = completedTaskCount;
1130 tim 1.10 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1131 dl 1.2 n += it.next().completedTasks;
1132     return n;
1133 tim 1.14 } finally {
1134 dl 1.2 mainLock.unlock();
1135     }
1136     }
1137 tim 1.1
1138     /**
1139 dl 1.17 * Method invoked prior to executing the given Runnable in the
1140     * given thread. This method may be used to re-initialize
1141     * ThreadLocals, or to perform logging. Note: To properly nest
1142     * multiple overridings, subclasses should generally invoke
1143 dl 1.5 * <tt>super.beforeExecute</tt> at the end of this method.
1144 tim 1.1 *
1145 dl 1.2 * @param t the thread that will run task r.
1146     * @param r the task that will be executed.
1147 tim 1.1 */
1148 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1149 tim 1.1
1150     /**
1151 dl 1.2 * Method invoked upon completion of execution of the given
1152     * Runnable. If non-null, the Throwable is the uncaught exception
1153 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1154     * nest multiple overridings, subclasses should generally invoke
1155     * <tt>super.afterExecute</tt> at the beginning of this method.
1156 tim 1.1 *
1157 dl 1.2 * @param r the runnable that has completed.
1158     * @param t the exception that cause termination, or null if
1159     * execution completed normally.
1160 tim 1.1 */
1161 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1162 tim 1.1
1163 dl 1.2 /**
1164     * Method invoked when the Executor has terminated. Default
1165 dl 1.17 * implementation does nothing. Note: To properly nest multiple
1166     * overridings, subclasses should generally invoke
1167     * <tt>super.terminated</tt> within this method.
1168 dl 1.2 */
1169     protected void terminated() { }
1170 tim 1.1
1171     /**
1172 dl 1.17 * A handler for unexecutable tasks that runs these tasks directly
1173     * in the calling thread of the <tt>execute</tt> method. This is
1174     * the default <tt>RejectedExecutionHandler</tt>.
1175 tim 1.1 */
1176 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1177 tim 1.1
1178     /**
1179     * Constructs a <tt>CallerRunsPolicy</tt>.
1180     */
1181     public CallerRunsPolicy() { }
1182    
1183 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1184     if (!e.isShutdown()) {
1185 tim 1.1 r.run();
1186     }
1187     }
1188     }
1189    
1190     /**
1191 dl 1.8 * A handler for unexecutable tasks that throws a
1192     * <tt>RejectedExecutionException</tt>.
1193 tim 1.1 */
1194 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1195 tim 1.1
1196     /**
1197     * Constructs a <tt>AbortPolicy</tt>.
1198     */
1199     public AbortPolicy() { }
1200    
1201 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1202     throw new RejectedExecutionException();
1203 tim 1.1 }
1204     }
1205    
1206     /**
1207     * A handler for unexecutable tasks that waits until the task can be
1208     * submitted for execution.
1209     */
1210 dl 1.2 public static class WaitPolicy implements RejectedExecutionHandler {
1211 tim 1.1 /**
1212     * Constructs a <tt>WaitPolicy</tt>.
1213     */
1214     public WaitPolicy() { }
1215    
1216 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1217     if (!e.isShutdown()) {
1218     try {
1219     e.getQueue().put(r);
1220 tim 1.14 } catch (InterruptedException ie) {
1221 dl 1.2 Thread.currentThread().interrupt();
1222     throw new RejectedExecutionException(ie);
1223     }
1224 tim 1.1 }
1225     }
1226     }
1227    
1228     /**
1229     * A handler for unexecutable tasks that silently discards these tasks.
1230     */
1231 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1232 tim 1.1
1233     /**
1234     * Constructs <tt>DiscardPolicy</tt>.
1235     */
1236     public DiscardPolicy() { }
1237    
1238 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1239 tim 1.1 }
1240     }
1241    
1242     /**
1243 dl 1.8 * A handler for unexecutable tasks that discards the oldest
1244     * unhandled request.
1245 tim 1.1 */
1246 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1247 tim 1.1 /**
1248 dl 1.2 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1249 tim 1.1 */
1250     public DiscardOldestPolicy() { }
1251    
1252 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1253     if (!e.isShutdown()) {
1254     e.getQueue().poll();
1255     e.execute(r);
1256 tim 1.1 }
1257     }
1258     }
1259     }